package pt.com.broker.client.nio;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.nio.BaseClient;
import pt.com.broker.client.nio.bootstrap.Bootstrap;
import pt.com.broker.client.nio.bootstrap.ChannelInitializer;
import pt.com.broker.client.nio.consumer.BrokerAsyncConsumer;
import pt.com.broker.client.nio.consumer.ConsumerManager;
import pt.com.broker.client.nio.consumer.PendingAcceptRequestsManager;
import pt.com.broker.client.nio.consumer.PongConsumerManager;
import pt.com.broker.client.nio.events.BrokerListener;
import pt.com.broker.client.nio.events.NotificationListenerAdapter;
import pt.com.broker.client.nio.exceptions.SubscriptionNotFound;
import pt.com.broker.client.nio.handlers.timeout.TimeoutException;
import pt.com.broker.client.nio.server.HostContainer;
import pt.com.broker.client.nio.server.HostInfo;
import pt.com.broker.client.nio.server.ReconnectEvent;
import pt.com.broker.client.nio.utils.ChannelWrapperFuture;
import pt.com.broker.client.nio.utils.HostInfoFuture;
import pt.com.broker.client.nio.utils.NetNotificationDecorator;
import pt.com.broker.types.NetAcknowledge;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetFault;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPing;
import pt.com.broker.types.NetPoll;
import pt.com.broker.types.NetProtocolType;
import pt.com.broker.types.NetPublish;
import pt.com.broker.types.NetSubscribe;
import pt.com.broker.types.NetSubscribeAction;
import pt.com.broker.types.NetUnsubscribe;

/* loaded from: input_file:pt/com/broker/client/nio/BrokerClient.class */
public class BrokerClient extends BaseClient implements Observer {
    private static final Logger log = LoggerFactory.getLogger(BrokerClient.class);
    private ConsumerManager consumerManager;
    private PongConsumerManager pongConsumerManager;
    private PendingAcceptRequestsManager acceptRequestsManager;
    private ChannelInitializer channelInitializer;
    ExecutorService executorService;
    private final CompletionService<HostInfo> service;
    private final CompletionService<HostInfo> unsubcribeService;

    public BrokerClient(NetProtocolType netProtocolType) {
        super(netProtocolType);
        this.executorService = Executors.newFixedThreadPool(10);
        this.service = new ExecutorCompletionService(this.executorService);
        this.unsubcribeService = new ExecutorCompletionService(this.executorService);
    }

    public BrokerClient(String str, int i) {
        super(str, i);
        this.executorService = Executors.newFixedThreadPool(10);
        this.service = new ExecutorCompletionService(this.executorService);
        this.unsubcribeService = new ExecutorCompletionService(this.executorService);
    }

    public BrokerClient(String str, int i, NetProtocolType netProtocolType) {
        super(str, i, netProtocolType);
        this.executorService = Executors.newFixedThreadPool(10);
        this.service = new ExecutorCompletionService(this.executorService);
        this.unsubcribeService = new ExecutorCompletionService(this.executorService);
    }

    public BrokerClient(HostInfo hostInfo, NetProtocolType netProtocolType) {
        super(hostInfo, netProtocolType);
        this.executorService = Executors.newFixedThreadPool(10);
        this.service = new ExecutorCompletionService(this.executorService);
        this.unsubcribeService = new ExecutorCompletionService(this.executorService);
    }

    @Override // pt.com.broker.client.nio.BaseClient
    protected void init() {
        setPongConsumerManager(new PongConsumerManager());
        setConsumerManager(new ConsumerManager());
        this.channelInitializer = new ChannelInitializer(getSerializer(), getConsumerManager(), getPongConsumerManager());
        this.channelInitializer.setOldFraming(getProtocolType() == NetProtocolType.SOAP_v0);
        setBootstrap(new Bootstrap(this.channelInitializer));
        setAcceptRequestsManager(new PendingAcceptRequestsManager());
        this.channelInitializer.setAcceptRequestsManager(getAcceptRequestsManager());
        HostContainer hostContainer = new HostContainer(getBootstrap());
        hostContainer.addObserver(this);
        setHosts(hostContainer);
    }

    @Override // pt.com.broker.client.nio.BaseClient
    public Future<HostInfo> publish(String str, String str2, NetAction.DestinationType destinationType) {
        return publish(str.getBytes(), str2, destinationType);
    }

