/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.functests.simulation.helpers;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.caudexorigo.concurrent.CustomExecutors;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.nio.BrokerClient;
import pt.com.broker.client.nio.server.HostInfo;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetProtocolType;
import pt.com.gcs.messaging.GcsExecutor;

public class SyncConsumers {
    private static final Logger log = LoggerFactory.getLogger(SyncConsumers.class);
    private final int consumersCount;
    private final String queueName;
    private final long ackDelay;
    private final long pollInterval;
    private final HostInfo hostInfo;
    private final NetProtocolType protocolType;
    private List<ConsumerInfo> consumers;
    private ScheduledThreadPoolExecutor shed_exec_srv;
    private static List<SyncConsumers> activeConsumers = new ArrayList<SyncConsumers>();
    private final String appName;

    public SyncConsumers(int consumersCount, String queueName, long ackDelay, long pollInterval, HostInfo hostInfo, NetProtocolType protocolType, String appName) {
        this.consumersCount = consumersCount;
        this.queueName = queueName;
        this.ackDelay = ackDelay;
        this.pollInterval = pollInterval;
        this.hostInfo = hostInfo;
        this.protocolType = protocolType;
        this.appName = appName;
        this.shed_exec_srv = CustomExecutors.newScheduledThreadPool((int)consumersCount, (String)"SyncConsumers-Sched");
    }

    public void init() {
        this.consumers = new ArrayList<ConsumerInfo>(this.consumersCount);
        for (int i = 0; i != this.consumersCount; ++i) {
            ConsumerInfo ci = new ConsumerInfo();
            try {
                ci.consumerName = this.appName + i;
                ci.brokerClient = new BrokerClient(this.hostInfo.getHostname(), this.hostInfo.getPort(), this.protocolType);
                this.consumers.add(ci);
                continue;
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    public void start() {
        Iterator<ConsumerInfo> iterator = this.consumers.iterator();
        while (iterator.hasNext()) {
            ConsumerInfo ci;
            final ConsumerInfo consumerInfo = ci = iterator.next();
            this.shed_exec_srv.execute(new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                NetNotification notification = consumerInfo.brokerClient.poll(SyncConsumers.this.queueName);
                                ++consumerInfo.messagesReceived;
                                Sleep.time((long)SyncConsumers.this.ackDelay);
                                consumerInfo.brokerClient.acknowledge(notification);
                                if (SyncConsumers.this.pollInterval == 0L) continue;
                                Sleep.time((long)SyncConsumers.this.pollInterval);
                            }
                        }
                        catch (Throwable e) {
                            log.error("Failed to poll", e);
                            continue;
                        }
                        break;
                    }
                }
            });
        }
        activeConsumers.add(this);
    }

    public void end() {
        for (ConsumerInfo ci : this.consumers) {
            try {
                ci.brokerClient.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        activeConsumers.remove(this);
    }

    static {
        long interval = 15L;
        Runnable command = new Runnable(){

            @Override
            public void run() {
                StringBuffer sb = new StringBuffer();
                for (SyncConsumers consumers : activeConsumers) {
                    sb.append("\nSync Consumers:");
                    sb.append(consumers.appName);
                    sb.append("\n");
                    for (ConsumerInfo ci : consumers.consumers) {
                        sb.append(String.format("Messages received in the last %s (s) by %s: %s\n", 15L, ci.consumerName, ci.messagesReceived));
                        ci.messagesReceived = 0L;
                    }
                }
                System.out.println(sb.toString());
            }
        };
        GcsExecutor.scheduleAtFixedRate((Runnable)command, (long)15L, (long)15L, (TimeUnit)TimeUnit.SECONDS);
    }

    private class ConsumerInfo {
        public BrokerClient brokerClient;
        public String consumerName;
        public volatile long messagesReceived = 0L;

        private ConsumerInfo() {
        }
    }
}

