/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.client.nio;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.netty.NettyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.nio.AcceptRequest;
import pt.com.broker.client.nio.BaseClient;
import pt.com.broker.client.nio.bootstrap.Bootstrap;
import pt.com.broker.client.nio.bootstrap.ChannelInitializer;
import pt.com.broker.client.nio.consumer.BrokerAsyncConsumer;
import pt.com.broker.client.nio.consumer.ConsumerManager;
import pt.com.broker.client.nio.consumer.PendingAcceptRequestsManager;
import pt.com.broker.client.nio.consumer.PongConsumerManager;
import pt.com.broker.client.nio.events.BrokerListener;
import pt.com.broker.client.nio.events.NotificationListenerAdapter;
import pt.com.broker.client.nio.events.connection.ConnectionEventListener;
import pt.com.broker.client.nio.exceptions.SubscriptionNotFound;
import pt.com.broker.client.nio.exceptions.UnavailableAgentException;
import pt.com.broker.client.nio.handlers.timeout.TimeoutException;
import pt.com.broker.client.nio.server.HostContainer;
import pt.com.broker.client.nio.server.HostInfo;
import pt.com.broker.client.nio.server.ReconnectEvent;
import pt.com.broker.client.nio.utils.ChannelWrapperFuture;
import pt.com.broker.client.nio.utils.HostInfoFuture;
import pt.com.broker.client.nio.utils.NetNotificationDecorator;
import pt.com.broker.types.NetAcknowledge;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetFault;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPing;
import pt.com.broker.types.NetPoll;
import pt.com.broker.types.NetProtocolType;
import pt.com.broker.types.NetPublish;
import pt.com.broker.types.NetSubscribe;
import pt.com.broker.types.NetSubscribeAction;
import pt.com.broker.types.NetUnsubscribe;

