package pt.com.broker.performance.distributed;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

/* loaded from: input_file:pt/com/broker/performance/distributed/DistConsumerApp.class */
public class DistConsumerApp implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(DistConsumerApp.class);
    private String host;
    private int port;
    private String dname;
    private String actorName;
    private BrokerClient brokerClient;

    public DistConsumerApp(String str, int i, String str2) throws Throwable {
        this.host = str;
        this.port = i;
        this.actorName = str2;
        this.dname = TestManager.TEST_MANAGEMENT_ACTION + str2;
        this.brokerClient = new BrokerClient(str, i);
        this.brokerClient.addAsyncConsumer(new NetSubscribe(this.dname, NetAction.DestinationType.QUEUE), this);
        System.out.println(String.format("Consumer '%s' running...", str2));
    }

    public static void main(String[] strArr) throws Throwable {
        DistTestCliArgs distTestCliArgs = (DistTestCliArgs) CliFactory.parseArguments(DistTestCliArgs.class, strArr);
        new DistConsumerApp(distTestCliArgs.getHost(), distTestCliArgs.getPort(), distTestCliArgs.getActorName());
        while (true) {
            Sleep.time(5000L);
        }
    }

    private void performTest(final DistTestParams distTestParams) {
        try {
            System.out.println("########## Consumer using: " + distTestParams.getEncoding());
            BrokerClient brokerClient = new BrokerClient(distTestParams.getClientInfo().getAgentHost(), distTestParams.getClientInfo().getPort(), "ConsumerActor", distTestParams.getEncoding());
            final NetAction.DestinationType destinationType = distTestParams.getDestinationType();
            NetSubscribe netSubscribe = new NetSubscribe(distTestParams.getDestination(), destinationType);
            if (distTestParams.isNoAckConsumer()) {
                netSubscribe.addHeader("ACK_REQUIRED", "false");
            }
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicLong atomicLong = new AtomicLong(0L);
            final AtomicLong atomicLong2 = new AtomicLong(0L);
            System.out.println(this.actorName + " starting new test: " + distTestParams.getTestName());
            if (distTestParams.isSyncConsumer()) {
                boolean z = false;
                System.out.println(this.actorName + " Sync consumer");
                atomicLong.set(System.currentTimeMillis());
                do {
                    NetNotification poll = brokerClient.poll(distTestParams.getDestination());
                    if (poll.getMessage().getPayload()[0] == 1) {
                        atomicInteger.incrementAndGet();
                    } else {
                        atomicLong2.compareAndSet(0L, System.currentTimeMillis());
                        z = true;
                    }
                    brokerClient.acknowledge(poll);
                } while (!z);
            } else {
                final CountDownLatch countDownLatch = new CountDownLatch(1);
                brokerClient.addAsyncConsumer(netSubscribe, new BrokerListener() { // from class: pt.com.broker.performance.distributed.DistConsumerApp.1
                    public void onMessage(NetNotification netNotification) {
                        atomicLong.compareAndSet(0L, System.currentTimeMillis());
                        if (netNotification.getMessage().getPayload()[0] == 1) {
                            atomicInteger.incrementAndGet();
                        } else {
                            atomicLong2.compareAndSet(0L, System.currentTimeMillis());
                            countDownLatch.countDown();
                        }
                    }

                    public boolean isAutoAck() {
                        return (distTestParams.isNoAckConsumer() || destinationType == NetAction.DestinationType.TOPIC) ? false : true;
                    }
                });
                countDownLatch.await();
                brokerClient.unsubscribe(netSubscribe.getDestinationType(), netSubscribe.getDestination());
            }
            Sleep.time(500L);
            brokerClient.close();
            NetBrokerMessage netBrokerMessage = new NetBrokerMessage(new TestResult(TestResult.ActorType.Consumer, this.actorName, distTestParams.getTestName(), atomicInteger.get(), atomicLong.get(), atomicLong2.get()).serialize());
            String str = TestManager.TEST_MANAGEMENT_RESULT;
            System.out.println(this.actorName + " Sending results");
            this.brokerClient.enqueueMessage(netBrokerMessage, str);
            System.out.println(this.actorName + " Test ended with sucess");
        } catch (Throwable th) {
            log.error("Test failed", th);
        }
    }

    public boolean isAutoAck() {
        return false;
    }

    public void onMessage(NetNotification netNotification) {
        DistTestParams deserialize = DistTestParams.deserialize(netNotification.getMessage().getPayload());
        try {
            this.brokerClient.acknowledge(netNotification);
        } catch (Throwable th) {
            log.error("Failed to ack test message", th);
        }
        performTest(deserialize);
    }
}
