/*
 * Decompiled with CFR 0.152.
 */
package pt.com.gcs.messaging;

import com.google.common.base.Optional;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.Shutdown;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPublish;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.conf.GlobalConfig;
import pt.com.gcs.messaging.BDBEnviroment;
import pt.com.gcs.messaging.Connect;
import pt.com.gcs.messaging.ExpiredMessagesDeleter;
import pt.com.gcs.messaging.GcsExecutor;
import pt.com.gcs.messaging.GcsRemoteProtocolHandler;
import pt.com.gcs.messaging.GlobalConfigMonitor;
import pt.com.gcs.messaging.MessageId;
import pt.com.gcs.messaging.OutboundRemoteChannels;
import pt.com.gcs.messaging.PingPeers;
import pt.com.gcs.messaging.QueueCounter;
import pt.com.gcs.messaging.QueueLister;
import pt.com.gcs.messaging.QueueProcessor;
import pt.com.gcs.messaging.QueueProcessorList;
import pt.com.gcs.messaging.QueueWatchDog;
import pt.com.gcs.messaging.StaleQueueCleaner;
import pt.com.gcs.messaging.TopicProcessor;
import pt.com.gcs.messaging.TopicProcessorList;
import pt.com.gcs.messaging.VirtualQueueStorage;
import pt.com.gcs.messaging.statistics.KpiQueueConsumerCounter;
import pt.com.gcs.messaging.statistics.KpiQueuesSize;
import pt.com.gcs.messaging.statistics.KpiTopicConsumerCounter;
import pt.com.gcs.messaging.statistics.StatisticsCollector;
import pt.com.gcs.net.Peer;
import pt.com.gcs.net.codec.GcsCodec;
import pt.com.gcs.net.codec.GcsHandler;
import pt.com.gcs.net.ssl.SslContextFactory;

public class Gcs {
    private static Logger log = LoggerFactory.getLogger(Gcs.class);
    private static final String SERVICE_NAME = "SAPO GCS";
    private static final Gcs instance = new Gcs();
    public static final int RECOVER_INTERVAL = 50;
    public static final int RECONNECT_INTERVAL = 5000;
    private Set<Channel> agentsConnection = new HashSet<Channel>();
    private Bootstrap connector;
    private static final long EXPIRATION_TIME = GcsInfo.getMessageStorageTime();

    public static void ackMessage(String queueName, String msgId) {
        if (StringUtils.isBlank((CharSequence)queueName)) {
            throw new IllegalArgumentException("Can not acknowledge a message with a blank queue name");
        }
        if (StringUtils.isBlank((CharSequence)msgId)) {
            throw new IllegalArgumentException("Can not acknowledge a message with a blank message-id");
        }
        instance.iackMessage(queueName, msgId);
    }