public class BrokerClient
extends BaseClient
implements Observer {
    private static final Logger log = LoggerFactory.getLogger(BrokerClient.class);
    private ConsumerManager consumerManager;
    private PongConsumerManager pongConsumerManager;
    private PendingAcceptRequestsManager acceptRequestsManager;
    private ChannelInitializer channelInitializer;
    private List<ConnectionEventListener> connectionEventListeners;
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    private final CompletionService<HostInfo> service = new ExecutorCompletionService<HostInfo>(this.executorService);
    private final CompletionService<HostInfo> unsubcribeService = new ExecutorCompletionService<HostInfo>(this.executorService);

    public BrokerClient(NetProtocolType ptype) {
        super(ptype);
    }

    public BrokerClient(NetProtocolType ptype, NettyContext nettyCtx) {
        super(ptype, nettyCtx);
    }

    public BrokerClient(String host, int port) {
        super(host, port);
    }

    public BrokerClient(String host, int port, NettyContext nettyCtx) {
        super(host, port, nettyCtx);
    }

    public BrokerClient(String host, int port, NetProtocolType ptype) {
        super(host, port, ptype);
    }

    public BrokerClient(String host, int port, NetProtocolType ptype, NettyContext nettyCtx) {
        super(host, port, ptype, nettyCtx);
    }

    public BrokerClient(HostInfo host, NetProtocolType ptype) {
        super(host, ptype);
    }

    public BrokerClient(HostInfo host, NetProtocolType ptype, NettyContext nettyCtx) {
        super(host, ptype, nettyCtx);
    }

    @Override
    protected void init() {
        this.setPongConsumerManager(new PongConsumerManager());
        this.setConsumerManager(new ConsumerManager());
        this.connectionEventListeners = new ArrayList<ConnectionEventListener>();
        this.channelInitializer = new ChannelInitializer(this.getSerializer(), this.getConsumerManager(), this.getPongConsumerManager(), this.connectionEventListeners);
        this.channelInitializer.setOldFraming(this.getProtocolType() == NetProtocolType.SOAP_v0);
        this.setBootstrap(new Bootstrap(this.channelInitializer, this.getNettyContext()));
        this.setAcceptRequestsManager(new PendingAcceptRequestsManager());
        this.channelInitializer.setAcceptRequestsManager(this.getAcceptRequestsManager());
        HostContainer hostContainer = new HostContainer(this.getBootstrap());
        hostContainer.addObserver(this);
        this.setHosts(hostContainer);
    }

    @Override
    public Future<HostInfo> publish(String brokerMessage, String destinationName, NetAction.DestinationType dtype) throws UnavailableAgentException {
        return this.publish(brokerMessage.getBytes(Charset.forName("UTF-8")), destinationName, dtype);
    }

    @Override
    public Future<HostInfo> publish(byte[] brokerMessage, String destinationName, NetAction.DestinationType dtype) throws UnavailableAgentException {
        NetBrokerMessage msg = new NetBrokerMessage(brokerMessage);
        return this.publish(msg, destinationName, dtype);
    }

    @Override
    public Future<HostInfo> publish(NetBrokerMessage brokerMessage, String destination, NetAction.DestinationType dtype) throws UnavailableAgentException {
        return this.publish(brokerMessage, destination, dtype, null);
    }

    public Future<HostInfo> publish(NetBrokerMessage brokerMessage, String destination, NetAction.DestinationType dtype, AcceptRequest request) throws UnavailableAgentException {
        Preconditions.checkNotNull((Object)brokerMessage, (Object)"brokerMessage cannot be null.");
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)destination), (Object)"destination cannot be null or empty");
        if (this.getHosts().getAvailableHost() == null) {
            throw new UnavailableAgentException();
        }
        NetPublish publish = new NetPublish(destination, dtype, brokerMessage);
        if (request != null) {
            publish.setActionId(request.getActionId());
            this.addAcceptMessageHandler(request);
        }
        NetAction action = new NetAction(publish);
        return this.sendNetMessage(new NetMessage(action, brokerMessage.getHeaders()));
    }

    public Future<HostInfo> subscribe(String destination, NetAction.DestinationType destinationType, BrokerListener listener) throws InterruptedException {
        return this.subscribe((NetSubscribeAction)new NetSubscribe(destination, destinationType), listener, null);
    }

    public Future<HostInfo> subscribe(NetSubscribeAction subscribe, BrokerListener listener) throws InterruptedException {
        return this.subscribe(subscribe, listener, null);
    }

    public Future<HostInfo> subscribe(final NetSubscribeAction subscribe, final BrokerListener listener, AcceptRequest request) throws InterruptedException {
        Collection<Object> servers = null;
        if (subscribe.getDestinationType() == NetAction.DestinationType.TOPIC) {
            servers = new ArrayList();
            servers.add(this.getAvailableHost());
        } else {
            servers = this.getHosts().getConnectedHosts();
        }
        if (servers.size() == 0) {
            return new BaseClient.HostNotAvailableFuture<HostInfo>(this);
        }
        if (request != null) {
            subscribe.setActionId(request.getActionId());
            this.addAcceptMessageHandler(request);
        }
        for (final HostInfo hostInfo : servers) {
            this.service.submit(new Callable<HostInfo>(){

                @Override
                public HostInfo call() throws Exception {
                    ChannelWrapperFuture future = BrokerClient.this.subscribeToHost(subscribe, listener, hostInfo);
                    final CountDownLatch latch = new CountDownLatch(1);
                    future.getInstance().addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            latch.countDown();
                        }
                    });
                    latch.await();
                    return hostInfo;
                }
            });
        }
        return this.service.take();
    }

    private HostInfoFuture<HostInfo> subscribeToHost(NetSubscribeAction subscribe, BrokerListener listener) {
        return this.subscribeToHost(subscribe, listener, this.getAvailableHost());
    }

    private ChannelWrapperFuture subscribeToHost(final NetSubscribeAction subscribe, BrokerListener listener, final HostInfo host) {
        String destination;
        if (listener == null) {
            throw new IllegalArgumentException("Invalid Listener");
        }
        if (subscribe.getDestinationType() == NetAction.DestinationType.VIRTUAL_QUEUE && !(destination = subscribe.getDestination()).contains("@")) {
            throw new IllegalArgumentException("Invalid name format for virtual queue");
        }
        NetAction netAction = null;
        if (subscribe instanceof NetPoll) {
            netAction = new NetAction((NetPoll)subscribe);
        }
        if (subscribe instanceof NetSubscribe) {
            netAction = new NetAction((NetSubscribe)subscribe);
        }
        NetMessage netMessage = this.buildMessage(netAction, subscribe.getHeaders());
        this.getConsumerManager().addSubscription(subscribe, listener, host);
        if (listener instanceof NotificationListenerAdapter) {
            ((NotificationListenerAdapter)listener).setBrokerClient(this);
        }
        ChannelWrapperFuture f = this.sendNetMessage(netMessage, host);
        f.getInstance().addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    BrokerClient.this.getConsumerManager().removeSubscription(subscribe, host);
                    log.debug("Error creating async consumer", future.cause());
                }
            }
        });
        return f;
    }

    public Future<HostInfo> unsubscribe(final NetAction.DestinationType destinationType, final String dstName) {
        Map<String, BrokerAsyncConsumer> consumers = this.getConsumerManager().getSubscriptions(destinationType);
        int total_unsubscribe = 0;
        for (Map.Entry<String, BrokerAsyncConsumer> entry : consumers.entrySet()) {
            BrokerAsyncConsumer consumer = entry.getValue();
            if (!consumer.getDestinationName().equals(dstName) || consumer.getHost() == null) continue;
            final HostInfo host = consumer.getHost();
            ++total_unsubscribe;
            this.unsubcribeService.submit(new Callable<HostInfo>(){

                @Override
                public HostInfo call() throws Exception {
                    ChannelWrapperFuture f = BrokerClient.this.unsubscribeHost(destinationType, dstName, host);
                    final CountDownLatch latch = new CountDownLatch(1);
                    f.getInstance().addListener((GenericFutureListener)new ChannelFutureListener(){

                        public void operationComplete(ChannelFuture future) throws Exception {
                            latch.countDown();
                        }
                    });
                    latch.await();
                    return host;
                }
            });
        }
        if (total_unsubscribe == 0) {
            return new BaseClient.ExceptionFuture<HostInfo>(this, new SubscriptionNotFound("No subscriptions found"));
        }
        try {
            return this.unsubcribeService.take();
        }
        catch (InterruptedException e) {
            return new BaseClient.ExceptionFuture<HostInfo>(this, e);
        }
    }

    private ChannelWrapperFuture unsubscribeHost(final NetAction.DestinationType destinationType, final String dstName, final HostInfo host) {
        NetUnsubscribe unsubscribe = new NetUnsubscribe(dstName, destinationType);
        NetMessage netMessage = new NetMessage(new NetAction(unsubscribe));
        ChannelWrapperFuture f = this.sendNetMessage(netMessage, host);
        f.getInstance().addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    BrokerClient.this.getConsumerManager().removeSubscription(destinationType, dstName, host);
                } else {
                    log.error("was not possible to unsubscribe", future.cause());
                }
            }
        });
        return f;
    }

    public Future<HostInfo> acknowledge(NetNotification notification, HostInfo host) throws Throwable {
        if (!(notification instanceof NetNotificationDecorator)) {
            throw new Exception("Invalid NetNotification");
        }
        if (notification.getDestinationType() == NetAction.DestinationType.TOPIC) {
            return null;
        }
        if (notification.getMessage() == null || StringUtils.isBlank((CharSequence)notification.getMessage().getMessageId())) {
            throw new IllegalArgumentException("Can't acknowledge invalid message.");
        }
        NetBrokerMessage brkMsg = notification.getMessage();
        String ackDestination = notification.getSubscription();
        String msgid = brkMsg.getMessageId();
        NetAcknowledge ackMsg = new NetAcknowledge(ackDestination, msgid);
        NetAction action = new NetAction(ackMsg);
        NetMessage msg = this.buildMessage(action);
        return this.sendNetMessage(msg, host);
    }

    public Future<HostInfo> acknowledge(NetNotification notification) throws Throwable {
        HostInfo host = ((NetNotificationDecorator)notification).getHost();
        return this.acknowledge(notification, host);
    }

    public Future<HostInfo> checkStatus(BrokerListener listener) throws Throwable {
        final String actionId = UUID.randomUUID().toString();
        NetPing ping = new NetPing(actionId);
        NetAction action = new NetAction(ping);
        NetMessage message = this.buildMessage(action);
        this.getPongConsumerManager().addSubscription(actionId, listener);
        HostInfoFuture f = this.sendNetMessage(message);
        if (f instanceof ChannelWrapperFuture) {
            ((ChannelWrapperFuture)f).getInstance().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        BrokerClient.this.getPongConsumerManager().removeSubscription(actionId);
                        log.error("Was not possible to check Status", future.cause());
                    }
                }
            });
        }
        return f;
    }

    public NetNotification poll(String name) {
        try {
            return this.poll(name, 0);
        }
        catch (TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    public NetNotification poll(String name, int timeout) throws TimeoutException {
        NetPoll netPoll = new NetPoll(name, (long)timeout);
        return this.poll(netPoll, null);
    }

    public NetNotification poll(final NetPoll netPoll, AcceptRequest request) throws TimeoutException {
        if (request != null) {
            this.addAcceptMessageHandler(request);
            netPoll.setActionId(request.getActionId());
        }
        final AtomicBoolean timeout = new AtomicBoolean(false);
        final CountDownLatch latch = new CountDownLatch(1);
        final ArrayList notifications = new ArrayList(1);
        try {
            this.subscribeToHost((NetSubscribeAction)netPoll, new BrokerListener(){

                @Override
                public void deliverMessage(NetMessage message, HostInfo host) throws Throwable {
                    try {
                        NetFault netFault = message.getAction().getFaultMessage();
                        if (netFault != null) {
                            if (netFault.getCode().equals("2005")) {
                                timeout.set(true);
                            }
                            return;
                        }
                        NetNotification netNotification = message.getAction().getNotificationMessage();
                        notifications.add(netNotification);
                    }
                    catch (Exception e) {
                        throw e;
                    }
                    finally {
                        BrokerClient.this.getConsumerManager().removeSubscription((NetSubscribeAction)netPoll, host);
                        latch.countDown();
                    }
                }
            });
            latch.await();
        }
        catch (Throwable e) {
            throw new RuntimeException("There was an unexpected error", e);
        }
        if (timeout.get()) {
            throw new TimeoutException("Poll timeout");
        }
        if (notifications.isEmpty()) {
            return null;
        }
        return (NetNotification)notifications.get(0);
    }

    public ConsumerManager getConsumerManager() {
        return this.consumerManager;
    }

    public void setConsumerManager(ConsumerManager consumerManager) {
        this.consumerManager = consumerManager;
    }

    public PongConsumerManager getPongConsumerManager() {
        return this.pongConsumerManager;
    }

    public void setPongConsumerManager(PongConsumerManager pongConsumerManager) {
        this.pongConsumerManager = pongConsumerManager;
    }

    @Override
    public HostContainer getHosts() {
        return this.hosts;
    }

    public PendingAcceptRequestsManager getAcceptRequestsManager() {
        return this.acceptRequestsManager;
    }

    public void setAcceptRequestsManager(PendingAcceptRequestsManager acceptRequestsManager) {
        this.acceptRequestsManager = acceptRequestsManager;
    }

    protected void addAcceptMessageHandler(AcceptRequest request) {
        String actionID = request.getActionId();
        long timeout = request.getTimeout();
        BrokerListener listener = request.getListener();
        try {
            this.getAcceptRequestsManager().addAcceptRequest(actionID, timeout, listener);
        }
        catch (Throwable throwable) {
            throwable.printStackTrace();
        }
    }

    public void setFaultListener(BrokerListener adapter) {
        this.channelInitializer.setFaultHandler(adapter);
    }

    @Override
    public void update(Observable observable, Object o) {
        if (observable instanceof HostContainer && o instanceof ReconnectEvent) {
            HostInfo host = ((ReconnectEvent)o).getHost();
            log.debug("Reconnect Event: " + host);
            this.resubscribe(host);
        }
    }

    private void resubscribe(HostInfo host) {
        NetAction.DestinationType[] resubscribeTypes;
        log.debug("Resubscribing : " + host);
        for (NetAction.DestinationType dType : resubscribeTypes = new NetAction.DestinationType[]{NetAction.DestinationType.QUEUE, NetAction.DestinationType.TOPIC, NetAction.DestinationType.VIRTUAL_QUEUE}) {
            Map<String, BrokerAsyncConsumer> map = this.consumerManager.removeSubscriptions(dType, host);
            for (Map.Entry<String, BrokerAsyncConsumer> entry : map.entrySet()) {
                BrokerAsyncConsumer consumer = entry.getValue();
                BrokerListener listener = entry.getValue().getListener();
                log.debug("Destination: " + entry.getKey());
                NetSubscribe subscribe = new NetSubscribe(consumer.getDestinationName(), consumer.getDestinationType());
                subscribe.setActionId(consumer.getActionId());
                this.subscribeToHost((NetSubscribeAction)subscribe, listener, host);
            }
        }
    }

    public void addConnectionEventListener(ConnectionEventListener connectionEventListener) {
        this.connectionEventListeners.add(connectionEventListener);
    }
}

