package pt.com.broker.messaging;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.auth.AccessControl;
import pt.com.broker.types.ForwardResult;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.channels.ListenerChannel;

/* loaded from: input_file:pt/com/broker/messaging/BrokerTopicListener.class */
public class BrokerTopicListener extends BrokerListener {
    private static final Logger log = LoggerFactory.getLogger(BrokerTopicListener.class);
    private static final ForwardResult failed = new ForwardResult(ForwardResult.Result.FAILED);
    private static final ForwardResult success = new ForwardResult(ForwardResult.Result.SUCCESS);
    private static final long MAX_WRITE_TIME = 125000000;
    private volatile long droppedMessages;
    private volatile boolean showSuspendedDeliveryMessage;
    private volatile boolean showResumedDeliveryMessage;
    private AtomicLong startDeliverAfter;

    public BrokerTopicListener(ListenerChannel listenerChannel, String str) {
        super(listenerChannel, str);
        this.droppedMessages = 0L;
        this.showSuspendedDeliveryMessage = false;
        this.showResumedDeliveryMessage = false;
        this.startDeliverAfter = new AtomicLong(System.nanoTime());
    }

    public boolean isAckRequired() {
        return false;
    }

    public NetAction.DestinationType getSourceDestinationType() {
        return NetAction.DestinationType.TOPIC;
    }

    public NetAction.DestinationType getTargetDestinationType() {
        return NetAction.DestinationType.TOPIC;
    }

    @Override // pt.com.broker.messaging.BrokerListener
    protected ForwardResult doOnMessage(NetMessage netMessage) {
        ListenerChannel channel = getChannel();
        ForwardResult forwardResult = success;
        try {
            if (channel.isWritable()) {
                getChannel().resetDeliveryTries();
                if (this.showResumedDeliveryMessage) {
                    log.info(String.format("Stopped discarding messages for topic '%s' and session '%s'. Dropped messages: %s", getsubscriptionKey(), channel.getRemoteAddressAsString(), Long.valueOf(this.droppedMessages)));
                    this.droppedMessages = 0L;
                    this.showResumedDeliveryMessage = false;
                }
                if (!deliveryAllowed(netMessage)) {
                    return failed;
                }
                channel.writeAndFlush(netMessage);
                this.showSuspendedDeliveryMessage = true;
            } else if (!isReady()) {
                this.showResumedDeliveryMessage = true;
                if (this.showSuspendedDeliveryMessage) {
                    log.info("Started discarding messages for topic '{}' and session '{}'.", getsubscriptionKey(), channel.getRemoteAddressAsString());
                    this.showSuspendedDeliveryMessage = false;
                }
                this.droppedMessages++;
                forwardResult = failed;
            } else {
                if (!deliveryAllowed(netMessage)) {
                    return failed;
                }
                final ListenerChannel channel2 = getChannel();
                if (channel2.getDeliveryTries() >= 100) {
                    this.droppedMessages++;
                    return failed;
                }
                channel2.incrementAndGetDeliveryTries();
                ChannelFuture writeAndFlush = channel.writeAndFlush(netMessage);
                final long nanoTime = System.nanoTime();
                this.startDeliverAfter.set(nanoTime + 10000);
                writeAndFlush.addListener(new ChannelFutureListener() { // from class: pt.com.broker.messaging.BrokerTopicListener.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        channel2.decrementAndGetDeliveryTries();
                        long nanoTime2 = System.nanoTime() - nanoTime;
                        if (nanoTime2 >= BrokerTopicListener.MAX_WRITE_TIME) {
                            BrokerTopicListener.this.startDeliverAfter.set(System.nanoTime() + (nanoTime2 / 2));
                        }
                    }
                });
            }
        } catch (Throwable th) {
            log.error("Error on message listener for '{}': {}", th.getMessage(), getsubscriptionKey());
            try {
                channel.getPipeline().get("broker-handler").exceptionCaught(channel.getChannelContext(), th, null);
            } catch (Throwable th2) {
                log.error("Could not propagate error to the client session! Message: {}", th2.getMessage());
            }
        }
        return forwardResult;
    }

    private boolean deliveryAllowed(NetMessage netMessage) {
        NetNotification notificationMessage = netMessage.getAction().getNotificationMessage();
        return AccessControl.deliveryAllowed(netMessage, NetAction.DestinationType.TOPIC, getChannel().getChannel(), getsubscriptionKey(), notificationMessage.getDestination());
    }

    public boolean isReady() {
        return System.nanoTime() > this.startDeliverAfter.get();
    }

    public boolean isActive() {
        return true;
    }
}