    @Override // pt.com.broker.client.nio.BaseClient
    public Future<HostInfo> publish(byte[] bArr, String str, NetAction.DestinationType destinationType) {
        return publish(new NetBrokerMessage(bArr), str, destinationType);
    }

    @Override // pt.com.broker.client.nio.BaseClient
    public Future<HostInfo> publish(NetBrokerMessage netBrokerMessage, String str, NetAction.DestinationType destinationType) {
        return publish(netBrokerMessage, str, destinationType, null);
    }

    public Future<HostInfo> publish(NetBrokerMessage netBrokerMessage, String str, NetAction.DestinationType destinationType, AcceptRequest acceptRequest) {
        if (netBrokerMessage == null || StringUtils.isBlank(str)) {
            throw new IllegalArgumentException("Mal-formed Enqueue request");
        }
        NetPublish netPublish = new NetPublish(str, destinationType, netBrokerMessage);
        if (acceptRequest != null) {
            netPublish.setActionId(acceptRequest.getActionId());
            addAcceptMessageHandler(acceptRequest);
        }
        return sendNetMessage(new NetMessage(new NetAction(netPublish), netBrokerMessage.getHeaders()));
    }

    public Future<HostInfo> subscribe(String str, NetAction.DestinationType destinationType, BrokerListener brokerListener) throws InterruptedException {
        return subscribe(new NetSubscribe(str, destinationType), brokerListener, (AcceptRequest) null);
    }

    public Future<HostInfo> subscribe(NetSubscribeAction netSubscribeAction, BrokerListener brokerListener) throws InterruptedException {
        return subscribe(netSubscribeAction, brokerListener, (AcceptRequest) null);
    }

