/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.execution;

import com.facebook.presto.execution.MockRemoteTaskFactory;
import com.facebook.presto.execution.NodeScheduler;
import com.facebook.presto.execution.NodeSchedulerConfig;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.NodeVersion;
import com.facebook.presto.metadata.PrestoNode;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.NodeManager;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import io.airlift.concurrent.Threads;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestNodeScheduler {
    private NodeTaskMap nodeTaskMap;
    private InMemoryNodeManager nodeManager;
    private NodeScheduler.NodeSelector nodeSelector;
    private Map<Node, RemoteTask> taskMap;
    private ExecutorService remoteTaskExecutor;

    @BeforeMethod
    public void setUp() throws Exception {
        this.nodeTaskMap = new NodeTaskMap();
        this.nodeManager = new InMemoryNodeManager();
        ImmutableList.Builder nodeBuilder = ImmutableList.builder();
        nodeBuilder.add((Object)new PrestoNode("other1", URI.create("http://127.0.0.1:11"), NodeVersion.UNKNOWN));
        nodeBuilder.add((Object)new PrestoNode("other2", URI.create("http://127.0.0.1:12"), NodeVersion.UNKNOWN));
        nodeBuilder.add((Object)new PrestoNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN));
        ImmutableList nodes = nodeBuilder.build();
        this.nodeManager.addNode("foo", (Iterable)nodes);
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setIncludeCoordinator(false).setMaxPendingSplitsPerNodePerTask(10);
        NodeScheduler nodeScheduler = new NodeScheduler((NodeManager)this.nodeManager, nodeSchedulerConfig, this.nodeTaskMap);
        this.taskMap = new HashMap<Node, RemoteTask>();
        this.nodeSelector = nodeScheduler.createNodeSelector("foo");
        this.remoteTaskExecutor = Executors.newCachedThreadPool(Threads.daemonThreadsNamed((String)"remoteTaskExecutor"));
    }

    @AfterMethod
    public void tearDown() throws Exception {
        this.remoteTaskExecutor.shutdown();
    }

    @Test
    public void testLocationAwareSchedulingDisabledScheduleLocal() throws Exception {
        NodeSchedulerConfig config = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setIncludeCoordinator(false).setLocationAwareSchedulingEnabled(false).setMaxPendingSplitsPerNodePerTask(10);
        NodeScheduler scheduler = new NodeScheduler((NodeManager)this.nodeManager, config, this.nodeTaskMap);
        NodeScheduler.NodeSelector selector = scheduler.createNodeSelector("foo");
        Split split = new Split("foo", (ConnectorSplit)new TestSplitLocal());
        ImmutableSet splits = ImmutableSet.of((Object)split);
        Map.Entry assignment = (Map.Entry)Iterables.getOnlyElement((Iterable)selector.computeAssignments((Set)splits, this.taskMap.values()).entries());
        Assert.assertEquals((Object)((Node)assignment.getKey()).getHostAndPort(), split.getAddresses().get(0));
        Assert.assertEquals(assignment.getValue(), (Object)split);
    }

    @Test
    public void testScheduleLocal() throws Exception {
        Split split = new Split("foo", (ConnectorSplit)new TestSplitLocal());
        ImmutableSet splits = ImmutableSet.of((Object)split);
        Map.Entry assignment = (Map.Entry)Iterables.getOnlyElement((Iterable)this.nodeSelector.computeAssignments((Set)splits, this.taskMap.values()).entries());
        Assert.assertEquals((Object)((Node)assignment.getKey()).getHostAndPort(), split.getAddresses().get(0));
        Assert.assertEquals(assignment.getValue(), (Object)split);
    }

    @Test
    public void testMultipleTasksPerNode() {
        NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig().setMaxSplitsPerNode(20).setIncludeCoordinator(false).setMaxPendingSplitsPerNodePerTask(10);
        NodeScheduler nodeScheduler = new NodeScheduler((NodeManager)this.nodeManager, nodeSchedulerConfig, this.nodeTaskMap);
        NodeScheduler.NodeSelector nodeSelector = nodeScheduler.createNodeSelector("foo");
        List nodes = nodeSelector.selectRandomNodes(10);
        Assert.assertEquals((int)nodes.size(), (int)3);
        nodeSchedulerConfig.setMultipleTasksPerNodeEnabled(true);
        nodeScheduler = new NodeScheduler((NodeManager)this.nodeManager, nodeSchedulerConfig, this.nodeTaskMap);
        nodeSelector = nodeScheduler.createNodeSelector("foo");
        nodes = nodeSelector.selectRandomNodes(9);
        Assert.assertEquals((int)nodes.size(), (int)9);
        HashMap<String, Integer> counts = new HashMap<String, Integer>();
        for (Node node : nodes) {
            Integer value = (Integer)counts.get(node.getNodeIdentifier());
            counts.put(node.getNodeIdentifier(), (value == null ? 0 : value) + 1);
        }
        Assert.assertEquals((int)((Integer)counts.get("other1")), (int)3);
        Assert.assertEquals((int)((Integer)counts.get("other2")), (int)3);
        Assert.assertEquals((int)((Integer)counts.get("other3")), (int)3);
    }

    @Test
    public void testScheduleRemote() throws Exception {
        HashSet<Split> splits = new HashSet<Split>();
        splits.add(new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        Multimap assignments = this.nodeSelector.computeAssignments(splits, this.taskMap.values());
        Assert.assertEquals((int)assignments.size(), (int)1);
    }

    @Test
    public void testBasicAssignment() throws Exception {
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < 3; ++i) {
            splits.add(new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, this.taskMap.values());
        Assert.assertEquals((int)assignments.entries().size(), (int)3);
        for (Node node : this.nodeManager.getActiveDatasourceNodes("foo")) {
            Assert.assertTrue((boolean)assignments.keySet().contains(node));
        }
    }

    @Test
    public void testMaxSplitsPerNode() throws Exception {
        PrestoNode newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN);
        this.nodeManager.addNode("foo", new Node[]{newNode});
        ImmutableList.Builder initialSplits = ImmutableList.builder();
        for (int i = 0; i < 10; ++i) {
            initialSplits.add((Object)new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        }
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor);
        RemoteTask remoteTask1 = remoteTaskFactory.createTableScanTask((Node)newNode, (List<Split>)initialSplits.build());
        this.nodeTaskMap.addTask((Node)newNode, remoteTask1);
        RemoteTask remoteTask2 = remoteTaskFactory.createTableScanTask((Node)newNode, (List<Split>)initialSplits.build());
        this.nodeTaskMap.addTask((Node)newNode, remoteTask2);
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < 5; ++i) {
            splits.add(new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, this.taskMap.values());
        Assert.assertFalse((boolean)assignments.keySet().contains(newNode));
    }

    @Test
    public void testMaxSplitsPerNodePerTask() throws Exception {
        PrestoNode newNode = new PrestoNode("other4", URI.create("http://127.0.0.1:14"), NodeVersion.UNKNOWN);
        this.nodeManager.addNode("foo", new Node[]{newNode});
        ImmutableList.Builder initialSplits = ImmutableList.builder();
        for (int i = 0; i < 20; ++i) {
            initialSplits.add((Object)new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        }
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor);
        for (Node node : this.nodeManager.getActiveDatasourceNodes("foo")) {
            RemoteTask remoteTask = remoteTaskFactory.createTableScanTask(node, (List<Split>)initialSplits.build());
            this.nodeTaskMap.addTask(node, remoteTask);
        }
        RemoteTask newRemoteTask = remoteTaskFactory.createTableScanTask((Node)newNode, (List<Split>)initialSplits.build());
        this.taskMap.put((Node)newNode, newRemoteTask);
        this.nodeTaskMap.addTask((Node)newNode, newRemoteTask);
        HashSet<Split> splits = new HashSet<Split>();
        for (int i = 0; i < 5; ++i) {
            splits.add(new Split("foo", (ConnectorSplit)new TestSplitRemote()));
        }
        Multimap assignments = this.nodeSelector.computeAssignments(splits, this.taskMap.values());
        Assert.assertEquals((int)assignments.keySet().size(), (int)3);
        Assert.assertFalse((boolean)assignments.keySet().contains(newNode));
    }

    @Test
    public void testTaskCompletion() throws Exception {
        MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(this.remoteTaskExecutor);
        Node chosenNode = (Node)Iterables.get((Iterable)this.nodeManager.getActiveDatasourceNodes("foo"), (int)0);
        RemoteTask remoteTask = remoteTaskFactory.createTableScanTask(chosenNode, (List<Split>)ImmutableList.of((Object)new Split("foo", (ConnectorSplit)new TestSplitRemote())));
        this.nodeTaskMap.addTask(chosenNode, remoteTask);
        Assert.assertEquals((int)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), (int)1);
        remoteTask.abort();
        TimeUnit.MILLISECONDS.sleep(100L);
        Assert.assertEquals((int)this.nodeTaskMap.getPartitionedSplitsOnNode(chosenNode), (int)0);
    }

    private class TestSplitRemote
    implements ConnectorSplit {
        private TestSplitRemote() {
        }

        public boolean isRemotelyAccessible() {
            return true;
        }

        public List<HostAddress> getAddresses() {
            int randomPort = ThreadLocalRandom.current().nextInt(5000);
            return ImmutableList.of((Object)HostAddress.fromString((String)("127.0.0.1:" + randomPort)));
        }

        public Object getInfo() {
            return this;
        }
    }

    private class TestSplitLocal
    implements ConnectorSplit {
        private TestSplitLocal() {
        }

        public boolean isRemotelyAccessible() {
            return false;
        }

        public List<HostAddress> getAddresses() {
            return ImmutableList.of((Object)HostAddress.fromString((String)"127.0.0.1:11"));
        }

        public Object getInfo() {
            return this;
        }
    }
}

