/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance.distributed;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang3.RandomStringUtils;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.DistTestCliArgs;
import pt.com.broker.performance.distributed.DistTestParams;
import pt.com.broker.performance.distributed.MachineConfiguration;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.performance.distributed.conf.ConfigurationInfo;
import pt.com.broker.performance.distributed.conf.Consumers;
import pt.com.broker.performance.distributed.conf.Machines;
import pt.com.broker.performance.distributed.conf.Producers;
import pt.com.broker.performance.distributed.conf.Tests;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetProtocolType;
import pt.com.broker.types.NetSubscribe;

public class TestManager
implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(TestManager.class);
    private static String TEST_MANAGEMENT_BASE = "/perf-test/management";
    public static String TEST_MANAGEMENT_ACTION = TEST_MANAGEMENT_BASE + "/action/";
    public static String TEST_MANAGEMENT_RESULT = TEST_MANAGEMENT_BASE + "/result";
    public static String TEST_MANAGEMENT_LOCAL_MANAGERS = TEST_MANAGEMENT_BASE + "/localmanager/";
    private BrokerClient brokerClient;
    private String hostname;
    private int port;
    private Map<String, DistTestParams> tests = new LinkedHashMap<String, DistTestParams>();
    private TreeMap<String, List<TestResult>> results = new TreeMap();
    private HashMap<String, ConfigurationInfo.AgentInfo> agents;
    private HashMap<String, MachineConfiguration> machineConfigurations = new HashMap();
    private StringBuilder testResults = new StringBuilder();
    private int numberOfMessages;
    private int messageSize;
    private volatile CountDownLatch testsCountDown;

    public static void main(String[] args) throws Throwable {
        DistTestCliArgs cargs = (DistTestCliArgs)CliFactory.parseArguments(DistTestCliArgs.class, (String[])args);
        TestManager testManager = new TestManager();
        testManager.numberOfMessages = cargs.getNumberOfMessages();
        testManager.messageSize = cargs.getMessageLength();
        testManager.hostname = cargs.getHost();
        testManager.port = cargs.getPort();
        testManager.init();
        System.out.println(String.format("Test manger running...", new Object[0]));
        testManager.start();
        System.out.println(String.format("\n\nTests ended!\n\nShowing all results", new Object[0]));
        testManager.showTotalResults();
        testManager.stop();
    }

    private void init() {
        pt.com.broker.performance.conf.ConfigurationInfo.init();
        ConfigurationInfo.init();
        this.agents = ConfigurationInfo.getAgents();
        this.loadMachineConfiguration();
        this.addTests();
    }

    private void start() {
        try {
            NetBrokerMessage netBrokerMessage;
            byte[] data;
            MachineConfiguration machineConfiguration;
            this.brokerClient = new BrokerClient(this.hostname, this.port);
            NetSubscribe netSubscribe = new NetSubscribe(TEST_MANAGEMENT_RESULT, NetAction.DestinationType.QUEUE);
            this.brokerClient.addAsyncConsumer(netSubscribe, (BrokerListener)this);
            System.out.println("Starting to configure participant machines");
            for (String machine : this.machineConfigurations.keySet()) {
                machineConfiguration = this.machineConfigurations.get(machine);
                System.out.println(String.format("Test: %s", machineConfiguration.getMachineName()));
                System.out.println("Consumers: ");
                for (String consumerName : machineConfiguration.getConsumers()) {
                    System.out.println(" - " + consumerName);
                }
                System.out.println("Producers: ");
                for (String producerName : machineConfiguration.getProducers()) {
                    System.out.println(" - " + producerName);
                }
                data = machineConfiguration.serialize();
                netBrokerMessage = new NetBrokerMessage(data);
                String destination = String.format("%s%s", TEST_MANAGEMENT_LOCAL_MANAGERS, machineConfiguration.getMachineName());
                this.brokerClient.enqueueMessage(netBrokerMessage, destination);
            }
            Sleep.time((long)2000L);
            for (String testName : this.tests.keySet()) {
                DistTestParams distTestParams = this.tests.get(testName);
                this.executeTest(distTestParams);
            }
            System.out.println("Stoping remote machines");
            for (String machine : this.machineConfigurations.keySet()) {
                machineConfiguration = this.machineConfigurations.get(machine);
                machineConfiguration.setStop(true);
                data = machineConfiguration.serialize();
                machineConfiguration.setStop(false);
                netBrokerMessage = new NetBrokerMessage(data);
                this.brokerClient.enqueueMessage(netBrokerMessage, String.format("%s%s", TEST_MANAGEMENT_LOCAL_MANAGERS, machineConfiguration.getMachineName()));
            }
        }
        catch (Throwable e) {
            log.error("Tests failed!", e);
        }
    }

    private void stop() {
        this.brokerClient.close();
        this.writeResult();
    }

    private void loadMachineConfiguration() {
        for (Machines.Machine machine : ConfigurationInfo.getConfiguration().getMachines().getMachine()) {
            MachineConfiguration machineConfiguration = new MachineConfiguration(machine.getMachineName(), machine.getProducers(), machine.getConsumers());
            this.machineConfigurations.put(machineConfiguration.getMachineName(), machineConfiguration);
            System.out.println(String.format("Added machine info for machine : '%s'", machineConfiguration.getMachineName()));
        }
    }

    private void addTests() {
        ConfigurationInfo.AgentInfo defaultAgent = ConfigurationInfo.getDefaultAgent();
        NetProtocolType encoding = ConfigurationInfo.getEncoding();
        for (Tests.Test t : ConfigurationInfo.getConfiguration().getTests().getTest()) {
            DistTestParams.ClientInfo clientInfo;
            String testName = t.getTestName();
            NetAction.DestinationType destinationType = NetAction.DestinationType.valueOf((String)t.getDestination().getDestinationType());
            boolean isSyncConsumer = t.getDestination().isSyncConsumer();
            boolean isNoAckConsumer = t.getDestination().isNoAckConsumer();
            int messageSize = t.getMessages().getMessageSize().intValue();
            if (this.messageSize != -1) {
                messageSize = this.messageSize;
            }
            int nrMessages = t.getMessages().getNumberOfMessages().intValue();
            if (this.numberOfMessages != -1) {
                nrMessages = this.numberOfMessages;
            }
            String randName = RandomStringUtils.randomAlphanumeric((int)15);
            DistTestParams distTestParams = new DistTestParams(testName, String.format("/perf/%s/%s", destinationType.toString().toLowerCase(), randName), destinationType, messageSize, nrMessages, isSyncConsumer, isNoAckConsumer, encoding);
            int consumerCount = t.getConsumers().getCount().intValue();
            for (Consumers.Consumer consumer : t.getConsumers().getConsumer()) {
                ConfigurationInfo.AgentInfo agentInfo = ConfigurationInfo.getAgents().get(consumer.getAgentId());
                clientInfo = new DistTestParams.ClientInfo(consumer.getName(), agentInfo.hostname, agentInfo.tcpPort);
                distTestParams.getConsumers().put(consumer.getName(), clientInfo);
            }
            for (int i = 1; i <= consumerCount; ++i) {
                String consumerName = String.format("consumer%s", i + "");
                if (distTestParams.getConsumers().get(consumerName) != null) continue;
                DistTestParams.ClientInfo clientInfo2 = new DistTestParams.ClientInfo(consumerName, defaultAgent.hostname, defaultAgent.tcpPort);
                distTestParams.getConsumers().put(clientInfo2.getName(), clientInfo2);
            }
            int producersCount = t.getProducers().getCount().intValue();
            for (Producers.Producer producer : t.getProducers().getProducer()) {
                ConfigurationInfo.AgentInfo agentInfo = ConfigurationInfo.getAgents().get(producer.getAgentId());
                DistTestParams.ClientInfo clientInfo3 = new DistTestParams.ClientInfo(producer.getName(), agentInfo.hostname, agentInfo.tcpPort);
                distTestParams.getProducers().put(producer.getName(), clientInfo3);
            }
            for (int i = 1; i <= producersCount; ++i) {
                String producerName = String.format("producer%s", i + "");
                if (distTestParams.getProducers().get(producerName) != null) continue;
                clientInfo = new DistTestParams.ClientInfo(producerName, defaultAgent.hostname, defaultAgent.tcpPort);
                distTestParams.getProducers().put(clientInfo.getName(), clientInfo);
            }
            this.tests.put(distTestParams.getTestName(), distTestParams);
            System.out.println(String.format("Test added: %s", distTestParams.getTestName()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumerEnded(TestResult result) {
        System.out.println("Consumer ended: " + result.getActorName());
        TreeMap<String, List<TestResult>> treeMap = this.results;
        synchronized (treeMap) {
            List<TestResult> resultsList = this.results.get(result.getTestName());
            resultsList.add(result);
        }
        this.testsCountDown.countDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void producerEnded(TestResult result) {
        System.out.println("Producer ended: " + result.getActorName());
        TreeMap<String, List<TestResult>> treeMap = this.results;
        synchronized (treeMap) {
            List<TestResult> resultsList = this.results.get(result.getTestName());
            resultsList.add(result);
        }
        this.testsCountDown.countDown();
    }

    public boolean isAutoAck() {
        return true;
    }

    public void onMessage(NetNotification message) {
        byte[] payload = message.getMessage().getPayload();
        TestResult result = TestResult.deserialize(payload);
        if (result.getActorType() == TestResult.ActorType.Consumer) {
            this.consumerEnded(result);
        } else {
            this.producerEnded(result);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeTest(DistTestParams distTestParams) {
        NetBrokerMessage netBrokerMsg;
        byte[] serializedData;
        DistTestParams.ClientInfo clientInfo;
        this.testsCountDown = new CountDownLatch(distTestParams.getConsumers().size() + distTestParams.getProducers().size());
        System.out.println(String.format("\nStarting test '%s'", distTestParams.getTestName()));
        Iterator<String> iterator = this.results;
        synchronized (iterator) {
            this.results.put(distTestParams.getTestName(), new ArrayList(distTestParams.getConsumers().size()));
        }
        for (String consumer : distTestParams.getConsumers().keySet()) {
            clientInfo = distTestParams.getConsumers().get(consumer);
            serializedData = distTestParams.serialize(clientInfo);
            netBrokerMsg = new NetBrokerMessage(serializedData);
            this.brokerClient.enqueueMessage(netBrokerMsg, TEST_MANAGEMENT_ACTION + consumer);
        }
        Sleep.time((long)1000L);
        for (String producer : distTestParams.getProducers().keySet()) {
            clientInfo = distTestParams.getProducers().get(producer);
            serializedData = distTestParams.serialize(clientInfo);
            netBrokerMsg = new NetBrokerMessage(serializedData);
            this.brokerClient.enqueueMessage(netBrokerMsg, TEST_MANAGEMENT_ACTION + producer);
        }
        try {
            this.testsCountDown.await();
            System.out.println(String.format("Test '%s' ended", distTestParams.getTestName()));
            if (distTestParams.getDestinationType() == NetAction.DestinationType.QUEUE) {
                for (String agentId : this.agents.keySet()) {
                    ConfigurationInfo.AgentInfo agentInfo = this.agents.get(agentId);
                    TestManager.deleteQueue(agentInfo.hostname, agentInfo.httpPort, distTestParams.getDestination());
                }
            }
        }
        catch (InterruptedException e) {
            log.error("InterruptedException while waiting on testCountDown", (Throwable)e);
        }
        String testname = distTestParams.getTestName();
        this.showTestResult(testname, this.tests.get(testname), this.results.get(testname));
    }

    private void showTestResult(String testname, DistTestParams testParams, List<TestResult> testResults) {
        double milli2second = 1000.0;
        StringBuilder sb = new StringBuilder();
        sb.append("\n--------------------------------------------------\n");
        sb.append("TEST: " + testname);
        sb.append(String.format("\nConsumers: %s, Producers: %s, Destination Type: %s, Sync Consumer: %s, No-Ack Consumer: %s\nMessage size: %s, Number of messages: %s\n\n", testParams.getConsumers().size(), testParams.getProducers().size(), testParams.getDestinationType(), testParams.isSyncConsumer(), testParams.isNoAckConsumer(), testParams.getMessageSize(), testParams.getNumberOfMessagesToSend()));
        double consumerEarliestStart = 9.223372036854776E18;
        double consumerLatestStop = 0.0;
        int consumerMessages = 0;
        double producerEarliestStart = 9.223372036854776E18;
        double producerLatestStop = 0.0;
        int producerMessages = 0;
        for (TestResult tRes : testResults) {
            if (tRes.getActorType() == TestResult.ActorType.Consumer) {
                if ((double)tRes.getStartTime() < consumerEarliestStart) {
                    consumerEarliestStart = tRes.getStartTime();
                }
                if (tRes.getStopTime() > consumerLatestStop) {
                    consumerLatestStop = (long)tRes.getStopTime();
                }
                consumerMessages += tRes.getMessages();
            } else {
                if ((double)tRes.getStartTime() < producerEarliestStart) {
                    producerEarliestStart = tRes.getStartTime();
                }
                if (tRes.getStopTime() > producerLatestStop) {
                    producerLatestStop = (long)tRes.getStopTime();
                }
                producerMessages += tRes.getMessages();
            }
            double actorTestTime = tRes.getStopTime() - (double)tRes.getStartTime();
            double timePerMsg = actorTestTime / (double)tRes.getMessages() / 1000.0;
            double messagesPerSecond = 1.0 / timePerMsg;
            sb.append(String.format("%s: %s, Messages: %s, Time: %.2f, Messages/second: %.2f\n", new Object[]{tRes.getActorType(), tRes.getActorName(), tRes.getMessages(), actorTestTime / 1000.0, messagesPerSecond}));
        }
        double timePerMsg = (consumerLatestStop - consumerEarliestStart) / (double)consumerMessages / 1000.0;
        double messagesPerSecond = 1.0 / timePerMsg;
        sb.append(String.format("\nTOTAL CONSUMER: Messages/second: %.3f", messagesPerSecond));
        timePerMsg = (producerLatestStop - producerEarliestStart) / (double)producerMessages / 1000.0;
        messagesPerSecond = 1.0 / timePerMsg;
        sb.append(String.format("\nTOTAL PRODUCER: Messages/second: %.3f", messagesPerSecond));
        System.out.println(sb.toString());
        this.testResults.append((CharSequence)sb);
    }

    private void showTotalResults() {
        System.out.println(this.testResults.toString());
    }

    private void writeResult() {
        try {
            FileWriter fstream = new FileWriter("results.txt");
            BufferedWriter out = new BufferedWriter(fstream);
            out.write(this.testResults.toString());
            out.close();
        }
        catch (IOException e) {
            log.error("Failed to write file", (Throwable)e);
        }
    }

    private static void deleteQueue(String hostname, int port, String queueName) {
        log.info("Deleting queue '{}', from host '{}'", (Object)queueName, (Object)(hostname + ":" + port));
        try {
            String agentUrl = String.format("http://%s:%s/broker/admin", hostname, port + "");
            URL url = new URL(agentUrl);
            URLConnection connection = url.openConnection();
            HttpURLConnection httpUrlconn = (HttpURLConnection)connection;
            httpUrlconn.setDoOutput(true);
            httpUrlconn.setConnectTimeout(500);
            httpUrlconn.setReadTimeout(60000);
            OutputStreamWriter wr = new OutputStreamWriter(httpUrlconn.getOutputStream());
            wr.write("QUEUE:" + queueName);
            wr.flush();
            int respCode = httpUrlconn.getResponseCode();
            if (respCode == 200) {
                log.debug("Queue '{}' deleted", (Object)queueName);
            } else {
                log.debug("Failed to delete queue '{}'", (Object)queueName);
            }
            wr.close();
        }
        catch (Throwable t) {
            log.error(String.format("Failed to connect to agent '%s:%s' to delete queue '%s':.", hostname, port, queueName), t);
        }
    }
}

