/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.functests.simulation.helpers;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.caudexorigo.concurrent.CustomExecutors;
import org.caudexorigo.concurrent.Sleep;
import org.caudexorigo.text.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.nio.BrokerClient;
import pt.com.broker.client.nio.server.HostInfo;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetProtocolType;
import pt.com.gcs.messaging.GcsExecutor;

public class Producers {
    private static final Logger log = LoggerFactory.getLogger(Producers.class);
    private final int producersCount;
    private final String destinatination;
    private final NetAction.DestinationType destinationType;
    private final int messagesPerSecond;
    private final HostInfo hostInfo;
    private final NetProtocolType protocolType;
    private final String appName;
    private int messageSize = 512;
    private long messageExpiration = 0L;
    private List<ProducerInfo> producers;
    private ScheduledThreadPoolExecutor shed_exec_srv;
    private NetBrokerMessage brokerMessage;
    volatile boolean stop = false;
    private final int messageBurst;
    private static List<Producers> activeProducers = new ArrayList<Producers>();

    public Producers(int producersCount, String destinatination, NetAction.DestinationType destinationType, int messagesPerSecond, int messageBurst, HostInfo hostInfo, NetProtocolType protocolType, String appName) {
        this.producersCount = producersCount;
        this.destinatination = destinatination;
        this.destinationType = destinationType;
        this.messagesPerSecond = messagesPerSecond;
        this.messageBurst = messageBurst;
        this.hostInfo = hostInfo;
        this.protocolType = protocolType;
        this.appName = appName;
        if (messagesPerSecond >= 1000) {
            throw new IllegalArgumentException("messagesPerSecond >= 1000");
        }
        if (messagesPerSecond != 0 && messageBurst != 0) {
            throw new IllegalArgumentException("(messagesPerSecond != 0) && (messageBurst != 0)");
        }
        this.shed_exec_srv = CustomExecutors.newScheduledThreadPool((int)producersCount, (String)"Producers-Sched");
    }

    public void init() {
        this.producers = new ArrayList<ProducerInfo>(this.producersCount);
        for (int i = 0; i != this.producersCount; ++i) {
            ProducerInfo pi = new ProducerInfo();
            try {
                pi.brokerClient = new BrokerClient(this.hostInfo.getHostname(), this.hostInfo.getPort(), this.protocolType);
                pi.producerName = this.appName + i;
                this.producers.add(pi);
                continue;
            }
            catch (Throwable e) {
                e.printStackTrace();
            }
        }
    }

    public void start() {
        String randomStr = RandomStringUtils.random((int)this.messageSize);
        this.brokerMessage = new NetBrokerMessage(randomStr);
        if (this.messageExpiration != 0L) {
            this.brokerMessage.setExpiration(System.currentTimeMillis() + this.messageExpiration);
        }
        Iterator<ProducerInfo> iterator = this.producers.iterator();
        while (iterator.hasNext()) {
            ProducerInfo pInfo;
            final ProducerInfo producerInfo = pInfo = iterator.next();
            Runnable nMessagesPerSecond = new Runnable(){

                @Override
                public void run() {
                    while (true) {
                        try {
                            while (true) {
                                producerInfo.brokerClient.publish(Producers.this.brokerMessage, Producers.this.destinatination, Producers.this.destinationType);
                                ++producerInfo.messagesSend;
                                if (Producers.this.stop) {
                                    return;
                                }
                                Sleep.time((long)(1000 / Producers.this.messagesPerSecond));
                            }
                        }
                        catch (Throwable e) {
                            log.error("Failed to publish message", e);
                            continue;
                        }
                        break;
                    }
                }
            };
            Runnable messageBurstRunnable = new Runnable(){

                @Override
                public void run() {
                    int count = Producers.this.messageBurst;
                    while (count-- != 0) {
                        try {
                            producerInfo.brokerClient.publish(Producers.this.brokerMessage, Producers.this.destinatination, Producers.this.destinationType);
                            ++producerInfo.messagesSend;
                            if (!Producers.this.stop) continue;
                            return;
                        }
                        catch (Throwable e) {
                            log.error("Failed to publish message", e);
                        }
                    }
                }
            };
            if (this.messageBurst == 0) {
                this.shed_exec_srv.execute(nMessagesPerSecond);
                continue;
            }
            this.shed_exec_srv.scheduleAtFixedRate(messageBurstRunnable, 5L, 5L, TimeUnit.SECONDS);
        }
        activeProducers.add(this);
    }

    public void end() {
        this.stop = true;
        this.shed_exec_srv.shutdownNow();
        for (ProducerInfo pi : this.producers) {
            try {
                pi.brokerClient.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        activeProducers.remove(this);
    }

    public void setMessageSize(int messageSize) {
        this.messageSize = messageSize;
    }

    public int getMessageSize() {
        return this.messageSize;
    }

    public void setMessageExpiration(long messageExpiration) {
        this.messageExpiration = messageExpiration;
    }

    public long getMessageExpiration() {
        return this.messageExpiration;
    }

    static {
        long interval = 15L;
        Runnable command = new Runnable(){

            @Override
            public void run() {
                StringBuffer sb = new StringBuffer();
                for (Producers producers : activeProducers) {
                    sb.append("\nProducers:");
                    sb.append(producers.appName);
                    sb.append("\n");
                    for (ProducerInfo pi : producers.producers) {
                        sb.append(String.format("Messages send in the last %s (s) by %s: %s\n", 15L, pi.producerName, pi.messagesSend));
                        pi.messagesSend = 0L;
                    }
                }
                System.out.println(sb.toString());
            }
        };
        GcsExecutor.scheduleAtFixedRate((Runnable)command, (long)15L, (long)15L, (TimeUnit)TimeUnit.SECONDS);
    }

    private class ProducerInfo {
        public BrokerClient brokerClient;
        public String producerName;
        public volatile long messagesSend = 0L;

        private ProducerInfo() {
        }
    }
}

