/*
 * Decompiled with CFR 0.152.
 */
package org.rapidoid.net.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.rapidoid.activity.RapidoidThread;
import org.rapidoid.commons.Rnd;
import org.rapidoid.config.ConfigUtil;
import org.rapidoid.log.Log;
import org.rapidoid.net.Protocol;
import org.rapidoid.net.Server;
import org.rapidoid.net.TCPServerInfo;
import org.rapidoid.net.impl.AbstractLoop;
import org.rapidoid.net.impl.DefaultExchange;
import org.rapidoid.net.impl.LoopStatus;
import org.rapidoid.net.impl.RapidoidConnection;
import org.rapidoid.net.impl.RapidoidHelper;
import org.rapidoid.net.impl.RapidoidWorker;
import org.rapidoid.net.impl.RapidoidWorkerThread;
import org.rapidoid.u.U;

public class RapidoidServerLoop
extends AbstractLoop<Server>
implements Server,
TCPServerInfo {
    private static final int MAX_PENDING_CONNECTIONS = 16384;
    private volatile RapidoidWorker[] ioWorkers;
    private RapidoidWorker currentWorker;
    private final String address;
    private final int port;
    private int workers = ConfigUtil.cpus();
    private boolean blockingAccept = false;
    protected final Protocol protocol;
    private final Class<? extends RapidoidHelper> helperClass;
    private final Class<? extends DefaultExchange<?>> exchangeClass;
    private ServerSocketChannel serverSocketChannel;
    private final Selector selector;
    private final int bufSizeKB;
    private final boolean noDelay;
    private final boolean syncBufs;

    public RapidoidServerLoop(Protocol protocol, Class<? extends DefaultExchange<?>> exchangeClass, Class<? extends RapidoidHelper> helperClass, String address, int port, int workers, int bufSizeKB, boolean noDelay, boolean syncBufs) {
        super("server");
        this.protocol = protocol;
        this.exchangeClass = exchangeClass;
        this.address = address;
        this.port = port;
        this.workers = workers;
        this.bufSizeKB = bufSizeKB;
        this.noDelay = noDelay;
        this.syncBufs = syncBufs;
        this.helperClass = (Class)U.or(helperClass, RapidoidHelper.class);
        try {
            this.selector = Selector.open();
        }
        catch (IOException e) {
            Log.error((String)"Cannot open selector!", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    @Override
    protected final void beforeLoop() {
        this.validate();
        try {
            this.openSocket();
        }
        catch (IOException e) {
            throw U.rte((String)"Cannot open socket!", (Throwable)e);
        }
    }

    private void validate() {
        U.must((this.workers <= RapidoidWorker.MAX_IO_WORKERS ? 1 : 0) != 0, (String)"Too many workers! Maximum = %s", (long)RapidoidWorker.MAX_IO_WORKERS);
    }

    private void openSocket() throws IOException {
        U.notNull((Object)this.protocol, (String)"protocol", (Object[])new Object[0]);
        U.notNull(this.helperClass, (String)"helperClass", (Object[])new Object[0]);
        String blockingInfo = this.blockingAccept ? "blocking" : "non-blocking";
        Log.debug((String)"Initializing server", (String)"address", (Object)this.address, (String)"port", (Object)this.port, (String)"sync", (Object)this.syncBufs, (String)"accept", (Object)blockingInfo);
        this.serverSocketChannel = ServerSocketChannel.open();
        if (this.serverSocketChannel.isOpen() && this.selector.isOpen()) {
            this.serverSocketChannel.configureBlocking(this.blockingAccept);
            ServerSocket socket = this.serverSocketChannel.socket();
            Log.info((String)"!Starting server", (String)"!address", (Object)this.address, (String)"!port", (Object)this.port, (String)"I/O workers", (Object)this.workers, (String)"sync", (Object)this.syncBufs, (String)"accept", (Object)blockingInfo);
            InetSocketAddress addr = new InetSocketAddress(this.address, this.port);
            socket.setReceiveBufferSize(16384);
            socket.setReuseAddress(true);
            socket.bind(addr, 16384);
            Log.debug((String)"Opened server socket", (String)"address", (Object)addr);
            if (!this.blockingAccept) {
                Log.debug((String)"Registering accept selector");
                this.serverSocketChannel.register(this.selector, 16);
            }
        } else {
            throw U.rte((String)"Cannot open socket!");
        }
        this.initWorkers();
    }

    private void initWorkers() {
        this.ioWorkers = new RapidoidWorker[this.workers];
        for (int i = 0; i < this.ioWorkers.length; ++i) {
            RapidoidWorkerThread workerThread = new RapidoidWorkerThread(i, this.protocol, this.exchangeClass, this.helperClass, this.bufSizeKB, this.noDelay, this.syncBufs);
            workerThread.start();
            this.ioWorkers[i] = workerThread.getWorker();
            if (i <= 0) continue;
            this.ioWorkers[i - 1].next = this.ioWorkers[i];
        }
        this.ioWorkers[this.ioWorkers.length - 1].next = this.ioWorkers[0];
        this.currentWorker = this.ioWorkers[0];
        for (RapidoidWorker worker : this.ioWorkers) {
            worker.waitToStart();
        }
    }

    @Override
    public synchronized Server start() {
        new RapidoidThread((Runnable)this, "server").start();
        this.waitForStatusOtherThan(LoopStatus.INIT, LoopStatus.BEFORE_LOOP);
        if (this.status == LoopStatus.FAILED) {
            throw U.rte((String)"Server start-up failed!");
        }
        return (Server)super.start();
    }

    @Override
    public synchronized Server shutdown() {
        Log.info((String)"Shutting down the server...");
        this.stopLoop();
        if (this.ioWorkers != null) {
            for (RapidoidWorker worker : this.ioWorkers) {
                worker.shutdown();
            }
        }
        if (this.serverSocketChannel != null && this.selector != null && this.serverSocketChannel.isOpen() && this.selector.isOpen()) {
            try {
                this.selector.close();
                this.serverSocketChannel.close();
            }
            catch (IOException e) {
                Log.warn((String)"Cannot close socket or selector!", (Throwable)e);
            }
        }
        super.shutdown();
        Log.info((String)"!The server is down.");
        return this;
    }

    public synchronized RapidoidConnection newConnection() {
        int rndWorker = Rnd.rnd((int)this.ioWorkers.length);
        return this.ioWorkers[rndWorker].newConnection();
    }

    public synchronized void process(RapidoidConnection conn) {
        conn.worker.process(conn);
    }

    @Override
    public synchronized String process(String input) {
        if (this.ioWorkers == null) {
            this.initWorkers();
        }
        RapidoidConnection conn = this.newConnection();
        conn.setInitial(false);
        conn.input.append(input);
        conn.setProtocol(this.protocol);
        this.process(conn);
        return conn.output.asText();
    }

    public Protocol getProtocol() {
        return this.protocol;
    }

    @Override
    public TCPServerInfo info() {
        return this;
    }

    @Override
    public long messagesProcessed() {
        long total = 0L;
        for (int i = 0; i < this.ioWorkers.length; ++i) {
            total += this.ioWorkers[i].getMessagesProcessed();
        }
        return total;
    }

    @Override
    protected void insideLoop() {
        if (this.blockingAccept) {
            this.processBlocking();
        } else {
            this.processNonBlocking();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processNonBlocking() {
        try {
            this.selector.select(50L);
        }
        catch (IOException e) {
            Log.error((String)"Select failed!", (Throwable)e);
        }
        try {
            Set<SelectionKey> selectedKeys;
            Set<SelectionKey> set = selectedKeys = this.selector.selectedKeys();
            synchronized (set) {
                Iterator<SelectionKey> iter = selectedKeys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    this.acceptChannel((ServerSocketChannel)key.channel());
                }
            }
        }
        catch (ClosedSelectorException closedSelectorException) {
            // empty catch block
        }
    }

    private void processBlocking() {
        this.acceptChannel(this.serverSocketChannel);
    }

    private void acceptChannel(ServerSocketChannel serverChannel) {
        try {
            SocketChannel channel = serverChannel.accept();
            this.currentWorker.accept(channel);
            this.currentWorker = this.currentWorker.next;
        }
        catch (IOException e) {
            Log.error((String)"Acceptor error!", (Throwable)e);
        }
    }
}

