/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.runners.AggregatorPipelineExtractor;
import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
import com.google.cloud.dataflow.sdk.runners.dataflow.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectPipelineRunner
extends PipelineRunner<EvaluationResults> {
    private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class);
    private Random rand;
    private static Map<Class, TransformEvaluator> defaultTransformEvaluators = new HashMap<Class, TransformEvaluator>();
    private Map<Class, TransformEvaluator> localTransformEvaluators = new HashMap<Class, TransformEvaluator>();
    private final DirectPipelineOptions options;
    private boolean testSerializability = true;
    private boolean testEncodability = true;
    private boolean testUnorderedness = true;

    public static <TransformT extends PTransform<?, ?>> void registerDefaultTransformEvaluator(Class<TransformT> transformClass, TransformEvaluator<? super TransformT> transformEvaluator) {
        if (defaultTransformEvaluators.put(transformClass, transformEvaluator) != null) {
            String string = String.valueOf(transformClass);
            throw new IllegalArgumentException(new StringBuilder(33 + String.valueOf(string).length()).append("defining multiple evaluators for ").append(string).toString());
        }
    }

    public <TransformT extends PTransform<?, ?>> void registerTransformEvaluator(Class<TransformT> transformClass, TransformEvaluator<TransformT> transformEvaluator) {
        if (this.localTransformEvaluators.put(transformClass, transformEvaluator) != null) {
            String string = String.valueOf(transformClass);
            throw new IllegalArgumentException(new StringBuilder(33 + String.valueOf(string).length()).append("defining multiple evaluators for ").append(string).toString());
        }
    }

    public <TransformT extends PTransform<?, ?>> TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> transformClass) {
        TransformEvaluator transformEvaluator = this.localTransformEvaluators.get(transformClass);
        if (transformEvaluator == null) {
            transformEvaluator = defaultTransformEvaluators.get(transformClass);
        }
        return transformEvaluator;
    }

    public static DirectPipelineRunner fromOptions(PipelineOptions options) {
        DirectPipelineOptions directOptions = PipelineOptionsValidator.validate(DirectPipelineOptions.class, options);
        LOG.debug("Creating DirectPipelineRunner");
        return new DirectPipelineRunner(directOptions);
    }

    public static DirectPipelineRunner createForTest() {
        DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
        options.setStableUniqueNames(PipelineOptions.CheckEnabled.ERROR);
        options.setGcpCredential(new TestCredential());
        return new DirectPipelineRunner(options);
    }

    public DirectPipelineRunner withSerializabilityTesting(boolean enable) {
        this.testSerializability = enable;
        return this;
    }

    public DirectPipelineRunner withEncodabilityTesting(boolean enable) {
        this.testEncodability = enable;
        return this;
    }

    public DirectPipelineRunner withUnorderednessTesting(boolean enable) {
        this.testUnorderedness = enable;
        return this;
    }

    @Override
    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
        if (transform instanceof Combine.GroupedValues) {
            return (OutputT)this.applyTestCombine((Combine.GroupedValues)transform, (PCollection)input);
        }
        return super.apply(transform, input);
    }

    private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombine(Combine.GroupedValues<K, InputT, OutputT> transform, PCollection<KV<K, Iterable<InputT>>> input) {
        PCollection output = (PCollection)input.apply(ParDo.of(TestCombineDoFn.create(transform, input, this.testSerializability, this.rand)));
        try {
            output.setCoder((Coder)transform.getDefaultOutputCoder(input));
        }
        catch (CannotProvideCoderException exc) {
            // empty catch block
        }
        return output;
    }

    @Override
    public EvaluationResults run(Pipeline pipeline) {
        LOG.info("Executing pipeline using the DirectPipelineRunner.");
        Evaluator evaluator = new Evaluator(this.rand);
        evaluator.run(pipeline);
        for (Counter<?> counter : evaluator.getCounters()) {
            LOG.info("Final aggregator value: {}", counter);
        }
        LOG.info("Pipeline execution complete.");
        return evaluator;
    }

    private DirectPipelineRunner(DirectPipelineOptions options) {
        this.options = options;
        IOChannelUtils.registerStandardIOFactories(options);
        long randomSeed = options.getDirectPipelineRunnerRandomSeed() != null ? options.getDirectPipelineRunnerRandomSeed().longValue() : new Random().nextLong();
        LOG.debug("DirectPipelineRunner using random seed {}.", (Object)randomSeed);
        this.rand = new Random(randomSeed);
    }

    public DirectPipelineOptions getPipelineOptions() {
        return this.options;
    }

    public String toString() {
        int n = this.hashCode();
        return new StringBuilder(32).append("DirectPipelineRunner#").append(n).toString();
    }

    class Evaluator
    implements Pipeline.PipelineVisitor,
    EvaluationContext {
        private final Map<PTransform<?, ?>, String> stepNames = new HashMap();
        private final Map<PValue, Object> store = new HashMap<PValue, Object>();
        private final CounterSet counters = new CounterSet(new Counter[0]);
        private AppliedPTransform<?, ?, ?> currentTransform;
        private Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = null;
        private final Map<PTransform<?, ?>, String> fullNames = new HashMap();
        private Random rand;

        public Evaluator() {
            this(new Random());
        }

        public Evaluator(Random rand) {
            this.rand = rand;
        }

        public void run(Pipeline pipeline) {
            pipeline.traverseTopologically(this);
            this.aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
        }

        @Override
        public DirectPipelineOptions getPipelineOptions() {
            return DirectPipelineRunner.this.options;
        }

        @Override
        public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
            Preconditions.checkArgument(this.currentTransform != null && this.currentTransform.getTransform() == transform, "can only be called with current transform");
            return (InputT)this.currentTransform.getInput();
        }

        @Override
        public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
            Preconditions.checkArgument(this.currentTransform != null && this.currentTransform.getTransform() == transform, "can only be called with current transform");
            return (OutputT)this.currentTransform.getOutput();
        }

        @Override
        public void enterCompositeTransform(TransformTreeNode node) {
        }

        @Override
        public void leaveCompositeTransform(TransformTreeNode node) {
        }

        @Override
        public void visitTransform(TransformTreeNode node) {
            PTransform<?, ?> transform = node.getTransform();
            this.fullNames.put(transform, node.getFullName());
            TransformEvaluator<?> evaluator = DirectPipelineRunner.this.getTransformEvaluator(transform.getClass());
            if (evaluator == null) {
                String string = String.valueOf(transform);
                throw new IllegalStateException(new StringBuilder(28 + String.valueOf(string).length()).append("no evaluator registered for ").append(string).toString());
            }
            LOG.debug("Evaluating {}", transform);
            this.currentTransform = AppliedPTransform.of(node.getFullName(), node.getInput(), node.getOutput(), transform);
            evaluator.evaluate(transform, this);
            this.currentTransform = null;
        }

        @Override
        public void visitValue(PValue value, TransformTreeNode producer) {
            LOG.debug("Checking evaluation of {}", (Object)value);
            if (value.getProducingTransformInternal() == null) {
                throw new RuntimeException("internal error: expecting a PValue to have a producingTransform");
            }
            if (!producer.isCompositeNode()) {
                this.getPValue(value);
            }
        }

        void setPValue(PValue pvalue, Object contents) {
            if (this.store.containsKey(pvalue)) {
                String string = String.valueOf(pvalue);
                throw new IllegalStateException(new StringBuilder(52 + String.valueOf(string).length()).append("internal error: setting the value of ").append(string).append(" more than once").toString());
            }
            this.store.put(pvalue, contents);
        }

        Object getPValue(PValue pvalue) {
            if (!this.store.containsKey(pvalue)) {
                String string = String.valueOf(pvalue);
                throw new IllegalStateException(new StringBuilder(65 + String.valueOf(string).length()).append("internal error: getting the value of ").append(string).append(" before it has been computed").toString());
            }
            return this.store.get(pvalue);
        }

        <T> List<ValueWithMetadata<T>> toValueWithMetadata(List<T> values) {
            ArrayList<ValueWithMetadata<T>> result = new ArrayList<ValueWithMetadata<T>>(values.size());
            for (T value : values) {
                result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value)));
            }
            return result;
        }

        <T> List<ValueWithMetadata<T>> toValueWithMetadataFromWindowedValue(List<WindowedValue<T>> values) {
            ArrayList<ValueWithMetadata<T>> result = new ArrayList<ValueWithMetadata<T>>(values.size());
            for (WindowedValue<T> value : values) {
                result.add(ValueWithMetadata.of(value));
            }
            return result;
        }

        @Override
        public <T> void setPCollection(PCollection<T> pc, List<T> elements) {
            this.setPCollectionValuesWithMetadata(pc, this.toValueWithMetadata(elements));
        }

        @Override
        public <T> void setPCollectionWindowedValue(PCollection<T> pc, List<WindowedValue<T>> elements) {
            this.setPCollectionValuesWithMetadata(pc, this.toValueWithMetadataFromWindowedValue(elements));
        }

        @Override
        public <T> void setPCollectionValuesWithMetadata(PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
            LOG.debug("Setting {} = {}", pc, elements);
            this.ensurePCollectionEncodable(pc, elements);
            this.setPValue(pc, elements);
        }

        @Override
        public <ElemT, T, WindowedT> void setPCollectionView(PCollectionView<T> view, Iterable<WindowedValue<ElemT>> value) {
            LOG.debug("Setting {} = {}", view, value);
            this.setPValue(view, value);
        }

        @Override
        public <T> List<T> getPCollection(PCollection<T> pc) {
            ArrayList<T> result = new ArrayList<T>();
            for (ValueWithMetadata<T> elem : this.getPCollectionValuesWithMetadata(pc)) {
                result.add(elem.getValue());
            }
            return result;
        }

        @Override
        public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc) {
            return Lists.transform(this.getPCollectionValuesWithMetadata(pc), new Function<ValueWithMetadata<T>, WindowedValue<T>>(){

                @Override
                public WindowedValue<T> apply(ValueWithMetadata<T> input) {
                    return input.getWindowedValue();
                }
            });
        }

        @Override
        public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc) {
            List<ValueWithMetadata<T>> elements = (List<ValueWithMetadata<T>>)this.getPValue(pc);
            elements = this.randomizeIfUnordered(elements, false);
            LOG.debug("Getting {} = {}", pc, elements);
            return elements;
        }

        @Override
        public <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs) {
            ArrayList<List<T>> elementsList = new ArrayList<List<T>>();
            for (PCollection<T> pc : pcs.getAll()) {
                elementsList.add(this.getPCollection(pc));
            }
            return elementsList;
        }

        @Override
        public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
            Iterable value = (Iterable)this.getPValue(view);
            LOG.debug("Getting {} = {}", view, (Object)value);
            return value;
        }

        <T> List<ValueWithMetadata<T>> ensurePCollectionEncodable(PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
            this.ensureCoderSerializable(pc.getCoder());
            if (!DirectPipelineRunner.this.testEncodability) {
                return elements;
            }
            ArrayList<ValueWithMetadata<T>> elementsCopy = new ArrayList<ValueWithMetadata<T>>(elements.size());
            for (ValueWithMetadata<T> element : elements) {
                elementsCopy.add(element.withValue(this.ensureElementEncodable(pc, element.getValue())));
            }
            return elementsCopy;
        }

        @Override
        public <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element) {
            String string = String.valueOf(pvalue.toString());
            return this.ensureSerializableByCoder(pvalue.getCoder(), element, string.length() != 0 ? "Within ".concat(string) : new String("Within "));
        }

        @Override
        public <T> List<T> randomizeIfUnordered(List<T> elements, boolean inPlaceAllowed) {
            if (!DirectPipelineRunner.this.testUnorderedness) {
                return elements;
            }
            ArrayList<T> elementsCopy = new ArrayList<T>(elements);
            Collections.shuffle(elementsCopy, this.rand);
            return elementsCopy;
        }

        @Override
        public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn) {
            if (!DirectPipelineRunner.this.testSerializability) {
                return fn;
            }
            return SerializableUtils.ensureSerializable(fn);
        }

        @Override
        public <T> Coder<T> ensureCoderSerializable(Coder<T> coder) {
            if (DirectPipelineRunner.this.testSerializability) {
                SerializableUtils.ensureSerializable(coder);
            }
            return coder;
        }

        @Override
        public <T> T ensureSerializableByCoder(Coder<T> coder, T value, String errorContext) {
            if (DirectPipelineRunner.this.testSerializability) {
                return SerializableUtils.ensureSerializableByCoder(coder, value, errorContext);
            }
            return value;
        }

        @Override
        public CounterSet.AddCounterMutator getAddCounterMutator() {
            return this.counters.getAddCounterMutator();
        }

        @Override
        public String getStepName(PTransform<?, ?> transform) {
            String stepName = this.stepNames.get(transform);
            if (stepName == null) {
                int n = this.stepNames.size() + 1;
                stepName = new StringBuilder(12).append("s").append(n).toString();
                this.stepNames.put(transform, stepName);
            }
            return stepName;
        }

        public CounterSet getCounters() {
            return this.counters;
        }

        @Override
        public PipelineResult.State getState() {
            return PipelineResult.State.DONE;
        }

        @Override
        public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
            HashMap stepValues = new HashMap();
            for (PTransform<?, ?> step : this.aggregatorSteps.get(aggregator)) {
                String stepName = String.format("user-%s-%s", this.stepNames.get(step), aggregator.getName());
                String fullName = this.fullNames.get(step);
                Counter<?> counter = this.counters.getExistingCounter(stepName);
                if (counter == null) {
                    String string = String.valueOf(aggregator);
                    throw new IllegalArgumentException(new StringBuilder(40 + String.valueOf(string).length()).append("Aggregator ").append(string).append(" is not used in this pipeline").toString());
                }
                stepValues.put(fullName, counter.getAggregate());
            }
            return new MapAggregatorValues(stepValues);
        }
    }

    public static interface EvaluationContext
    extends EvaluationResults {
        public DirectPipelineOptions getPipelineOptions();

        public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> var1);

        public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> var1);

        public <T> void setPCollectionValuesWithMetadata(PCollection<T> var1, List<ValueWithMetadata<T>> var2);

        public <T> void setPCollectionWindowedValue(PCollection<T> var1, List<WindowedValue<T>> var2);

        public <T> void setPCollection(PCollection<T> var1, List<T> var2);

        public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> var1);

        public <ElemT, T, WindowedT> void setPCollectionView(PCollectionView<T> var1, Iterable<WindowedValue<ElemT>> var2);

        public <T> T ensureElementEncodable(TypedPValue<T> var1, T var2);

        public <T> List<T> randomizeIfUnordered(List<T> var1, boolean var2);

        public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT var1);

        public <T> Coder<T> ensureCoderSerializable(Coder<T> var1);

        public <T> T ensureSerializableByCoder(Coder<T> var1, T var2, String var3);

        public CounterSet.AddCounterMutator getAddCounterMutator();

        public String getStepName(PTransform<?, ?> var1);
    }

    public static class ValueWithMetadata<V> {
        private final Object key;
        private final WindowedValue<V> windowedValue;

        public static <V> ValueWithMetadata<V> of(WindowedValue<V> windowedValue) {
            return new ValueWithMetadata<V>(windowedValue, null);
        }

        public ValueWithMetadata<V> withKey(Object key) {
            return new ValueWithMetadata<V>(this.windowedValue, key);
        }

        public <T> ValueWithMetadata<T> withValue(T value) {
            return new ValueWithMetadata<T>(this.windowedValue.withValue(value), this.getKey());
        }

        public WindowedValue<V> getWindowedValue() {
            return this.windowedValue;
        }

        public V getValue() {
            return this.windowedValue.getValue();
        }

        public Instant getTimestamp() {
            return this.windowedValue.getTimestamp();
        }

        public Collection<? extends BoundedWindow> getWindows() {
            return this.windowedValue.getWindows();
        }

        public Object getKey() {
            return this.key;
        }

        private ValueWithMetadata(WindowedValue<V> windowedValue, Object key) {
            this.windowedValue = windowedValue;
            this.key = key;
        }
    }

    public static interface EvaluationResults
    extends PipelineResult {
        public <T> List<T> getPCollection(PCollection<T> var1);

        public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> var1);

        public <T> List<List<T>> getPCollectionList(PCollectionList<T> var1);

        public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> var1);
    }

    public static interface TransformEvaluator<TransformT extends PTransform> {
        public void evaluate(TransformT var1, EvaluationContext var2);
    }

    public static class TestCombineDoFn<K, InputT, AccumT, OutputT>
    extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> {
        private final Combine.KeyedCombineFn<? super K, ? super InputT, AccumT, OutputT> fn;
        private final Coder<AccumT> accumCoder;
        private final boolean testSerializability;
        private final Random rand;

        public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, OutputT> create(Combine.GroupedValues<K, InputT, OutputT> transform, PCollection<KV<K, Iterable<InputT>>> input, boolean testSerializability, Random rand) {
            AppliedCombineFn<K, InputT, ?, OutputT> fn = transform.getAppliedFn(input.getPipeline().getCoderRegistry(), input.getCoder());
            return new TestCombineDoFn(fn.getFn(), fn.getAccumulatorCoder(), testSerializability, rand);
        }

        public TestCombineDoFn(Combine.KeyedCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, Coder<AccumT> accumCoder, boolean testSerializability, Random rand) {
            this.fn = fn;
            this.accumCoder = accumCoder;
            this.testSerializability = testSerializability;
            this.rand = rand;
            this.accumCoder.getEncodingId();
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Object key = ((KV)c.element()).getKey();
            Iterable values = (Iterable)((KV)c.element()).getValue();
            String string = String.valueOf(this.fn.toString());
            List<AccumT> groupedPostShuffle = this.ensureSerializableByCoder(ListCoder.of(this.accumCoder), TestCombineDoFn.addInputsRandomly(this.fn, key, values, this.rand), string.length() != 0 ? "After addInputs of KeyedCombineFn ".concat(string) : new String("After addInputs of KeyedCombineFn "));
            String string2 = String.valueOf(this.fn.toString());
            AccumT merged = this.ensureSerializableByCoder(this.accumCoder, this.fn.mergeAccumulators(key, groupedPostShuffle), string2.length() != 0 ? "After mergeAccumulators of KeyedCombineFn ".concat(string2) : new String("After mergeAccumulators of KeyedCombineFn "));
            c.output(KV.of(key, this.fn.extractOutput(key, merged)));
        }

        public static <K, AccumT, InputT> List<AccumT> addInputsRandomly(Combine.KeyedCombineFn<? super K, ? super InputT, AccumT, ?> fn, K key, Iterable<InputT> values, Random random) {
            ArrayList<AccumT> out = new ArrayList<AccumT>();
            int i = 0;
            AccumT accumulator = fn.createAccumulator(key);
            boolean hasInput = false;
            for (InputT value : values) {
                accumulator = fn.addInput(key, accumulator, value);
                hasInput = true;
                if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) {
                    out.add(accumulator);
                    accumulator = fn.createAccumulator(key);
                    hasInput = false;
                }
                ++i;
            }
            if (hasInput) {
                out.add(accumulator);
            }
            Collections.shuffle(out, random);
            return out;
        }

        public <T> T ensureSerializableByCoder(Coder<T> coder, T value, String errorContext) {
            if (this.testSerializability) {
                return SerializableUtils.ensureSerializableByCoder(coder, value, errorContext);
            }
            return value;
        }
    }
}