    public static void addAsyncConsumer(String subscriptionKey, MessageListener listener) {
        if (StringUtils.isBlank((CharSequence)subscriptionKey)) {
            throw new IllegalArgumentException("Can not make a subscription with a blank subscription name");
        }
        if (listener == null) {
            throw new IllegalArgumentException("Can not make a subscription with a null listener");
        }
        if (listener.getSourceDestinationType() == NetAction.DestinationType.TOPIC) {
            instance.iaddTopicConsumer(subscriptionKey, listener);
        } else if (listener.getSourceDestinationType() == NetAction.DestinationType.QUEUE) {
            instance.iaddQueueConsumer(subscriptionKey, listener);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static void connect(SocketAddress address) {
        if (GlobalConfig.contains((InetSocketAddress)address)) {
            String remoteAgentId = OutboundRemoteChannels.socketToAgentId(address);
            Channel channel = OutboundRemoteChannels.get(remoteAgentId);
            if (channel != null) {
                OutboundRemoteChannels.remove(remoteAgentId);
                ChannelFuture channelFuture = channel.close();
                channelFuture.awaitUninterruptibly(5L, TimeUnit.SECONDS);
            }
            log.info("Connecting to '{}'.", (Object)address.toString());
            ChannelFuture cf = Gcs.instance.connector.connect(address);
            cf.awaitUninterruptibly(5L, TimeUnit.SECONDS);
            boolean sucess = cf.isSuccess();
            if (!sucess) {
                log.info("Connection fail to '{}'.", (Object)address.toString());
                if (!cf.isDone()) {
                    cf.cancel(true);
                    if (cf.channel().isActive()) {
                        log.warn("Connection to '{}' established after beeing canceled.", (Object)address.toString());
                        cf.channel().close();
                    }
                }
                GcsExecutor.schedule(new Connect(address), 5000L, TimeUnit.MILLISECONDS);
            } else {
                OutboundRemoteChannels.add(remoteAgentId, cf.channel());
                log.info("Connection established to '{}'.", (Object)address.toString());
                Set<Channel> set = Gcs.instance.agentsConnection;
                synchronized (set) {
                    Gcs.instance.agentsConnection.add(cf.channel());
                }
            }
        } else {
            log.info("Peer '{}' does not appear in the world map, it will be ignored.", (Object)address.toString());
        }
    }

    public static boolean enqueue(NetMessage nmsg, String queueName) {
        nmsg.getHeaders().put("TYPE", "COM_QUEUE");
        return instance.ienqueue(nmsg, queueName);
    }

    protected static void reloadWorldMap() {
        log.info("Reloading the world map");
        Set<Channel> connectedSessions = Gcs.getManagedConnectorSessions();
        ArrayList<Channel> sessionsToClose = new ArrayList<Channel>(connectedSessions.size());
        for (Channel channel : connectedSessions) {
            InetSocketAddress inet = (InetSocketAddress)channel.remoteAddress();
            if (GlobalConfig.contains(inet)) continue;
            log.info("Remove peer '{}'", (Object)inet.toString());
            sessionsToClose.add(channel);
        }
        for (Channel channel : sessionsToClose) {
            channel.close();
        }
        ArrayList<InetSocketAddress> remoteSessions = new ArrayList<InetSocketAddress>(connectedSessions.size());
        for (Channel channel : connectedSessions) {
            remoteSessions.add((InetSocketAddress)channel.remoteAddress());
        }
        List<Peer> list = GlobalConfig.getPeerList();
        for (Peer peer : list) {
            InetSocketAddress addr = new InetSocketAddress(peer.getHost(), peer.getPort());
            if (remoteSessions.contains(addr)) continue;
            Gcs.connect(addr);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Set<Channel> getManagedConnectorSessions() {
        LinkedHashSet<Channel> connections = null;
        Set<Channel> set = Gcs.instance.agentsConnection;
        synchronized (set) {
            connections = new LinkedHashSet<Channel>(Gcs.instance.agentsConnection);
        }
        return connections;
    }

    protected static List<Peer> getPeerList() {
        return GlobalConfig.getPeerList();
    }

    public static void destroy() {
        instance.idestroy();
    }

    public static void init() {
        instance.iinit();
    }

    public static void publish(NetPublish np) {
        instance.ipublish(np);
    }

    public static void removeAsyncConsumer(MessageListener listener) {
        if (listener != null) {
            if (listener.getSourceDestinationType() == NetAction.DestinationType.TOPIC) {
                TopicProcessorList.removeListener(listener);
            } else if (listener.getSourceDestinationType() == NetAction.DestinationType.QUEUE) {
                QueueProcessorList.removeListener(listener);
            }
        } else {
            log.warn("Can not remove null listener");
        }
    }

    private Gcs() {
    }

    private void connectToAllPeers() {
        log.info("connectToAllPeers()");
        List<Peer> peerList = GlobalConfig.getPeerList();
        String peers = peerList.toString();
        log.info("Peers: {}", (Object)peers);
        for (Peer peer : peerList) {
            InetSocketAddress addr = new InetSocketAddress(peer.getHost(), peer.getPort());
            Gcs.connect(addr);
        }
    }

    private void iackMessage(String queueName, String msgId) {
        if (!QueueProcessorList.hasQueue(queueName)) {
            log.warn(String.format("Trying to acknowledge a message whose queue doesn't exists. Queue: '%s', MsgId: '%s' ", queueName, msgId));
            return;
        }
        QueueProcessor queueProcessor = QueueProcessorList.get(queueName);
        if (queueProcessor != null) {
            queueProcessor.ack(msgId);
        }
    }

    private void iaddQueueConsumer(String queueName, MessageListener listener) {
        QueueProcessor qp;
        if (listener != null && (qp = QueueProcessorList.get(queueName)) != null) {
            qp.add(listener);
        }
    }

    private void iaddTopicConsumer(String subscriptionName, MessageListener listener) {
        TopicProcessor topicProcessor;
        if (listener != null && (topicProcessor = TopicProcessorList.get(subscriptionName)) != null) {
            topicProcessor.add(listener, true);
        }
    }

    private boolean ienqueue(NetMessage nmsg, String queueName) {
        QueueProcessor qp = QueueProcessorList.get(queueName);
        if (qp != null) {
            qp.getQueueStatistics().newQueueMessageReceived();
            qp.store(nmsg, GlobalConfig.preferLocalConsumers());
            return true;
        }
        return false;
    }

    private void iinit() {
        if (GlobalConfig.supportVirtualQueues()) {
            String[] virtual_queues;
            for (String vqueue : virtual_queues = VirtualQueueStorage.getVirtualQueueNames()) {
                log.debug("Add VirtualQueue '{}' from storage", (Object)vqueue);
                this.iaddQueueConsumer(vqueue, null);
            }
        } else {
            log.info("Virtual Queues not supported.");
        }
        String[] queues = BDBEnviroment.getQueueNames();
        for (String queueName : queues) {
            QueueProcessorList.get(queueName);
        }
        log.info("{} starting.", (Object)SERVICE_NAME);
        try {
            this.startAcceptor(GcsInfo.getAgentPort());
            this.startConnector();
            GcsExecutor.scheduleWithFixedDelay(new GlobalConfigMonitor(), 30L, 30L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new StatisticsCollector(), 60L, 60L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new KpiQueueConsumerCounter(), 120L, 120L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new KpiTopicConsumerCounter(), 120L, 120L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new QueueLister(), 5L, 5L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new QueueCounter(), 20L, 20L, TimeUnit.SECONDS);
            GcsExecutor.scheduleWithFixedDelay(new KpiQueuesSize(), 5L, 5L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new ExpiredMessagesDeleter(), 1L, 1L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new QueueWatchDog(), 2L, 2L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new PingPeers(), 5L, 5L, TimeUnit.MINUTES);
            GcsExecutor.scheduleWithFixedDelay(new StaleQueueCleaner((Optional<String>)Optional.absent(), GlobalConfig.getQueueMaxStaleAge()), GlobalConfig.getQueueMaxStaleAge(), GlobalConfig.getQueueMaxStaleAge(), TimeUnit.MILLISECONDS);
            Map<String, Long> queuePrefixConfig = GlobalConfig.getQueuePrefixConfig();
            Set<String> queuePrefixes = queuePrefixConfig.keySet();
            for (String queuePrefix : queuePrefixes) {
                Long queueStaleTimer = queuePrefixConfig.get(queuePrefix);
                GcsExecutor.scheduleWithFixedDelay(new StaleQueueCleaner((Optional<String>)Optional.of((Object)queuePrefix), queueStaleTimer), queueStaleTimer, queueStaleTimer, TimeUnit.MILLISECONDS);
            }
        }
        catch (Throwable t) {
            Shutdown.now((Throwable)t);
        }
        this.connectToAllPeers();
        log.info("{} initialized.", (Object)SERVICE_NAME);
    }

    private void idestroy() {
        try {
            BDBEnviroment.sync();
        }
        catch (Throwable te) {
            log.error(te.getMessage(), te);
        }
    }

    private void ipublish(NetPublish np) {
        TopicProcessorList.notify(np, false);
    }

    private void startAcceptor(int portNumber) throws IOException {
        ServerBootstrap bootstrap = new ServerBootstrap();
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        bootstrap.group((EventLoopGroup)bossGroup, (EventLoopGroup)workerGroup);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, (Object)true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.childOption(ChannelOption.SO_RCVBUF, (Object)131072);
        bootstrap.childOption(ChannelOption.SO_SNDBUF, (Object)131072);
        bootstrap.option(ChannelOption.SO_REUSEADDR, (Object)true);
        bootstrap.option(ChannelOption.SO_BACKLOG, (Object)1024);
        ((ServerBootstrap)bootstrap.channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                SslContext serverContext = SslContextFactory.getServerSslContext(GcsInfo.getCertificateFile(), GcsInfo.getKeyFile(), GcsInfo.getKeyPassword());
                pipeline.addLast("broker-gcs-handler", (ChannelHandler)new GcsHandler(serverContext, GcsInfo.isForceAgentSsl()));
            }
        });
        InetSocketAddress inet = new InetSocketAddress("0.0.0.0", portNumber);
        bootstrap.bind((SocketAddress)inet);
        log.info("{} listening on: '{}'.", (Object)SERVICE_NAME, (Object)inet.toString());
    }

    private void startConnector() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group((EventLoopGroup)new NioEventLoopGroup());
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                SslContext clientContext = SslContextFactory.getClientSslContext();
                pipeline.addLast("broker-ssl-handler", (ChannelHandler)clientContext.newHandler(ch.alloc()));
                pipeline.addLast("broker-codec", (ChannelHandler)new GcsCodec());
                pipeline.addLast("broker-handler", (ChannelHandler)new GcsRemoteProtocolHandler());
            }
        });
        bootstrap.option(ChannelOption.SO_KEEPALIVE, (Object)true);
        bootstrap.option(ChannelOption.SO_RCVBUF, (Object)131072);
        bootstrap.option(ChannelOption.SO_SNDBUF, (Object)131072);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)5000);
        this.connector = bootstrap;
    }

    public static synchronized void deleteQueue(String queueName, boolean safe) {
        QueueProcessorList.remove(queueName, safe);
    }

    public static synchronized void deleteQueue(String queueName) {
        QueueProcessorList.remove(queueName);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void remoteSessionClosed(Channel channel) {
        Set<Channel> set = Gcs.instance.agentsConnection;
        synchronized (set) {
            Gcs.instance.agentsConnection.remove(channel);
        }
    }

    public static NetMessage buildNotification(NetPublish np, String subscriptionName) {
        String msg_id = MessageId.getMessageId();
        if (StringUtils.isBlank((CharSequence)np.getMessage().getMessageId())) {
            np.getMessage().setMessageId(msg_id);
        }
        long now = System.currentTimeMillis();
        if (np.getMessage().getTimestamp() == -1L) {
            np.getMessage().setTimestamp(now);
        }
        if (np.getMessage().getExpiration() == -1L) {
            String deliveryTime = (String)np.getMessage().getHeaders().get("DEFERRED_DELIVERY");
            if (StringUtils.isBlank((CharSequence)deliveryTime)) {
                np.getMessage().setExpiration(now + EXPIRATION_TIME);
            } else {
                try {
                    long value = Long.parseLong(deliveryTime);
                    np.getMessage().setExpiration(value + EXPIRATION_TIME);
                }
                catch (NumberFormatException nfe) {
                    log.error(String.format("'EXPIRATION_TIME' is invalid '%s'", deliveryTime), (Throwable)nfe);
                    throw new RuntimeException(nfe);
                }
            }
        }
        NetNotification notification = new NetNotification(np.getDestination(), np.getDestinationType(), np.getMessage(), subscriptionName);
        NetAction action = new NetAction(NetAction.ActionType.NOTIFICATION);
        action.setNotificationMessage(notification);
        NetMessage message = new NetMessage(action);
        if (np.getMessage().getHeaders() != null) {
            message.getHeaders().putAll(np.getMessage().getHeaders());
        }
        return message;
    }

    public static void broadcastMaxQueueSizeReached() {
        Gcs.broadcastMaxSizeFault(String.format("The maximum number of queues (%s) has been reached.", GcsInfo.getMaxQueues()));
    }

    public static void broadcastMaxDistinctSubscriptionsReached() {
        Gcs.broadcastMaxSizeFault(String.format("The maximum number of distinct subscriptions (%s) has been reached.", GcsInfo.getMaxDistinctSubscriptions()));
    }

    private static void broadcastMaxSizeFault(String message) {
        String topic = String.format("/system/faults/#%s#", GcsInfo.getAgentName());
        String soapMessageTemplate = "<soap:Envelope xmlns:soap='http://www.w3.org/2003/05/soap-envelope' xmlns:wsa='http://www.w3.org/2005/08/addressing' xmlns:mq='http://services.sapo.pt/broker'><soap:Header><wsa:From><wsa:Address>%s</wsa:Address></wsa:From></soap:Header><soap:Body><soap:Fault><soap:Code><soap:Value>soap:Receiver</soap:Value></soap:Code><soap:Reason><soap:Text>%s</soap:Text></soap:Reason><soap:Detail>%s</soap:Detail></soap:Fault></soap:Body></soap:Envelope>";
        String faultMessage = String.format("<soap:Envelope xmlns:soap='http://www.w3.org/2003/05/soap-envelope' xmlns:wsa='http://www.w3.org/2005/08/addressing' xmlns:mq='http://services.sapo.pt/broker'><soap:Header><wsa:From><wsa:Address>%s</wsa:Address></wsa:From></soap:Header><soap:Body><soap:Fault><soap:Code><soap:Value>soap:Receiver</soap:Value></soap:Code><soap:Reason><soap:Text>%s</soap:Text></soap:Reason><soap:Detail>%s</soap:Detail></soap:Fault></soap:Body></soap:Envelope>", GcsInfo.getAgentName(), "Limit reached", message);
        NetPublish np = new NetPublish(topic, NetAction.DestinationType.TOPIC, new NetBrokerMessage(faultMessage));
        Gcs.publish(np);
    }

    public static Gcs getInstance() {
        return instance;
    }
}

