package pt.com.broker.messaging;

import io.netty.channel.ChannelHandlerContext;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.caudexorigo.time.ISO8601;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.core.BrokerExecutor;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetSubscribe;
import pt.com.broker.types.NetUnsubscribe;
import pt.com.broker.types.channels.ListenerChannel;
import pt.com.broker.types.channels.ListenerChannelFactory;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.messaging.Gcs;
import pt.com.gcs.messaging.InternalPublisher;
import pt.com.gcs.messaging.QueueProcessor;
import pt.com.gcs.messaging.QueueProcessorList;
import pt.com.gcs.messaging.TopicProcessor;
import pt.com.gcs.messaging.TopicProcessorList;

/* loaded from: input_file:pt/com/broker/messaging/BrokerConsumer.class */
public class BrokerConsumer {
    private static final Logger log = LoggerFactory.getLogger(BrokerConsumer.class);
    private static BrokerConsumer instance = new BrokerConsumer();

    public static BrokerConsumer getInstance() {
        return instance;
    }

    private BrokerConsumer() {
        Runnable runnable = new Runnable() { // from class: pt.com.broker.messaging.BrokerConsumer.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StringBuilder sb = new StringBuilder();
                    sb.append(String.format("<mqinfo date='%s' agent-name='%s'>", ISO8601.format(new Date()), GcsInfo.getAgentName()));
                    for (TopicProcessor topicProcessor : TopicProcessorList.values()) {
                        int countLocal = countLocal(topicProcessor.listeners());
                        if (countLocal > 0) {
                            sb.append(String.format("\n\t<item subject='topic://%s' predicate='subscriptions' value='%s' />", topicProcessor.getSubscriptionName(), Integer.valueOf(countLocal)));
                        }
                    }
                    sb.append("\n</mqinfo>");
                    InternalPublisher.send(String.format("/system/stats/topics/#%s#", GcsInfo.getAgentName()), sb.toString());
                } catch (Throwable th) {
                    BrokerConsumer.log.error(th.getMessage(), th);
                }
            }

            private int countLocal(Collection<MessageListener> collection) {
                int i = 0;
                Iterator<MessageListener> it = collection.iterator();
                while (it.hasNext()) {
                    if (it.next().getType() == MessageListener.Type.LOCAL) {
                        i++;
                    }
                }
                return i;
            }
        };
        Runnable runnable2 = new Runnable() { // from class: pt.com.broker.messaging.BrokerConsumer.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StringBuilder sb = new StringBuilder();
                    sb.append(String.format("<mqinfo date='%s' agent-name='%s'>", ISO8601.format(new Date()), GcsInfo.getAgentName()));
                    for (QueueProcessor queueProcessor : QueueProcessorList.values()) {
                        int size = queueProcessor.localListeners().size();
                        if (size > 0) {
                            sb.append(String.format("\n\t<item subject='queue://%s' predicate='subscriptions' value='%s' />", queueProcessor.getQueueName(), Integer.valueOf(size)));
                        }
                    }
                    sb.append("\n</mqinfo>");
                    InternalPublisher.send(String.format("/system/stats/queues/#%s#", GcsInfo.getAgentName()), sb.toString());
                } catch (Throwable th) {
                    BrokerConsumer.log.error(th.getMessage(), th);
                }
            }
        };
        BrokerExecutor.scheduleWithFixedDelay(runnable, 120L, 120L, TimeUnit.SECONDS);
        BrokerExecutor.scheduleWithFixedDelay(runnable2, 120L, 120L, TimeUnit.SECONDS);
    }

    public void listen(NetSubscribe netSubscribe, ChannelHandlerContext channelHandlerContext, boolean z) {
        try {
            Gcs.addAsyncConsumer(netSubscribe.getDestination(), new BrokerQueueListener(ListenerChannelFactory.getListenerChannel(channelHandlerContext), netSubscribe.getDestination(), z));
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public synchronized boolean subscribe(NetSubscribe netSubscribe, ChannelHandlerContext channelHandlerContext) {
        try {
            Gcs.addAsyncConsumer(netSubscribe.getDestination(), new BrokerTopicListener(ListenerChannelFactory.getListenerChannel(channelHandlerContext), netSubscribe.getDestination()));
            return true;
        } catch (Throwable th) {
            if (th instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(th);
        }
    }

    public synchronized void unsubscribe(NetUnsubscribe netUnsubscribe, ChannelHandlerContext channelHandlerContext) {
        String destination = netUnsubscribe.getDestination();
        NetAction.DestinationType destinationType = netUnsubscribe.getDestinationType();
        ListenerChannel listenerChannel = ListenerChannelFactory.getListenerChannel(channelHandlerContext);
        if (destinationType == NetAction.DestinationType.TOPIC) {
            TopicProcessorList.removeListener(new BrokerTopicListener(listenerChannel, destination));
        } else if (destinationType == NetAction.DestinationType.QUEUE || destinationType == NetAction.DestinationType.VIRTUAL_QUEUE) {
            QueueProcessorList.removeListener(new BrokerQueueListener(listenerChannel, destination, true));
        }
    }
}
