package pt.com.broker.performance.distributed;

import java.io.UnsupportedEncodingException;
import org.apache.commons.lang3.RandomStringUtils;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

/* loaded from: input_file:pt/com/broker/performance/distributed/DistProducerApp.class */
public class DistProducerApp implements BrokerListener {
    public static final byte STOP_MESSAGE = 0;
    public static final byte REGULAR_MESSAGE = 1;
    private static final Logger log = LoggerFactory.getLogger(DistProducerApp.class);
    private String host;
    private int port;
    private String actorName;
    private BrokerClient brokerClient;

    public DistProducerApp(String str, int i, String str2) throws Throwable {
        this.host = str;
        this.port = i;
        this.actorName = str2;
        String str3 = TestManager.TEST_MANAGEMENT_ACTION + str2;
        this.brokerClient = new BrokerClient(str, i, "Producer");
        this.brokerClient.addAsyncConsumer(new NetSubscribe(str3, NetAction.DestinationType.QUEUE), this);
        System.out.println(String.format("Producer '%s' running...", str2));
    }

    public static void main(String[] strArr) throws Throwable {
        DistTestCliArgs distTestCliArgs = (DistTestCliArgs) CliFactory.parseArguments(DistTestCliArgs.class, strArr);
        new DistProducerApp(distTestCliArgs.getHost(), distTestCliArgs.getPort(), distTestCliArgs.getActorName());
        while (true) {
            Sleep.time(5000L);
        }
    }

    private void sendLoop(BrokerClient brokerClient, int i, int i2, NetAction.DestinationType destinationType, String str, TestResult testResult) throws Throwable {
        System.out.println(String.format("Producing %s messages with %s chars.", Integer.valueOf(i2), Integer.valueOf(i)));
        byte[] message = getMessage((byte) 1, RandomStringUtils.randomAlphanumeric(i - 1));
        byte[] message2 = getMessage((byte) 0, i2 + "");
        NetBrokerMessage netBrokerMessage = new NetBrokerMessage(message);
        NetBrokerMessage netBrokerMessage2 = new NetBrokerMessage(message2);
        long currentTimeMillis = System.currentTimeMillis();
        for (int i3 = 0; i3 != i2; i3++) {
            if (destinationType == NetAction.DestinationType.QUEUE) {
                brokerClient.enqueueMessage(netBrokerMessage, str);
            } else {
                brokerClient.publishMessage(netBrokerMessage, str);
            }
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        System.out.println(this.actorName + " sending stop messages");
        for (int i4 = 0; i4 != 150; i4++) {
            if (destinationType == NetAction.DestinationType.QUEUE) {
                brokerClient.enqueueMessage(netBrokerMessage2, str);
            } else {
                brokerClient.publishMessage(netBrokerMessage2, str);
            }
            if (destinationType == NetAction.DestinationType.TOPIC) {
                Sleep.time(50L);
            }
        }
        brokerClient.close();
        testResult.setMessages(i2);
        testResult.setStartTime(currentTimeMillis);
        testResult.setStopTime(currentTimeMillis2);
    }

    private byte[] getMessage(byte b, String str) {
        byte[] bArr = new byte[0];
        try {
            bArr = str.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
        }
        byte[] bArr2 = new byte[bArr.length + 1];
        bArr2[0] = b;
        System.arraycopy(bArr, 0, bArr2, 1, bArr.length);
        return bArr2;
    }

    private void performTest(DistTestParams distTestParams) {
        System.out.println(this.actorName + " starting new test: " + distTestParams.getTestName());
        try {
            TestResult testResult = new TestResult(TestResult.ActorType.Procucer, this.actorName, distTestParams.getTestName());
            BrokerClient brokerClient = new BrokerClient(distTestParams.getClientInfo().getAgentHost(), distTestParams.getClientInfo().getPort(), "ProducerActor", distTestParams.getEncoding());
            sendLoop(brokerClient, distTestParams.getMessageSize(), distTestParams.getNumberOfMessagesToSend(), distTestParams.getDestinationType(), distTestParams.getDestination(), testResult);
            brokerClient.close();
            this.brokerClient.enqueueMessage(new NetBrokerMessage(testResult.serialize()), TestManager.TEST_MANAGEMENT_RESULT);
            System.out.println(this.actorName + " ended test " + distTestParams.getTestName());
        } catch (Throwable th) {
            log.error("Test failed", th);
        }
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public boolean isAutoAck() {
        return false;
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public void onMessage(NetNotification netNotification) {
        try {
            this.brokerClient.acknowledge(netNotification);
        } catch (Throwable th) {
            log.error("Acknowledge failed", th);
        }
        System.out.println("DistProducerApp.onMessage()");
        performTest(DistTestParams.deserialize(netNotification.getMessage().getPayload()));
    }
}