    public Future<HostInfo> subscribe(final NetSubscribeAction netSubscribeAction, final BrokerListener brokerListener, AcceptRequest acceptRequest) throws InterruptedException {
        Collection<HostInfo> connectedHosts;
        if (netSubscribeAction.getDestinationType() == NetAction.DestinationType.TOPIC) {
            connectedHosts = new ArrayList();
            connectedHosts.add(getAvailableHost());
        } else {
            connectedHosts = getHosts().getConnectedHosts();
        }
        if (connectedHosts.size() == 0) {
            return new BaseClient.HostNotAvailableFuture();
        }
        if (acceptRequest != null) {
            netSubscribeAction.setActionId(acceptRequest.getActionId());
            addAcceptMessageHandler(acceptRequest);
        }
        for (final HostInfo hostInfo : connectedHosts) {
            this.service.submit(new Callable<HostInfo>() { // from class: pt.com.broker.client.nio.BrokerClient.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public HostInfo call() throws Exception {
                    ChannelWrapperFuture subscribeToHost = BrokerClient.this.subscribeToHost(netSubscribeAction, brokerListener, hostInfo);
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    subscribeToHost.getInstance().addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: pt.com.broker.client.nio.BrokerClient.1.1
                        @Override // io.netty.util.concurrent.GenericFutureListener
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await();
                    return hostInfo;
                }
            });
        }
        return this.service.take();
    }

    private HostInfoFuture<HostInfo> subscribeToHost(NetSubscribeAction netSubscribeAction, BrokerListener brokerListener) {
        return subscribeToHost(netSubscribeAction, brokerListener, getAvailableHost());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelWrapperFuture subscribeToHost(final NetSubscribeAction netSubscribeAction, BrokerListener brokerListener, final HostInfo hostInfo) {
        if (brokerListener == null) {
            throw new IllegalArgumentException("Invalid Listener");
        }
        if (netSubscribeAction.getDestinationType() == NetAction.DestinationType.VIRTUAL_QUEUE && !netSubscribeAction.getDestination().contains("@")) {
            throw new IllegalArgumentException("Invalid name format for virtual queue");
        }
        NetAction netAction = null;
        if (netSubscribeAction instanceof NetPoll) {
            netAction = new NetAction((NetPoll) netSubscribeAction);
        }
        if (netSubscribeAction instanceof NetSubscribe) {
            netAction = new NetAction((NetSubscribe) netSubscribeAction);
        }
        NetMessage buildMessage = buildMessage(netAction, netSubscribeAction.getHeaders());
        getConsumerManager().addSubscription(netSubscribeAction, brokerListener, hostInfo);
        if (brokerListener instanceof NotificationListenerAdapter) {
            ((NotificationListenerAdapter) brokerListener).setBrokerClient(this);
        }
        ChannelWrapperFuture sendNetMessage = sendNetMessage(buildMessage, hostInfo);
        sendNetMessage.getInstance().addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: pt.com.broker.client.nio.BrokerClient.2
            @Override // io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    return;
                }
                BrokerClient.this.getConsumerManager().removeSubscription(netSubscribeAction, hostInfo);
                BrokerClient.log.debug("Error creating async consumer", channelFuture.cause());
            }
        });
        return sendNetMessage;
    }

    public Future<HostInfo> unsubscribe(final NetAction.DestinationType destinationType, final String str) {
        int i = 0;
        Iterator<Map.Entry<String, BrokerAsyncConsumer>> it = getConsumerManager().getSubscriptions(destinationType).entrySet().iterator();
        while (it.hasNext()) {
            BrokerAsyncConsumer value = it.next().getValue();
            if (value.getDestinationName().equals(str) && value.getHost() != null) {
                final HostInfo host = value.getHost();
                i++;
                this.unsubcribeService.submit(new Callable<HostInfo>() { // from class: pt.com.broker.client.nio.BrokerClient.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public HostInfo call() throws Exception {
                        ChannelWrapperFuture unsubscribeHost = BrokerClient.this.unsubscribeHost(destinationType, str, host);
                        final CountDownLatch countDownLatch = new CountDownLatch(1);
                        unsubscribeHost.getInstance().addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: pt.com.broker.client.nio.BrokerClient.3.1
                            @Override // io.netty.util.concurrent.GenericFutureListener
                            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                                countDownLatch.countDown();
                            }
                        });
                        countDownLatch.await();
                        return host;
                    }
                });
            }
        }
        if (i == 0) {
            return new BaseClient.ExceptionFuture(new SubscriptionNotFound("No subscriptions found"));
        }
        try {
            return this.unsubcribeService.take();
        } catch (InterruptedException e) {
            return new BaseClient.ExceptionFuture(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelWrapperFuture unsubscribeHost(final NetAction.DestinationType destinationType, final String str, final HostInfo hostInfo) {
        ChannelWrapperFuture sendNetMessage = sendNetMessage(new NetMessage(new NetAction(new NetUnsubscribe(str, destinationType))), hostInfo);
        sendNetMessage.getInstance().addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: pt.com.broker.client.nio.BrokerClient.4
            @Override // io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    BrokerClient.this.getConsumerManager().removeSubscription(destinationType, str, hostInfo);
                } else {
                    BrokerClient.log.error("was not possible to unsubscribe", channelFuture.cause());
                }
            }
        });
        return sendNetMessage;
    }

    public Future<HostInfo> acknowledge(NetNotification netNotification, HostInfo hostInfo) throws Throwable {
        if (!(netNotification instanceof NetNotificationDecorator)) {
            throw new Exception("Invalid NetNotification");
        }
        if (netNotification.getDestinationType() == NetAction.DestinationType.TOPIC) {
            return null;
        }
        if (netNotification.getMessage() == null || StringUtils.isBlank(netNotification.getMessage().getMessageId())) {
            throw new IllegalArgumentException("Can't acknowledge invalid message.");
        }
        return sendNetMessage(buildMessage(new NetAction(new NetAcknowledge(netNotification.getSubscription(), netNotification.getMessage().getMessageId()))), hostInfo);
    }

    public Future<HostInfo> acknowledge(NetNotification netNotification) throws Throwable {
        return acknowledge(netNotification, ((NetNotificationDecorator) netNotification).getHost());
    }

    public Future<HostInfo> checkStatus(BrokerListener brokerListener) throws Throwable {
        final String uuid = UUID.randomUUID().toString();
        NetMessage buildMessage = buildMessage(new NetAction(new NetPing(uuid)));
        getPongConsumerManager().addSubscription(uuid, brokerListener);
        HostInfoFuture sendNetMessage = sendNetMessage(buildMessage);
        if (sendNetMessage instanceof ChannelWrapperFuture) {
            ((ChannelWrapperFuture) sendNetMessage).getInstance().addListener2((GenericFutureListener<? extends io.netty.util.concurrent.Future<? super Void>>) new ChannelFutureListener() { // from class: pt.com.broker.client.nio.BrokerClient.5
                @Override // io.netty.util.concurrent.GenericFutureListener
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    BrokerClient.this.getPongConsumerManager().removeSubscription(uuid);
                    BrokerClient.log.error("Was not possible to check Status", channelFuture.cause());
                }
            });
        }
        return sendNetMessage;
    }

    public NetNotification poll(String str) {
        try {
            return poll(str, 0);
        } catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public NetNotification poll(String str, int i) throws TimeoutException {
        return poll(new NetPoll(str, i), (AcceptRequest) null);
    }

    public NetNotification poll(final NetPoll netPoll, AcceptRequest acceptRequest) throws TimeoutException {
        if (acceptRequest != null) {
            addAcceptMessageHandler(acceptRequest);
            netPoll.setActionId(acceptRequest.getActionId());
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final ArrayList arrayList = new ArrayList(1);
        try {
            subscribeToHost(netPoll, new BrokerListener() { // from class: pt.com.broker.client.nio.BrokerClient.6
                @Override // pt.com.broker.client.nio.events.BrokerListener
                public void deliverMessage(NetMessage netMessage, HostInfo hostInfo) throws Throwable {
                    try {
                        try {
                            NetFault faultMessage = netMessage.getAction().getFaultMessage();
                            if (faultMessage != null) {
                                if (faultMessage.getCode().equals(NetFault.PollTimeoutErrorCode)) {
                                    atomicBoolean.set(true);
                                }
                            } else {
                                arrayList.add(netMessage.getAction().getNotificationMessage());
                                BrokerClient.this.getConsumerManager().removeSubscription(netPoll, hostInfo);
                                countDownLatch.countDown();
                            }
                        } catch (Exception e) {
                            throw e;
                        }
                    } finally {
                        BrokerClient.this.getConsumerManager().removeSubscription(netPoll, hostInfo);
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            if (atomicBoolean.get()) {
                throw new TimeoutException("Poll timeout");
            }
            if (arrayList.isEmpty()) {
                return null;
            }
            return (NetNotification) arrayList.get(0);
        } catch (Throwable th) {
            throw new RuntimeException("There was an unexpected error", th);
        }
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public void setConsumerManager(ConsumerManager consumerManager) {
        this.consumerManager = consumerManager;
    }

    public PongConsumerManager getPongConsumerManager() {
        return this.pongConsumerManager;
    }

    public void setPongConsumerManager(PongConsumerManager pongConsumerManager) {
        this.pongConsumerManager = pongConsumerManager;
    }

    @Override // pt.com.broker.client.nio.BaseClient
    public HostContainer getHosts() {
        return this.hosts;
    }

    public PendingAcceptRequestsManager getAcceptRequestsManager() {
        return this.acceptRequestsManager;
    }

    public void setAcceptRequestsManager(PendingAcceptRequestsManager pendingAcceptRequestsManager) {
        this.acceptRequestsManager = pendingAcceptRequestsManager;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addAcceptMessageHandler(AcceptRequest acceptRequest) {
        try {
            getAcceptRequestsManager().addAcceptRequest(acceptRequest.getActionId(), acceptRequest.getTimeout(), acceptRequest.getListener());
        } catch (Throwable th) {
            th.printStackTrace();
        }
    }

    public void setFaultListener(BrokerListener brokerListener) {
        this.channelInitializer.setFaultHandler(brokerListener);
    }

    @Override // java.util.Observer
    public void update(Observable observable, Object obj) {
        if ((observable instanceof HostContainer) && (obj instanceof ReconnectEvent)) {
            HostInfo host = ((ReconnectEvent) obj).getHost();
            log.debug("Reconnect Event: " + host);
            resubscribe(host);
        }
    }

    private void resubscribe(HostInfo hostInfo) {
        log.debug("Resubscribing : " + hostInfo);
        for (Map.Entry<String, BrokerAsyncConsumer> entry : this.consumerManager.removeSubscriptions(NetAction.DestinationType.QUEUE, hostInfo).entrySet()) {
            BrokerAsyncConsumer value = entry.getValue();
            BrokerListener listener = entry.getValue().getListener();
            log.debug("Destination: " + entry.getKey());
            NetSubscribe netSubscribe = new NetSubscribe(value.getDestinationName(), value.getDestinationType());
            netSubscribe.setActionId(value.getActionId());
            subscribeToHost(netSubscribe, listener, hostInfo);
        }
    }
}
