/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.MultiOutputInfo;
import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase;
import com.google.cloud.dataflow.sdk.runners.worker.ParDoFnFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
import com.google.cloud.dataflow.sdk.util.NullSideInputReader;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.ParDoFn;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

class CombineValuesFn
extends ParDoFnBase {
    private final String phase;
    private final Combine.KeyedCombineFn<?, ?, ?, ?> combineFn;

    static CombineValuesFn of(PipelineOptions options, Combine.KeyedCombineFn<?, ?, ?, ?> combineFn, String phase, String stepName, String transformName, DataflowExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
        return new CombineValuesFn(options, combineFn, phase, stepName, transformName, executionContext, addCounterMutator, stateSampler);
    }

    @Override
    protected DoFnInfo<?, ?> getDoFnInfo() {
        DoFn doFn = null;
        switch (this.phase) {
            case "all": {
                doFn = new CombineValuesDoFn(this.combineFn);
                break;
            }
            case "add": {
                doFn = new AddInputsDoFn(this.combineFn);
                break;
            }
            case "merge": {
                doFn = new MergeAccumulatorsDoFn(this.combineFn);
                break;
            }
            case "extract": {
                doFn = new ExtractOutputDoFn(this.combineFn);
                break;
            }
            default: {
                throw new IllegalArgumentException("phase must be one of 'all', 'add', 'merge', 'extract'");
            }
        }
        return new DoFnInfo(doFn, null);
    }

    private CombineValuesFn(PipelineOptions options, Combine.KeyedCombineFn<?, ?, ?, ?> combineFn, String phase, String stepName, String transformName, DataflowExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
        super(options, NullSideInputReader.empty(), Arrays.asList("output"), stepName, transformName, executionContext, addCounterMutator, stateSampler);
        this.phase = phase;
        this.combineFn = combineFn;
    }

    private static class ExtractOutputDoFn<K, AccumT, OutputT>
    extends DoFn<KV<K, AccumT>, KV<K, OutputT>> {
        private final Combine.KeyedCombineFn<K, ?, AccumT, OutputT> combineFn;

        private ExtractOutputDoFn(Combine.KeyedCombineFn<K, ?, AccumT, OutputT> combineFn) {
            this.combineFn = combineFn;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV kv = (KV)c.element();
            Object key = kv.getKey();
            OutputT output = this.combineFn.extractOutput(key, kv.getValue());
            c.output(KV.of(key, output));
        }
    }

    private static class MergeAccumulatorsDoFn<K, AccumT>
    extends DoFn<KV<K, Iterable<AccumT>>, KV<K, AccumT>> {
        private final Combine.KeyedCombineFn<K, ?, AccumT, ?> combineFn;

        private MergeAccumulatorsDoFn(Combine.KeyedCombineFn<K, ?, AccumT, ?> combineFn) {
            this.combineFn = combineFn;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV kv = (KV)c.element();
            Object key = kv.getKey();
            AccumT accum = this.combineFn.mergeAccumulators(key, (Iterable)kv.getValue());
            c.output(KV.of(key, accum));
        }
    }

    private static class AddInputsDoFn<K, InputT, AccumT>
    extends DoFn<KV<K, Iterable<InputT>>, KV<K, AccumT>> {
        private final Combine.KeyedCombineFn<K, InputT, AccumT, ?> combineFn;

        private AddInputsDoFn(Combine.KeyedCombineFn<K, InputT, AccumT, ?> combineFn) {
            this.combineFn = combineFn;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV kv = (KV)c.element();
            Object key = kv.getKey();
            AccumT accum = this.combineFn.createAccumulator(key);
            for (Object input : (Iterable)kv.getValue()) {
                accum = this.combineFn.addInput(key, accum, input);
            }
            c.output(KV.of(key, accum));
        }
    }

    private static class CombineValuesDoFn<K, InputT, OutputT>
    extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> {
        private final Combine.KeyedCombineFn<K, InputT, ?, OutputT> combineFn;

        private CombineValuesDoFn(Combine.KeyedCombineFn<K, InputT, ?, OutputT> combineFn) {
            this.combineFn = combineFn;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV kv = (KV)c.element();
            Object key = kv.getKey();
            c.output(KV.of(key, this.combineFn.apply(key, (Iterable)kv.getValue())));
        }
    }

    static final class Factory
    implements ParDoFnFactory {
        Factory() {
        }

        @Override
        public ParDoFn create(PipelineOptions options, CloudObject cloudUserFn, String stepName, String transformName, @Nullable List<SideInputInfo> sideInputInfos, @Nullable List<MultiOutputInfo> multiOutputInfos, int numOutputs, DataflowExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) throws Exception {
            Preconditions.checkArgument(sideInputInfos == null || sideInputInfos.size() == 0, "unexpected side inputs for CombineValuesFn");
            Preconditions.checkArgument(numOutputs == 1, "expected exactly one output for CombineValuesFn");
            Object deserializedFn = SerializableUtils.deserializeFromByteArray(Structs.getBytes((Map<String, Object>)((Object)cloudUserFn), "serialized_fn"), "serialized user fn");
            Preconditions.checkArgument(deserializedFn instanceof AppliedCombineFn);
            AppliedCombineFn combineFn = (AppliedCombineFn)deserializedFn;
            String phase = Structs.getString((Map<String, Object>)((Object)cloudUserFn), "phase", "all");
            return CombineValuesFn.of(options, combineFn.getFn(), phase, stepName, transformName, executionContext, addCounterMutator, stateSampler);
        }
    }

    public static class CombinePhase {
        public static final String ALL = "all";
        public static final String ADD = "add";
        public static final String MERGE = "merge";
        public static final String EXTRACT = "extract";
    }
}

