/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance.distributed;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.DistConsumerApp;
import pt.com.broker.performance.distributed.DistProducerApp;
import pt.com.broker.performance.distributed.DistTestCliArgs;
import pt.com.broker.performance.distributed.MachineConfiguration;
import pt.com.broker.performance.distributed.TestManager;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

public class LocalManager
implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(LocalManager.class);
    private ExecutorService executer;
    private BrokerClient brokerClient;
    private String hostname;
    private int port;
    private String machineName;

    public static void main(String[] args) throws Throwable {
        DistTestCliArgs cargs = (DistTestCliArgs)CliFactory.parseArguments(DistTestCliArgs.class, (String[])args);
        LocalManager testManager = new LocalManager();
        testManager.machineName = cargs.getMachineName();
        testManager.hostname = cargs.getHost();
        testManager.port = cargs.getPort();
        testManager.brokerClient = new BrokerClient(testManager.hostname, testManager.port);
        if (cargs.warmup()) {
            testManager.warmUp();
        }
        NetSubscribe subscribe = new NetSubscribe(String.format("%s%s", TestManager.TEST_MANAGEMENT_LOCAL_MANAGERS, testManager.machineName), NetAction.DestinationType.QUEUE);
        testManager.brokerClient.addAsyncConsumer(subscribe, (BrokerListener)testManager);
        while (true) {
            Sleep.time((long)5000L);
        }
    }

    private void warmUp() {
        System.out.println("Warming up!");
        System.out.println(" - Publishing messages...");
        int nrOfMsg = 20000;
        int msgSize = 8192;
        String topic = "/topic/warmup/" + RandomStringUtils.randomAlphanumeric((int)10);
        String content = RandomStringUtils.random((int)8192);
        NetBrokerMessage brokerMsg = new NetBrokerMessage(content);
        for (int i = 0; i != 20000; ++i) {
            this.brokerClient.publishMessage(brokerMsg, topic);
        }
        System.out.println(" - Enqueueing and consuming messages...");
        String queueName = "/queue/warmup/" + RandomStringUtils.randomAlphanumeric((int)10);
        NetSubscribe subscribe = new NetSubscribe(queueName, NetAction.DestinationType.QUEUE);
        final AtomicLong count = new AtomicLong(20000L);
        final CountDownLatch latch = new CountDownLatch(1);
        try {
            this.brokerClient.addAsyncConsumer(subscribe, new BrokerListener(){

                public void onMessage(NetNotification message) {
                    if (count.decrementAndGet() == 0L) {
                        latch.countDown();
                    }
                }

                public boolean isAutoAck() {
                    return true;
                }
            });
        }
        catch (Throwable e) {
            latch.countDown();
        }
        for (int i = 0; i != 20000; ++i) {
            this.brokerClient.enqueueMessage(brokerMsg, queueName);
        }
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        System.out.println("Warming up ended");
    }

    private void startConsumer(String name) {
        final String consumerName = name;
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    System.out.println("Consumer started: " + consumerName);
                    new DistConsumerApp(LocalManager.this.hostname, LocalManager.this.port, consumerName);
                }
                catch (Throwable e) {
                    log.error("Consumer initialization failed", e);
                }
            }
        };
        this.executer.submit(runnable);
    }

    private void startProducer(String name) {
        final String producerName = name;
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    System.out.println("Producer started: " + producerName);
                    new DistProducerApp(LocalManager.this.hostname, LocalManager.this.port, producerName);
                }
                catch (Throwable e) {
                    log.error("Consumer initialization failed", e);
                }
            }
        };
        this.executer.submit(runnable);
    }

    private void startTest(MachineConfiguration machineConfiguration) {
        System.out.println(String.format("Starting test, using %s consumers and %s producers.", machineConfiguration.getConsumers().size(), machineConfiguration.getProducers().size()));
        int actors = 0;
        if (machineConfiguration.getConsumers() != null) {
            actors += machineConfiguration.getConsumers().size();
        }
        if (machineConfiguration.getProducers() != null) {
            actors += machineConfiguration.getProducers().size();
        }
        this.executer = Executors.newFixedThreadPool(actors);
        for (String consumerName : machineConfiguration.getConsumers()) {
            this.startConsumer(consumerName);
        }
        for (String producerName : machineConfiguration.getProducers()) {
            this.startProducer(producerName);
        }
    }

    private void stopTest(MachineConfiguration machineConfiguration) {
    }

    public boolean isAutoAck() {
        return true;
    }

    public void onMessage(NetNotification message) {
        try {
            MachineConfiguration machineConfiguration = MachineConfiguration.deserialize(message.getMessage().getPayload());
            if (!machineConfiguration.isStop()) {
                this.startTest(machineConfiguration);
            } else {
                this.stopTest(machineConfiguration);
            }
        }
        catch (Throwable t) {
            log.error("Failed to process received message", t);
        }
    }
}

