/*
 * Decompiled with CFR 0.152.
 */
package pt.com.broker.client;

import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
import org.caudexorigo.concurrent.Sleep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.AcceptRequest;
import pt.com.broker.client.BrokerAsyncConsumer;
import pt.com.broker.client.BrokerClient;
import pt.com.broker.client.BrokerProtocolHandler;
import pt.com.broker.client.HostInfo;
import pt.com.broker.client.messaging.BrokerErrorListenter;
import pt.com.broker.client.messaging.BrokerListener;
import pt.com.broker.client.messaging.PendingAcceptRequestsManager;
import pt.com.broker.client.utils.CircularContainer;
import pt.com.broker.types.NetAcknowledge;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetBrokerMessage;
import pt.com.broker.types.NetFault;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetNotification;
import pt.com.broker.types.NetPing;
import pt.com.broker.types.NetPoll;
import pt.com.broker.types.NetPong;
import pt.com.broker.types.NetProtocolType;
import pt.com.broker.types.NetPublish;
import pt.com.broker.types.NetSubscribe;
import pt.com.broker.types.NetUnsubscribe;

public abstract class BaseBrokerClient {
    private static final Logger log = LoggerFactory.getLogger(BaseBrokerClient.class);
    protected String _appName;
    protected final BlockingQueue<NetPong> _bstatus = new LinkedBlockingQueue<NetPong>();
    protected final EnumMap<NetAction.DestinationType, Map<String, BrokerAsyncConsumer>> _consumerList = new EnumMap(NetAction.DestinationType.class);
    protected final Map<String, NetMessage> _syncSubscriptions = new HashMap<String, NetMessage>();
    protected final Map<String, SynchronousQueue<NetMessage>> pendingPolls = new HashMap<String, SynchronousQueue<NetMessage>>();
    private NetProtocolType protocolType;
    private boolean oldFramming;
    protected BrokerClientState state = BrokerClientState.UNSTARTED;
    protected BrokerProtocolHandler _netHandler;
    protected CircularContainer<HostInfo> hosts;
    private static final int DEFAULT_MAX_NUMBER_OF_TRIES = Integer.MAX_VALUE;
    private volatile int numberOfTries = Integer.MAX_VALUE;
    protected static final BrokerErrorListenter defaultErrorListener = new BrokerErrorListenter(){

        @Override
        public void onFault(NetFault fault) {
            try {
                log.error("Fault message received");
                log.error("\tFault code: '{}'", (Object)fault.getCode());
                log.error("\tFault message: '{}'", (Object)fault.getMessage());
                log.error("\tFault action identifier: '{}'", (Object)fault.getActionId());
                log.error("\tFault detail: '{}'", (Object)fault.getDetail());
            }
            catch (Throwable t) {
                log.error("Fault message format is unsuported.");
            }
        }

        @Override
        public void onError(Throwable throwable) {
            log.error("An error occurred", throwable);
        }
    };
    protected BrokerErrorListenter errorListener;
    protected ScheduledExecutorService statusScheduler = Executors.newSingleThreadScheduledExecutor();

    protected void init() throws Throwable {
        this.state = BrokerClientState.CONNECT;
        this.setErrorListener(BaseBrokerClient.getDefaultErrorListener());
        this._netHandler = this.getBrokerProtocolHandler();
        this.getNetHandler().start();
        this.state = BrokerClientState.OK;
        this.statusScheduler.scheduleWithFixedDelay(new CheckStatusTask(), 0L, 1L, TimeUnit.MINUTES);
    }

    public BaseBrokerClient(String host, int portNumber, int connectTimeout, int readTimeout) throws Throwable {
        this(host, portNumber, connectTimeout, readTimeout, "BrokerClient", NetProtocolType.PROTOCOL_BUFFER);
    }

    public BaseBrokerClient(String host, int portNumber, int connectTimeout, int readTimeout, String appName) throws Throwable {
        this(host, portNumber, connectTimeout, readTimeout, appName, NetProtocolType.PROTOCOL_BUFFER);
    }

