/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance.distributed;

import java.io.UnsupportedEncodingException;
import org.apache.commons.lang3.RandomStringUtils;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.DistTestCliArgs;
import pt.com.broker.performance.distributed.DistTestParams;
import pt.com.broker.performance.distributed.TestManager;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

public class DistProducerApp
implements BrokerListener {
    public static final byte STOP_MESSAGE = 0;
    public static final byte REGULAR_MESSAGE = 1;
    private static final Logger log = LoggerFactory.getLogger(DistProducerApp.class);
    private String host;
    private int port;
    private String actorName;
    private BrokerClient brokerClient;

    public DistProducerApp(String host, int port, String actorName) throws Throwable {
        this.host = host;
        this.port = port;
        this.actorName = actorName;
        String destination = TestManager.TEST_MANAGEMENT_ACTION + actorName;
        this.brokerClient = new BrokerClient(host, port, "Producer");
        NetSubscribe subscribe = new NetSubscribe(destination, NetAction.DestinationType.QUEUE);
        this.brokerClient.addAsyncConsumer(subscribe, (BrokerListener)this);
        System.out.println(String.format("Producer '%s' running...", actorName));
    }

    public static void main(String[] args) throws Throwable {
        DistTestCliArgs cargs = (DistTestCliArgs)CliFactory.parseArguments(DistTestCliArgs.class, (String[])args);
        DistProducerApp producer = new DistProducerApp(cargs.getHost(), cargs.getPort(), cargs.getActorName());
        while (true) {
            Sleep.time((long)5000L);
        }
    }

    private void sendLoop(BrokerClient bk, int messageLength, int nrOfMessages, NetAction.DestinationType destinationType, String destination, TestResult testResult) throws Throwable {
        System.out.println(String.format("Producing %s messages with %s chars.", nrOfMessages, messageLength));
        String regularMsgContent = RandomStringUtils.randomAlphanumeric((int)(messageLength - 1));
        String stopMsgContent = nrOfMessages + "";
        byte[] regularMessage = this.getMessage((byte)1, regularMsgContent);
        byte[] stopMessage = this.getMessage((byte)0, stopMsgContent);
        NetBrokerMessage brokerMessage = new NetBrokerMessage(regularMessage);
        NetBrokerMessage stopBrokerMessage = new NetBrokerMessage(stopMessage);
        long startTime = System.currentTimeMillis();
        for (int i = 0; i != nrOfMessages; ++i) {
            if (destinationType == NetAction.DestinationType.QUEUE) {
                bk.enqueueMessage(brokerMessage, destination);
                continue;
            }
            bk.publishMessage(brokerMessage, destination);
        }
        long stopTime = System.currentTimeMillis();
        System.out.println(this.actorName + " sending stop messages");
        for (int i = 0; i != 150; ++i) {
            if (destinationType == NetAction.DestinationType.QUEUE) {
                bk.enqueueMessage(stopBrokerMessage, destination);
            } else {
                bk.publishMessage(stopBrokerMessage, destination);
            }
            if (destinationType != NetAction.DestinationType.TOPIC) continue;
            Sleep.time((long)50L);
        }
        bk.close();
        testResult.setMessages(nrOfMessages);
        testResult.setStartTime(startTime);
        testResult.setStopTime(stopTime);
    }

    private byte[] getMessage(byte headerByte, String messageContent) {
        byte[] serializedContent = new byte[]{};
        try {
            serializedContent = messageContent.getBytes("UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            // empty catch block
        }
        byte[] serializedMessage = new byte[serializedContent.length + 1];
        serializedMessage[0] = headerByte;
        System.arraycopy(serializedContent, 0, serializedMessage, 1, serializedContent.length);
        return serializedMessage;
    }

    private void performTest(DistTestParams testParams) {
        System.out.println(this.actorName + " starting new test: " + testParams.getTestName());
        try {
            TestResult testResult = new TestResult(TestResult.ActorType.Procucer, this.actorName, testParams.getTestName());
            BrokerClient bk = new BrokerClient(testParams.getClientInfo().getAgentHost(), testParams.getClientInfo().getPort(), "ProducerActor", testParams.getEncoding());
            this.sendLoop(bk, testParams.getMessageSize(), testParams.getNumberOfMessagesToSend(), testParams.getDestinationType(), testParams.getDestination(), testResult);
            bk.close();
            byte[] data = testResult.serialize();
            NetBrokerMessage netBrokerMessage = new NetBrokerMessage(data);
            String destination = TestManager.TEST_MANAGEMENT_RESULT;
            this.brokerClient.enqueueMessage(netBrokerMessage, destination);
            System.out.println(this.actorName + " ended test " + testParams.getTestName());
        }
        catch (Throwable t) {
            log.error("Test failed", t);
        }
    }

    public boolean isAutoAck() {
        return false;
    }

    public void onMessage(NetNotification notification) {
        try {
            this.brokerClient.acknowledge(notification);
        }
        catch (Throwable t) {
            log.error("Acknowledge failed", t);
        }
        System.out.println("DistProducerApp.onMessage()");
        byte[] testParams = notification.getMessage().getPayload();
        DistTestParams distTestParams = DistTestParams.deserialize(testParams);
        this.performTest(distTestParams);
    }
}

