/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.operator;

import com.facebook.presto.operator.HttpPageBufferClient;
import com.facebook.presto.operator.MockExchangeRequestProcessor;
import com.facebook.presto.operator.PageBufferClientStatus;
import com.facebook.presto.operator.PageTooLargeException;
import com.facebook.presto.operator.PageTransportErrorException;
import com.facebook.presto.operator.PageTransportTimeoutException;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.testing.TestingBlockEncodingManager;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.airlift.concurrent.Threads;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.HttpStatus;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.testing.TestingHttpClient;
import io.airlift.http.client.testing.TestingResponse;
import io.airlift.testing.Assertions;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestHttpPageBufferClient {
    private ScheduledExecutorService executor;

    @BeforeClass
    public void setUp() {
        this.executor = Executors.newScheduledThreadPool(4, Threads.daemonThreadsNamed((String)"test-%s"));
    }

    @AfterClass
    public void tearDown() {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    @Test
    public void testHappyPath() throws Exception {
        Page expectedPage = new Page(100, new Block[0]);
        DataSize expectedMaxSize = new DataSize(11.0, DataSize.Unit.MEGABYTE);
        MockExchangeRequestProcessor processor = new MockExchangeRequestProcessor(expectedMaxSize);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient((HttpClient)new TestingHttpClient((Function)processor, (ExecutorService)this.executor), expectedMaxSize, new Duration(1.0, TimeUnit.MINUTES), location, (HttpPageBufferClient.ClientCallback)callback, TestingBlockEncodingManager.createTestingBlockEncodingManager(), this.executor, Stopwatch.createUnstarted());
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)1);
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 1, 1, 1, 0, "not scheduled");
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 1, 2, 2, 0, "not scheduled");
        processor.addPage(location, expectedPage);
        processor.addPage(location, expectedPage);
        callback.resetStats();
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)2);
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(0));
        TestHttpPageBufferClient.assertPageEquals(expectedPage, callback.getPages().get(1));
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        callback.resetStats();
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 3, 3, 3, 0, "not scheduled");
        callback.resetStats();
        processor.setComplete(location);
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)0);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)1);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 3, 4, 4, 0, "not scheduled");
    }

    @Test
    public void testLifecycle() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient((HttpClient)new TestingHttpClient((Function)processor, (ExecutorService)this.executor), new DataSize(10.0, DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), location, (HttpPageBufferClient.ClientCallback)callback, TestingBlockEncodingManager.createTestingBlockEncodingManager(), this.executor, Stopwatch.createUnstarted());
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        beforeRequest.await(1L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assert.assertEquals((boolean)client.isRunning(), (boolean)true);
        afterRequest.await(1L, TimeUnit.SECONDS);
        requestComplete.await(1L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        client.close();
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 1, 1, 1, "not scheduled");
    }

    @Test
    public void testInvalidResponses() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(1);
        CyclicBarrier afterRequest = new CyclicBarrier(1);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient((HttpClient)new TestingHttpClient((Function)processor, (ExecutorService)this.executor), new DataSize(10.0, DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), location, (HttpPageBufferClient.ClientCallback)callback, TestingBlockEncodingManager.createTestingBlockEncodingManager(), this.executor, Stopwatch.createUnstarted());
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        processor.setResponse((Response)new TestingResponse(HttpStatus.NOT_FOUND, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"application/X-presto-pages"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected response code to be 200, but was 404 Not Found");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"INVALID_TYPE"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected application/x-presto-pages response from server but got INVALID_TYPE");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        callback.resetStats();
        processor.setResponse((Response)new TestingResponse(HttpStatus.OK, (ListMultimap)ImmutableListMultimap.of((Object)"Content-Type", (Object)"text/plain"), new byte[0]));
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportErrorException.class);
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Expected application/x-presto-pages response from server but got text/plain");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
        client.close();
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 3, 3, 3, "not scheduled");
    }

    @Test
    public void testCloseDuringPendingRequest() throws Exception {
        CyclicBarrier beforeRequest = new CyclicBarrier(2);
        CyclicBarrier afterRequest = new CyclicBarrier(2);
        StaticRequestProcessor processor = new StaticRequestProcessor(beforeRequest, afterRequest);
        processor.setResponse((Response)new TestingResponse(HttpStatus.NO_CONTENT, (ListMultimap)ImmutableListMultimap.of(), new byte[0]));
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient((HttpClient)new TestingHttpClient((Function)processor, (ExecutorService)this.executor), new DataSize(10.0, DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), location, (HttpPageBufferClient.ClientCallback)callback, TestingBlockEncodingManager.createTestingBlockEncodingManager(), this.executor, Stopwatch.createUnstarted());
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        beforeRequest.await(1L, TimeUnit.SECONDS);
        TestHttpPageBufferClient.assertStatus(client, location, "running", 0, 1, 0, 0, "PROCESSING_REQUEST");
        Assert.assertEquals((boolean)client.isRunning(), (boolean)true);
        client.close();
        try {
            requestComplete.await(1L, TimeUnit.SECONDS);
        }
        catch (BrokenBarrierException ignored) {
            // empty catch block
        }
        TestHttpPageBufferClient.assertStatus(client, location, "closed", 0, 1, 1, 1, "not scheduled");
    }

    @Test
    public void testExceptionFromResponseHandler() throws Exception {
        TestingTicker ticker = new TestingTicker();
        AtomicReference<Duration> tickerIncrement = new AtomicReference<Duration>(new Duration(0.0, TimeUnit.SECONDS));
        Function processor = input -> {
            Duration delta = (Duration)tickerIncrement.get();
            ticker.increment(delta.toMillis(), TimeUnit.MILLISECONDS);
            throw new RuntimeException("Foo");
        };
        CyclicBarrier requestComplete = new CyclicBarrier(2);
        TestingClientCallback callback = new TestingClientCallback(requestComplete);
        URI location = URI.create("http://localhost:8080");
        HttpPageBufferClient client = new HttpPageBufferClient((HttpClient)new TestingHttpClient(processor, (ExecutorService)this.executor), new DataSize(10.0, DataSize.Unit.MEGABYTE), new Duration(1.0, TimeUnit.MINUTES), location, (HttpPageBufferClient.ClientCallback)callback, TestingBlockEncodingManager.createTestingBlockEncodingManager(), this.executor, Stopwatch.createUnstarted((Ticker)ticker));
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 0, 0, 0, "not scheduled");
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)1);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 1, 1, 1, "not scheduled");
        tickerIncrement.set(new Duration(30.0, TimeUnit.SECONDS));
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)2);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)0);
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 2, 2, 2, "not scheduled");
        tickerIncrement.set(new Duration(31.0, TimeUnit.SECONDS));
        client.scheduleRequest();
        requestComplete.await(1L, TimeUnit.SECONDS);
        Assert.assertEquals((int)callback.getPages().size(), (int)0);
        Assert.assertEquals((int)callback.getCompletedRequests(), (int)3);
        Assert.assertEquals((int)callback.getFinishedBuffers(), (int)0);
        Assert.assertEquals((int)callback.getFailedBuffers(), (int)1);
        Assertions.assertInstanceOf((Object)callback.getFailure(), PageTransportTimeoutException.class);
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"");
        Assertions.assertContains((String)callback.getFailure().getMessage(), (String)"Requests to http://localhost:8080/0 failed for 61000.00ms");
        TestHttpPageBufferClient.assertStatus(client, location, "queued", 0, 3, 3, 3, "not scheduled");
    }

    @Test
    public void testErrorCodes() throws Exception {
        Assert.assertEquals((Object)new PageTooLargeException().getErrorCode(), (Object)StandardErrorCode.PAGE_TOO_LARGE.toErrorCode());
        Assert.assertEquals((Object)new PageTransportErrorException("").getErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_ERROR.toErrorCode());
        Assert.assertEquals((Object)new PageTransportTimeoutException("", null).getErrorCode(), (Object)StandardErrorCode.PAGE_TRANSPORT_TIMEOUT.toErrorCode());
    }

    private static void assertStatus(HttpPageBufferClient client, URI location, String status, int pagesReceived, int requestsScheduled, int requestsCompleted, int requestsFailed, String httpRequestState) {
        PageBufferClientStatus actualStatus = client.getStatus();
        Assert.assertEquals((Object)actualStatus.getUri(), (Object)location);
        Assert.assertEquals((String)actualStatus.getState(), (String)status, (String)"status");
        Assert.assertEquals((int)actualStatus.getPagesReceived(), (int)pagesReceived, (String)"pagesReceived");
        Assert.assertEquals((int)actualStatus.getRequestsScheduled(), (int)requestsScheduled, (String)"requestsScheduled");
        Assert.assertEquals((int)actualStatus.getRequestsCompleted(), (int)requestsCompleted, (String)"requestsCompleted");
        Assert.assertEquals((int)actualStatus.getRequestsFailed(), (int)requestsFailed, (String)"requestsFailed");
        Assert.assertEquals((String)actualStatus.getHttpRequestState(), (String)httpRequestState, (String)"httpRequestState");
    }

    private static void assertPageEquals(Page expectedPage, Page actualPage) {
        Assert.assertEquals((int)actualPage.getPositionCount(), (int)expectedPage.getPositionCount());
        Assert.assertEquals((int)actualPage.getChannelCount(), (int)expectedPage.getChannelCount());
    }

    private static class StaticRequestProcessor
    implements Function<Request, Response> {
        private final AtomicReference<Response> response = new AtomicReference();
        private final CyclicBarrier beforeRequest;
        private final CyclicBarrier afterRequest;

        private StaticRequestProcessor(CyclicBarrier beforeRequest, CyclicBarrier afterRequest) {
            this.beforeRequest = beforeRequest;
            this.afterRequest = afterRequest;
        }

        private void setResponse(Response response) {
            this.response.set(response);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Response apply(@Nullable Request request) {
            try {
                this.beforeRequest.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate((Throwable)e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate((Throwable)e);
            }
            try {
                Response response = this.response.get();
                return response;
            }
            finally {
                try {
                    this.afterRequest.await(1L, TimeUnit.SECONDS);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw Throwables.propagate((Throwable)e);
                }
                catch (BrokenBarrierException | TimeoutException e) {
                    throw Throwables.propagate((Throwable)e);
                }
            }
        }
    }

    private static class TestingClientCallback
    implements HttpPageBufferClient.ClientCallback {
        private final CyclicBarrier done;
        private final List<Page> pages = Collections.synchronizedList(new ArrayList());
        private final AtomicInteger completedRequests = new AtomicInteger();
        private final AtomicInteger finishedBuffers = new AtomicInteger();
        private final AtomicInteger failedBuffers = new AtomicInteger();
        private final AtomicReference<Throwable> failure = new AtomicReference();

        public TestingClientCallback(CyclicBarrier done) {
            this.done = done;
        }

        public List<Page> getPages() {
            return this.pages;
        }

        private int getCompletedRequests() {
            return this.completedRequests.get();
        }

        private int getFinishedBuffers() {
            return this.finishedBuffers.get();
        }

        public int getFailedBuffers() {
            return this.failedBuffers.get();
        }

        public Throwable getFailure() {
            return this.failure.get();
        }

        public void addPage(HttpPageBufferClient client, Page page) {
            this.pages.add(page);
        }

        public void requestComplete(HttpPageBufferClient client) {
            this.completedRequests.getAndIncrement();
            try {
                this.done.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate((Throwable)e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        public void clientFinished(HttpPageBufferClient client) {
            this.finishedBuffers.getAndIncrement();
            try {
                this.done.await(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw Throwables.propagate((Throwable)e);
            }
            catch (BrokenBarrierException | TimeoutException e) {
                throw Throwables.propagate((Throwable)e);
            }
        }

        public void clientFailed(HttpPageBufferClient client, Throwable cause) {
            this.failedBuffers.getAndIncrement();
            this.failure.compareAndSet(null, cause);
        }

        public void resetStats() {
            this.pages.clear();
            this.completedRequests.set(0);
            this.finishedBuffers.set(0);
            this.failedBuffers.set(0);
            this.failure.set(null);
        }
    }
}

