/*
 * Decompiled with CFR 0.152.
 */
package pt.com.gcs.messaging;

import io.netty.channel.Channel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.ErrorAnalyser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.DeliverableMessage;
import pt.com.broker.types.ForwardResult;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.MessageListenerEventChangeHandler;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.conf.GlobalConfig;
import pt.com.gcs.messaging.BDBMessage;
import pt.com.gcs.messaging.BDBStorage;
import pt.com.gcs.messaging.Gcs;
import pt.com.gcs.messaging.GcsExecutor;
import pt.com.gcs.messaging.MessageId;
import pt.com.gcs.messaging.QueueStatistics;
import pt.com.gcs.messaging.SubscriptionProcessor;
import pt.com.gcs.messaging.SystemMessagesPublisher;
import pt.com.gcs.messaging.TopicProcessor;
import pt.com.gcs.messaging.TopicProcessorList;
import pt.com.gcs.messaging.TopicToQueueDispatcher;
import pt.com.gcs.messaging.VirtualQueueStorage;

public class QueueProcessor
implements SubscriptionProcessor {
    private static Logger log = LoggerFactory.getLogger(QueueProcessor.class);
    private static final ForwardResult failed = new ForwardResult(ForwardResult.Result.FAILED);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    protected final AtomicBoolean emptyQueueInfoDisplay = new AtomicBoolean(false);
    protected final AtomicLong counter = new AtomicLong(0L);
    private final AtomicBoolean isWorking = new AtomicBoolean(false);
    private final CopyOnWriteArraySet<MessageListener> localQueueListeners = new CopyOnWriteArraySet();
    private final String queueName;
    private final Set<MessageListener> remoteQueueListeners = new CopyOnWriteArraySet<MessageListener>();
    private final AtomicLong sequence;
    private final BDBStorage storage;
    private TopicToQueueDispatcher topicFwd;
    private final QueueStatistics queueStatistics = new QueueStatistics();
    private final AtomicBoolean deliveringMessages = new AtomicBoolean(false);
    private final AtomicLong nextRun = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong lastCycle = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong currentIdx = new AtomicLong(0L);
    private final AtomicLong lastMessageDelivered = new AtomicLong(System.currentTimeMillis());
    private long maxStaleAge;
    private AtomicBoolean deliveryRequest = new AtomicBoolean(false);
    private MessageListenerEventChangeHandler msgListenterEventHandler = new MessageListenerEventChangeHandler(){

        public void stateChanged(MessageListener messageListener, MessageListener.MessageListenerState state) {
            if (messageListener.isReady() && messageListener.isActive()) {
                QueueProcessor.this.deliverMessages();
            }
        }
    };

    protected QueueProcessor(String queueName, long maxStaleAge) {
        this.maxStaleAge = maxStaleAge;
        if (StringUtils.isBlank((CharSequence)queueName)) {
            throw new IllegalArgumentException("Queue names can not be blank");
        }
        this.queueName = queueName;
        this.storage = new BDBStorage(this);
        long cnt = this.storage.count();
        if (cnt == 0L) {
            this.sequence = new AtomicLong(0L);
            this.counter.set(0L);
        } else {
            this.sequence = new AtomicLong(this.storage.getLastSequenceValue());
            this.counter.set(cnt);
        }
        if (GlobalConfig.supportVirtualQueues()) {
            this.createDispatcher();
        }
        log.info("Create Queue Processor for '{}'.", (Object)queueName);
        log.info("Queue '{}' has {} message(s).", (Object)queueName, (Object)this.getQueuedMessagesCount());
    }

    protected long getMaxStaleAge() {
        return this.maxStaleAge;
    }

    protected void ack(String msgId) {
        if (log.isDebugEnabled()) {
            log.debug("Ack message . MsgId: '{}'.", (Object)msgId);
        }
        if (this.storage.deleteMessage(msgId)) {
            this.counter.decrementAndGet();
        }
    }

    public void add(MessageListener listener) {
        if (listener != null) {
            boolean success = false;
            if (listener.getType() == MessageListener.Type.LOCAL) {
                success = this.addLocal(listener);
            } else if (listener.getType() == MessageListener.Type.REMOTE) {
                success = this.addRemote(listener);
            }
            if (success) {
                listener.addStateChangeListener(this.getMessageListenerEventChangeHandler());
                this.deliverMessages();
            }
        } else {
            throw new IllegalArgumentException(String.format("Cannot add null listener to queue '%s'", this.queueName));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addLocal(MessageListener listener) {
        CopyOnWriteArraySet<MessageListener> copyOnWriteArraySet = this.localQueueListeners;
        synchronized (copyOnWriteArraySet) {
            if (this.localQueueListeners.add(listener)) {
                if (this.localQueueListeners.size() == 1) {
                    this.broadCastNewQueueConsumer(listener);
                }
                log.info("Add listener -> '{}'", (Object)listener.toString());
                return true;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addRemote(MessageListener listener) {
        Set<MessageListener> set = this.remoteQueueListeners;
        synchronized (set) {
            if (this.remoteQueueListeners.add(listener)) {
                log.info("Add listener -> '{}'", (Object)listener.toString());
                return true;
            }
            return false;
        }
    }

    private void broadCastActionQueueConsumer(String action) {
        Set<Channel> sessions = Gcs.getManagedConnectorSessions();
        for (Channel channel : sessions) {
            try {
                this.broadCastQueueInfo(action, channel);
            }
            catch (Throwable t) {
                log.error(t.getMessage(), t);
                try {
                    channel.close();
                }
                catch (Throwable ct) {
                    log.error(ct.getMessage(), ct);
                }
            }
        }
    }

    private void broadCastNewQueueConsumer(MessageListener listener) {
        log.info("Tell all peers about new queue consumer for: '{}' in channel '{}'", (Object)this.queueName, (Object)listener.getChannel().getRemoteAddressAsString());
        this.broadCastActionQueueConsumer("CREATE");
    }

    protected void broadCastQueueInfo(String action, Channel channel) {
        String ptemplate = "<sysmessage><action>%s</action><source-name>%s</source-name><source-ip>%s</source-ip><destination>%s</destination></sysmessage>";
        String payload = String.format(ptemplate, action, GcsInfo.getAgentName(), channel.localAddress().toString(), this.queueName);
        NetBrokerMessage brkMsg = new NetBrokerMessage(payload.getBytes(UTF8));
        brkMsg.setMessageId(MessageId.getMessageId());
        NetNotification notification = new NetNotification("/system/peer", NetAction.DestinationType.TOPIC, brkMsg, "/system/peer");
        NetAction naction = new NetAction(NetAction.ActionType.NOTIFICATION);
        naction.setNotificationMessage(notification);
        NetMessage nmsg = new NetMessage(naction);
        nmsg.getHeaders().put("TYPE", "SYSTEM_QUEUE");
        SystemMessagesPublisher.sendMessage(nmsg, channel);
    }

    private void broadCastRemovedQueueConsumer(MessageListener listener) {
        log.info("Tell all peers about deleted queue consumer for: '{}' in channel '{}'", (Object)this.queueName, (Object)listener.getChannel().getRemoteAddressAsString());
        this.broadCastActionQueueConsumer("DELETE");
    }

    public synchronized void clearStorage() {
        this.removeDispatcher();
        this.storage.deleteQueue();
    }

    private void createDispatcher() {
        try {
            if (StringUtils.contains((CharSequence)this.queueName, (CharSequence)"@")) {
                log.info("Get Dispatcher for: {}", (Object)this.queueName);
                String topicName = StringUtils.substringAfter((String)this.queueName, (String)"@");
                if (StringUtils.isBlank((CharSequence)topicName)) {
                    String errorMessage = String.format("Can't create a topic dispatcher whose name is empty. VirtualQueue name: '%s'", this.queueName);
                    log.error(errorMessage);
                    throw new RuntimeException(errorMessage);
                }
                this.topicFwd = new TopicToQueueDispatcher(null, topicName, this.queueName);
                VirtualQueueStorage.saveVirtualQueue(this.queueName);
                TopicProcessor topicProcessor = TopicProcessorList.get(topicName);
                if (topicProcessor != null) {
                    topicProcessor.add(this.topicFwd, false);
                }
            }
        }
        catch (Throwable e) {
            this.topicFwd = null;
            throw new RuntimeException(e);
        }
    }

    public long decrementQueuedMessagesCount() {
        return this.counter.decrementAndGet();
    }

    public void deleteExpiredMessages() {
        if (!this.hasRecipient()) {
            this.storage.deleteExpiredMessages();
        }
    }

    protected ForwardResult forward(NetMessage nmsg, boolean preferLocalConsumer) {
        boolean isLocalDelivery = true;
        ForwardResult result = this.notify(this.localQueueListeners, nmsg);
        if (!(result.result != ForwardResult.Result.FAILED || preferLocalConsumer && this.localQueueListeners.size() > 0)) {
            result = this.notify(this.remoteQueueListeners, nmsg);
            isLocalDelivery = false;
        }
        if (result.result != ForwardResult.Result.FAILED && isLocalDelivery) {
            this.queueStatistics.newQueueMessageDelivered();
        }
        return result;
    }

    public long getQueuedMessagesCount() {
        return this.counter.get();
    }

    public String getQueueName() {
        return this.queueName;
    }

    @Override
    public String getSubscriptionName() {
        return this.getQueueName();
    }

    private boolean hasActiveListeners(Set<MessageListener> listeners) {
        for (MessageListener ml : listeners) {
            if (!ml.isActive()) continue;
            return true;
        }
        return false;
    }

    private boolean hasReadyListeners(Set<MessageListener> listeners) {
        for (MessageListener ml : listeners) {
            if (!ml.isReady()) continue;
            return true;
        }
        return false;
    }

    protected boolean hasRecipient() {
        if (this.hasReadyListeners(this.localQueueListeners)) {
            return true;
        }
        if (this.hasActiveListeners(this.localQueueListeners)) {
            return false;
        }
        return this.hasReadyListeners(this.remoteQueueListeners);
    }

    @Override
    public Set<MessageListener> localListeners() {
        return this.localQueueListeners;
    }

    private ForwardResult notify(Set<MessageListener> listeners, NetMessage nmsg) {
        int s = listeners.size();
        if (s == 0) {
            return failed;
        }
        int n = (int)this.currentIdx.get();
        try {
            int idx = 0;
            for (MessageListener ml : listeners) {
                if (idx++ < n || !ml.isReady()) continue;
                this.currentIdx.set(idx);
                return ml.onMessage((DeliverableMessage)nmsg);
            }
            idx = 0;
            for (MessageListener ml : listeners) {
                if (idx++ >= n) break;
                if (!ml.isReady()) continue;
                this.currentIdx.set(idx);
                return ml.onMessage((DeliverableMessage)nmsg);
            }
            return failed;
        }
        catch (Throwable t) {
            return failed;
        }
    }

    @Override
    public Set<MessageListener> remoteListeners() {
        return this.remoteQueueListeners;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void remove(MessageListener listener) {
        if (listener != null) {
            Set<MessageListener> set;
            boolean removed = false;
            if (listener.getType() == MessageListener.Type.LOCAL) {
                set = this.localQueueListeners;
                synchronized (set) {
                    if (this.localQueueListeners.remove(listener)) {
                        log.info("Removed local listener -> '{}'", (Object)listener.toString());
                        if (this.localQueueListeners.size() == 0) {
                            this.broadCastRemovedQueueConsumer(listener);
                        }
                        removed = true;
                    }
                }
            }
            if (listener.getType() == MessageListener.Type.REMOTE) {
                set = this.remoteQueueListeners;
                synchronized (set) {
                    if (this.remoteQueueListeners.remove(listener)) {
                        log.info("Removed remote listener -> '{}'", (Object)listener.toString());
                        removed = true;
                    }
                }
            }
            if (removed) {
                listener.removeStateChangeListener(this.getMessageListenerEventChangeHandler());
            }
        } else {
            log.error(String.format("Cannot remove null listener to queue '%s'", this.queueName));
        }
    }

    private void removeDispatcher() {
        try {
            if (this.topicFwd != null) {
                Gcs.removeAsyncConsumer(this.topicFwd);
                VirtualQueueStorage.deleteVirtualQueue(this.getQueueName());
            }
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    public void setSequenceNumber(long seqNumber) {
        this.sequence.set(seqNumber);
    }

    public int size() {
        return this.localQueueListeners.size() + this.remoteQueueListeners.size();
    }

    public long getLastCycle() {
        return this.lastCycle.get();
    }

    public void store(NetMessage nmsg, boolean preferLocalConsumer) {
        long seq_nr = this.sequence.incrementAndGet();
        String msg_id = MessageId.getBaseMessageId() + seq_nr;
        nmsg.getAction().getNotificationMessage().getMessage().setMessageId(msg_id);
        BDBMessage bdbm = new BDBMessage(nmsg, seq_nr, preferLocalConsumer);
        this.doStore(msg_id, bdbm);
        this.deliverMessages();
    }

    private void doStore(String msg_id, BDBMessage bdbm) {
        this.storage.insert(bdbm);
        this.counter.incrementAndGet();
        if (log.isDebugEnabled()) {
            log.debug(String.format("Stored message with id '%s'.", msg_id));
        }
    }

    public long lastMessageDelivered() {
        return this.lastMessageDelivered.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void wakeup() {
        if (this.isWorking.getAndSet(true)) {
            if (log.isDebugEnabled()) {
                log.debug("Queue '{}' is running, skip wakeup", (Object)this.queueName);
            }
            return;
        }
        long cnt = this.getQueuedMessagesCount();
        if (cnt > 0L) {
            this.emptyQueueInfoDisplay.set(false);
            if (this.hasReadyListeners(this.localQueueListeners) || this.hasReadyListeners(this.remoteQueueListeners)) {
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Wakeup queue '{}'", (Object)this.queueName);
                    }
                    long nextCycleDelay = this.storage.recoverMessages();
                    long now = System.currentTimeMillis();
                    long nextCycleTime = now + nextCycleDelay;
                    if (nextCycleDelay != 0L && nextCycleTime < this.nextRun.get() || nextCycleDelay != 0L && this.nextRun.get() < now) {
                        this.nextRun.set(nextCycleTime);
                        GcsExecutor.schedule(new Runnable(){

                            @Override
                            public void run() {
                                QueueProcessor.this.deliverMessages();
                            }
                        }, nextCycleDelay, TimeUnit.MILLISECONDS);
                    }
                    this.lastMessageDelivered.set(now);
                }
                catch (Throwable t) {
                    Throwable r = ErrorAnalyser.findRootCause((Throwable)t);
                    log.error(r.getMessage(), r);
                }
                finally {
                    this.isWorking.set(false);
                }
            }
        }
        this.isWorking.set(false);
    }

    public QueueStatistics getQueueStatistics() {
        return this.queueStatistics;
    }

    protected boolean deliverMessages() {
        boolean set = this.deliveringMessages.compareAndSet(false, true);
        this.deliveryRequest.set(true);
        if (set) {
            GcsExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    while (QueueProcessor.this.deliveryRequest.compareAndSet(true, false)) {
                        QueueProcessor.this.wakeup();
                    }
                    QueueProcessor.this.deliveringMessages.set(false);
                    QueueProcessor.this.lastCycle.set(System.currentTimeMillis());
                }
            });
        }
        return set;
    }

    private MessageListenerEventChangeHandler getMessageListenerEventChangeHandler() {
        return this.msgListenterEventHandler;
    }

    @Override
    public boolean hasLocalListeners() {
        return this.localListeners().size() > 0;
    }

    @Override
    public boolean hasRemoteListeners() {
        return this.remoteListeners().size() > 0;
    }

    public List<NetMessage> getMessages() {
        List<BDBMessage> msgs = this.storage.getMessages();
        ArrayList<NetMessage> lista = new ArrayList<NetMessage>(msgs.size());
        for (BDBMessage message : msgs) {
            lista.add(message.getMessage());
        }
        return lista;
    }
}

