/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.FlattenInstruction;
import com.google.api.services.dataflow.model.InstructionInput;
import com.google.api.services.dataflow.model.InstructionOutput;
import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.ParDoInstruction;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.api.services.dataflow.model.PartialGroupByKeyInstruction;
import com.google.api.services.dataflow.model.ReadInstruction;
import com.google.api.services.dataflow.model.WriteInstruction;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter;
import com.google.cloud.dataflow.sdk.runners.worker.ParDoFnFactory;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SinkFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.Serializer;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObservable;
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
import com.google.cloud.dataflow.sdk.util.common.worker.FlattenOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor;
import com.google.cloud.dataflow.sdk.util.common.worker.Operation;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver;
import com.google.cloud.dataflow.sdk.util.common.worker.ParDoFn;
import com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.PartialGroupByKeyOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ReceivingOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.joda.time.Instant;

public class MapTaskExecutorFactory {
    private static ParDoFnFactory parDoFnFactory = new ParDoFnFactory.DefaultFactory();

    public static MapTaskExecutor create(PipelineOptions options, MapTask mapTask, DataflowExecutionContext context, CounterSet counters, StateSampler stateSampler) throws Exception {
        return MapTaskExecutorFactory.create(options, mapTask, ReaderFactory.Registry.defaultRegistry(), context, counters, stateSampler);
    }

    public static MapTaskExecutor create(PipelineOptions options, MapTask mapTask, ReaderFactory.Registry registry, DataflowExecutionContext context, CounterSet counters, StateSampler stateSampler) throws Exception {
        ArrayList<Operation> operations = new ArrayList<Operation>();
        String counterPrefix = stateSampler.getPrefix();
        for (ParallelInstruction instruction : mapTask.getInstructions()) {
            operations.add(MapTaskExecutorFactory.createOperation(options, instruction, registry, context, operations, counterPrefix, counters.getAddCounterMutator(), stateSampler));
        }
        return new MapTaskExecutor(operations, counters, stateSampler);
    }

