/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.PagePartitionFunction;
import com.facebook.presto.UnpartitionedPagePartitionFunction;
import com.facebook.presto.block.BlockAssertions;
import com.facebook.presto.execution.BufferInfo;
import com.facebook.presto.execution.BufferResult;
import com.facebook.presto.execution.SharedBuffer;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.operator.PageAssertions;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class TestSharedBuffer {
    private static final Duration NO_WAIT = new Duration(0.0, TimeUnit.MILLISECONDS);
    private static final Duration MAX_WAIT = new Duration(1.0, TimeUnit.SECONDS);
    private static final DataSize PAGE_SIZE = new DataSize((double)TestSharedBuffer.createPage(42).getSizeInBytes(), DataSize.Unit.BYTE);
    private static final TaskId TASK_ID = new TaskId("query", "stage", "task");
    private static final ImmutableList<BigintType> TYPES = ImmutableList.of((Object)BigintType.BIGINT);
    public static final TaskId FIRST = new TaskId("query", "stage", "first_task");
    public static final TaskId SECOND = new TaskId("query", "stage", "second_task");
    public static final TaskId QUEUE = new TaskId("query", "stage", "queue");
    public static final TaskId FOO = new TaskId("foo", "bar", "baz");
    private ScheduledExecutorService stateNotificationExecutor;

    private static Page createPage(int i) {
        return new Page(new Block[]{BlockAssertions.createLongsBlock(i)});
    }

    public static DataSize sizeOfPages(int count) {
        return new DataSize((double)(PAGE_SIZE.toBytes() * (long)count), DataSize.Unit.BYTE);
    }

    @BeforeClass
    public void setUp() throws Exception {
        this.stateNotificationExecutor = Executors.newScheduledThreadPool(5, Threads.daemonThreadsNamed((String)"test-%s"));
    }

    @AfterClass
    public void tearDown() throws Exception {
        if (this.stateNotificationExecutor != null) {
            this.stateNotificationExecutor.shutdownNow();
            this.stateNotificationExecutor = null;
        }
    }

    @Test
    public void testInvalidConstructorArg() throws Exception {
        try {
            new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, new DataSize(0.0, DataSize.Unit.BYTE));
            Assert.fail((String)"Expected IllegalStateException");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testSimple() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        for (int i = 0; i < 3; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 3, 0);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), TestSharedBuffer.createPage(1), TestSharedBuffer.createPage(2)));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 3, 0);
        sharedBuffer.get(FIRST, 3L, TestSharedBuffer.sizeOfPages(10)).cancel(true);
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 0, 3);
        for (int i = 3; i < 10; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 7, 3);
        ListenableFuture<?> future = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(10));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 3L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), TestSharedBuffer.bufferResult(3L, TestSharedBuffer.createPage(3), new Page[0]));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 8, 3);
        Assert.assertFalse((boolean)future.isDone());
        outputBuffers = outputBuffers.withBuffer(SECOND, (PagePartitionFunction)new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 11, 0);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), TestSharedBuffer.createPage(1), TestSharedBuffer.createPage(2), TestSharedBuffer.createPage(3), TestSharedBuffer.createPage(4), TestSharedBuffer.createPage(5), TestSharedBuffer.createPage(6), TestSharedBuffer.createPage(7), TestSharedBuffer.createPage(8), TestSharedBuffer.createPage(9)));
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 11, 0);
        sharedBuffer.get(SECOND, 10L, TestSharedBuffer.sizeOfPages(10)).cancel(true);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 1, 10);
        outputBuffers = outputBuffers.withNoMoreBufferIds();
        sharedBuffer.setOutputBuffers(outputBuffers);
        future.get(1L, TimeUnit.SECONDS);
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(11));
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(12));
        future = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(13));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 11, 3);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 4L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), TestSharedBuffer.bufferResult(4L, TestSharedBuffer.createPage(4), new Page[0]));
        future.get(1L, TimeUnit.SECONDS);
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 10, 4);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        sharedBuffer.setNoMorePages();
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 10, 4);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 5L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), TestSharedBuffer.bufferResult(5L, TestSharedBuffer.createPage(5), new Page[0]));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 9, 5);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        BufferResult x = TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 6L, TestSharedBuffer.sizeOfPages(10), NO_WAIT);
        TestSharedBuffer.assertBufferResultEquals(TYPES, x, TestSharedBuffer.bufferResult(6L, TestSharedBuffer.createPage(6), TestSharedBuffer.createPage(7), TestSharedBuffer.createPage(8), TestSharedBuffer.createPage(9), TestSharedBuffer.createPage(10), TestSharedBuffer.createPage(11), TestSharedBuffer.createPage(12), TestSharedBuffer.createPage(13)));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 8, 6);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 14L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)14L, (boolean)true));
        TestSharedBuffer.assertQueueClosed(sharedBuffer, FIRST, 14);
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 10L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), TestSharedBuffer.bufferResult(10L, TestSharedBuffer.createPage(10), TestSharedBuffer.createPage(11), TestSharedBuffer.createPage(12), TestSharedBuffer.createPage(13)));
        TestSharedBuffer.assertQueueState(sharedBuffer, SECOND, 4, 10);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 14L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)14L, (boolean)true));
        TestSharedBuffer.assertQueueClosed(sharedBuffer, FIRST, 14);
        TestSharedBuffer.assertQueueClosed(sharedBuffer, SECOND, 14);
        TestSharedBuffer.assertFinished(sharedBuffer);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 14L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)14L, (boolean)true));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 14L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)14L, (boolean)true));
    }

    public static BufferResult getBufferResult(SharedBuffer sharedBuffer, TaskId outputId, long sequenceId, DataSize maxSize, Duration maxWait) {
        ListenableFuture future = sharedBuffer.get(outputId, sequenceId, maxSize);
        return TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, maxWait);
    }

    public static BufferResult getFuture(ListenableFuture<BufferResult> future, Duration maxWait) {
        try {
            return (BufferResult)future.get(maxWait.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            throw Throwables.propagate((Throwable)e);
        }
        catch (ExecutionException e) {
            throw Throwables.propagate((Throwable)e.getCause());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw Throwables.propagate((Throwable)e);
        }
    }

    @Test
    public void testDuplicateRequests() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        for (int i = 0; i < 3; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        outputBuffers = outputBuffers.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 3, 0);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), TestSharedBuffer.createPage(1), TestSharedBuffer.createPage(2)));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 3, 0);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), TestSharedBuffer.createPage(1), TestSharedBuffer.createPage(2)));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 3, 0);
        sharedBuffer.get(FIRST, 3L, TestSharedBuffer.sizeOfPages(10)).cancel(true);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)0L, (boolean)false));
        TestSharedBuffer.assertQueueState(sharedBuffer, FIRST, 0, 3);
    }

    @Test
    public void testAddQueueAfterNoMoreQueues() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        try {
            OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FOO, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds();
            sharedBuffer.setOutputBuffers(outputBuffers);
            Assert.fail((String)"Expected IllegalStateException from addQueue after noMoreQueues has been called");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testAddQueueAfterDestroy() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        sharedBuffer.destroy();
        TestSharedBuffer.assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        TestSharedBuffer.assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withNoMoreBufferIds());
        TestSharedBuffer.assertFinished(sharedBuffer);
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FOO, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
    }

    @Test
    public void testGetBeforeCreate() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        ListenableFuture future = sharedBuffer.get(FIRST, 0L, TestSharedBuffer.sizeOfPages(1));
        Assert.assertFalse((boolean)future.isDone());
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(33));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(33), new Page[0]));
    }

    @Test
    public void testAbortBeforeCreate() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        ListenableFuture future = sharedBuffer.get(FIRST, 0L, TestSharedBuffer.sizeOfPages(1));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.abort(FIRST);
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(33));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), BufferResult.emptyResults((long)0L, (boolean)true));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(10), NO_WAIT), BufferResult.emptyResults((long)0L, (boolean)true));
    }

    @Test
    public void testAddStateMachine() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        sharedBuffer.setNoMorePages();
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        Assert.assertEquals((long)sharedBuffer.getInfo().getPagesAdded(), (long)0L);
        sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        sharedBuffer.destroy();
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        Assert.assertEquals((long)sharedBuffer.getInfo().getPagesAdded(), (long)0L);
    }

    @Test
    public void testAbort() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        for (int i = 0; i < 10; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        sharedBuffer.setNoMorePages();
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS;
        outputBuffers = outputBuffers.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction());
        sharedBuffer.setOutputBuffers(outputBuffers);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 0L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        sharedBuffer.abort(FIRST);
        TestSharedBuffer.assertQueueClosed(sharedBuffer, FIRST, 0);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, FIRST, 1L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), BufferResult.emptyResults((long)1L, (boolean)true));
        outputBuffers = outputBuffers.withBuffer(SECOND, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds();
        sharedBuffer.setOutputBuffers(outputBuffers);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 0L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        sharedBuffer.abort(SECOND);
        TestSharedBuffer.assertQueueClosed(sharedBuffer, SECOND, 0);
        TestSharedBuffer.assertFinished(sharedBuffer);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, SECOND, 1L, TestSharedBuffer.sizeOfPages(1), NO_WAIT), BufferResult.emptyResults((long)0L, (boolean)true));
    }

    @Test
    public void testFinishClosesEmptyQueues() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(10));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(FIRST, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withBuffer(SECOND, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()));
        sharedBuffer.setNoMorePages();
        TestSharedBuffer.assertQueueClosed(sharedBuffer, FIRST, 0);
        TestSharedBuffer.assertQueueClosed(sharedBuffer, SECOND, 0);
    }

    @Test
    public void testAbortFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        ListenableFuture future = sharedBuffer.get(QUEUE, 0L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        future = sharedBuffer.get(QUEUE, 1L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.abort(QUEUE);
        TestSharedBuffer.assertQueueClosed(sharedBuffer, QUEUE, 1);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), BufferResult.emptyResults((long)1L, (boolean)true));
    }

    @Test
    public void testFinishFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()));
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        ListenableFuture future = sharedBuffer.get(QUEUE, 0L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        future = sharedBuffer.get(QUEUE, 1L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.setNoMorePages();
        TestSharedBuffer.assertQueueClosed(sharedBuffer, QUEUE, 1);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), BufferResult.emptyResults((long)1L, (boolean)true));
    }

    @Test
    public void testFinishFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        for (int i = 0; i < 5; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        ListenableFuture<?> firstEnqueuePage = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(5));
        ListenableFuture<?> secondEnqueuePage = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(6));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, QUEUE, 0L, TestSharedBuffer.sizeOfPages(1), MAX_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        sharedBuffer.get(QUEUE, 1L, TestSharedBuffer.sizeOfPages(1)).cancel(true);
        Assert.assertTrue((boolean)firstEnqueuePage.isDone());
        Assert.assertFalse((boolean)secondEnqueuePage.isDone());
        sharedBuffer.setNoMorePages();
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        Assert.assertTrue((boolean)secondEnqueuePage.isDone());
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, QUEUE, 1L, TestSharedBuffer.sizeOfPages(100), NO_WAIT), TestSharedBuffer.bufferResult(1L, TestSharedBuffer.createPage(1), TestSharedBuffer.createPage(2), TestSharedBuffer.createPage(3), TestSharedBuffer.createPage(4), TestSharedBuffer.createPage(5)));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, QUEUE, 6L, TestSharedBuffer.sizeOfPages(100), NO_WAIT), BufferResult.emptyResults((long)6L, (boolean)true));
        TestSharedBuffer.assertFinished(sharedBuffer);
    }

    @Test
    public void testDestroyFreesReader() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        ListenableFuture future = sharedBuffer.get(QUEUE, 0L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(0));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        future = sharedBuffer.get(QUEUE, 1L, TestSharedBuffer.sizeOfPages(10));
        Assert.assertFalse((boolean)future.isDone());
        sharedBuffer.destroy();
        TestSharedBuffer.assertQueueClosed(sharedBuffer, QUEUE, 1);
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getFuture((ListenableFuture<BufferResult>)future, NO_WAIT), BufferResult.emptyResults((long)1L, (boolean)true));
    }

    @Test
    public void testDestroyFreesWriter() throws Exception {
        SharedBuffer sharedBuffer = new SharedBuffer(TASK_ID, (Executor)this.stateNotificationExecutor, TestSharedBuffer.sizeOfPages(5));
        sharedBuffer.setOutputBuffers(OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(QUEUE, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds());
        Assert.assertFalse((boolean)sharedBuffer.isFinished());
        for (int i = 0; i < 5; ++i) {
            TestSharedBuffer.addPage(sharedBuffer, TestSharedBuffer.createPage(i));
        }
        ListenableFuture<?> firstEnqueuePage = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(5));
        ListenableFuture<?> secondEnqueuePage = TestSharedBuffer.enqueuePage(sharedBuffer, TestSharedBuffer.createPage(6));
        TestSharedBuffer.assertBufferResultEquals(TYPES, TestSharedBuffer.getBufferResult(sharedBuffer, QUEUE, 0L, TestSharedBuffer.sizeOfPages(1), MAX_WAIT), TestSharedBuffer.bufferResult(0L, TestSharedBuffer.createPage(0), new Page[0]));
        sharedBuffer.get(QUEUE, 1L, TestSharedBuffer.sizeOfPages(1)).cancel(true);
        Assert.assertTrue((boolean)firstEnqueuePage.isDone());
        Assert.assertFalse((boolean)secondEnqueuePage.isDone());
        sharedBuffer.destroy();
        TestSharedBuffer.assertFinished(sharedBuffer);
        Assert.assertTrue((boolean)secondEnqueuePage.isDone());
    }

    private static ListenableFuture<?> enqueuePage(SharedBuffer sharedBuffer, Page page) {
        ListenableFuture future = sharedBuffer.enqueue(page);
        Assert.assertFalse((boolean)future.isDone());
        return future;
    }

    private static void addPage(SharedBuffer sharedBuffer, Page page) {
        Assert.assertTrue((boolean)sharedBuffer.enqueue(page).isDone());
    }

    private static void assertQueueState(SharedBuffer sharedBuffer, TaskId queueId, int size, int pagesSent) {
        Assert.assertEquals((Object)TestSharedBuffer.getBufferInfo(sharedBuffer, queueId), (Object)new BufferInfo(queueId, false, size, (long)pagesSent));
    }

    private static void assertQueueClosed(SharedBuffer sharedBuffer, TaskId queueId, int pagesSent) {
        Assert.assertEquals((Object)TestSharedBuffer.getBufferInfo(sharedBuffer, queueId), (Object)new BufferInfo(queueId, true, 0, (long)pagesSent));
    }

    private static BufferInfo getBufferInfo(SharedBuffer sharedBuffer, TaskId queueId) {
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            if (!bufferInfo.getBufferId().equals((Object)queueId)) continue;
            return bufferInfo;
        }
        return null;
    }

    private static void assertFinished(SharedBuffer sharedBuffer) throws Exception {
        Assert.assertTrue((boolean)sharedBuffer.isFinished());
        for (BufferInfo bufferInfo : sharedBuffer.getInfo().getBuffers()) {
            Assert.assertTrue((boolean)bufferInfo.isFinished());
            Assert.assertEquals((int)bufferInfo.getBufferedPages(), (int)0);
        }
    }

    private static void assertBufferResultEquals(List<? extends Type> types, BufferResult actual, BufferResult expected) {
        Assert.assertEquals((int)actual.getPages().size(), (int)expected.getPages().size());
        Assert.assertEquals((long)actual.getToken(), (long)expected.getToken());
        for (int i = 0; i < actual.getPages().size(); ++i) {
            Page actualPage = (Page)actual.getPages().get(i);
            Page expectedPage = (Page)expected.getPages().get(i);
            Assert.assertEquals((int)actualPage.getChannelCount(), (int)expectedPage.getChannelCount());
            PageAssertions.assertPageEquals(types, actualPage, expectedPage);
        }
        Assert.assertEquals((boolean)actual.isBufferClosed(), (boolean)expected.isBufferClosed());
    }

    public static BufferResult bufferResult(long token, Page firstPage, Page ... otherPages) {
        ImmutableList pages = ImmutableList.builder().add((Object)firstPage).add((Object[])otherPages).build();
        return new BufferResult(token, token + (long)pages.size(), false, (List)pages);
    }
}

