/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowOutputCounter;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputDoFnRunner;
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver;
import com.google.cloud.dataflow.sdk.util.common.worker.ParDoFn;
import com.google.cloud.dataflow.sdk.util.common.worker.Receiver;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;

public abstract class ParDoFnBase
implements ParDoFn {
    private final PipelineOptions options;
    private final SideInputReader sideInputReader;
    private final TupleTag<Object> mainOutputTag;
    private final List<TupleTag<?>> sideOutputTags;
    private final String stepName;
    private final String transformName;
    private final ExecutionContext executionContext;
    private final CounterSet.AddCounterMutator addCounterMutator;
    private final StateSampler stateSampler;
    private DoFnRunner<Object, Object> fnRunner;

    public ExecutionContext getExecutionContext() {
        return this.executionContext;
    }

    protected ParDoFnBase(PipelineOptions options, SideInputReader sideInputReader, List<String> outputTags, String stepName, String transformName, ExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
        this.options = options;
        this.sideInputReader = sideInputReader;
        Preconditions.checkArgument(outputTags.size() > 0, "expected at least one output");
        this.mainOutputTag = new TupleTag(outputTags.get(0));
        this.sideOutputTags = new ArrayList();
        if (outputTags.size() > 1) {
            for (String tag : outputTags.subList(1, outputTags.size())) {
                this.sideOutputTags.add(new TupleTag(tag));
            }
        }
        this.stepName = stepName;
        this.transformName = transformName;
        this.executionContext = executionContext;
        this.addCounterMutator = addCounterMutator;
        this.stateSampler = stateSampler;
    }

    protected abstract DoFnInfo<?, ?> getDoFnInfo();

    @Override
    public void startBundle(final Receiver ... receivers) throws Exception {
        if (receivers.length != this.sideOutputTags.size() + 1) {
            throw new AssertionError((Object)"unexpected number of receivers for DoFn");
        }
        ExecutionContext.StepContext stepContext = null;
        if (this.executionContext != null) {
            stepContext = this.executionContext.getOrCreateStepContext(this.stepName, this.transformName, this.stateSampler);
        }
        DoFnInfo<?, ?> doFnInfo = this.getDoFnInfo();
        DoFnRunner.OutputManager outputManager = new DoFnRunner.OutputManager(){
            final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new HashMap();

            @Nullable
            private Receiver getReceiverOrNull(TupleTag<?> tag) {
                if (tag.equals(ParDoFnBase.this.mainOutputTag)) {
                    return receivers[0];
                }
                if (ParDoFnBase.this.sideOutputTags.contains(tag)) {
                    return receivers[ParDoFnBase.this.sideOutputTags.indexOf(tag) + 1];
                }
                return this.undeclaredOutputs.get(tag);
            }

            @Override
            public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
                Receiver receiver = this.getReceiverOrNull(tag);
                if (receiver == null) {
                    String string = String.valueOf(tag.getId());
                    String outputName = string.length() != 0 ? "implicit-".concat(string) : new String("implicit-");
                    OutputReceiver undeclaredReceiver = new OutputReceiver();
                    DataflowOutputCounter outputCounter = new DataflowOutputCounter(outputName, ParDoFnBase.this.addCounterMutator);
                    undeclaredReceiver.addOutputCounter(outputCounter);
                    this.undeclaredOutputs.put(tag, undeclaredReceiver);
                    receiver = undeclaredReceiver;
                }
                try {
                    receiver.process(output);
                }
                catch (Throwable t) {
                    throw Throwables.propagate(t);
                }
            }
        };
        this.fnRunner = this.options.as(StreamingOptions.class).isStreaming() && !this.sideInputReader.isEmpty() ? new StreamingSideInputDoFnRunner(this.options, doFnInfo, this.sideInputReader, outputManager, this.mainOutputTag, this.sideOutputTags, stepContext, this.addCounterMutator) : DoFnRunner.create(this.options, doFnInfo.getDoFn(), this.sideInputReader, outputManager, this.mainOutputTag, this.sideOutputTags, stepContext, this.addCounterMutator, doFnInfo.getWindowingStrategy());
        this.fnRunner.startBundle();
    }

    @Override
    public void processElement(Object elem) throws Exception {
        this.fnRunner.processElement((WindowedValue)elem);
    }

    @Override
    public void finishBundle() throws Exception {
        this.fnRunner.finishBundle();
        this.fnRunner = null;
    }
}

