/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.operator.MockExchangeRequestProcessor;
import com.facebook.presto.operator.PageBufferClientStatus;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.testing.TestingBlockEncodingManager;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestExchangeClient {
    private ScheduledExecutorService executor;

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
    }

    @AfterClass
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(32.0, DataSize.Unit.MEGABYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), (HttpClient)new TestingHttpClient((Function)processor, (ExecutorService)this.executor), this.executor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(1));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(2));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        if (exchangeClient.getStatus().getBufferedPages() != 0) {
            Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        }
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() == 0L ? 1 : 0) != 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 2, 2, "not scheduled");
    }

    @Test
    public void testAddLocation() throws Exception {
        DataSize maxResponseSize = new DataSize(10.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(32.0, DataSize.Unit.MEGABYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), (HttpClient)new TestingHttpClient((Function)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.executor);
        URI location1 = URI.create("http://localhost:8081/foo");
        processor.addPage(location1, TestExchangeClient.createPage(1));
        processor.addPage(location1, TestExchangeClient.createPage(2));
        processor.addPage(location1, TestExchangeClient.createPage(3));
        processor.setComplete(location1);
        exchangeClient.addLocation(location1);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(1));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(2));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)exchangeClient.getNextPage(new Duration(10.0, TimeUnit.MILLISECONDS)));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        URI location2 = URI.create("http://localhost:8082/bar");
        processor.addPage(location2, TestExchangeClient.createPage(4));
        processor.addPage(location2, TestExchangeClient.createPage(5));
        processor.addPage(location2, TestExchangeClient.createPage(6));
        processor.setComplete(location2);
        exchangeClient.addLocation(location2);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(4));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(5));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(6));
        Assert.assertNull((Object)exchangeClient.getNextPage(new Duration(10.0, TimeUnit.MILLISECONDS)));
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        ImmutableMap statuses = Maps.uniqueIndex((Iterable)exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus::getUri);
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location1), location1, "closed", 3, 2, 2, "not scheduled");
        TestExchangeClient.assertStatus((PageBufferClientStatus)statuses.get((Object)location2), location2, "closed", 3, 2, 2, "not scheduled");
    }

    @Test
    public void testBufferLimit() throws Exception {
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        processor.setComplete(location);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(1.0, DataSize.Unit.BYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), (HttpClient)new TestingHttpClient((Function)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.executor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        long start = System.nanoTime();
        exchangeClient.scheduleRequestIfNecessary();
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 1, 1, 1, "not scheduled");
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(0.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(1));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(0.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(2));
        do {
            Assertions.assertLessThan((Comparable)Duration.nanosSince((long)start), (Comparable)new Duration(5.0, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)1);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() > 0L ? 1 : 0) != 0);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(3));
        Assert.assertNull((Object)exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)));
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertTrue((exchangeClient.getStatus().getBufferedBytes() == 0L ? 1 : 0) != 0);
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        TestExchangeClient.assertStatus((PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0), location, "closed", 3, 4, 4, "not scheduled");
    }

    @Test
    public void testClose() throws Exception {
        DataSize maxResponseSize = new DataSize(1.0, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(maxResponseSize);
        URI location = URI.create("http://localhost:8080");
        processor.addPage(location, TestExchangeClient.createPage(1));
        processor.addPage(location, TestExchangeClient.createPage(2));
        processor.addPage(location, TestExchangeClient.createPage(3));
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(1.0, DataSize.Unit.BYTE), maxResponseSize, 1, new Duration(1.0, TimeUnit.MINUTES), (HttpClient)new TestingHttpClient((Function)processor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test-%s"))), this.executor);
        exchangeClient.addLocation(location);
        exchangeClient.noMoreLocations();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)false);
        TestExchangeClient.assertPageEquals(exchangeClient.getNextPage(new Duration(1.0, TimeUnit.SECONDS)), TestExchangeClient.createPage(1));
        exchangeClient.close();
        Assert.assertEquals((boolean)exchangeClient.isClosed(), (boolean)true);
        Assert.assertNull((Object)exchangeClient.getNextPage(new Duration(0.0, TimeUnit.SECONDS)));
        Assert.assertEquals((int)exchangeClient.getStatus().getBufferedPages(), (int)0);
        Assert.assertEquals((long)exchangeClient.getStatus().getBufferedBytes(), (long)0L);
        PageBufferClientStatus clientStatus = (PageBufferClientStatus)exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        Assert.assertEquals((String)clientStatus.getState(), (String)"closed", (String)"status");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)"not scheduled", (String)"httpRequestState");
    }

    private static Page createPage(int size) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, size)});
    }

    private static void assertPageEquals(Page actualPage, Page expectedPage) {
        Assert.assertNotNull((Object)actualPage);
        Assert.assertEquals((int)actualPage.getPositionCount(), (int)expectedPage.getPositionCount());
        Assert.assertEquals((int)actualPage.getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus clientStatus, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, String httpRequestState) {
        Assert.assertEquals((Object)clientStatus.getUri(), (Object)location);
        Assert.assertEquals((String)clientStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((int)clientStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        Assert.assertEquals((int)clientStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        Assert.assertEquals((int)clientStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        Assert.assertEquals((String)clientStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }
}

