/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import org.caudexorigo.Shutdown;
import org.caudexorigo.concurrent.Sleep;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

public class PubSubPerformanceTestV1 {
    private static String hostname = "localhost";
    private static int port = 3323;
    private static final int NUMBER_OF_CONSUMERS = 4;
    private static final int MESSAGES_PRODUCED = 50;
    private static final String DESTINATION_NAME = "/topic/performance";
    private static boolean produceBefore = true;
    private static Object producingSynObj = new Object();
    private static volatile long initTime = 0L;
    private static volatile long endTime = 0L;
    private static Object endSyncObj = new Object();
    private static CountDownLatch consumerThreadRegistered = new CountDownLatch(0);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) {
        System.out.println("Test starting!");
        int consumerCount = 4;
        while (consumerCount-- != 0) {
            new AsynConsumer().start();
        }
        try {
            consumerThreadRegistered.await();
        }
        catch (InterruptedException e1) {
            e1.printStackTrace();
            Shutdown.now();
        }
        new Producer().start();
        Object object = endSyncObj;
        synchronized (object) {
            try {
                endSyncObj.wait();
                endTime = System.nanoTime();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("  Test Ended!");
        System.out.println("     Messages produced: 50");
        System.out.println("     Number of consumer: 4");
        System.out.println("     Time (in milliseconds): " + (endTime - initTime) / 1000000L);
    }

    public static class AsynConsumer
    extends Thread {
        private volatile int messagesReceived = 0;
        private BrokerClient bk = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            System.out.println("AsynConsumer Thread started");
            try {
                this.bk = new BrokerClient(hostname, port);
                if (produceBefore) {
                    Object object = producingSynObj;
                    synchronized (object) {
                        producingSynObj.wait();
                    }
                }
                if (initTime == 0L) {
                    initTime = System.nanoTime();
                }
                try {
                    NetSubscribe subscribe = new NetSubscribe(PubSubPerformanceTestV1.DESTINATION_NAME, NetAction.DestinationType.TOPIC);
                    BrokerListener listener = new BrokerListener(){

                        public boolean isAutoAck() {
                            return true;
                        }

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        public void onMessage(NetNotification message) {
                            System.out.print("C");
                            if (++AsynConsumer.this.messagesReceived == 50) {
                                Object object = endSyncObj;
                                synchronized (object) {
                                    endSyncObj.notifyAll();
                                    AsynConsumer.this.bk.close();
                                }
                            }
                        }
                    };
                    this.bk.addAsyncConsumer(subscribe, listener);
                    consumerThreadRegistered.countDown();
                }
                catch (TimeoutException te) {
                    te.printStackTrace();
                }
            }
            catch (Throwable e) {
                e.printStackTrace();
                this.bk.close();
                System.out.println("Consumer Thread ended (exception):");
                return;
            }
            System.out.println("AsynConsumer.run() END");
        }
    }

    public static class Producer
    extends Thread {
        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            System.out.println("Producer Thread started");
            try {
                BrokerClient bk = new BrokerClient(hostname, port);
                int count = 50;
                NetBrokerMessage brokerMessage = new NetBrokerMessage("This is a test!");
                while (count-- != 0) {
                    System.out.print("P");
                    bk.publishMessage(brokerMessage, PubSubPerformanceTestV1.DESTINATION_NAME);
                    Sleep.time((long)10L);
                }
                if (produceBefore) {
                    Object object = producingSynObj;
                    synchronized (object) {
                        producingSynObj.notifyAll();
                    }
                }
                bk.close();
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
            System.out.println("Producer Thread ended");
        }
    }
}

