package pt.com.broker.client.nio.consumer;

import java.net.InetSocketAddress;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pt.com.broker.client.nio.events.BrokerListener;
import pt.com.broker.client.nio.exceptions.ExistingSubscriptionException;
import pt.com.broker.client.nio.server.HostInfo;
import pt.com.broker.client.nio.types.DestinationDataDelegator;
import pt.com.broker.types.NetAction;
import pt.com.broker.types.NetMessage;
import pt.com.broker.types.NetSubscribeAction;

/* loaded from: input_file:pt/com/broker/client/nio/consumer/ConsumerManager.class */
public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    protected final EnumMap<NetAction.DestinationType, Map<String, BrokerAsyncConsumer>> _consumerList = new EnumMap<>(NetAction.DestinationType.class);

    public ConsumerManager() {
        this._consumerList.put((EnumMap<NetAction.DestinationType, Map<String, BrokerAsyncConsumer>>) NetAction.DestinationType.TOPIC, (NetAction.DestinationType) new ConcurrentHashMap());
        this._consumerList.put((EnumMap<NetAction.DestinationType, Map<String, BrokerAsyncConsumer>>) NetAction.DestinationType.QUEUE, (NetAction.DestinationType) new ConcurrentHashMap());
    }

    public void addSubscription(NetSubscribeAction netSubscribeAction, BrokerListener brokerListener, HostInfo hostInfo) {
        BrokerAsyncConsumer brokerAsyncConsumer = new BrokerAsyncConsumer(netSubscribeAction.getDestination(), netSubscribeAction.getDestinationType(), brokerListener);
        brokerAsyncConsumer.setActionId(netSubscribeAction.getActionId());
        brokerAsyncConsumer.setHost(hostInfo);
        addSubscription(brokerAsyncConsumer);
    }

    private InetSocketAddress getSocket(HostInfo hostInfo) {
        return new InetSocketAddress(hostInfo.getHostname(), hostInfo.getPort());
    }

    private String getDestinationKey(String str, NetAction.DestinationType destinationType, HostInfo hostInfo) {
        if (destinationType == NetAction.DestinationType.TOPIC) {
            return str;
        }
        InetSocketAddress socket = getSocket(hostInfo);
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("Invalid Destination");
        }
        return socket.getHostName() + ":" + socket.getPort() + "#" + str;
    }

    public void addSubscription(BrokerAsyncConsumer brokerAsyncConsumer) {
        NetAction.DestinationType destinationType = brokerAsyncConsumer.getDestinationType();
        if (StringUtils.isEmpty(brokerAsyncConsumer.getDestinationName())) {
            throw new IllegalArgumentException("Invalid Destination name");
        }
        if (destinationType == null) {
            throw new IllegalArgumentException("Invalid Destination Type");
        }
        String destinationKey = getDestinationKey(brokerAsyncConsumer.getDestinationName(), brokerAsyncConsumer.getDestinationType(), brokerAsyncConsumer.getHost());
        synchronized (this._consumerList) {
            Map<String, BrokerAsyncConsumer> subscriptions = getSubscriptions(destinationType);
            if (subscriptions.containsKey(destinationKey)) {
                throw new ExistingSubscriptionException(String.format("A listener for the destination %s already exists", destinationKey));
            }
            subscriptions.put(destinationKey, brokerAsyncConsumer);
            log.info("Added Async Consumer for {} {} ", brokerAsyncConsumer.getHost(), brokerAsyncConsumer.getDestinationName());
        }
    }

    public BrokerAsyncConsumer removeSubscription(NetSubscribeAction netSubscribeAction, HostInfo hostInfo) {
        return removeSubscription(netSubscribeAction.getDestinationType(), netSubscribeAction.getDestination(), hostInfo);
    }

    public BrokerAsyncConsumer removeSubscription(NetAction.DestinationType destinationType, String str, HostInfo hostInfo) {
        BrokerAsyncConsumer remove;
        synchronized (this._consumerList) {
            Map<String, BrokerAsyncConsumer> subscriptions = getSubscriptions(destinationType);
            String destinationKey = getDestinationKey(str, destinationType, hostInfo);
            remove = subscriptions.remove(destinationKey);
            if (remove != null) {
                log.debug("Removing key: " + destinationKey);
            }
        }
        return remove;
    }

    public BrokerAsyncConsumer getConsumer(NetAction.DestinationType destinationType, String str, HostInfo hostInfo) {
        return getSubscriptions(destinationType).get(getDestinationKey(str, destinationType, hostInfo));
    }

    protected BrokerAsyncConsumer getConsumer(NetMessage netMessage, HostInfo hostInfo) {
        DestinationDataDelegator destinationDataDelegator = new DestinationDataDelegator(netMessage);
        return getConsumer(destinationDataDelegator.getDestinationType(), destinationDataDelegator.getSubscription(), hostInfo);
    }

    public Map<String, BrokerAsyncConsumer> getSubscriptions(NetAction.DestinationType destinationType) {
        return this._consumerList.get(NetAction.DestinationType.TOPIC.equals(destinationType) ? NetAction.DestinationType.TOPIC : NetAction.DestinationType.QUEUE);
    }

    public void deliverMessage(NetMessage netMessage, HostInfo hostInfo) throws Throwable {
        BrokerAsyncConsumer consumer = getConsumer(netMessage, hostInfo);
        if (consumer == null) {
            log.warn("No consumer found for message: " + netMessage);
        } else {
            consumer.deliver(netMessage, hostInfo);
        }
    }

    public Map<String, BrokerAsyncConsumer> getSubscriptions(NetAction.DestinationType destinationType, HostInfo hostInfo) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, BrokerAsyncConsumer> entry : getSubscriptions(destinationType).entrySet()) {
            String key = entry.getKey();
            BrokerAsyncConsumer value = entry.getValue();
            if (value.getHost().equals(hostInfo)) {
                hashMap.put(key, value);
            }
        }
        return hashMap;
    }

    public Map<String, BrokerAsyncConsumer> removeSubscriptions(NetAction.DestinationType destinationType, HostInfo hostInfo) {
        HashMap hashMap;
        synchronized (this._consumerList) {
            hashMap = new HashMap(2);
            for (Map.Entry<String, BrokerAsyncConsumer> entry : getSubscriptions(destinationType).entrySet()) {
                String key = entry.getKey();
                BrokerAsyncConsumer value = entry.getValue();
                if (removeSubscription(destinationType, value.getDestinationName(), hostInfo) != null) {
                    hashMap.put(key, value);
                }
            }
        }
        return hashMap;
    }
}
