/*
 * Decompiled with CFR 0.152.
 */
package pt.com.gcs.messaging;

import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.DeliverableMessage;
import pt.com.broker.types.ForwardResult;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPublish;
import pt.com.broker.types.channels.ListenerChannel;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.messaging.DestinationMatcher;
import pt.com.gcs.messaging.Gcs;
import pt.com.gcs.messaging.MessageId;
import pt.com.gcs.messaging.SubscriptionProcessor;
import pt.com.gcs.messaging.SystemMessagesPublisher;
import pt.com.gcs.messaging.TopicStatistics;

public class TopicProcessor
implements SubscriptionProcessor {
    private static Logger log = LoggerFactory.getLogger(TopicProcessor.class);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private final TopicStatistics topicStatistics = new TopicStatistics();
    private final String subscriptionName;
    private final Set<MessageListener> topicListeners = new CopyOnWriteArraySet<MessageListener>();
    private volatile boolean broadcastable;

    protected TopicProcessor(String subscriptionName) {
        if (StringUtils.isBlank((CharSequence)subscriptionName)) {
            throw new IllegalArgumentException("Subscription names can not be blank");
        }
        this.subscriptionName = subscriptionName;
        try {
            Pattern.compile(subscriptionName);
        }
        catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(MessageListener listener, boolean broadcast) {
        if (listener != null) {
            Set<MessageListener> set = this.topicListeners;
            synchronized (set) {
                if (listener.getType() == MessageListener.Type.REMOTE) {
                    this.addRemote(listener);
                } else {
                    this.addLocal(listener);
                }
            }
        } else {
            throw new IllegalArgumentException(String.format("Cannot add null listener to subscription '%s'", this.subscriptionName));
        }
    }

    private void addLocal(MessageListener listener) {
        boolean has_local = false;
        if (listener.getType() == MessageListener.Type.LOCAL) {
            has_local = this.hasLocalConsumers();
        }
        if (this.topicListeners.add(listener)) {
            log.info("Add listener -> '{}'", (Object)listener.toString());
            if (listener.getType() == MessageListener.Type.LOCAL) {
                if (!has_local) {
                    this.broadCastNewTopicConsumer();
                }
                this.broadcastable = true;
            }
        }
    }

    private void addRemote(MessageListener listener) {
        if (this.topicListeners.add(listener)) {
            log.info("Add listener -> '{}'", (Object)listener.toString());
        }
    }

    private void broadCastActionTopicConsumer(String action) {
        Set<Channel> sessions = Gcs.getManagedConnectorSessions();
        for (Channel channel : sessions) {
            try {
                this.broadCastTopicInfo(action, channel);
            }
            catch (Throwable t) {
                log.error(t.getMessage(), t);
                try {
                    channel.close();
                }
                catch (Throwable ct) {
                    log.error(ct.getMessage(), ct);
                }
            }
        }
    }

    private void broadCastNewTopicConsumer() {
        log.info("Tell all peers about new topic consumer for: '{}'", (Object)this.subscriptionName);
        this.broadCastActionTopicConsumer("CREATE");
    }

    private void broadCastRemovedTopicConsumer() {
        log.info("Tell all peers about deleted topic consumer of: '{}'", (Object)this.subscriptionName);
        this.broadCastActionTopicConsumer("DELETE");
    }

    protected void broadCastTopicInfo(String action, Channel channel) {
        String ptemplate = "<sysmessage><action>%s</action><source-name>%s</source-name><source-ip>%s</source-ip><destination>%s</destination></sysmessage>";
        String payload = String.format(ptemplate, action, GcsInfo.getAgentName(), ((InetSocketAddress)channel.remoteAddress()).getHostName(), this.subscriptionName);
        NetBrokerMessage brkMsg = new NetBrokerMessage(payload.getBytes(UTF8));
        brkMsg.setMessageId(MessageId.getMessageId());
        NetNotification notification = new NetNotification("/system/peer", NetAction.DestinationType.TOPIC, brkMsg, "/system/peer");
        NetAction naction = new NetAction(NetAction.ActionType.NOTIFICATION);
        naction.setNotificationMessage(notification);
        NetMessage nmsg = new NetMessage(naction);
        nmsg.getHeaders().put("TYPE", "SYSTEM_TOPIC");
        SystemMessagesPublisher.sendMessage(nmsg, channel);
    }

    @Override
    public String getSubscriptionName() {
        return this.subscriptionName;
    }

    protected boolean hasLocalConsumers() {
        for (MessageListener l : this.topicListeners) {
            if (l.getType() != MessageListener.Type.LOCAL) continue;
            return true;
        }
        return false;
    }

    public boolean isBroadcastable() {
        return this.broadcastable;
    }

    public Set<MessageListener> listeners() {
        return this.topicListeners;
    }

    protected void notify(NetPublish np, boolean localOnly, Set<ListenerChannel> remoteListenersTouched) {
        String topicName;
        if (this.size() > 0 && DestinationMatcher.match(this.subscriptionName, topicName = np.getDestination())) {
            NetMessage nmsg = Gcs.buildNotification(np, this.subscriptionName);
            for (MessageListener ml : this.topicListeners) {
                if (ml == null || localOnly && ml.getType() == MessageListener.Type.REMOTE) continue;
                if (ml.getType() == MessageListener.Type.REMOTE) {
                    if (remoteListenersTouched.contains(ml.getChannel())) continue;
                    this.doNotify(nmsg, ml);
                    remoteListenersTouched.add(ml.getChannel());
                    continue;
                }
                this.doNotify(nmsg, ml);
            }
        }
    }

    private void doNotify(NetMessage nmsg, MessageListener ml) {
        if (ml.onMessage((DeliverableMessage)nmsg).result == ForwardResult.Result.SUCCESS) {
            if (ml.getTargetDestinationType() == NetAction.DestinationType.TOPIC && ml.getType().equals((Object)MessageListener.Type.LOCAL)) {
                log.debug("Incrementig toppic messages delivered for topic: {}", (Object)this.getSubscriptionName());
                this.topicStatistics.newTopicMessageDelivered();
            } else {
                this.topicStatistics.newTopicDispatchedToQueueMessage();
            }
        } else {
            this.topicStatistics.newTopicDiscardedMessage();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(MessageListener listener) {
        if (listener != null) {
            Set<MessageListener> set = this.topicListeners;
            synchronized (set) {
                if (this.topicListeners.remove(listener)) {
                    log.info("Removed listener -> '{}'", (Object)listener.toString());
                    boolean has_local = this.hasLocalConsumers();
                    if (!has_local && listener.getType() == MessageListener.Type.LOCAL) {
                        this.broadCastRemovedTopicConsumer();
                    }
                }
            }
        }
    }

    protected int size() {
        return this.topicListeners.size();
    }

    public TopicStatistics getTopicStatistics() {
        return this.topicStatistics;
    }

    @Override
    public Set<MessageListener> localListeners() {
        HashSet<MessageListener> listeners = new HashSet<MessageListener>();
        for (MessageListener listener : this.listeners()) {
            if (listener.getType() != MessageListener.Type.LOCAL && listener.getType() != MessageListener.Type.INTERNAL) continue;
            listeners.add(listener);
        }
        return listeners;
    }

    @Override
    public Set<MessageListener> remoteListeners() {
        HashSet<MessageListener> listeners = new HashSet<MessageListener>();
        for (MessageListener listener : this.listeners()) {
            if (listener.getType() != MessageListener.Type.REMOTE) continue;
            listeners.add(listener);
        }
        return listeners;
    }

    @Override
    public boolean hasLocalListeners() {
        for (MessageListener listener : this.listeners()) {
            if (listener.getType() != MessageListener.Type.LOCAL && listener.getType() != MessageListener.Type.INTERNAL) continue;
            return true;
        }
        return false;
    }

    @Override
    public boolean hasRemoteListeners() {
        for (MessageListener listener : this.listeners()) {
            if (listener.getType() != MessageListener.Type.REMOTE) continue;
            return true;
        }
        return false;
    }
}

