package pt.com.gcs.messaging;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.caudexorigo.Shutdown;
import org.caudexorigo.concurrent.CustomExecutors;
import org.caudexorigo.text.StringUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.Headers;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPublish;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.conf.GlobalConfig;
import pt.com.gcs.net.Peer;
import pt.com.gcs.net.codec.GcsDecoder;
import pt.com.gcs.net.codec.GcsEncoder;

/* loaded from: input_file:pt/com/gcs/messaging/Gcs.class */
public class Gcs {
    private static final String SERVICE_NAME = "SAPO GCS";
    public static final int RECOVER_INTERVAL = 50;
    public static final int RECONNECT_INTERVAL = 5000;
    private Set<Channel> agentsConnection = new HashSet();
    private ClientBootstrap connector;
    private static Logger log = LoggerFactory.getLogger(Gcs.class);
    private static final Gcs instance = new Gcs();
    private static final long EXPIRATION_TIME = GcsInfo.getMessageStorageTime();

    public static void ackMessage(String str, String str2) {
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Can not acknowledge a message with a blank queue name");
        }
        if (StringUtils.isBlank(str2)) {
            throw new IllegalArgumentException("Can not acknowledge a message with a blank message-id");
        }
        instance.iackMessage(str, str2);
    }

    public static void addAsyncConsumer(String str, MessageListener messageListener) {
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Can not make a subscription with a blank subscription name");
        }
        if (messageListener == null) {
            throw new IllegalArgumentException("Can not make a subscription with a null listener");
        }
        if (messageListener.getSourceDestinationType() == NetAction.DestinationType.TOPIC) {
            instance.iaddTopicConsumer(str, messageListener);
        } else if (messageListener.getSourceDestinationType() == NetAction.DestinationType.QUEUE) {
            instance.iaddQueueConsumer(str, messageListener);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void connect(SocketAddress socketAddress) {
        if (!GlobalConfig.contains((InetSocketAddress) socketAddress)) {
            log.info("Peer '{}' does not appear in the world map, it will be ignored.", socketAddress.toString());
            return;
        }
        String socketToAgentId = OutboundRemoteChannels.socketToAgentId(socketAddress);
        Channel channel = OutboundRemoteChannels.get(socketToAgentId);
        if (channel != null) {
            OutboundRemoteChannels.remove(socketToAgentId);
            channel.close().awaitUninterruptibly(5L, TimeUnit.SECONDS);
        }
        log.info("Connecting to '{}'.", socketAddress.toString());
        ChannelFuture connect = instance.connector.connect(socketAddress);
        connect.awaitUninterruptibly(5L, TimeUnit.SECONDS);
        if (connect.isSuccess()) {
            OutboundRemoteChannels.add(socketToAgentId, connect.getChannel());
            log.info("Connection established to '{}'.", socketAddress.toString());
            synchronized (instance.agentsConnection) {
                instance.agentsConnection.add(connect.getChannel());
            }
            return;
        }
        log.info("Connection fail to '{}'.", socketAddress.toString());
        if (!connect.isDone()) {
            connect.cancel();
            if (connect.getChannel().isConnected()) {
                log.warn("Connection to '{}' established after beeing canceled.", socketAddress.toString());
                connect.getChannel().close();
            }
        }
        GcsExecutor.schedule(new Connect(socketAddress), 5000L, TimeUnit.MILLISECONDS);
    }

    public static boolean enqueue(NetMessage netMessage, String str) {
        netMessage.getHeaders().put("TYPE", "COM_QUEUE");
        return instance.ienqueue(netMessage, str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void reloadWorldMap() {
        log.info("Reloading the world map");
        Set<Channel> managedConnectorSessions = getManagedConnectorSessions();
        ArrayList arrayList = new ArrayList(managedConnectorSessions.size());
        for (Channel channel : managedConnectorSessions) {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.getRemoteAddress();
            if (!GlobalConfig.contains(inetSocketAddress)) {
                log.info("Remove peer '{}'", inetSocketAddress.toString());
                arrayList.add(channel);
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Channel) it.next()).close();
        }
        ArrayList arrayList2 = new ArrayList(managedConnectorSessions.size());
        Iterator<Channel> it2 = managedConnectorSessions.iterator();
        while (it2.hasNext()) {
            arrayList2.add((InetSocketAddress) it2.next().getRemoteAddress());
        }
        for (Peer peer : GlobalConfig.getPeerList()) {
            InetSocketAddress inetSocketAddress2 = new InetSocketAddress(peer.getHost(), peer.getPort());
            if (!arrayList2.contains(inetSocketAddress2)) {
                connect(inetSocketAddress2);
            }
        }
    }

    public static Set<Channel> getManagedConnectorSessions() {
        LinkedHashSet linkedHashSet;
        synchronized (instance.agentsConnection) {
            linkedHashSet = new LinkedHashSet(instance.agentsConnection);
        }
        return linkedHashSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static List<Peer> getPeerList() {
        return GlobalConfig.getPeerList();
    }

    public static void destroy() {
        instance.idestroy();
    }

    public static void init() {
        instance.iinit();
    }

    public static void publish(NetPublish netPublish) {
        instance.ipublish(netPublish);
    }

    public static void removeAsyncConsumer(MessageListener messageListener) {
        if (messageListener == null) {
            log.warn("Can not remove null listener");
        } else if (messageListener.getSourceDestinationType() == NetAction.DestinationType.TOPIC) {
            TopicProcessorList.removeListener(messageListener);
        } else if (messageListener.getSourceDestinationType() == NetAction.DestinationType.QUEUE) {
            QueueProcessorList.removeListener(messageListener);
        }
    }

    private Gcs() {
    }

    private void connectToAllPeers() {
        log.info("connectToAllPeers()");
        List<Peer> peerList = GlobalConfig.getPeerList();
        log.info("Peers: {}", peerList.toString());
        for (Peer peer : peerList) {
            connect(new InetSocketAddress(peer.getHost(), peer.getPort()));
        }
    }

    private void iackMessage(String str, String str2) {
        if (!QueueProcessorList.hasQueue(str)) {
            log.warn(String.format("Trying to acknowledge a message whose queue doesn't exists. Queue: '%s', MsgId: '%s' ", str, str2));
            return;
        }
        QueueProcessor queueProcessor = QueueProcessorList.get(str);
        if (queueProcessor != null) {
            queueProcessor.ack(str2);
        }
    }

    private void iaddQueueConsumer(String str, MessageListener messageListener) {
        QueueProcessor queueProcessor;
        if (messageListener == null || (queueProcessor = QueueProcessorList.get(str)) == null) {
            return;
        }
        queueProcessor.add(messageListener);
    }

    private void iaddTopicConsumer(String str, MessageListener messageListener) {
        TopicProcessor topicProcessor;
        if (messageListener == null || (topicProcessor = TopicProcessorList.get(str)) == null) {
            return;
        }
        topicProcessor.add(messageListener, true);
    }

    private boolean ienqueue(NetMessage netMessage, String str) {
        QueueProcessor queueProcessor = QueueProcessorList.get(str);
        if (queueProcessor == null) {
            return false;
        }
        queueProcessor.getQueueStatistics().newQueueMessageReceived();
        queueProcessor.store(netMessage, GlobalConfig.preferLocalConsumers());
        return true;
    }

    private void iinit() {
        if (GlobalConfig.supportVirtualQueues()) {
            for (String str : VirtualQueueStorage.getVirtualQueueNames()) {
                log.debug("Add VirtualQueue '{}' from storage", str);
                iaddQueueConsumer(str, null);
            }
        } else {
            log.info("Virtual Queues not supported.");
        }
        for (String str2 : BDBEnviroment.getQueueNames()) {
            QueueProcessorList.get(str2);
        }
        log.info("{} starting.", SERVICE_NAME);
        try {
            startAcceptor(GcsInfo.getAgentPort());
            startConnector();
            GcsExecutor.scheduleWithFixedDelay(new QueueCounter(), 20L, 20L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new GlobalConfigMonitor(), 30L, 30L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new GlobalStatisticsPublisher(), 60L, 60L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new QueueLister(), 5L, 5L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new ExpiredMessagesDeleter(), 10L, 10L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new QueueWatchDog(), 2L, 2L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new PingPeers(), 5L, 5L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new StaleQueueCleaner(), GlobalConfig.getQueueMaxStaleAge(), GlobalConfig.getQueueMaxStaleAge(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            Shutdown.now(th);
        }
        connectToAllPeers();
        log.info("{} initialized.", SERVICE_NAME);
    }

    private void idestroy() {
        try {
            BDBEnviroment.sync();
        } catch (Throwable th) {
            log.error(th.getMessage(), th);
        }
    }

    private void ipublish(NetPublish netPublish) {
        TopicProcessorList.notify(netPublish, false);
    }

    private void startAcceptor(int i) throws IOException {
        ServerBootstrap serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(CustomExecutors.newCachedThreadPool("gcs-io-1"), CustomExecutors.newCachedThreadPool("gcs-worker-1")));
        serverBootstrap.setOption("child.tcpNoDelay", true);
        serverBootstrap.setOption("child.keepAlive", true);
        serverBootstrap.setOption("child.receiveBufferSize", 131072);
        serverBootstrap.setOption("child.sendBufferSize", 131072);
        serverBootstrap.setOption("reuseAddress", true);
        serverBootstrap.setOption("backlog", 1024);
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() { // from class: pt.com.gcs.messaging.Gcs.1
            @Override // org.jboss.netty.channel.ChannelPipelineFactory
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("broker-encoder", new GcsEncoder());
                pipeline.addLast("broker-decoder", new GcsDecoder());
                pipeline.addLast("broker-handler", new GcsAcceptorProtocolHandler());
                return pipeline;
            }
        });
        InetSocketAddress inetSocketAddress = new InetSocketAddress("0.0.0.0", i);
        serverBootstrap.bind(inetSocketAddress);
        log.info("SAPO-BROKER Listening on: '{}'.", inetSocketAddress.toString());
        log.info("{} listening on: '{}'.", SERVICE_NAME, inetSocketAddress.toString());
    }

    private void startConnector() {
        log.info("Starting Local Connector - step 0");
        ThreadPoolExecutor newCachedThreadPool = CustomExecutors.newCachedThreadPool("gcs-io-2");
        log.info("Starting Local Connector - step 1");
        ThreadPoolExecutor newCachedThreadPool2 = CustomExecutors.newCachedThreadPool("gcs-worker-2");
        log.info("Starting Local Connector - step 2");
        ClientBootstrap clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(newCachedThreadPool, newCachedThreadPool2));
        log.info("Starting Local Connector - step 3");
        ChannelPipelineFactory channelPipelineFactory = new ChannelPipelineFactory() { // from class: pt.com.gcs.messaging.Gcs.2
            @Override // org.jboss.netty.channel.ChannelPipelineFactory
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("broker-encoder", new GcsEncoder());
                pipeline.addLast("broker-decoder", new GcsDecoder());
                pipeline.addLast("broker-handler", new GcsRemoteProtocolHandler());
                return pipeline;
            }
        };
        log.info("Starting Local Connector - step 4");
        clientBootstrap.setPipelineFactory(channelPipelineFactory);
        log.info("Starting Local Connector - step 5");
        clientBootstrap.setOption("child.keepAlive", true);
        clientBootstrap.setOption("child.receiveBufferSize", 131072);
        clientBootstrap.setOption("child.sendBufferSize", 131072);
        clientBootstrap.setOption("connectTimeoutMillis", 5000);
        log.info("Starting Local Connector - step 6");
        this.connector = clientBootstrap;
    }

    public static synchronized void deleteQueue(String str, boolean z) {
        QueueProcessorList.remove(str, z);
    }

    public static synchronized void deleteQueue(String str) {
        QueueProcessorList.remove(str);
    }

    public static void remoteSessionClosed(Channel channel) {
        synchronized (instance.agentsConnection) {
            instance.agentsConnection.remove(channel);
        }
    }

    public static NetMessage buildNotification(NetPublish netPublish, String str) {
        String messageId = MessageId.getMessageId();
        if (StringUtils.isBlank(netPublish.getMessage().getMessageId())) {
            netPublish.getMessage().setMessageId(messageId);
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (netPublish.getMessage().getTimestamp() == -1) {
            netPublish.getMessage().setTimestamp(currentTimeMillis);
        }
        if (netPublish.getMessage().getExpiration() == -1) {
            String str2 = netPublish.getMessage().getHeaders().get(Headers.DEFERRED_DELIVERY);
            if (StringUtils.isBlank(str2)) {
                netPublish.getMessage().setExpiration(currentTimeMillis + EXPIRATION_TIME);
            } else {
                try {
                    netPublish.getMessage().setExpiration(Long.parseLong(str2) + EXPIRATION_TIME);
                } catch (NumberFormatException e) {
                    log.error(String.format("'EXPIRATION_TIME' is invalid '%s'", str2), (Throwable) e);
                    throw new RuntimeException(e);
                }
            }
        }
        NetNotification netNotification = new NetNotification(netPublish.getDestination(), netPublish.getDestinationType(), netPublish.getMessage(), str);
        NetAction netAction = new NetAction(NetAction.ActionType.NOTIFICATION);
        netAction.setNotificationMessage(netNotification);
        NetMessage netMessage = new NetMessage(netAction);
        if (netPublish.getMessage().getHeaders() != null) {
            netMessage.getHeaders().putAll(netPublish.getMessage().getHeaders());
        }
        return netMessage;
    }

    public static void broadcastMaxQueueSizeReached() {
        broadcastMaxSizeFault(String.format("The maximum number of queues (%s) has been reached.", Integer.valueOf(GcsInfo.getMaxQueues())));
    }

    public static void broadcastMaxDistinctSubscriptionsReached() {
        broadcastMaxSizeFault(String.format("The maximum number of distinct subscriptions (%s) has been reached.", Integer.valueOf(GcsInfo.getMaxDistinctSubscriptions())));
    }

    private static void broadcastMaxSizeFault(String str) {
        publish(new NetPublish(String.format("/system/faults/#%s#", GcsInfo.getAgentName()), NetAction.DestinationType.TOPIC, new NetBrokerMessage(String.format("<soap:Envelope xmlns:soap='http://www.w3.org/2003/05/soap-envelope' xmlns:wsa='http://www.w3.org/2005/08/addressing' xmlns:mq='http://services.sapo.pt/broker'><soap:Header><wsa:From><wsa:Address>%s</wsa:Address></wsa:From></soap:Header><soap:Body><soap:Fault><soap:Code><soap:Value>soap:Receiver</soap:Value></soap:Code><soap:Reason><soap:Text>%s</soap:Text></soap:Reason><soap:Detail>%s</soap:Detail></soap:Fault></soap:Body></soap:Envelope>", GcsInfo.getAgentName(), "Limit reached", str))));
    }
}