    public BaseBrokerClient(String host, int portNumber, int connectTimeout, int readTimeout, String appName, NetProtocolType ptype) throws Throwable {
        this.hosts = new CircularContainer(1);
        this.hosts.add(new HostInfo(host, portNumber, -1, connectTimeout, readTimeout));
        this.init(appName, ptype);
    }

    private void init(String appName, NetProtocolType ptype) {
        this._appName = appName;
        this.protocolType = ptype;
        this.oldFramming = this.protocolType == NetProtocolType.SOAP_v0;
        this._consumerList.put(NetAction.DestinationType.TOPIC, new ConcurrentHashMap());
        this._consumerList.put(NetAction.DestinationType.QUEUE, new ConcurrentHashMap());
    }

    public BaseBrokerClient(Collection<HostInfo> hosts) throws Throwable {
        this(hosts, "BrokerClient");
    }

    public BaseBrokerClient(Collection<HostInfo> hosts, String appName) throws Throwable {
        this(hosts, appName, NetProtocolType.PROTOCOL_BUFFER);
    }

    public BaseBrokerClient(Collection<HostInfo> hosts, String appName, NetProtocolType ptype) throws Throwable {
        this.hosts = new CircularContainer<HostInfo>(hosts);
        this.init(appName, ptype);
    }

    protected abstract BrokerProtocolHandler getBrokerProtocolHandler() throws Throwable;

    public void acknowledge(NetNotification notification, AcceptRequest acceptRequest) throws Throwable {
        NetAcknowledge ackMsg;
        if (notification != null && notification.getMessage() != null && StringUtils.isNotBlank((CharSequence)notification.getMessage().getMessageId())) {
            NetBrokerMessage brkMsg = notification.getMessage();
            if (notification.getDestinationType() == NetAction.DestinationType.TOPIC) {
                return;
            }
            String ackDestination = notification.getSubscription();
            ackMsg = new NetAcknowledge(ackDestination, brkMsg.getMessageId());
            if (acceptRequest != null) {
                ackMsg.setActionId(acceptRequest.getActionId());
                PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
            }
        } else {
            throw new IllegalArgumentException("Can't acknowledge invalid message.");
        }
        NetAction action = new NetAction(NetAction.ActionType.ACKNOWLEDGE);
        action.setAcknowledgeMessage(ackMsg);
        NetMessage msg = this.buildMessage(action);
        this.getNetHandler().sendMessage(msg);
    }

    public void acknowledge(NetNotification notification) throws Throwable {
        this.acknowledge(notification, null);
    }

    public void addAsyncConsumer(NetSubscribe subscribe, BrokerListener listener, AcceptRequest acceptRequest) throws Throwable {
        if (subscribe != null && StringUtils.isNotBlank((CharSequence)subscribe.getDestination())) {
            Map<String, BrokerAsyncConsumer> subscriptions = this._consumerList.get(subscribe.getDestinationType().equals((Object)NetAction.DestinationType.TOPIC) ? NetAction.DestinationType.TOPIC : NetAction.DestinationType.QUEUE);
            BrokerAsyncConsumer previous = subscriptions.put(subscribe.getDestination(), new BrokerAsyncConsumer(subscribe, listener));
            if (previous != null) {
                subscriptions.put(subscribe.getDestination(), previous);
                throw new IllegalStateException("A listener for that Destination already exists");
            }
            if (acceptRequest != null) {
                subscribe.setActionId(acceptRequest.getActionId());
                PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
            }
        } else {
            throw new IllegalArgumentException("Mal-formed Notification request");
        }
        NetAction netAction = new NetAction(NetAction.ActionType.SUBSCRIBE);
        netAction.setSubscribeMessage(subscribe);
        NetMessage msg = this.buildMessage(netAction, subscribe.getHeaders());
        this.getNetHandler().sendMessage(msg);
        log.info("Created new async consumer for '{}'", (Object)subscribe.getDestination());
    }

