package pt.com.broker.performance.distributed;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.caudexorigo.cli.CliFactory;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetSubscribe;

/* loaded from: input_file:pt/com/broker/performance/distributed/LocalManager.class */
public class LocalManager implements BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(LocalManager.class);
    private ExecutorService executer;
    private BrokerClient brokerClient;
    private String hostname;
    private int port;
    private String machineName;

    public static void main(String[] strArr) throws Throwable {
        DistTestCliArgs distTestCliArgs = (DistTestCliArgs) CliFactory.parseArguments(DistTestCliArgs.class, strArr);
        LocalManager localManager = new LocalManager();
        localManager.machineName = distTestCliArgs.getMachineName();
        localManager.hostname = distTestCliArgs.getHost();
        localManager.port = distTestCliArgs.getPort();
        localManager.brokerClient = new BrokerClient(localManager.hostname, localManager.port);
        if (distTestCliArgs.warmup()) {
            localManager.warmUp();
        }
        localManager.brokerClient.addAsyncConsumer(new NetSubscribe(String.format("%s%s", TestManager.TEST_MANAGEMENT_LOCAL_MANAGERS, localManager.machineName), NetAction.DestinationType.QUEUE), localManager);
        while (true) {
            Sleep.time(5000L);
        }
    }

    private void warmUp() {
        System.out.println("Warming up!");
        System.out.println(" - Publishing messages...");
        String str = "/topic/warmup/" + RandomStringUtils.randomAlphanumeric(10);
        NetBrokerMessage netBrokerMessage = new NetBrokerMessage(RandomStringUtils.random(8192));
        for (int i = 0; i != 20000; i++) {
            this.brokerClient.publishMessage(netBrokerMessage, str);
        }
        System.out.println(" - Enqueueing and consuming messages...");
        String str2 = "/queue/warmup/" + RandomStringUtils.randomAlphanumeric(10);
        NetSubscribe netSubscribe = new NetSubscribe(str2, NetAction.DestinationType.QUEUE);
        final AtomicLong atomicLong = new AtomicLong(20000L);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            this.brokerClient.addAsyncConsumer(netSubscribe, new BrokerListener() { // from class: pt.com.broker.performance.distributed.LocalManager.1
                @Override // pt.com.broker.client.messaging.BrokerListener
                public void onMessage(NetNotification netNotification) {
                    if (atomicLong.decrementAndGet() == 0) {
                        countDownLatch.countDown();
                    }
                }

                @Override // pt.com.broker.client.messaging.BrokerListener
                public boolean isAutoAck() {
                    return true;
                }
            });
        } catch (Throwable th) {
            countDownLatch.countDown();
        }
        for (int i2 = 0; i2 != 20000; i2++) {
            this.brokerClient.enqueueMessage(netBrokerMessage, str2);
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
        System.out.println("Warming up ended");
    }

    private void startConsumer(final String str) {
        this.executer.submit(new Runnable() { // from class: pt.com.broker.performance.distributed.LocalManager.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    System.out.println("Consumer started: " + str);
                    new DistConsumerApp(LocalManager.this.hostname, LocalManager.this.port, str);
                } catch (Throwable th) {
                    LocalManager.log.error("Consumer initialization failed", th);
                }
            }
        });
    }

    private void startProducer(final String str) {
        this.executer.submit(new Runnable() { // from class: pt.com.broker.performance.distributed.LocalManager.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    System.out.println("Producer started: " + str);
                    new DistProducerApp(LocalManager.this.hostname, LocalManager.this.port, str);
                } catch (Throwable th) {
                    LocalManager.log.error("Consumer initialization failed", th);
                }
            }
        });
    }

    private void startTest(MachineConfiguration machineConfiguration) {
        System.out.println(String.format("Starting test, using %s consumers and %s producers.", Integer.valueOf(machineConfiguration.getConsumers().size()), Integer.valueOf(machineConfiguration.getProducers().size())));
        int i = 0;
        if (machineConfiguration.getConsumers() != null) {
            i = 0 + machineConfiguration.getConsumers().size();
        }
        if (machineConfiguration.getProducers() != null) {
            i += machineConfiguration.getProducers().size();
        }
        this.executer = Executors.newFixedThreadPool(i);
        Iterator<String> it = machineConfiguration.getConsumers().iterator();
        while (it.hasNext()) {
            startConsumer(it.next());
        }
        Iterator<String> it2 = machineConfiguration.getProducers().iterator();
        while (it2.hasNext()) {
            startProducer(it2.next());
        }
    }

    private void stopTest(MachineConfiguration machineConfiguration) {
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public boolean isAutoAck() {
        return true;
    }

    @Override // pt.com.broker.client.messaging.BrokerListener
    public void onMessage(NetNotification netNotification) {
        try {
            MachineConfiguration deserialize = MachineConfiguration.deserialize(netNotification.getMessage().getPayload());
            if (deserialize.isStop()) {
                stopTest(deserialize);
            } else {
                startTest(deserialize);
            }
        } catch (Throwable th) {
            log.error("Failed to process received message", th);
        }
    }
}
