/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.performance.distributed;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.performance.distributed.DistTestCliArgs;
import pt.com.broker.performance.distributed.DistTestParams;
import pt.com.broker.performance.distributed.TestManager;
import pt.com.broker.performance.distributed.TestResult;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

public class DistConsumerApp
implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(DistConsumerApp.class);
    private String host;
    private int port;
    private String dname;
    private String actorName;
    private BrokerClient brokerClient;

    public DistConsumerApp(String host, int port, String actorName) throws Throwable {
        this.host = host;
        this.port = port;
        this.actorName = actorName;
        this.dname = TestManager.TEST_MANAGEMENT_ACTION + actorName;
        this.brokerClient = new BrokerClient(host, port);
        NetSubscribe subscribe = new NetSubscribe(this.dname, NetAction.DestinationType.QUEUE);
        this.brokerClient.addAsyncConsumer(subscribe, (BrokerListener)this);
        System.out.println(String.format("Consumer '%s' running...", actorName));
    }

    public static void main(String[] args) throws Throwable {
        DistTestCliArgs cargs = (DistTestCliArgs)CliFactory.parseArguments(DistTestCliArgs.class, (String[])args);
        DistConsumerApp consumer = new DistConsumerApp(cargs.getHost(), cargs.getPort(), cargs.getActorName());
        while (true) {
            Sleep.time((long)5000L);
        }
    }

    private void performTest(final DistTestParams testParams) {
        try {
            System.out.println("########## Consumer using: " + testParams.getEncoding());
            BrokerClient bk = new BrokerClient(testParams.getClientInfo().getAgentHost(), testParams.getClientInfo().getPort(), "ConsumerActor", testParams.getEncoding());
            final NetAction.DestinationType destType = testParams.getDestinationType();
            NetSubscribe subscribe = new NetSubscribe(testParams.getDestination(), destType);
            if (testParams.isNoAckConsumer()) {
                subscribe.addHeader("ACK_REQUIRED", "false");
            }
            final AtomicInteger counter = new AtomicInteger(0);
            final AtomicLong startTime = new AtomicLong(0L);
            final AtomicLong stopTime = new AtomicLong(0L);
            System.out.println(this.actorName + " starting new test: " + testParams.getTestName());
            if (!testParams.isSyncConsumer()) {
                final CountDownLatch countDown = new CountDownLatch(1);
                bk.addAsyncConsumer(subscribe, new BrokerListener(){

                    public void onMessage(NetNotification notification) {
                        startTime.compareAndSet(0L, System.currentTimeMillis());
                        if (notification.getMessage().getPayload()[0] == 1) {
                            counter.incrementAndGet();
                        } else {
                            stopTime.compareAndSet(0L, System.currentTimeMillis());
                            countDown.countDown();
                        }
                    }

                    public boolean isAutoAck() {
                        if (testParams.isNoAckConsumer()) {
                            return false;
                        }
                        return destType != NetAction.DestinationType.TOPIC;
                    }
                });
                countDown.await();
                bk.unsubscribe(subscribe.getDestinationType(), subscribe.getDestination());
            } else {
                boolean stop = false;
                System.out.println(this.actorName + " Sync consumer");
                startTime.set(System.currentTimeMillis());
                do {
                    NetNotification notification;
                    if ((notification = bk.poll(testParams.getDestination())).getMessage().getPayload()[0] == 1) {
                        counter.incrementAndGet();
                    } else {
                        stopTime.compareAndSet(0L, System.currentTimeMillis());
                        stop = true;
                    }
                    bk.acknowledge(notification);
                } while (!stop);
            }
            Sleep.time((long)500L);
            bk.close();
            TestResult testResult = new TestResult(TestResult.ActorType.Consumer, this.actorName, testParams.getTestName(), counter.get(), startTime.get(), stopTime.get());
            byte[] data = testResult.serialize();
            NetBrokerMessage netBrokerMessage = new NetBrokerMessage(data);
            String destination = TestManager.TEST_MANAGEMENT_RESULT;
            System.out.println(this.actorName + " Sending results");
            this.brokerClient.enqueueMessage(netBrokerMessage, destination);
            System.out.println(this.actorName + " Test ended with sucess");
        }
        catch (Throwable t) {
            log.error("Test failed", t);
        }
    }

    public boolean isAutoAck() {
        return false;
    }

    public void onMessage(NetNotification notification) {
        byte[] testParams = notification.getMessage().getPayload();
        DistTestParams distTestParams = DistTestParams.deserialize(testParams);
        try {
            this.brokerClient.acknowledge(notification);
        }
        catch (Throwable t) {
            log.error("Failed to ack test message", t);
        }
        this.performTest(distTestParams);
    }
}

