package pt.com.broker.performance.distributed;

import com.sleepycat.je.rep.utilint.HostPortPair;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import org.apache.http.HttpStatus;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.caudexorigo.text.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.DistTestParams;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.performance.distributed.conf.ConfigurationInfo;
import pt.com.broker.performance.distributed.conf.Consumers;
import pt.com.broker.performance.distributed.conf.Machines;
import pt.com.broker.performance.distributed.conf.Producers;
import pt.com.broker.performance.distributed.conf.Tests;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetProtocolType;
import pt.com.broker.types.NetSubscribe;

/* loaded from: input_file:pt/com/broker/performance/distributed/TestManager.class */
public class TestManager implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(TestManager.class);
    private static String TEST_MANAGEMENT_BASE = "/perf-test/management";
    public static String TEST_MANAGEMENT_ACTION = TEST_MANAGEMENT_BASE + "/action/";
    public static String TEST_MANAGEMENT_RESULT = TEST_MANAGEMENT_BASE + "/result";
    public static String TEST_MANAGEMENT_LOCAL_MANAGERS = TEST_MANAGEMENT_BASE + "/localmanager/";
    private BrokerClient brokerClient;
    private String hostname;
    private int port;
    private HashMap<String, ConfigurationInfo.AgentInfo> agents;
    private int numberOfMessages;
    private int messageSize;
    private volatile CountDownLatch testsCountDown;
    private Map<String, DistTestParams> tests = new LinkedHashMap();
    private TreeMap<String, List<TestResult>> results = new TreeMap<>();
    private HashMap<String, MachineConfiguration> machineConfigurations = new HashMap<>();
    private StringBuilder testResults = new StringBuilder();

    public static void main(String[] strArr) throws Throwable {
        DistTestCliArgs distTestCliArgs = (DistTestCliArgs) CliFactory.parseArguments(DistTestCliArgs.class, strArr);
        TestManager testManager = new TestManager();
        testManager.numberOfMessages = distTestCliArgs.getNumberOfMessages();
        testManager.messageSize = distTestCliArgs.getMessageLength();
        testManager.hostname = distTestCliArgs.getHost();
        testManager.port = distTestCliArgs.getPort();
        testManager.init();
        System.out.println(String.format("Test manger running...", new Object[0]));
        testManager.start();
        System.out.println(String.format("\n\nTests ended!\n\nShowing all results", new Object[0]));
        testManager.showTotalResults();
        testManager.stop();
    }

    private void init() {
        pt.com.broker.performance.conf.ConfigurationInfo.init();
        ConfigurationInfo.init();
        this.agents = ConfigurationInfo.getAgents();
        loadMachineConfiguration();
        addTests();
    }

    private void start() {
        try {
            this.brokerClient = new BrokerClient(this.hostname, this.port);
            this.brokerClient.addAsyncConsumer(new NetSubscribe(TEST_MANAGEMENT_RESULT, NetAction.DestinationType.QUEUE), this);
            System.out.println("Starting to configure participant machines");
            Iterator<String> it = this.machineConfigurations.keySet().iterator();
            while (it.hasNext()) {
                MachineConfiguration machineConfiguration = this.machineConfigurations.get(it.next());
                System.out.println(String.format("Test: %s", machineConfiguration.getMachineName()));
                System.out.println("Consumers: ");
                Iterator<String> it2 = machineConfiguration.getConsumers().iterator();
                while (it2.hasNext()) {
                    System.out.println(" - " + it2.next());
                }
                System.out.println("Producers: ");
                Iterator<String> it3 = machineConfiguration.getProducers().iterator();
                while (it3.hasNext()) {
                    System.out.println(" - " + it3.next());
                }
                this.brokerClient.enqueueMessage(new NetBrokerMessage(machineConfiguration.serialize()), String.format("%s%s", TEST_MANAGEMENT_LOCAL_MANAGERS, machineConfiguration.getMachineName()));
            }
            Sleep.time(2000L);
            Iterator<String> it4 = this.tests.keySet().iterator();
            while (it4.hasNext()) {
                executeTest(this.tests.get(it4.next()));
            }
            System.out.println("Stoping remote machines");
            Iterator<String> it5 = this.machineConfigurations.keySet().iterator();
            while (it5.hasNext()) {
                MachineConfiguration machineConfiguration2 = this.machineConfigurations.get(it5.next());
                machineConfiguration2.setStop(true);
                byte[] serialize = machineConfiguration2.serialize();
                machineConfiguration2.setStop(false);
                this.brokerClient.enqueueMessage(new NetBrokerMessage(serialize), String.format("%s%s", TEST_MANAGEMENT_LOCAL_MANAGERS, machineConfiguration2.getMachineName()));
            }
        } catch (Throwable th) {
            log.error("Tests failed!", th);
        }
    }

    private void stop() {
        this.brokerClient.close();
        writeResult();
    }

    private void loadMachineConfiguration() {
        for (Machines.Machine machine : ConfigurationInfo.getConfiguration().getMachines().getMachine()) {
            MachineConfiguration machineConfiguration = new MachineConfiguration(machine.getMachineName(), machine.getProducers(), machine.getConsumers());
            this.machineConfigurations.put(machineConfiguration.getMachineName(), machineConfiguration);
            System.out.println(String.format("Added machine info for machine : '%s'", machineConfiguration.getMachineName()));
        }
    }

    private void addTests() {
        ConfigurationInfo.AgentInfo defaultAgent = ConfigurationInfo.getDefaultAgent();
        NetProtocolType encoding = ConfigurationInfo.getEncoding();
        for (Tests.Test test2 : ConfigurationInfo.getConfiguration().getTests().getTest()) {
            String testName = test2.getTestName();
            NetAction.DestinationType valueOf = NetAction.DestinationType.valueOf(test2.getDestination().getDestinationType());
            boolean isSyncConsumer = test2.getDestination().isSyncConsumer();
            boolean isNoAckConsumer = test2.getDestination().isNoAckConsumer();
            int intValue = test2.getMessages().getMessageSize().intValue();
            if (this.messageSize != -1) {
                intValue = this.messageSize;
            }
            int intValue2 = test2.getMessages().getNumberOfMessages().intValue();
            if (this.numberOfMessages != -1) {
                intValue2 = this.numberOfMessages;
            }
            DistTestParams distTestParams = new DistTestParams(testName, String.format("/perf/%s/%s", valueOf.toString().toLowerCase(), RandomStringUtils.randomAlphanumeric(15)), valueOf, intValue, intValue2, isSyncConsumer, isNoAckConsumer, encoding);
            int intValue3 = test2.getConsumers().getCount().intValue();
            for (Consumers.Consumer consumer : test2.getConsumers().getConsumer()) {
                ConfigurationInfo.AgentInfo agentInfo = ConfigurationInfo.getAgents().get(consumer.getAgentId());
                distTestParams.getConsumers().put(consumer.getName(), new DistTestParams.ClientInfo(consumer.getName(), agentInfo.hostname, agentInfo.tcpPort));
            }
            for (int i = 1; i <= intValue3; i++) {
                String format = String.format("consumer%s", i + "");
                if (distTestParams.getConsumers().get(format) == null) {
                    DistTestParams.ClientInfo clientInfo = new DistTestParams.ClientInfo(format, defaultAgent.hostname, defaultAgent.tcpPort);
                    distTestParams.getConsumers().put(clientInfo.getName(), clientInfo);
                }
            }
            int intValue4 = test2.getProducers().getCount().intValue();
            for (Producers.Producer producer : test2.getProducers().getProducer()) {
                ConfigurationInfo.AgentInfo agentInfo2 = ConfigurationInfo.getAgents().get(producer.getAgentId());
                distTestParams.getProducers().put(producer.getName(), new DistTestParams.ClientInfo(producer.getName(), agentInfo2.hostname, agentInfo2.tcpPort));
            }
            for (int i2 = 1; i2 <= intValue4; i2++) {
                String format2 = String.format("producer%s", i2 + "");
                if (distTestParams.getProducers().get(format2) == null) {
                    DistTestParams.ClientInfo clientInfo2 = new DistTestParams.ClientInfo(format2, defaultAgent.hostname, defaultAgent.tcpPort);
                    distTestParams.getProducers().put(clientInfo2.getName(), clientInfo2);
                }
            }
            this.tests.put(distTestParams.getTestName(), distTestParams);
            System.out.println(String.format("Test added: %s", distTestParams.getTestName()));
        }
    }

    private void consumerEnded(TestResult testResult) {
        System.out.println("Consumer ended: " + testResult.getActorName());
        synchronized (this.results) {
            this.results.get(testResult.getTestName()).add(testResult);
        }
        this.testsCountDown.countDown();
    }

    private void producerEnded(TestResult testResult) {
        System.out.println("Producer ended: " + testResult.getActorName());
        synchronized (this.results) {
            this.results.get(testResult.getTestName()).add(testResult);
        }
        this.testsCountDown.countDown();
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public boolean isAutoAck() {
        return true;
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public void onMessage(NetNotification netNotification) {
        TestResult deserialize = TestResult.deserialize(netNotification.getMessage().getPayload());
        if (deserialize.getActorType() == TestResult.ActorType.Consumer) {
            consumerEnded(deserialize);
        } else {
            producerEnded(deserialize);
        }
    }

    private void executeTest(DistTestParams distTestParams) {
        this.testsCountDown = new CountDownLatch(distTestParams.getConsumers().size() + distTestParams.getProducers().size());
        System.out.println(String.format("\nStarting test '%s'", distTestParams.getTestName()));
        synchronized (this.results) {
            this.results.put(distTestParams.getTestName(), new ArrayList(distTestParams.getConsumers().size()));
        }
        for (String str : distTestParams.getConsumers().keySet()) {
            this.brokerClient.enqueueMessage(new NetBrokerMessage(distTestParams.serialize(distTestParams.getConsumers().get(str))), TEST_MANAGEMENT_ACTION + str);
        }
        Sleep.time(1000L);
        for (String str2 : distTestParams.getProducers().keySet()) {
            this.brokerClient.enqueueMessage(new NetBrokerMessage(distTestParams.serialize(distTestParams.getProducers().get(str2))), TEST_MANAGEMENT_ACTION + str2);
        }
        try {
            this.testsCountDown.await();
            System.out.println(String.format("Test '%s' ended", distTestParams.getTestName()));
            if (distTestParams.getDestinationType() == NetAction.DestinationType.QUEUE) {
                Iterator<String> it = this.agents.keySet().iterator();
                while (it.hasNext()) {
                    ConfigurationInfo.AgentInfo agentInfo = this.agents.get(it.next());
                    deleteQueue(agentInfo.hostname, agentInfo.httpPort, distTestParams.getDestination());
                }
            }
        } catch (InterruptedException e) {
            log.error("InterruptedException while waiting on testCountDown", (Throwable) e);
        }
        String testName = distTestParams.getTestName();
        showTestResult(testName, this.tests.get(testName), this.results.get(testName));
    }

    private void showTestResult(String str, DistTestParams distTestParams, List<TestResult> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("\n--------------------------------------------------\n");
        sb.append("TEST: " + str);
        sb.append(String.format("\nConsumers: %s, Producers: %s, Destination Type: %s, Sync Consumer: %s, No-Ack Consumer: %s\nMessage size: %s, Number of messages: %s\n\n", Integer.valueOf(distTestParams.getConsumers().size()), Integer.valueOf(distTestParams.getProducers().size()), distTestParams.getDestinationType(), Boolean.valueOf(distTestParams.isSyncConsumer()), Boolean.valueOf(distTestParams.isNoAckConsumer()), Integer.valueOf(distTestParams.getMessageSize()), Integer.valueOf(distTestParams.getNumberOfMessagesToSend())));
        double d = 9.223372036854776E18d;
        double d2 = 0.0d;
        int i = 0;
        double d3 = 9.223372036854776E18d;
        double d4 = 0.0d;
        int i2 = 0;
        for (TestResult testResult : list) {
            if (testResult.getActorType() == TestResult.ActorType.Consumer) {
                if (testResult.getStartTime() < d) {
                    d = testResult.getStartTime();
                }
                if (testResult.getStopTime() > d2) {
                    d2 = (long) testResult.getStopTime();
                }
                i += testResult.getMessages();
            } else {
                if (testResult.getStartTime() < d3) {
                    d3 = testResult.getStartTime();
                }
                if (testResult.getStopTime() > d4) {
                    d4 = (long) testResult.getStopTime();
                }
                i2 += testResult.getMessages();
            }
            double stopTime = testResult.getStopTime() - testResult.getStartTime();
            sb.append(String.format("%s: %s, Messages: %s, Time: %.2f, Messages/second: %.2f\n", testResult.getActorType(), testResult.getActorName(), Integer.valueOf(testResult.getMessages()), Double.valueOf(stopTime / 1000.0d), Double.valueOf(1.0d / ((stopTime / testResult.getMessages()) / 1000.0d))));
        }
        sb.append(String.format("\nTOTAL CONSUMER: Messages/second: %.3f", Double.valueOf(1.0d / (((d2 - d) / i) / 1000.0d))));
        sb.append(String.format("\nTOTAL PRODUCER: Messages/second: %.3f", Double.valueOf(1.0d / (((d4 - d3) / i2) / 1000.0d))));
        System.out.println(sb.toString());
        this.testResults.append((CharSequence) sb);
    }

    private void showTotalResults() {
        System.out.println(this.testResults.toString());
    }

    private void writeResult() {
        try {
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter("results.txt"));
            bufferedWriter.write(this.testResults.toString());
            bufferedWriter.close();
        } catch (IOException e) {
            log.error("Failed to write file", (Throwable) e);
        }
    }

    private static void deleteQueue(String str, int i, String str2) {
        log.info("Deleting queue '{}', from host '{}'", str2, str + HostPortPair.SEPARATOR + i);
        try {
            HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(String.format("http://%s:%s/broker/admin", str, i + "")).openConnection();
            httpURLConnection.setDoOutput(true);
            httpURLConnection.setConnectTimeout(HttpStatus.SC_INTERNAL_SERVER_ERROR);
            httpURLConnection.setReadTimeout(60000);
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(httpURLConnection.getOutputStream());
            outputStreamWriter.write("QUEUE:" + str2);
            outputStreamWriter.flush();
            if (httpURLConnection.getResponseCode() == 200) {
                log.debug("Queue '{}' deleted", str2);
            } else {
                log.debug("Failed to delete queue '{}'", str2);
            }
            outputStreamWriter.close();
        } catch (Throwable th) {
            log.error(String.format("Failed to connect to agent '%s:%s' to delete queue '%s':.", str, Integer.valueOf(i), str2), th);
        }
    }
}
