/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.masterslave;

import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisConnectionException;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.codec.StringCodec;
import com.lambdaworks.redis.internal.LettuceLists;
import com.lambdaworks.redis.masterslave.Timeout;
import com.lambdaworks.redis.protocol.LettuceCharsets;
import com.lambdaworks.redis.pubsub.RedisPubSubAdapter;
import com.lambdaworks.redis.pubsub.StatefulRedisPubSubConnection;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;

class SentinelTopologyRefresh
implements Closeable {
    private static final InternalLogger LOG = InternalLoggerFactory.getInstance(SentinelTopologyRefresh.class);
    private static final StringCodec CODEC = new StringCodec(LettuceCharsets.ASCII);
    private static final Set<String> PROCESSING_CHANNELS = new HashSet<String>(Arrays.asList("failover-end", "failover-end-for-timeout"));
    private final Map<RedisURI, StatefulRedisPubSubConnection<String, String>> pubSubConnections = new ConcurrentHashMap<RedisURI, StatefulRedisPubSubConnection<String, String>>();
    private final RedisClient redisClient;
    private final List<RedisURI> sentinels;
    private final List<Runnable> refreshRunnables = new CopyOnWriteArrayList<Runnable>();
    private final RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<String, String>(){

        @Override
        public void message(String pattern, String channel, String message) {
            SentinelTopologyRefresh.this.processMessage(pattern, channel, message);
        }
    };
    private final PubSubMessageActionScheduler topologyRefresh;
    private final PubSubMessageActionScheduler sentinelReconnect;
    private volatile boolean closed = false;

    SentinelTopologyRefresh(RedisClient redisClient, String masterId, List<RedisURI> sentinels) {
        this.redisClient = redisClient;
        this.sentinels = LettuceLists.newList(sentinels);
        this.topologyRefresh = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(), new TopologyRefreshMessagePredicate(masterId));
        this.sentinelReconnect = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(), new SentinelReconnectMessagePredicate());
    }

    @Override
    public void close() {
        this.closed = true;
        HashMap<RedisURI, StatefulRedisPubSubConnection<String, String>> connections = new HashMap<RedisURI, StatefulRedisPubSubConnection<String, String>>(this.pubSubConnections);
        connections.forEach((k, c) -> {
            c.removeListener(this.adapter);
            c.close();
            this.pubSubConnections.remove(k);
        });
    }

    void bind(Runnable runnable) {
        this.refreshRunnables.add(runnable);
        this.initializeSentinels();
    }

    private void initializeSentinels() {
        if (this.closed) {
            return;
        }
        AtomicReference ref = new AtomicReference();
        this.sentinels.forEach(redisURI -> {
            if (this.closed) {
                return;
            }
            StatefulRedisPubSubConnection<String, String> pubSubConnection = null;
            try {
                if (!this.pubSubConnections.containsKey(redisURI)) {
                    pubSubConnection = this.redisClient.connectPubSub(CODEC, (RedisURI)redisURI);
                    this.pubSubConnections.put((RedisURI)redisURI, pubSubConnection);
                    pubSubConnection.addListener(this.adapter);
                    pubSubConnection.async().psubscribe("*");
                }
            }
            catch (RedisConnectionException e) {
                if (ref.get() == null) {
                    ref.set(e);
                }
                ((RedisConnectionException)ref.get()).addSuppressed(e);
            }
        });
        if (this.sentinels.isEmpty() && ref.get() != null) {
            throw (RedisConnectionException)ref.get();
        }
        if (this.closed) {
            this.close();
        }
    }

    private void processMessage(String pattern, String channel, String message) {
        this.topologyRefresh.processMessage(channel, message, () -> {
            LOG.debug("Received topology changed signal from Redis Sentinel, scheduling topology update");
            return () -> this.refreshRunnables.forEach(Runnable::run);
        });
        this.sentinelReconnect.processMessage(channel, message, () -> {
            LOG.debug("Received sentinel state changed signal from Redis Sentinel, scheduling sentinel reconnect attempts");
            return this::initializeSentinels;
        });
    }

    private static class SentinelReconnectMessagePredicate
    implements MessagePredicate {
        private SentinelReconnectMessagePredicate() {
        }

        @Override
        public boolean test(String message, String channel) {
            if (channel.equals("+sentinel")) {
                return true;
            }
            return (channel.equals("-odown") || channel.equals("-sdown")) && message.startsWith("sentinel ");
        }
    }

    private static class TopologyRefreshMessagePredicate
    implements MessagePredicate {
        private final String masterId;

        TopologyRefreshMessagePredicate(String masterId) {
            this.masterId = masterId;
        }

        @Override
        public boolean test(String channel, String message) {
            if (channel.equals("+elected-leader") && message.startsWith(String.format("master %s ", this.masterId))) {
                return true;
            }
            if (channel.equals("+switch-master") && message.startsWith(String.format("%s ", this.masterId))) {
                return true;
            }
            if (channel.equals("fix-slave-config") && message.contains(String.format("@ %s ", this.masterId))) {
                return true;
            }
            if ((channel.equals("+convert-to-slave") || channel.equals("+role-change")) && message.contains(String.format("@ %s ", this.masterId))) {
                return true;
            }
            return PROCESSING_CHANNELS.contains(channel);
        }
    }

    static interface MessagePredicate
    extends BiPredicate<String, String> {
        @Override
        public boolean test(String var1, String var2);
    }

    private static class TimedSemaphore {
        private final AtomicReference<Timeout> timeoutRef = new AtomicReference();
        private final int timeout = 5;
        private final TimeUnit timeUnit = TimeUnit.SECONDS;

        private TimedSemaphore() {
        }

        protected void onEvent(Consumer<Timeout> timeoutConsumer) {
            Timeout existingTimeout = this.timeoutRef.get();
            if (existingTimeout != null && !existingTimeout.isExpired()) {
                return;
            }
            this.getClass();
            Timeout timeout = new Timeout(5L, this.timeUnit);
            boolean state = this.timeoutRef.compareAndSet(existingTimeout, timeout);
            if (state) {
                timeoutConsumer.accept(timeout);
            }
        }
    }

    private static class PubSubMessageActionScheduler {
        private final TimedSemaphore timedSemaphore = new TimedSemaphore();
        private final EventExecutorGroup eventExecutors;
        private final MessagePredicate filter;

        PubSubMessageActionScheduler(EventExecutorGroup eventExecutors, MessagePredicate filter) {
            this.eventExecutors = eventExecutors;
            this.filter = filter;
        }

        void processMessage(String channel, String message, Supplier<Runnable> runnableSupplier) {
            if (!this.processingAllowed(channel, message)) {
                return;
            }
            this.timedSemaphore.onEvent(timeout -> {
                Runnable runnable = (Runnable)runnableSupplier.get();
                if (timeout == null) {
                    this.eventExecutors.submit(runnable);
                } else {
                    this.eventExecutors.schedule(runnable, timeout.remaining(), TimeUnit.MILLISECONDS);
                }
            });
        }

        private boolean processingAllowed(String channel, String message) {
            if (this.eventExecutors.isShuttingDown()) {
                return false;
            }
            return this.filter.test(channel, message);
        }
    }
}

