/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.caudexorigo.concurrent.Sleep;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

public class QueueLatencyTestMain {
    public static void main(String[] args) throws Throwable {
        QueueLatencyTestMain.runTest();
        QueueLatencyTestMain.runTest();
    }

    private static void runTest() throws Throwable {
        int NUMBER_OF_MESSAGES = 1000;
        String BROKER_HOST = "127.0.0.1";
        int BROKER_PORT = 3323;
        String DESTINATION_NAME = "/Queue/LatencyTest";
        NetAction.DestinationType DESTINATION_TYPE = NetAction.DestinationType.QUEUE;
        AtomicInteger testCount = new AtomicInteger(0);
        final AtomicInteger receivedMessages = new AtomicInteger(0);
        final AtomicLong minLatency = new AtomicLong(Long.MAX_VALUE);
        final AtomicLong maxLatency = new AtomicLong(0L);
        final AtomicLong totalLantecy = new AtomicLong(0L);
        BrokerClient brokerClientConsumer = new BrokerClient("127.0.0.1", 3323);
        final CountDownLatch latch = new CountDownLatch(1);
        NetSubscribe netSubscribe = new NetSubscribe("/Queue/LatencyTest", DESTINATION_TYPE);
        netSubscribe.addHeader("ACK_REQUIRED", "false");
        brokerClientConsumer.addAsyncConsumer(netSubscribe, new BrokerListener(){

            public void onMessage(NetNotification message) {
                long now = System.nanoTime();
                long send_ts = Long.parseLong(new String(message.getMessage().getPayload()));
                long elapsed = now - send_ts;
                if (receivedMessages.get() % 100 == 0) {
                    System.out.println("Received: " + receivedMessages.get());
                }
                if (elapsed < minLatency.get()) {
                    minLatency.set(elapsed);
                }
                if (elapsed > maxLatency.get()) {
                    maxLatency.set(elapsed);
                }
                totalLantecy.addAndGet(elapsed);
                if (receivedMessages.incrementAndGet() == 1000) {
                    latch.countDown();
                }
            }

            public boolean isAutoAck() {
                return false;
            }
        });
        System.out.println();
        System.out.println("[Starting test]");
        long global_start = System.currentTimeMillis();
        Random rnd = new Random();
        BrokerClient brokerClientProducer = new BrokerClient("127.0.0.1", 3323);
        do {
            if (testCount.get() % 100 == 0) {
                System.out.println("Sent: " + testCount.get());
            }
            NetBrokerMessage message = new NetBrokerMessage(Long.toString(System.nanoTime()));
            if (DESTINATION_TYPE.equals((Object)NetAction.DestinationType.TOPIC)) {
                brokerClientProducer.publishMessage(message, "/Queue/LatencyTest");
            } else {
                brokerClientProducer.enqueueMessage(message, "/Queue/LatencyTest");
            }
            long rnd_slepp = Math.abs(rnd.nextLong());
            Sleep.time((long)(rnd_slepp % 20L));
        } while (testCount.incrementAndGet() != 1000);
        latch.await();
        long global_end = System.currentTimeMillis();
        long global_elapsed = global_end - global_start;
        brokerClientConsumer.close();
        brokerClientProducer.close();
        System.out.println("Test results:");
        System.out.println(String.format("Min latency:\t\t%s ms", minLatency.doubleValue() / 1000000.0));
        System.out.println(String.format("Max latency:\t\t%s ms", maxLatency.doubleValue() / 1000000.0));
        System.out.printf("Average latency:\t%s ms%n", totalLantecy.doubleValue() / 1000.0 / 1000000.0);
        System.out.printf("Test Duration: %sms%n", global_elapsed);
    }
}

