package pt.com.gcs.messaging;

import io.netty.channel.Channel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.ErrorAnalyser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.types.ForwardResult;
import pt.com.broker.types.MessageListener;
import pt.com.broker.types.MessageListenerEventChangeHandler;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.gcs.conf.GcsInfo;
import pt.com.gcs.conf.GlobalConfig;

/* loaded from: input_file:pt/com/gcs/messaging/QueueProcessor.class */
public class QueueProcessor implements SubscriptionProcessor {
    private static Logger log = LoggerFactory.getLogger(QueueProcessor.class);
    private static final ForwardResult failed = new ForwardResult(ForwardResult.Result.FAILED);
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private final String queueName;
    private final AtomicLong sequence;
    private final BDBStorage storage;
    private TopicToQueueDispatcher topicFwd;
    private long maxStaleAge;
    protected final AtomicBoolean emptyQueueInfoDisplay = new AtomicBoolean(false);
    protected final AtomicLong counter = new AtomicLong(0);
    private final AtomicBoolean isWorking = new AtomicBoolean(false);
    private final CopyOnWriteArraySet<MessageListener> localQueueListeners = new CopyOnWriteArraySet<>();
    private final Set<MessageListener> remoteQueueListeners = new CopyOnWriteArraySet();
    private final QueueStatistics queueStatistics = new QueueStatistics();
    private final AtomicBoolean deliveringMessages = new AtomicBoolean(false);
    private final AtomicLong nextRun = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong lastCycle = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong currentIdx = new AtomicLong(0);
    private final AtomicLong lastMessageDelivered = new AtomicLong(System.currentTimeMillis());
    private AtomicBoolean deliveryRequest = new AtomicBoolean(false);
    private MessageListenerEventChangeHandler msgListenterEventHandler = new MessageListenerEventChangeHandler() { // from class: pt.com.gcs.messaging.QueueProcessor.3
        public void stateChanged(MessageListener messageListener, MessageListener.MessageListenerState messageListenerState) {
            if (messageListener.isReady() && messageListener.isActive()) {
                QueueProcessor.this.deliverMessages();
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: protected */
    public QueueProcessor(String str, long j) {
        this.maxStaleAge = j;
        if (StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Queue names can not be blank");
        }
        this.queueName = str;
        this.storage = new BDBStorage(this);
        long count = this.storage.count();
        if (count == 0) {
            this.sequence = new AtomicLong(0L);
            this.counter.set(0L);
        } else {
            this.sequence = new AtomicLong(this.storage.getLastSequenceValue());
            this.counter.set(count);
        }
        if (GlobalConfig.supportVirtualQueues()) {
            createDispatcher();
        }
        log.info("Create Queue Processor for '{}'.", str);
        log.info("Queue '{}' has {} message(s).", str, Long.valueOf(getQueuedMessagesCount()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getMaxStaleAge() {
        return this.maxStaleAge;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void ack(String str) {
        if (log.isDebugEnabled()) {
            log.debug("Ack message . MsgId: '{}'.", str);
        }
        if (this.storage.deleteMessage(str)) {
            this.counter.decrementAndGet();
        }
    }

    public void add(MessageListener messageListener) {
        if (messageListener == null) {
            throw new IllegalArgumentException(String.format("Cannot add null listener to queue '%s'", this.queueName));
        }
        boolean z = false;
        if (messageListener.getType() == MessageListener.Type.LOCAL) {
            z = addLocal(messageListener);
        } else if (messageListener.getType() == MessageListener.Type.REMOTE) {
            z = addRemote(messageListener);
        }
        if (z) {
            messageListener.addStateChangeListener(getMessageListenerEventChangeHandler());
            deliverMessages();
        }
    }

    private boolean addLocal(MessageListener messageListener) {
        synchronized (this.localQueueListeners) {
            if (!this.localQueueListeners.add(messageListener)) {
                return false;
            }
            if (this.localQueueListeners.size() == 1) {
                broadCastNewQueueConsumer(messageListener);
            }
            log.info("Add listener -> '{}'", messageListener.toString());
            return true;
        }
    }

    private boolean addRemote(MessageListener messageListener) {
        synchronized (this.remoteQueueListeners) {
            if (!this.remoteQueueListeners.add(messageListener)) {
                return false;
            }
            log.info("Add listener -> '{}'", messageListener.toString());
            return true;
        }
    }

    private void broadCastActionQueueConsumer(String str) {
        for (Channel channel : Gcs.getManagedConnectorSessions()) {
            try {
                broadCastQueueInfo(str, channel);
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
                try {
                    channel.close();
                } catch (Throwable th2) {
                    log.error(th2.getMessage(), th2);
                }
            }
        }
    }

    private void broadCastNewQueueConsumer(MessageListener messageListener) {
        log.info("Tell all peers about new queue consumer for: '{}' in channel '{}'", this.queueName, messageListener.getChannel().getRemoteAddressAsString());
        broadCastActionQueueConsumer("CREATE");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void broadCastQueueInfo(String str, Channel channel) {
        NetBrokerMessage netBrokerMessage = new NetBrokerMessage(String.format("<sysmessage><action>%s</action><source-name>%s</source-name><source-ip>%s</source-ip><destination>%s</destination></sysmessage>", str, GcsInfo.getAgentName(), channel.localAddress().toString(), this.queueName).getBytes(UTF8));
        netBrokerMessage.setMessageId(MessageId.getMessageId());
        NetNotification netNotification = new NetNotification("/system/peer", NetAction.DestinationType.TOPIC, netBrokerMessage, "/system/peer");
        NetAction netAction = new NetAction(NetAction.ActionType.NOTIFICATION);
        netAction.setNotificationMessage(netNotification);
        NetMessage netMessage = new NetMessage(netAction);
        netMessage.getHeaders().put("TYPE", "SYSTEM_QUEUE");
        SystemMessagesPublisher.sendMessage(netMessage, channel);
    }

    private void broadCastRemovedQueueConsumer(MessageListener messageListener) {
        log.info("Tell all peers about deleted queue consumer for: '{}' in channel '{}'", this.queueName, messageListener.getChannel().getRemoteAddressAsString());
        broadCastActionQueueConsumer("DELETE");
    }

    public synchronized void clearStorage() {
        removeDispatcher();
        this.storage.deleteQueue();
    }

    private void createDispatcher() {
        try {
            if (StringUtils.contains(this.queueName, "@")) {
                log.info("Get Dispatcher for: {}", this.queueName);
                String substringAfter = StringUtils.substringAfter(this.queueName, "@");
                if (StringUtils.isBlank(substringAfter)) {
                    String format = String.format("Can't create a topic dispatcher whose name is empty. VirtualQueue name: '%s'", this.queueName);
                    log.error(format);
                    throw new RuntimeException(format);
                }
                this.topicFwd = new TopicToQueueDispatcher(null, substringAfter, this.queueName);
                VirtualQueueStorage.saveVirtualQueue(this.queueName);
                TopicProcessor topicProcessor = TopicProcessorList.get(substringAfter);
                if (topicProcessor != null) {
                    topicProcessor.add(this.topicFwd, false);
                }
            }
        } catch (Throwable th) {
            this.topicFwd = null;
            throw new RuntimeException(th);
        }
    }

    public long decrementQueuedMessagesCount() {
        return this.counter.decrementAndGet();
    }

    public void deleteExpiredMessages() {
        if (hasRecipient()) {
            return;
        }
        this.storage.deleteExpiredMessages();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ForwardResult forward(NetMessage netMessage, boolean z) {
        boolean z2 = true;
        ForwardResult notify = notify(this.localQueueListeners, netMessage);
        if (notify.result == ForwardResult.Result.FAILED && (!z || this.localQueueListeners.size() <= 0)) {
            notify = notify(this.remoteQueueListeners, netMessage);
            z2 = false;
        }
        if (notify.result != ForwardResult.Result.FAILED && z2) {
            this.queueStatistics.newQueueMessageDelivered();
        }
        return notify;
    }

    public long getQueuedMessagesCount() {
        return this.counter.get();
    }

    public String getQueueName() {
        return this.queueName;
    }

    @Override // pt.com.gcs.messaging.SubscriptionProcessor
    public String getSubscriptionName() {
        return getQueueName();
    }

    private boolean hasActiveListeners(Set<MessageListener> set) {
        Iterator<MessageListener> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().isActive()) {
                return true;
            }
        }
        return false;
    }

    private boolean hasReadyListeners(Set<MessageListener> set) {
        Iterator<MessageListener> it = set.iterator();
        while (it.hasNext()) {
            if (it.next().isReady()) {
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasRecipient() {
        if (hasReadyListeners(this.localQueueListeners)) {
            return true;
        }
        if (hasActiveListeners(this.localQueueListeners)) {
            return false;
        }
        return hasReadyListeners(this.remoteQueueListeners);
    }

    @Override // pt.com.gcs.messaging.SubscriptionProcessor
    public Set<MessageListener> localListeners() {
        return this.localQueueListeners;
    }

    private ForwardResult notify(Set<MessageListener> set, NetMessage netMessage) {
        if (set.size() == 0) {
            return failed;
        }
        int i = (int) this.currentIdx.get();
        try {
            int i2 = 0;
            for (MessageListener messageListener : set) {
                int i3 = i2;
                i2++;
                if (i3 >= i && messageListener.isReady()) {
                    this.currentIdx.set(i2);
                    return messageListener.onMessage(netMessage);
                }
            }
            int i4 = 0;
            for (MessageListener messageListener2 : set) {
                int i5 = i4;
                i4++;
                if (i5 >= i) {
                    break;
                }
                if (messageListener2.isReady()) {
                    this.currentIdx.set(i4);
                    return messageListener2.onMessage(netMessage);
                }
            }
            return failed;
        } catch (Throwable th) {
            return failed;
        }
    }

    @Override // pt.com.gcs.messaging.SubscriptionProcessor
    public Set<MessageListener> remoteListeners() {
        return this.remoteQueueListeners;
    }

    public void remove(MessageListener messageListener) {
        if (messageListener == null) {
            log.error(String.format("Cannot remove null listener to queue '%s'", this.queueName));
            return;
        }
        boolean z = false;
        if (messageListener.getType() == MessageListener.Type.LOCAL) {
            synchronized (this.localQueueListeners) {
                if (this.localQueueListeners.remove(messageListener)) {
                    log.info("Removed local listener -> '{}'", messageListener.toString());
                    if (this.localQueueListeners.size() == 0) {
                        broadCastRemovedQueueConsumer(messageListener);
                    }
                    z = true;
                }
            }
        }
        if (messageListener.getType() == MessageListener.Type.REMOTE) {
            synchronized (this.remoteQueueListeners) {
                if (this.remoteQueueListeners.remove(messageListener)) {
                    log.info("Removed remote listener -> '{}'", messageListener.toString());
                    z = true;
                }
            }
        }
        if (z) {
            messageListener.removeStateChangeListener(getMessageListenerEventChangeHandler());
        }
    }

    private void removeDispatcher() {
        try {
            if (this.topicFwd != null) {
                Gcs.removeAsyncConsumer(this.topicFwd);
                VirtualQueueStorage.deleteVirtualQueue(getQueueName());
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    public void setSequenceNumber(long j) {
        this.sequence.set(j);
    }

    public int size() {
        return this.localQueueListeners.size() + this.remoteQueueListeners.size();
    }

    public long getLastCycle() {
        return this.lastCycle.get();
    }

    public void store(NetMessage netMessage, boolean z) {
        long incrementAndGet = this.sequence.incrementAndGet();
        String str = MessageId.getBaseMessageId() + incrementAndGet;
        netMessage.getAction().getNotificationMessage().getMessage().setMessageId(str);
        doStore(str, new BDBMessage(netMessage, incrementAndGet, z));
        deliverMessages();
    }

    private void doStore(String str, BDBMessage bDBMessage) {
        this.storage.insert(bDBMessage);
        this.counter.incrementAndGet();
        if (log.isDebugEnabled()) {
            log.debug(String.format("Stored message with id '%s'.", str));
        }
    }

    public long lastMessageDelivered() {
        return this.lastMessageDelivered.get();
    }

    protected final void wakeup() {
        if (this.isWorking.getAndSet(true)) {
            if (log.isDebugEnabled()) {
                log.debug("Queue '{}' is running, skip wakeup", this.queueName);
                return;
            }
            return;
        }
        if (getQueuedMessagesCount() > 0) {
            this.emptyQueueInfoDisplay.set(false);
            if (hasReadyListeners(this.localQueueListeners) || hasReadyListeners(this.remoteQueueListeners)) {
                try {
                    try {
                        if (log.isDebugEnabled()) {
                            log.debug("Wakeup queue '{}'", this.queueName);
                        }
                        long recoverMessages = this.storage.recoverMessages();
                        long currentTimeMillis = System.currentTimeMillis();
                        long j = currentTimeMillis + recoverMessages;
                        if ((recoverMessages != 0 && j < this.nextRun.get()) || (recoverMessages != 0 && this.nextRun.get() < currentTimeMillis)) {
                            this.nextRun.set(j);
                            GcsExecutor.schedule(new Runnable() { // from class: pt.com.gcs.messaging.QueueProcessor.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    QueueProcessor.this.deliverMessages();
                                }
                            }, recoverMessages, TimeUnit.MILLISECONDS);
                        }
                        this.lastMessageDelivered.set(currentTimeMillis);
                        this.isWorking.set(false);
                    } catch (Throwable th) {
                        Throwable findRootCause = ErrorAnalyser.findRootCause(th);
                        log.error(findRootCause.getMessage(), findRootCause);
                        this.isWorking.set(false);
                    }
                } catch (Throwable th2) {
                    this.isWorking.set(false);
                    throw th2;
                }
            }
        }
        this.isWorking.set(false);
    }

    public QueueStatistics getQueueStatistics() {
        return this.queueStatistics;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean deliverMessages() {
        boolean compareAndSet = this.deliveringMessages.compareAndSet(false, true);
        this.deliveryRequest.set(true);
        if (compareAndSet) {
            GcsExecutor.execute(new Runnable() { // from class: pt.com.gcs.messaging.QueueProcessor.2
                @Override // java.lang.Runnable
                public void run() {
                    while (QueueProcessor.this.deliveryRequest.compareAndSet(true, false)) {
                        QueueProcessor.this.wakeup();
                    }
                    QueueProcessor.this.deliveringMessages.set(false);
                    QueueProcessor.this.lastCycle.set(System.currentTimeMillis());
                }
            });
        }
        return compareAndSet;
    }

    private MessageListenerEventChangeHandler getMessageListenerEventChangeHandler() {
        return this.msgListenterEventHandler;
    }

    @Override // pt.com.gcs.messaging.SubscriptionProcessor
    public boolean hasLocalListeners() {
        return localListeners().size() > 0;
    }

    @Override // pt.com.gcs.messaging.SubscriptionProcessor
    public boolean hasRemoteListeners() {
        return remoteListeners().size() > 0;
    }

    public List<NetMessage> getMessages() {
        List<BDBMessage> messages = this.storage.getMessages();
        ArrayList arrayList = new ArrayList(messages.size());
        Iterator<BDBMessage> it = messages.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getMessage());
        }
        return arrayList;
    }
}
