/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.block.PagesSerde;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.DriverContext;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.operator.ExchangeOperator;
import com.facebook.presto.operator.Operator;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.operator.SequencePageBuilder;
import com.facebook.presto.operator.SourceOperator;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.facebook.presto.split.RemoteSplit;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.testing.TestingBlockEncodingManager;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.base.Supplier;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.SliceOutput;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeOperator {
    private static final List<Type> TYPES = ImmutableList.of((Object)VarcharType.VARCHAR);
    private static final Page PAGE = SequencePageBuilder.createSequencePage(TYPES, 10, 100);
    private static final String TASK_1_ID = "task1";
    private static final String TASK_2_ID = "task2";
    private static final String TASK_3_ID = "task3";
    private final LoadingCache<String, TaskBuffer> taskBuffers = CacheBuilder.newBuilder().build((CacheLoader)new CacheLoader<String, TaskBuffer>(){

        public TaskBuffer load(String key) throws Exception {
            return new TaskBuffer();
        }
    });
    private ScheduledExecutorService executor;
    private HttpClient httpClient;
    private Supplier<ExchangeClient> exchangeClientSupplier;

    @BeforeClass
    public void setUp() throws Exception {
        this.executor = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
        this.httpClient = new TestingHttpClient((Function)new HttpClientHandler(this.taskBuffers), (ExecutorService)this.executor);
        this.exchangeClientSupplier = new Supplier<ExchangeClient>(){

            public ExchangeClient get() {
                return new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(32.0, DataSize.Unit.MEGABYTE), new DataSize(10.0, DataSize.Unit.MEGABYTE), 3, new Duration(1.0, TimeUnit.MINUTES), TestExchangeOperator.this.httpClient, TestExchangeOperator.this.executor);
            }
        };
    }

    @AfterClass
    public void tearDown() throws Exception {
        this.httpClient.close();
        this.httpClient = null;
        this.executor.shutdownNow();
        this.executor = null;
    }

    @BeforeMethod
    public void setUpMethod() {
        this.taskBuffers.invalidateAll();
    }

    @Test
    public void testSimple() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(this.newRemoteSplit(TASK_1_ID));
        operator.addSplit(this.newRemoteSplit(TASK_2_ID));
        operator.addSplit(this.newRemoteSplit(TASK_3_ID));
        operator.noMoreSplits();
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(10, true);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(10, true);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(10, true);
        this.waitForPages((Operator)operator, 30);
        this.waitForFinished((Operator)operator);
    }

    private Split newRemoteSplit(String taskId) {
        return new Split("remote", (ConnectorSplit)new RemoteSplit(URI.create("http://localhost/" + taskId)));
    }

    @Test
    public void testWaitForClose() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(this.newRemoteSplit(TASK_1_ID));
        operator.addSplit(this.newRemoteSplit(TASK_2_ID));
        operator.addSplit(this.newRemoteSplit(TASK_3_ID));
        operator.noMoreSplits();
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, false);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(1, false);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(1, false);
        this.waitForPages((Operator)operator, 3);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(2, true);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(2, true);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(2, true);
        this.waitForPages((Operator)operator, 6);
        this.waitForFinished((Operator)operator);
    }

    @Test
    public void testWaitForNoMoreSplits() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(this.newRemoteSplit(TASK_1_ID));
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, true);
        this.waitForPages((Operator)operator, 1);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        operator.addSplit(this.newRemoteSplit(TASK_2_ID));
        operator.noMoreSplits();
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(2, true);
        this.waitForPages((Operator)operator, 2);
        this.waitForFinished((Operator)operator);
    }

    @Test
    public void testFinish() throws Exception {
        SourceOperator operator = this.createExchangeOperator();
        operator.addSplit(this.newRemoteSplit(TASK_1_ID));
        operator.addSplit(this.newRemoteSplit(TASK_2_ID));
        operator.addSplit(this.newRemoteSplit(TASK_3_ID));
        operator.noMoreSplits();
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_1_ID)).addPages(1, false);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_2_ID)).addPages(1, false);
        ((TaskBuffer)this.taskBuffers.getUnchecked((Object)TASK_3_ID)).addPages(1, false);
        this.waitForPages((Operator)operator, 3);
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)false);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertEquals((Object)operator.getOutput(), null);
        operator.finish();
        this.waitForFinished((Operator)operator);
    }

    private SourceOperator createExchangeOperator() {
        ExchangeOperator.ExchangeOperatorFactory operatorFactory = new ExchangeOperator.ExchangeOperatorFactory(0, new PlanNodeId("test"), this.exchangeClientSupplier, TYPES);
        DriverContext driverContext = new TaskContext(new TaskId("query", "stage", "task"), (Executor)this.executor, SessionTestUtils.TEST_SESSION).addPipelineContext(true, true).addDriverContext();
        return operatorFactory.createOperator(driverContext);
    }

    private List<Page> waitForPages(Operator operator, int expectedPageCount) throws InterruptedException {
        long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        ArrayList<Page> outputPages = new ArrayList<Page>();
        while (outputPages.size() < expectedPageCount && System.nanoTime() < endTime) {
            Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
            if (operator.isFinished()) break;
            Page outputPage = operator.getOutput();
            if (outputPage != null) {
                outputPages.add(outputPage);
                continue;
            }
            Thread.sleep(10L);
        }
        Thread.sleep(10L);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertNull((Object)operator.getOutput());
        Assert.assertEquals((int)outputPages.size(), (int)expectedPageCount);
        for (Page page : outputPages) {
            PageAssertions.assertPageEquals(operator.getTypes(), page, PAGE);
        }
        return outputPages;
    }

    private void waitForFinished(Operator operator) throws InterruptedException {
        long endTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(10L);
        while (System.nanoTime() < endTime) {
            Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
            Assert.assertNull((Object)operator.getOutput());
            if (operator.isFinished()) break;
            Thread.sleep(10L);
        }
        Assert.assertEquals((boolean)operator.isFinished(), (boolean)true);
        Assert.assertEquals((boolean)operator.needsInput(), (boolean)false);
        Assert.assertNull((Object)operator.getOutput());
    }

    private static class TaskBuffer {
        private final List<Page> buffer = new ArrayList<Page>();
        private int acknowledgedPages;
        private boolean closed;

        private TaskBuffer() {
        }

        private synchronized void addPages(int pages, boolean close) {
            this.addPages(Collections.nCopies(pages, PAGE));
            if (close) {
                this.closed = true;
            }
        }

        public synchronized void addPages(Iterable<Page> pages) {
            Iterables.addAll(this.buffer, pages);
        }

        public synchronized Page getPage(int pageSequenceId) {
            this.acknowledgedPages = Math.max(this.acknowledgedPages, pageSequenceId);
            if (pageSequenceId >= this.buffer.size()) {
                return null;
            }
            return this.buffer.get(pageSequenceId);
        }

        private synchronized boolean isFinished() {
            return this.closed && this.acknowledgedPages == this.buffer.size();
        }
    }

    private static class HttpClientHandler
    implements Function<Request, Response> {
        private final LoadingCache<String, TaskBuffer> taskBuffers;

        public HttpClientHandler(LoadingCache<String, TaskBuffer> taskBuffers) {
            this.taskBuffers = taskBuffers;
        }

        public Response apply(Request request) {
            ImmutableList parts = ImmutableList.copyOf((Iterable)Splitter.on((String)"/").omitEmptyStrings().split((CharSequence)request.getUri().getPath()));
            Assert.assertEquals((int)parts.size(), (int)2);
            String taskId = (String)parts.get(0);
            int pageToken = Integer.parseInt((String)parts.get(1));
            ImmutableListMultimap.Builder headers = ImmutableListMultimap.builder();
            headers.put((Object)"X-Presto-Page-Sequence-Id", (Object)String.valueOf(pageToken));
            TaskBuffer taskBuffer = (TaskBuffer)this.taskBuffers.getUnchecked((Object)taskId);
            Page page = taskBuffer.getPage(pageToken);
            if (page != null) {
                headers.put((Object)"Content-Type", (Object)"application/X-presto-pages");
                headers.put((Object)"X-Presto-Page-End-Sequence-Id", (Object)String.valueOf(pageToken + 1));
                DynamicSliceOutput output = new DynamicSliceOutput(256);
                PagesSerde.writePages((BlockEncodingSerde)TestingBlockEncodingManager.createTestingBlockEncodingManager(), (SliceOutput)output, (Page[])new Page[]{page});
                return new TestingResponse(HttpStatus.OK, (ListMultimap)headers.build(), (InputStream)output.slice().getInput());
            }
            if (taskBuffer.isFinished()) {
                headers.put((Object)"X-Presto-Page-End-Sequence-Id", (Object)String.valueOf(pageToken));
                return new TestingResponse(HttpStatus.GONE, (ListMultimap)headers.build(), new byte[0]);
            }
            headers.put((Object)"X-Presto-Page-End-Sequence-Id", (Object)String.valueOf(pageToken));
            return new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)headers.build(), new byte[0]);
        }
    }
}