    public void addAsyncConsumer(NetSubscribe subscribe, BrokerListener listener) throws Throwable {
        this.addAsyncConsumer(subscribe, listener, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendSubscriptions() throws Throwable {
        for (NetAction.DestinationType desType : this._consumerList.keySet()) {
            Map<String, BrokerAsyncConsumer> subscriptions = this._consumerList.get(desType);
            for (BrokerAsyncConsumer aconsumer : subscriptions.values()) {
                NetSubscribe subscription = aconsumer.getSubscription();
                NetAction netAction = new NetAction(NetAction.ActionType.SUBSCRIBE);
                netAction.setSubscribeMessage(subscription);
                NetMessage msg = this.buildMessage(netAction, subscription.getHeaders());
                this.getNetHandler().sendMessage(msg);
                log.info("Reconnected async consumer for '{}'", (Object)subscription.getDestination());
            }
        }
        Map<String, NetMessage> map = this._syncSubscriptions;
        synchronized (map) {
            for (String queueName : this._syncSubscriptions.keySet()) {
                this.getNetHandler().sendMessage(this._syncSubscriptions.get(queueName));
            }
        }
    }

    private NetMessage buildMessage(NetAction action) {
        return this.buildMessage(action, null);
    }

    private NetMessage buildMessage(NetAction action, Map<String, String> headers) {
        NetMessage message = new NetMessage(action, headers);
        return message;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public NetPong checkStatus() throws Throwable {
        String actionId = UUID.randomUUID().toString();
        NetPing ping = new NetPing(actionId);
        NetAction action = new NetAction(NetAction.ActionType.PING);
        action.setPingMessage(ping);
        NetMessage message = this.buildMessage(action);
        this.getNetHandler().sendMessage(message);
        long timeout = System.currentTimeMillis() + 2000L;
        NetPong pong = null;
        do {
            BlockingQueue<NetPong> blockingQueue = this._bstatus;
            synchronized (blockingQueue) {
                Sleep.time((long)500L);
                if (System.currentTimeMillis() > timeout) {
                    return null;
                }
                pong = (NetPong)this._bstatus.peek();
                if (pong == null) {
                    continue;
                }
                if (!pong.getActionId().equals(NetPong.getUniversalActionId()) && !pong.getActionId().equals(actionId)) {
                    pong = null;
                }
                this._bstatus.remove();
            }
        } while (pong == null);
        return pong;
    }

    public void enqueueMessage(NetBrokerMessage brokerMessage, String destinationName, AcceptRequest acceptRequest) {
        if (brokerMessage != null && StringUtils.isNotBlank((CharSequence)destinationName)) {
            NetPublish publish = new NetPublish(destinationName, NetAction.DestinationType.QUEUE, brokerMessage);
            if (acceptRequest != null) {
                publish.setActionId(acceptRequest.getActionId());
                PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
            }
            NetAction action = new NetAction(NetAction.ActionType.PUBLISH);
            action.setPublishMessage(publish);
            NetMessage msg = this.buildMessage(action, brokerMessage.getHeaders());
            try {
                this.getNetHandler().sendMessage(msg);
            }
            catch (Throwable t) {
                log.error("Failed to deliver message.", t);
            }
        } else {
            throw new IllegalArgumentException("Mal-formed Enqueue request");
        }
    }

    public void enqueueMessage(NetBrokerMessage brokerMessage, String destinationName) {
        this.enqueueMessage(brokerMessage, destinationName, null);
    }

    protected void feedStatusConsumer(NetPong pong) throws Throwable {
        this._bstatus.offer(pong);
    }

    protected HostInfo getHostInfo() {
        return this.hosts.get();
    }

    public void addHostInfo(HostInfo hostInfo) {
        this.hosts.add(hostInfo);
    }

    protected void notifyListener(NetNotification notification) {
        Map<String, BrokerAsyncConsumer> subscripions;
        BrokerAsyncConsumer brokerAsyncConsumer;
        String value;
        Map headers = notification.getHeaders();
        boolean ackRequired = true;
        if (headers != null && (value = (String)headers.get("ACK_REQUIRED")) != null && value.equalsIgnoreCase("false")) {
            ackRequired = false;
        }
        if ((brokerAsyncConsumer = (subscripions = this._consumerList.get(notification.getDestinationType().equals((Object)NetAction.DestinationType.TOPIC) ? NetAction.DestinationType.TOPIC : NetAction.DestinationType.QUEUE)).get(notification.getSubscription())) == null) {
            log.error(String.format("Unexpected notification. DestinationType: %s, Destination: %s, Subscription: %s.", notification.getDestinationType(), notification.getDestination(), notification.getSubscription()));
            return;
        }
        brokerAsyncConsumer.deliver(notification);
        BrokerListener listener = brokerAsyncConsumer.getListener();
        if (!ackRequired) {
            return;
        }
        if (notification.getDestinationType() != NetAction.DestinationType.TOPIC && listener.isAutoAck()) {
            try {
                this.acknowledge(notification);
            }
            catch (Throwable t) {
                log.error("Could not acknowledge message, messageId: '{}'", (Object)notification.getMessage().getMessageId());
                log.error(t.getMessage(), t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public NetNotification poll(String queueName, long timeout, long reserveTime, AcceptRequest acceptRequest) throws Throwable {
        if (StringUtils.isBlank((CharSequence)queueName)) {
            throw new IllegalArgumentException("Mal-formed Poll request. queueName is blank.");
        }
        NetPoll poll = new NetPoll(queueName, timeout);
        NetAction action = new NetAction(NetAction.ActionType.POLL);
        action.setPollMessage(poll);
        NetMessage message = this.buildMessage(action);
        SynchronousQueue synQueue = new SynchronousQueue();
        Map<String, NetMessage> map = this._syncSubscriptions;
        synchronized (map) {
            if (this._syncSubscriptions.containsKey(queueName)) {
                throw new IllegalArgumentException("Queue " + queueName + " has already a poll runnig.");
            }
            this._syncSubscriptions.put(queueName, message);
            this.pendingPolls.put(queueName, synQueue);
        }
        if (reserveTime > 0L) {
            message.getHeaders().put("RESERVE_TIME", reserveTime + "");
        }
        if (acceptRequest != null) {
            poll.setActionId(acceptRequest.getActionId());
            PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
        }
        this.getNetHandler().sendMessage(message);
        NetMessage receivedMsg = (NetMessage)synQueue.take();
        Map<String, NetMessage> map2 = this._syncSubscriptions;
        synchronized (map2) {
            this._syncSubscriptions.remove(queueName);
            this.pendingPolls.remove(queueName);
        }
        if (receivedMsg == BrokerProtocolHandler.TimeoutUnblockNotification) {
            throw new TimeoutException();
        }
        if (receivedMsg == BrokerProtocolHandler.NoMessageUnblockNotification) {
            return null;
        }
        NetNotification m = null;
        if (!receivedMsg.getAction().getActionType().equals((Object)NetAction.ActionType.NOTIFICATION)) {
            log.error("Poll unbloqued by a message that wasn't of any of the expeceted error nor a notification.");
            return null;
        }
        m = receivedMsg.getAction().getNotificationMessage();
        return m;
    }

    public NetNotification poll(String queueName, long timeout, AcceptRequest acceptRequest) throws Throwable {
        return this.poll(queueName, timeout, -1L, acceptRequest);
    }

    public NetNotification poll(String queueName) throws Throwable {
        return this.poll(queueName, 0L, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean offerPollResponse(String destination, NetMessage message) {
        Map<String, NetMessage> map = this._syncSubscriptions;
        synchronized (map) {
            SynchronousQueue<NetMessage> synchronousQueue = this.pendingPolls.get(destination);
            if (synchronousQueue != null) {
                return synchronousQueue.offer(message);
            }
            return false;
        }
    }

    public void publishMessage(NetBrokerMessage brokerMessage, String destination, AcceptRequest acceptRequest) {
        if (brokerMessage != null && StringUtils.isNotBlank((CharSequence)destination)) {
            NetPublish publish = new NetPublish(destination, NetAction.DestinationType.TOPIC, brokerMessage);
            if (acceptRequest != null) {
                publish.setActionId(acceptRequest.getActionId());
                PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
            }
            NetAction action = new NetAction(NetAction.ActionType.PUBLISH);
            action.setPublishMessage(publish);
            NetMessage msg = this.buildMessage(action, brokerMessage.getHeaders());
            try {
                this.getNetHandler().sendMessage(msg);
            }
            catch (Throwable e) {
                log.error("Could not publish message, messageId:");
                log.error(e.getMessage(), e);
            }
        } else {
            throw new IllegalArgumentException("Mal-formed Publish request");
        }
    }

    public void publishMessage(NetBrokerMessage brokerMessage, String destination) {
        this.publishMessage(brokerMessage, destination, null);
    }

    public void unsubscribe(NetAction.DestinationType destinationType, String destinationName, AcceptRequest acceptRequest) throws Throwable {
        NetUnsubscribe unsubs;
        if (StringUtils.isNotBlank((CharSequence)destinationName) && destinationType != null) {
            unsubs = new NetUnsubscribe(destinationName, destinationType);
            if (acceptRequest != null) {
                unsubs.setActionId(acceptRequest.getActionId());
                PendingAcceptRequestsManager.addAcceptRequest(acceptRequest);
            }
        } else {
            throw new IllegalArgumentException("Mal-formed Unsubscribe request");
        }
        NetAction action = new NetAction(NetAction.ActionType.UNSUBSCRIBE);
        action.setUnsbuscribeMessage(unsubs);
        NetMessage message = this.buildMessage(action);
        this.getNetHandler().sendMessage(message);
        Map<String, BrokerAsyncConsumer> subscriptions = this._consumerList.get(destinationType.equals((Object)NetAction.DestinationType.TOPIC) ? NetAction.DestinationType.TOPIC : NetAction.DestinationType.QUEUE);
        subscriptions.remove(destinationName);
    }

    public void unsubscribe(NetAction.DestinationType destinationType, String destinationName) throws Throwable {
        this.unsubscribe(destinationType, destinationName, null);
    }

    public void close() {
        this.getNetHandler().stop();
        this.statusScheduler.shutdown();
        this.state = BrokerClientState.CLOSE;
    }

    public BrokerProtocolHandler getNetHandler() {
        return this._netHandler;
    }

    public static BrokerErrorListenter getDefaultErrorListener() {
        return defaultErrorListener;
    }

    public void setErrorListener(BrokerErrorListenter errorListener) {
        this.errorListener = errorListener;
    }

    public BrokerErrorListenter getErrorListener() {
        return this.errorListener;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public BrokerClientState getState() {
        BaseBrokerClient baseBrokerClient = this;
        synchronized (baseBrokerClient) {
            return this.state;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void setState(BrokerClientState state) {
        BaseBrokerClient baseBrokerClient = this;
        synchronized (baseBrokerClient) {
            this.state = state;
        }
    }

    public void setPortocolType(NetProtocolType portocolType) {
        this.protocolType = portocolType;
    }

    public NetProtocolType getProtocolType() {
        return this.protocolType;
    }

    public void setNumberOfTries(int numberOfTries) {
        this.numberOfTries = numberOfTries;
    }

    public int getNumberOfTries() {
        return this.numberOfTries;
    }

    public boolean isOldFramming() {
        return this.oldFramming;
    }

    protected void setupHeartBeatPacket() {
    }

    private class CheckStatusTask
    implements Runnable {
        private CheckStatusTask() {
        }

        @Override
        public void run() {
            try {
                log.debug("Checking connection state and status!");
                if (BaseBrokerClient.this.state == BrokerClientState.OK && BaseBrokerClient.this.checkStatus() == null) {
                    log.debug("Restarting the connection.");
                    BaseBrokerClient.this.state = BrokerClientState.FAIL;
                    BaseBrokerClient.this.getNetHandler().stop();
                    BaseBrokerClient.this.getNetHandler().start();
                    BaseBrokerClient.this.state = BrokerClientState.CONNECT;
                }
            }
            catch (Throwable e) {
                log.error("Unexpected error checking connection status.", e);
            }
        }
    }

    public static interface BrokerClientStateOk {
        public void onOk(BrokerClient var1);
    }

    public static enum BrokerClientState {
        UNSTARTED,
        CONNECT,
        OK,
        AUTH,
        FAIL,
        CLOSE;

    }
}