    static Operation createOperation(PipelineOptions options, ParallelInstruction instruction, DataflowExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        return MapTaskExecutorFactory.createOperation(options, instruction, ReaderFactory.Registry.defaultRegistry(), executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
    }

    static Operation createOperation(PipelineOptions options, ParallelInstruction instruction, ReaderFactory.Registry registry, DataflowExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        if (instruction.getRead() != null) {
            return MapTaskExecutorFactory.createReadOperation(options, instruction, registry, executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
        }
        if (instruction.getWrite() != null) {
            return MapTaskExecutorFactory.createWriteOperation(options, instruction, executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
        }
        if (instruction.getParDo() != null) {
            return MapTaskExecutorFactory.createParDoOperation(options, instruction, executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
        }
        if (instruction.getPartialGroupByKey() != null) {
            return MapTaskExecutorFactory.createPartialGroupByKeyOperation(options, instruction, executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
        }
        if (instruction.getFlatten() != null) {
            return MapTaskExecutorFactory.createFlattenOperation(options, instruction, executionContext, priorOperations, counterPrefix, addCounterMutator, stateSampler);
        }
        String string = String.valueOf(instruction);
        throw new Exception(new StringBuilder(24 + String.valueOf(string).length()).append("Unexpected instruction: ").append(string).toString());
    }

    static ReadOperation createReadOperation(PipelineOptions options, ParallelInstruction instruction, ReaderFactory.Registry registry, DataflowExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        ReadInstruction read = instruction.getRead();
        String operationName = instruction.getSystemName();
        Reader<?> reader = registry.create(read.getSource(), options, executionContext, addCounterMutator, operationName);
        OutputReceiver[] receivers = MapTaskExecutorFactory.createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, 1);
        return new ReadOperation(operationName, reader, receivers, counterPrefix, addCounterMutator, stateSampler);
    }

    static WriteOperation createWriteOperation(PipelineOptions options, ParallelInstruction instruction, ExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        WriteInstruction write = instruction.getWrite();
        Sink sink = SinkFactory.create(options, write.getSink(), executionContext, addCounterMutator);
        OutputReceiver[] receivers = MapTaskExecutorFactory.createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, 0);
        WriteOperation operation = new WriteOperation(instruction.getSystemName(), sink, receivers, counterPrefix, addCounterMutator, stateSampler);
        MapTaskExecutorFactory.attachInput(operation, write.getInput(), priorOperations);
        return operation;
    }

    static ParDoOperation createParDoOperation(PipelineOptions options, ParallelInstruction instruction, DataflowExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        ParDoInstruction parDo = instruction.getParDo();
        ParDoFn fn = parDoFnFactory.create(options, CloudObject.fromSpec(parDo.getUserFn()), instruction.getSystemName(), instruction.getName(), parDo.getSideInputs(), parDo.getMultiOutputInfos(), parDo.getNumOutputs(), executionContext, addCounterMutator, stateSampler);
        OutputReceiver[] receivers = MapTaskExecutorFactory.createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, parDo.getNumOutputs());
        ParDoOperation operation = new ParDoOperation(instruction.getSystemName(), fn, receivers, counterPrefix, addCounterMutator, stateSampler);
        MapTaskExecutorFactory.attachInput(operation, parDo.getInput(), priorOperations);
        return operation;
    }

    static PartialGroupByKeyOperation createPartialGroupByKeyOperation(PipelineOptions options, ParallelInstruction instruction, ExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        PartialGroupByKeyInstruction pgbk = instruction.getPartialGroupByKey();
        Coder windowedCoder = Serializer.deserialize(pgbk.getInputElementCodec(), Coder.class);
        if (!(windowedCoder instanceof WindowedValue.WindowedValueCoder)) {
            String string = String.valueOf(windowedCoder);
            throw new Exception(new StringBuilder(63 + String.valueOf(string).length()).append("unexpected kind of input coder for PartialGroupByKeyOperation: ").append(string).toString());
        }
        Coder elemCoder = ((WindowedValue.WindowedValueCoder)windowedCoder).getValueCoder();
        if (!(elemCoder instanceof KvCoder)) {
            String string = String.valueOf(elemCoder);
            throw new Exception(new StringBuilder(71 + String.valueOf(string).length()).append("unexpected kind of input element coder for PartialGroupByKeyOperation: ").append(string).toString());
        }
        KvCoder kvCoder = (KvCoder)elemCoder;
        Coder keyCoder = kvCoder.getKeyCoder();
        Coder valueCoder = kvCoder.getValueCoder();
        OutputReceiver[] receivers = MapTaskExecutorFactory.createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, 1);
        ValueCombiner valueCombiner = MapTaskExecutorFactory.createValueCombiner(pgbk);
        PartialGroupByKeyOperation operation = new PartialGroupByKeyOperation(instruction.getSystemName(), new WindowingCoderGroupingKeyCreator(keyCoder), new CoderSizeEstimator(WindowedValue.getValueOnlyCoder(keyCoder)), new CoderSizeEstimator(valueCoder), 0.001, valueCombiner, PairInfo.create(), receivers, counterPrefix, addCounterMutator, stateSampler);
        MapTaskExecutorFactory.attachInput(operation, pgbk.getInput(), priorOperations);
        return operation;
    }

    static ValueCombiner createValueCombiner(PartialGroupByKeyInstruction pgbk) throws Exception {
        if (pgbk.getValueCombiningFn() == null) {
            return null;
        }
        Object deserializedFn = SerializableUtils.deserializeFromByteArray(Structs.getBytes((Map<String, Object>)((Object)CloudObject.fromSpec(pgbk.getValueCombiningFn())), "serialized_fn"), "serialized combine fn");
        return new ValueCombiner(((AppliedCombineFn)deserializedFn).getFn());
    }

    static FlattenOperation createFlattenOperation(PipelineOptions options, ParallelInstruction instruction, ExecutionContext executionContext, List<Operation> priorOperations, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        FlattenInstruction flatten = instruction.getFlatten();
        OutputReceiver[] receivers = MapTaskExecutorFactory.createOutputReceivers(instruction, counterPrefix, addCounterMutator, stateSampler, 1);
        FlattenOperation operation = new FlattenOperation(instruction.getSystemName(), receivers, counterPrefix, addCounterMutator, stateSampler);
        for (InstructionInput input : flatten.getInputs()) {
            MapTaskExecutorFactory.attachInput(operation, input, priorOperations);
        }
        return operation;
    }

    static OutputReceiver[] createOutputReceivers(ParallelInstruction instruction, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler, int expectedNumOutputs) throws Exception {
        int numOutputs = 0;
        if (instruction.getOutputs() != null) {
            numOutputs = instruction.getOutputs().size();
        }
        if (numOutputs != expectedNumOutputs) {
            throw new AssertionError((Object)"ParallelInstruction.Outputs has an unexpected length");
        }
        OutputReceiver[] receivers = new OutputReceiver[numOutputs];
        for (int i = 0; i < numOutputs; ++i) {
            InstructionOutput cloudOutput = (InstructionOutput)instruction.getOutputs().get(i);
            receivers[i] = new OutputReceiver();
            DataflowOutputCounter outputCounter = new DataflowOutputCounter(cloudOutput.getName(), new ElementByteSizeObservableCoder(Serializer.deserialize(cloudOutput.getCodec(), Coder.class)), addCounterMutator);
            receivers[i].addOutputCounter(outputCounter);
        }
        return receivers;
    }

    static void attachInput(ReceivingOperation operation, @Nullable InstructionInput input, List<Operation> priorOperations) {
        Integer producerInstructionIndex = 0;
        Integer outputNum = 0;
        if (input != null) {
            if (input.getProducerInstructionIndex() != null) {
                producerInstructionIndex = input.getProducerInstructionIndex();
            }
            if (input.getOutputNum() != null) {
                outputNum = input.getOutputNum();
            }
        }
        Operation source = priorOperations.get(producerInstructionIndex);
        operation.attachInput(source, outputNum);
    }

    public static class ElementByteSizeObservableCoder<T>
    implements ElementByteSizeObservable<T> {
        final Coder<T> coder;

        public ElementByteSizeObservableCoder(Coder<T> coder) {
            this.coder = coder;
        }

        @Override
        public boolean isRegisterByteSizeObserverCheap(T value) {
            return this.coder.isRegisterByteSizeObserverCheap(value, Coder.Context.OUTER);
        }

        @Override
        public void registerByteSizeObserver(T value, ElementByteSizeObserver observer) throws Exception {
            this.coder.registerByteSizeObserver(value, observer, Coder.Context.OUTER);
        }
    }

    public static class CoderSizeEstimator<T>
    implements PartialGroupByKeyOperation.SizeEstimator<T> {
        final Coder<T> coder;

        public CoderSizeEstimator(Coder<T> coder) {
            this.coder = coder;
        }

        @Override
        public long estimateSize(T value) throws Exception {
            return CoderUtils.encodeToByteArray(this.coder, value).length;
        }
    }

    public static class WindowingCoderGroupingKeyCreator<K>
    implements PartialGroupByKeyOperation.GroupingKeyCreator<WindowedValue<K>> {
        private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private final Coder<K> coder;

        public WindowingCoderGroupingKeyCreator(Coder<K> coder) {
            this.coder = coder;
        }

        @Override
        public Object createGroupingKey(WindowedValue<K> key) throws Exception {
            return WindowedValue.of(this.coder.structuralValue(key.getValue()), ignored, key.getWindows(), key.getPane());
        }
    }

    public static class PairInfo
    implements PartialGroupByKeyOperation.PairInfo {
        private static PairInfo theInstance = new PairInfo();

        public static PairInfo create() {
            return theInstance;
        }

        private PairInfo() {
        }

        @Override
        public Object getKeyFromInputPair(Object pair) {
            WindowedValue windowedKv = (WindowedValue)pair;
            return windowedKv.withValue(((KV)windowedKv.getValue()).getKey());
        }

        @Override
        public Object getValueFromInputPair(Object pair) {
            WindowedValue windowedKv = (WindowedValue)pair;
            return ((KV)windowedKv.getValue()).getValue();
        }

        @Override
        public Object makeOutputPair(Object key, Object values) {
            WindowedValue windowedKey = (WindowedValue)key;
            return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
        }
    }

    public static class ValueCombiner<K, InputT, AccumT, OutputT>
    implements PartialGroupByKeyOperation.Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
        private final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn;

        private ValueCombiner(Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
            this.combineFn = combineFn;
        }

        @Override
        public AccumT createAccumulator(WindowedValue<K> windowedKey) {
            return this.combineFn.createAccumulator(windowedKey.getValue());
        }

        @Override
        public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT value) {
            return this.combineFn.addInput(windowedKey.getValue(), accumulator, value);
        }

        @Override
        public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT> accumulators) {
            return this.combineFn.mergeAccumulators(windowedKey.getValue(), accumulators);
        }

        @Override
        public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
            return this.combineFn.extractOutput(windowedKey.getValue(), accumulator);
        }
    }
}

