package com.facebook.presto.operator;

import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.testing.TestingBlockEncodingManager;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.testing.Assertions;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:com/facebook/presto/operator/TestExchangeClient.class */
public class TestExchangeClient {
    private ScheduledExecutorService executor;

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed("test-%s"));
    }

    @AfterClass
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        DataSize dataSize = new DataSize(10.0d, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(32.0d, DataSize.Unit.MEGABYTE), dataSize, 1, new Duration(1.0d, TimeUnit.MINUTES), new TestingHttpClient(mockExchangeRequestProcessor, this.executor), this.executor);
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(1));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(2));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(3));
        Assert.assertNull(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)));
        Assert.assertEquals(exchangeClient.isClosed(), true);
        if (exchangeClient.getStatus().getBufferedPages() != 0) {
            Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        }
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() == 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "closed", 3, 2, 2, "not scheduled");
    }

    @Test
    public void testAddLocation() throws Exception {
        DataSize dataSize = new DataSize(10.0d, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(32.0d, DataSize.Unit.MEGABYTE), dataSize, 1, new Duration(1.0d, TimeUnit.MINUTES), new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed("test-%s"))), this.executor);
        URI create = URI.create("http://localhost:8081/foo");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        exchangeClient.addLocation(create);
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(1));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(2));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(3));
        Assert.assertNull(exchangeClient.getNextPage(new Duration(10.0d, TimeUnit.MILLISECONDS)));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        URI create2 = URI.create("http://localhost:8082/bar");
        mockExchangeRequestProcessor.addPage(create2, createPage(4));
        mockExchangeRequestProcessor.addPage(create2, createPage(5));
        mockExchangeRequestProcessor.addPage(create2, createPage(6));
        mockExchangeRequestProcessor.setComplete(create2);
        exchangeClient.addLocation(create2);
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(4));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(5));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(6));
        Assert.assertNull(exchangeClient.getNextPage(new Duration(10.0d, TimeUnit.MILLISECONDS)));
        Assert.assertEquals(exchangeClient.isClosed(), false);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), true);
        ImmutableMap uniqueIndex = Maps.uniqueIndex(exchangeClient.getStatus().getPageBufferClientStatuses(), PageBufferClientStatus.uriGetter());
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create), create, "closed", 3, 2, 2, "not scheduled");
        assertStatus((PageBufferClientStatus) uniqueIndex.get(create2), create2, "closed", 3, 2, 2, "not scheduled");
    }

    @Test
    public void testBufferLimit() throws Exception {
        DataSize dataSize = new DataSize(1.0d, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize);
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(1.0d, DataSize.Unit.BYTE), dataSize, 1, new Duration(1.0d, TimeUnit.MINUTES), new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed("test-%s"))), this.executor);
        URI create = URI.create("http://localhost:8080");
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        long nanoTime = System.nanoTime();
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        mockExchangeRequestProcessor.setComplete(create);
        exchangeClient.scheduleRequestIfNecessary();
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 1, 1, 1, "not scheduled");
        assertPageEquals(exchangeClient.getNextPage(new Duration(0.0d, TimeUnit.SECONDS)), createPage(1));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 2, 2, 2, "not scheduled");
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(exchangeClient.getNextPage(new Duration(0.0d, TimeUnit.SECONDS)), createPage(2));
        do {
            Assertions.assertLessThan(Duration.nanosSince(nanoTime), new Duration(5.0d, TimeUnit.SECONDS));
            Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
        } while (exchangeClient.getStatus().getBufferedPages() == 0);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "queued", 3, 3, 3, "not scheduled");
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 1);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() > 0);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(3));
        Assert.assertNull(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)));
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertTrue(exchangeClient.getStatus().getBufferedBytes() == 0);
        Assert.assertEquals(exchangeClient.isClosed(), true);
        assertStatus((PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0), create, "closed", 3, 4, 4, "not scheduled");
    }

    @Test
    public void testClose() throws Exception {
        DataSize dataSize = new DataSize(1.0d, DataSize.Unit.BYTE);
        MockExchangeRequestProcessor mockExchangeRequestProcessor = new MockExchangeRequestProcessor(dataSize);
        URI create = URI.create("http://localhost:8080");
        mockExchangeRequestProcessor.addPage(create, createPage(1));
        mockExchangeRequestProcessor.addPage(create, createPage(2));
        mockExchangeRequestProcessor.addPage(create, createPage(3));
        ExchangeClient exchangeClient = new ExchangeClient(TestingBlockEncodingManager.createTestingBlockEncodingManager(), new DataSize(1.0d, DataSize.Unit.BYTE), dataSize, 1, new Duration(1.0d, TimeUnit.MINUTES), new TestingHttpClient(mockExchangeRequestProcessor, Executors.newCachedThreadPool(Threads.daemonThreadsNamed("test-%s"))), this.executor);
        exchangeClient.addLocation(create);
        exchangeClient.noMoreLocations();
        Assert.assertEquals(exchangeClient.isClosed(), false);
        assertPageEquals(exchangeClient.getNextPage(new Duration(1.0d, TimeUnit.SECONDS)), createPage(1));
        exchangeClient.close();
        Assert.assertEquals(exchangeClient.isClosed(), true);
        Assert.assertNull(exchangeClient.getNextPage(new Duration(0.0d, TimeUnit.SECONDS)));
        Assert.assertEquals(exchangeClient.getStatus().getBufferedPages(), 0);
        Assert.assertEquals(exchangeClient.getStatus().getBufferedBytes(), 0L);
        PageBufferClientStatus pageBufferClientStatus = (PageBufferClientStatus) exchangeClient.getStatus().getPageBufferClientStatuses().get(0);
        Assert.assertEquals(pageBufferClientStatus.getUri(), create);
        Assert.assertEquals(pageBufferClientStatus.getState(), "closed", "status");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), "not scheduled", "httpRequestState");
    }

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongSequenceBlock(0, i)});
    }

    private static void assertPageEquals(Page page, Page page2) {
        Assert.assertNotNull(page);
        Assert.assertEquals(page.getPositionCount(), page2.getPositionCount());
        Assert.assertEquals(page.getChannelCount(), page2.getChannelCount());
    }

    private static void assertStatus(PageBufferClientStatus pageBufferClientStatus, URI uri, String str, int i, int i2, int i3, String str2) {
        Assert.assertEquals(pageBufferClientStatus.getUri(), uri);
        Assert.assertEquals(pageBufferClientStatus.getState(), str, "status");
        Assert.assertEquals(pageBufferClientStatus.getPagesReceived(), i, "pagesReceived");
        Assert.assertEquals(pageBufferClientStatus.getRequestsScheduled(), i2, "requestsScheduled");
        Assert.assertEquals(pageBufferClientStatus.getRequestsCompleted(), i3, "requestsCompleted");
        Assert.assertEquals(pageBufferClientStatus.getHttpRequestState(), str2, "httpRequestState");
    }
}
