/*
 * Decompiled with CFR 0.152.
 */
package pt.com.gcs.messaging;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.ErrorAnalyser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.CriticalErrors;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.channels.ChannelAttributes;
import pt.com.broker.types.channels.ListenerChannelFactory;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.conf.GlobalConfig;
import pt.com.gcs.messaging.Gcs;
import pt.com.gcs.messaging.GlobalConfigMonitor;
import pt.com.gcs.messaging.InboundRemoteChannels;
import pt.com.gcs.messaging.QueueProcessor;
import pt.com.gcs.messaging.QueueProcessorList;
import pt.com.gcs.messaging.RemoteListener;
import pt.com.gcs.messaging.TopicProcessor;
import pt.com.gcs.messaging.TopicProcessorList;
import pt.com.gcs.net.Peer;

@ChannelHandler.Sharable
public class GcsAcceptorProtocolHandler
extends SimpleChannelInboundHandler<NetMessage> {
    private static Logger log = LoggerFactory.getLogger(GcsAcceptorProtocolHandler.class);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static List<InetSocketAddress> peersAddressList;

    private static void createPeersList() {
        List<Peer> peerList = GlobalConfig.getPeerList();
        peersAddressList = new ArrayList<InetSocketAddress>(peerList.size());
        for (Peer peer : peerList) {
            InetSocketAddress addr = new InetSocketAddress(peer.getHost(), peer.getPort());
            peersAddressList.add(addr);
        }
    }

    public static void globalConfigReloaded() {
        GcsAcceptorProtocolHandler.createPeersList();
    }

    protected void channelRead0(ChannelHandlerContext ctx, NetMessage msg) throws Exception {
        NetMessage m = msg;
        String mtype = (String)m.getHeaders().get("TYPE");
        NetNotification nnot = m.getAction().getNotificationMessage();
        NetBrokerMessage brkMsg = nnot.getMessage();
        String msgContent = new String(brkMsg.getPayload(), "UTF-8");
        if (log.isDebugEnabled()) {
            log.debug(String.format("Message Received from: '%s', Destination: '%s', Type: '%s', MsgId: '%s'", ctx.channel().remoteAddress(), nnot.getDestination(), mtype, brkMsg.getMessageId()));
        }
        if (mtype.equals("ACK")) {
            Gcs.ackMessage(nnot.getDestination(), brkMsg.getMessageId());
            return;
        }
        if (mtype.equals("PING")) {
            this.acknowledgeSystemMessage(brkMsg.getMessageId(), ctx);
            return;
        }
        if (mtype.equals("HELLO")) {
            Peer peer = Peer.createPeerFromHelloMessage(msgContent);
            if (peer == null) {
                log.error("Invalid 'HELLO' message: ", (Object)msgContent);
                return;
            }
            this.validatePeer(ctx, peer, msgContent);
            boolean isValid = (Boolean)ChannelAttributes.get((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.ISVALID");
            if (isValid) {
                log.debug("Peer is valid!");
                ChannelHandlerContext previousChannel = InboundRemoteChannels.add(peer.getAddress(), ctx);
                if (previousChannel != null) {
                    log.info(String.format("Peer '%s' connected through channel '%s' was connected through channel '%s'", peer.getAddress(), ctx.channel().toString(), previousChannel.channel().toString()));
                    this.handleChannelClosed(previousChannel);
                }
                return;
            }
            String paddr = String.valueOf(ChannelAttributes.get((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.PEER_ADDRESS"));
            log.error("A peer from \"{}\" tried to connect but it does not appear in the world map.", (Object)paddr);
            ctx.channel().close();
            return;
        }
        if (mtype.equals("SYSTEM_TOPIC") || mtype.equals("SYSTEM_QUEUE")) {
            RemoteListener remoteListener;
            String action = this.extract(msgContent, "<action>", "</action>");
            String src_name = this.extract(msgContent, "<source-name>", "</source-name>");
            ChannelHandlerContext channelHandlerContext = InboundRemoteChannels.get(src_name);
            if (!ctx.equals(channelHandlerContext)) {
                log.error(String.format("RemoteChannel for agent '%s' is '%s' but received a system message from '%s'. Closing both.", src_name, channelHandlerContext, ctx));
                channelHandlerContext.channel().close();
                ctx.channel().close();
                return;
            }
            String subscriptionKey = this.extract(msgContent, "<destination>", "</destination>");
            if (StringUtils.isBlank((CharSequence)subscriptionKey)) {
                String errorMessage = String.format("Sytem Queue or Topic message has a blank destination field. Message content: %s", msgContent);
                log.error(errorMessage);
                throw new RuntimeException(errorMessage);
            }
            if (StringUtils.isBlank((CharSequence)action)) {
                String errorMessage = String.format("Sytem Queue or Topic message has a blank action field. Message content: %s", msgContent);
                log.error(errorMessage);
                throw new RuntimeException(errorMessage);
            }
            if (log.isInfoEnabled()) {
                String lmsg = String.format("Action: '%s' Consumer; Subscription: '%s'; Source: '%s'", action, subscriptionKey, src_name);
                log.info(lmsg);
            }
            this.acknowledgeSystemMessage(brkMsg.getMessageId(), ctx);
            if (mtype.equals("SYSTEM_TOPIC")) {
                remoteListener = new RemoteListener(ListenerChannelFactory.getListenerChannel((ChannelHandlerContext)ctx), subscriptionKey, NetAction.DestinationType.TOPIC, NetAction.DestinationType.TOPIC);
                TopicProcessor tp = TopicProcessorList.get(subscriptionKey);
                if (tp == null) {
                    log.error("Failed to obtain a TopicProcessor instance for topic '{}'.", (Object)subscriptionKey);
                    return;
                }
                if (action.equals("CREATE")) {
                    tp.add((MessageListener)remoteListener, false);
                } else if (action.equals("DELETE")) {
                    tp.remove((MessageListener)remoteListener);
                }
            } else if (mtype.equals("SYSTEM_QUEUE")) {
                remoteListener = new RemoteListener(ListenerChannelFactory.getListenerChannel((ChannelHandlerContext)ctx), subscriptionKey, NetAction.DestinationType.QUEUE, NetAction.DestinationType.QUEUE);
                QueueProcessor qp = QueueProcessorList.get(subscriptionKey);
                if (qp == null) {
                    log.error("Failed to obtain a QueueProcessor instance for queue '{}'.", (Object)subscriptionKey);
                    return;
                }
                if (action.equals("CREATE")) {
                    qp.add((MessageListener)remoteListener);
                } else if (action.equals("DELETE")) {
                    qp.remove((MessageListener)remoteListener);
                }
            }
        } else {
            log.warn("Unkwown message type. Don't know how to handle message");
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Throwable rootCause = ErrorAnalyser.findRootCause((Throwable)cause);
        CriticalErrors.exitIfCritical((Throwable)rootCause);
        log.error("Exception Caught:'{}', '{}'", (Object)ctx.channel().remoteAddress().toString(), (Object)rootCause.getMessage());
        log.error(String.format("STACKTRACE:\n%s", rootCause));
        if (rootCause instanceof ClosedChannelException) {
            this.handleChannelClosed(ctx);
        }
    }

    private void acknowledgeSystemMessage(String messageId, ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String ptemplate = "<sysmessage><action>%s</action><source-name>%s</source-name><source-ip>%s</source-ip><message-id>%s</message-id></sysmessage>";
        String payload = String.format("<sysmessage><action>%s</action><source-name>%s</source-name><source-ip>%s</source-ip><message-id>%s</message-id></sysmessage>", "SYSTEM_ACKNOWLEDGE", GcsInfo.getAgentName(), channel.localAddress().toString(), messageId);
        NetBrokerMessage brkMsg = new NetBrokerMessage(payload.getBytes(UTF8));
        NetNotification notification = new NetNotification("/system/peer", NetAction.DestinationType.TOPIC, brkMsg, "/system/peer");
        NetAction naction = new NetAction(NetAction.ActionType.NOTIFICATION);
        naction.setNotificationMessage(notification);
        NetMessage nmsg = new NetMessage(naction);
        nmsg.getHeaders().put("TYPE", "SYSTEM_ACK");
        if (log.isDebugEnabled()) {
            log.debug(String.format("Acknowledging System Message. Payload: %s", payload));
        }
        if (channel.isWritable()) {
            channel.writeAndFlush((Object)nmsg);
        } else {
            log.warn(String.format("Can ack system message because the channel is not writable. Message id '%s' could not be sent to '%s'. Closing connection.", messageId, channel.remoteAddress().toString()));
            channel.close();
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.handleChannelClosed(ctx);
    }

    private void handleChannelClosed(ChannelHandlerContext ctx) {
        log.info("Session Closed: '{}'", (Object)ctx.channel().remoteAddress());
        TopicProcessorList.removeSession(ctx);
        QueueProcessorList.removeSession(ctx);
        if (!InboundRemoteChannels.remove(ctx)) {
            log.warn("Failed to remove '{}' from RemoteChannels. It should be there.", (Object)ctx.channel());
        }
        ChannelAttributes.remove((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx));
        ListenerChannelFactory.channelClosed((Channel)ctx.channel());
    }

    private boolean validPeerAddress(ChannelHandlerContext ctx) {
        InetSocketAddress remotePeer = (InetSocketAddress)ctx.channel().remoteAddress();
        InetAddress address = remotePeer.getAddress();
        for (InetSocketAddress addr : peersAddressList) {
            if (!address.equals(addr.getAddress())) continue;
            return true;
        }
        return false;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        log.info("Session Opened: '{}'", (Object)ctx.channel().remoteAddress());
        if (!this.validPeerAddress(ctx)) {
            ctx.channel().close();
            log.warn("GCS: connection refused");
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Session Created: '{}'", (Object)ctx.channel().remoteAddress());
        }
    }

    private void validatePeer(ChannelHandlerContext ctx, Peer peer, String helloMessage) {
        log.debug("\"Hello\" message received: '{}'", (Object)helloMessage);
        try {
            ChannelAttributes.set((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.PEER_ADDRESS", (Object)peer.getAddress());
            if (Gcs.getPeerList().contains(peer)) {
                log.debug("Peer '{}' exists in the world map'", (Object)peer.toString());
                ChannelAttributes.set((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.ISVALID", (Object)true);
                return;
            }
        }
        catch (Throwable t) {
            ChannelAttributes.set((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.PEER_ADDRESS", (Object)"Unknown address");
            log.error(t.getMessage(), t);
        }
        ChannelAttributes.set((String)ChannelAttributes.getChannelId((ChannelHandlerContext)ctx), (String)"GcsAcceptorProtocolHandler.ISVALID", (Object)false);
    }

    private String extract(String ins, String prefix, String sufix) {
        if (StringUtils.isBlank((CharSequence)ins)) {
            return "";
        }
        int s = ins.indexOf(prefix) + prefix.length();
        int e = ins.indexOf(sufix);
        return ins.substring(s, e);
    }

    static {
        GcsAcceptorProtocolHandler.createPeersList();
        GlobalConfigMonitor.addGlobalConfigModifiedListener(new GlobalConfigMonitor.GlobalConfigModifiedListener(){

            @Override
            public void globalConfigModified() {
                GcsAcceptorProtocolHandler.globalConfigReloaded();
            }
        });
    }
}

