/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers;
import com.facebook.presto.PagePartitionFunction;
import com.facebook.presto.Session;
import com.facebook.presto.SessionTestUtils;
import com.facebook.presto.UnpartitionedPagePartitionFunction;
import com.facebook.presto.execution.LocationFactory;
import com.facebook.presto.execution.NodeScheduler;
import com.facebook.presto.execution.NodeSchedulerConfig;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.QueryId;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.RemoteTaskFactory;
import com.facebook.presto.execution.SharedBuffer;
import com.facebook.presto.execution.SqlStageExecution;
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.StageState;
import com.facebook.presto.execution.StateMachine;
import com.facebook.presto.execution.TaskId;
import com.facebook.presto.execution.TaskInfo;
import com.facebook.presto.execution.TaskState;
import com.facebook.presto.execution.TaskStateMachine;
import com.facebook.presto.execution.TestSqlTaskManager;
import com.facebook.presto.execution.TestingSplit;
import com.facebook.presto.metadata.ColumnHandle;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.NodeVersion;
import com.facebook.presto.metadata.PrestoNode;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.metadata.TableHandle;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.ConnectorColumnHandle;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.type.VarcharType;
import com.facebook.presto.split.ConnectorAwareSplitSource;
import com.facebook.presto.sql.planner.PlanFragment;
import com.facebook.presto.sql.planner.StageExecutionPlan;
import com.facebook.presto.sql.planner.Symbol;
import com.facebook.presto.sql.planner.TestingColumnHandle;
import com.facebook.presto.sql.planner.TestingTableHandle;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.TableScanNode;
import com.facebook.presto.util.Failures;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.Threads;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;
import org.joda.time.DateTime;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestSqlStageExecution {
    public static final TaskId OUT = new TaskId("query", "stage", "out");
    private NodeTaskMap nodeTaskMap;
    private InMemoryNodeManager nodeManager;
    private NodeScheduler nodeScheduler;
    private LocationFactory locationFactory;

    @BeforeMethod
    public void setUp() throws Exception {
        this.nodeManager = new InMemoryNodeManager();
        ImmutableList.Builder nodeBuilder = ImmutableList.builder();
        nodeBuilder.add((Object)new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN));
        nodeBuilder.add((Object)new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN));
        nodeBuilder.add((Object)new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN));
        ImmutableList nodes = nodeBuilder.build();
        this.nodeManager.addNode("foo", (Iterable)nodes);
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setIncludeCoordinator(false).setMaxPendingSplitsPerNodePerTask(10);
        this.nodeTaskMap = new NodeTaskMap();
        this.nodeScheduler = new NodeScheduler((NodeManager)this.nodeManager, nodeSchedulerConfig, this.nodeTaskMap);
        this.locationFactory = new TestSqlTaskManager.MockLocationFactory();
    }

    @Test(expectedExceptions={ExecutionException.class}, expectedExceptionsMessageRegExp=".*No nodes available to run query")
    public void testExcludeCoordinator() throws Exception {
        InMemoryNodeManager nodeManager = new InMemoryNodeManager();
        NodeScheduler nodeScheduler = new NodeScheduler((NodeManager)nodeManager, new NodeSchedulerConfig().setIncludeCoordinator(false), this.nodeTaskMap);
        SqlStageExecution sqlStageExecution = this.createSqlStageExecution(nodeScheduler, 2, 20);
        Future future = sqlStageExecution.start();
        future.get(1L, TimeUnit.SECONDS);
    }

    @Test
    public void testSplitAssignment() throws Exception {
        SqlStageExecution sqlStageExecution1 = this.createSqlStageExecution(this.nodeScheduler, 2, 15);
        Future future1 = sqlStageExecution1.start();
        future1.get(1L, TimeUnit.SECONDS);
        for (RemoteTask remoteTask : sqlStageExecution1.getAllTasks()) {
            Assert.assertEquals((int)remoteTask.getPartitionedSplitCount(), (int)5);
        }
        PrestoNode additionalNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN);
        this.nodeManager.addNode("foo", new Node[]{additionalNode});
        SqlStageExecution sqlStageExecution2 = this.createSqlStageExecution(this.nodeScheduler, 5, 5);
        Future future2 = sqlStageExecution2.start();
        future2.get(1L, TimeUnit.SECONDS);
        List tasks2 = sqlStageExecution2.getTasks((Node)additionalNode);
        RemoteTask task = (RemoteTask)Iterables.getFirst((Iterable)tasks2, null);
        Assert.assertNotNull((Object)task);
        Assert.assertEquals((int)task.getPartitionedSplitCount(), (int)5);
    }

    @Test
    public void testSplitAssignmentBatchSizeGreaterThanMaxPending() throws Exception {
        SqlStageExecution sqlStageExecution1 = this.createSqlStageExecution(this.nodeScheduler, 100, 100);
        Future future1 = sqlStageExecution1.start();
        try {
            future1.get(1L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            // empty catch block
        }
        for (RemoteTask task : sqlStageExecution1.getAllTasks()) {
            Assert.assertEquals((int)task.getPartitionedSplitCount(), (int)20);
        }
    }

    private SqlStageExecution createSqlStageExecution(NodeScheduler nodeScheduler, int splitBatchSize, int splitCount) {
        ExecutorService remoteTaskExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"remoteTaskExecutor"));
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor);
        ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"stageExecutor"));
        OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(OUT, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds();
        StageExecutionPlan tableScanPlan = this.createTableScanPlan("test", splitCount);
        return new SqlStageExecution(new QueryId("query"), this.locationFactory, tableScanPlan, nodeScheduler, (RemoteTaskFactory)remoteTaskFactory, SessionTestUtils.TEST_SESSION, splitBatchSize, 8, executor, this.nodeTaskMap, outputBuffers);
    }

    @Test(enabled=false)
    public void testYieldCausesFullSchedule() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"test"));
        SqlStageExecution stageExecution = null;
        try {
            StageExecutionPlan joinPlan = this.createJoinPlan("A");
            InMemoryNodeManager nodeManager = new InMemoryNodeManager();
            nodeManager.addNode("foo", new Node[]{new PrestoNode("other", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN)});
            OutputBuffers outputBuffers = OutputBuffers.INITIAL_EMPTY_OUTPUT_BUFFERS.withBuffer(OUT, (PagePartitionFunction)new UnpartitionedPagePartitionFunction()).withNoMoreBufferIds();
            stageExecution = new SqlStageExecution(new QueryId("query"), (LocationFactory)new TestSqlTaskManager.MockLocationFactory(), joinPlan, new NodeScheduler((NodeManager)nodeManager, new NodeSchedulerConfig(), this.nodeTaskMap), (RemoteTaskFactory)new MockRemoteTaskFactory(executor), SessionTestUtils.TEST_SESSION, 1000, 8, executor, this.nodeTaskMap, outputBuffers);
            Future future = stageExecution.start();
            long start = System.nanoTime();
            while (true) {
                StageInfo stageInfo = stageExecution.getStageInfo();
                Assert.assertEquals((Object)stageInfo.getState(), (Object)StageState.SCHEDULING);
                StageInfo tableScanInfo = (StageInfo)stageInfo.getSubStages().get(0);
                StageState tableScanState = tableScanInfo.getState();
                switch (tableScanState) {
                    case PLANNED: 
                    case SCHEDULING: 
                    case SCHEDULED: {
                        break;
                    }
                    case RUNNING: {
                        Assert.assertEquals((int)stageInfo.getTasks().size(), (int)2);
                        Assert.assertEquals((int)tableScanInfo.getTasks().size(), (int)1);
                        Assert.assertEquals((Object)((TaskInfo)tableScanInfo.getTasks().get(0)).getOutputBuffers().getState(), (Object)SharedBuffer.BufferState.NO_MORE_BUFFERS);
                        return;
                    }
                    case FINISHED: 
                    case CANCELED: 
                    case FAILED: {
                        Assert.fail((String)("Unexpected state for table scan stage " + tableScanState));
                    }
                }
                if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) > 1L) {
                    Assert.fail((String)"Expected test to complete within 1 second");
                }
                try {
                    future.get(50L, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException e) {}
            }
        }
        finally {
            if (stageExecution != null) {
                stageExecution.cancel(false);
            }
            executor.shutdownNow();
        }
    }

    private StageExecutionPlan createJoinPlan(String planId) {
        StageExecutionPlan build = this.createTableScanPlan("build", 1);
        ExchangeNode buildExchange = new ExchangeNode(new PlanNodeId(planId + "-build"), build.getFragment().getId(), (List)ImmutableList.copyOf(build.getFragment().getSymbols().keySet()));
        StageExecutionPlan probe = this.createTableScanPlan("probe", 10);
        ExchangeNode probeExchange = new ExchangeNode(new PlanNodeId(planId + "-probe"), probe.getFragment().getId(), (List)ImmutableList.copyOf(probe.getFragment().getSymbols().keySet()));
        PlanFragment joinPlan = new PlanFragment(new PlanFragmentId(planId), (PlanNode)new JoinNode(new PlanNodeId(planId), JoinNode.Type.INNER, (PlanNode)probeExchange, (PlanNode)buildExchange, (List)ImmutableList.of()), probe.getFragment().getSymbols(), PlanFragment.PlanDistribution.SOURCE, new PlanNodeId(planId), PlanFragment.OutputPartitioning.NONE, (List)ImmutableList.of());
        return new StageExecutionPlan(joinPlan, probe.getDataSource(), (List)ImmutableList.of((Object)probe, (Object)build));
    }

    private StageExecutionPlan createTableScanPlan(String planId, int splitCount) {
        Symbol symbol = new Symbol("column");
        PlanNodeId tableScanNodeId = new PlanNodeId(planId);
        PlanFragment testFragment = new PlanFragment(new PlanFragmentId(planId), (PlanNode)new TableScanNode(tableScanNodeId, new TableHandle("test", (ConnectorTableHandle)new TestingTableHandle()), (List)ImmutableList.of((Object)symbol), (Map)ImmutableMap.of((Object)symbol, (Object)new ColumnHandle("test", (ConnectorColumnHandle)new TestingColumnHandle("column"))), null, Optional.absent()), (Map)ImmutableMap.of((Object)symbol, (Object)VarcharType.VARCHAR), PlanFragment.PlanDistribution.SOURCE, tableScanNodeId, PlanFragment.OutputPartitioning.NONE, (List)ImmutableList.of());
        ImmutableList.Builder splits = ImmutableList.builder();
        for (int i = 0; i < splitCount; ++i) {
            splits.add((Object)new TestingSplit());
        }
        ConnectorAwareSplitSource splitSource = new ConnectorAwareSplitSource("test", (ConnectorSplitSource)new FixedSplitSource(null, (Iterable)splits.build()));
        return new StageExecutionPlan(testFragment, Optional.of((Object)splitSource), (List)ImmutableList.of());
    }

    private static class MockRemoteTaskFactory
    implements RemoteTaskFactory {
        private final Executor executor;

        private MockRemoteTaskFactory(Executor executor) {
            this.executor = executor;
        }

        public RemoteTask createRemoteTask(Session session, TaskId taskId, Node node, PlanFragment fragment, Multimap<PlanNodeId, Split> initialSplits, OutputBuffers outputBuffers) {
            return new MockRemoteTask(taskId, node.getNodeIdentifier(), fragment, this.executor, initialSplits);
        }

        private static class MockRemoteTask
        implements RemoteTask {
            private final AtomicLong nextTaskInfoVersion = new AtomicLong(1L);
            private final URI location;
            private final TaskStateMachine taskStateMachine;
            private final TaskContext taskContext;
            private final SharedBuffer sharedBuffer;
            private final String nodeId;
            private final PlanFragment fragment;
            @GuardedBy(value="this")
            private final Set<PlanNodeId> noMoreSplits = new HashSet<PlanNodeId>();
            @GuardedBy(value="this")
            private final Multimap<PlanNodeId, Split> splits = HashMultimap.create();

            public MockRemoteTask(TaskId taskId, String nodeId, PlanFragment fragment, Executor executor, Multimap<PlanNodeId, Split> initialSplits) {
                this.taskStateMachine = new TaskStateMachine((TaskId)Preconditions.checkNotNull((Object)taskId, (Object)"taskId is null"), (Executor)Preconditions.checkNotNull((Object)executor, (Object)"executor is null"));
                this.taskContext = new TaskContext(this.taskStateMachine, executor, SessionTestUtils.TEST_SESSION, new DataSize(256.0, DataSize.Unit.MEGABYTE), new DataSize(1.0, DataSize.Unit.MEGABYTE), true, true);
                this.location = URI.create("fake://task/" + taskId);
                this.sharedBuffer = new SharedBuffer(taskId, executor, (DataSize)Preconditions.checkNotNull((Object)new DataSize(1.0, DataSize.Unit.BYTE), (Object)"maxBufferSize is null"));
                this.fragment = (PlanFragment)Preconditions.checkNotNull((Object)fragment, (Object)"fragment is null");
                this.nodeId = nodeId;
                this.splits.putAll(initialSplits);
            }

            public String getNodeId() {
                return this.nodeId;
            }

            public TaskInfo getTaskInfo() {
                TaskState state = this.taskStateMachine.getState();
                Object failures = ImmutableList.of();
                if (state == TaskState.FAILED) {
                    failures = Failures.toFailures((Iterable)this.taskStateMachine.getFailureCauses());
                }
                return new TaskInfo(this.taskStateMachine.getTaskId(), this.nextTaskInfoVersion.getAndIncrement(), state, this.location, DateTime.now(), this.sharedBuffer.getInfo(), (Set)ImmutableSet.of(), this.taskContext.getTaskStats(), (List)failures);
            }

            public void finished() {
                this.taskStateMachine.finished();
            }

            public void start() {
            }

            public void addSplits(PlanNodeId sourceId, Iterable<Split> splits) {
                Preconditions.checkNotNull(splits, (Object)"splits is null");
                for (Split split : splits) {
                    this.splits.put((Object)sourceId, (Object)split);
                }
            }

            public void noMoreSplits(PlanNodeId sourceId) {
                this.noMoreSplits.add(sourceId);
            }

            public void setOutputBuffers(OutputBuffers outputBuffers) {
                this.sharedBuffer.setOutputBuffers(outputBuffers);
            }

            public void addStateChangeListener(final StateMachine.StateChangeListener<TaskInfo> stateChangeListener) {
                this.taskStateMachine.addStateChangeListener((StateMachine.StateChangeListener)new StateMachine.StateChangeListener<TaskState>(){

                    public void stateChanged(TaskState newValue) {
                        stateChangeListener.stateChanged((Object)MockRemoteTask.this.getTaskInfo());
                    }
                });
            }

            public void cancel() {
                this.taskStateMachine.cancel();
            }

            public Duration waitForTaskToFinish(Duration maxWait) throws InterruptedException {
                while (true) {
                    TaskState currentState = this.taskStateMachine.getState();
                    if (maxWait.toMillis() <= 1L || currentState.isDone()) {
                        return maxWait;
                    }
                    maxWait = this.taskStateMachine.waitForStateChange(currentState, maxWait);
                }
            }

            public int getPartitionedSplitCount() {
                if (this.taskStateMachine.getState().isDone()) {
                    return 0;
                }
                return this.splits.size();
            }

            public int getQueuedPartitionedSplitCount() {
                return 0;
            }
        }
    }
}

