/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Sets;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.CounterAggregator;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.PeriodFormat;

public class DoFnRunner<InputT, OutputT> {
    public final DoFn<InputT, OutputT> fn;
    public final DoFnContext<InputT, OutputT> context;

    protected DoFnRunner(PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
        this.fn = fn;
        this.context = new DoFnContext<InputT, OutputT>(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, addCounterMutator, windowingStrategy == null ? null : windowingStrategy.getWindowFn());
    }

    public static <InputT, OutputT> DoFnRunner<InputT, OutputT> create(PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowingStrategy<?, ?> windowingStrategy) {
        return new DoFnRunner<InputT, OutputT>(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, addCounterMutator, windowingStrategy);
    }

    public void startBundle() {
        try {
            this.fn.startBundle(this.context);
        }
        catch (Throwable t) {
            Throwables.propagateIfInstanceOf(t, UserCodeException.class);
            throw new UserCodeException(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processElement(WindowedValue<InputT> elem) {
        String previousStepName = DataflowWorkerLoggingMDC.getStepName();
        DataflowWorkerLoggingMDC.setStepName(this.context.stepContext.getStepName());
        try {
            if (elem.getWindows().size() <= 1 || !DoFn.RequiresWindowAccess.class.isAssignableFrom(this.fn.getClass()) && this.context.sideInputReader.isEmpty()) {
                this.invokeProcessElement(elem);
            } else {
                for (BoundedWindow window : elem.getWindows()) {
                    this.invokeProcessElement(WindowedValue.of(elem.getValue(), elem.getTimestamp(), window, elem.getPane()));
                }
            }
        }
        finally {
            DataflowWorkerLoggingMDC.setStepName(previousStepName);
        }
    }

    protected void invokeProcessElement(WindowedValue<InputT> elem) {
        DoFn.ProcessContext processContext = this.createProcessContext(elem);
        try {
            this.fn.processElement(processContext);
        }
        catch (Throwable t) {
            Throwables.propagateIfInstanceOf(t, UserCodeException.class);
            throw new UserCodeException(t);
        }
    }

    public void finishBundle() {
        try {
            this.fn.finishBundle(this.context);
        }
        catch (Throwable t) {
            Throwables.propagateIfInstanceOf(t, UserCodeException.class);
            throw new UserCodeException(t);
        }
    }

    protected DoFn.ProcessContext createProcessContext(WindowedValue<InputT> elem) {
        return new DoFnProcessContext<InputT, OutputT>(this.fn, this.context, elem);
    }

    static class DoFnProcessContext<InputT, OutputT>
    extends DoFn.ProcessContext {
        final DoFn<InputT, OutputT> fn;
        final DoFnContext<InputT, OutputT> context;
        final WindowedValue<InputT> windowedValue;

        public DoFnProcessContext(DoFn<InputT, OutputT> fn, DoFnContext<InputT, OutputT> context, WindowedValue<InputT> windowedValue) {
            this.fn = fn;
            this.context = context;
            this.windowedValue = windowedValue;
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return this.context.getPipelineOptions();
        }

        @Override
        public InputT element() {
            return this.windowedValue.getValue();
        }

        /*
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        @Override
        public <T> T sideInput(PCollectionView<T> view) {
            BoundedWindow window;
            Preconditions.checkNotNull(view, "View passed to sideInput cannot be null");
            Iterator<BoundedWindow> windowIter = this.windows().iterator();
            if (!windowIter.hasNext()) {
                if (!(this.context.windowFn instanceof GlobalWindows)) throw new IllegalStateException("sideInput called when main input element is not in any windows");
                window = GlobalWindow.INSTANCE;
                return this.context.sideInput(view, window);
            } else {
                window = windowIter.next();
                if (!windowIter.hasNext()) return this.context.sideInput(view, window);
                throw new IllegalStateException("sideInput called when main input element is in multiple windows");
            }
        }

        @Override
        public BoundedWindow window() {
            if (!(this.fn instanceof DoFn.RequiresWindowAccess)) {
                throw new UnsupportedOperationException("window() is only available in the context of a DoFn marked as RequiresWindow.");
            }
            return Iterables.getOnlyElement(this.windows());
        }

        @Override
        public PaneInfo pane() {
            return this.windowedValue.getPane();
        }

        @Override
        public void output(OutputT output) {
            this.context.outputWindowedValue(this.windowedValue.withValue(output));
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            this.checkTimestamp(timestamp);
            this.context.outputWindowedValue(output, timestamp, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.context.outputWindowedValue(output, timestamp, windows, pane);
        }

        @Override
        public <T> void sideOutput(TupleTag<T> tag, T output) {
            Preconditions.checkNotNull(tag, "Tag passed to sideOutput cannot be null");
            this.context.sideOutputWindowedValue(tag, this.windowedValue.withValue(output));
        }

        @Override
        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            Preconditions.checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null");
            this.checkTimestamp(timestamp);
            this.context.sideOutputWindowedValue(tag, output, timestamp, this.windowedValue.getWindows(), this.windowedValue.getPane());
        }

        @Override
        public Instant timestamp() {
            return this.windowedValue.getTimestamp();
        }

        public Collection<? extends BoundedWindow> windows() {
            return this.windowedValue.getWindows();
        }

        private void checkTimestamp(Instant timestamp) {
            if (timestamp.isBefore((ReadableInstant)this.windowedValue.getTimestamp().minus((ReadableDuration)this.fn.getAllowedTimestampSkew()))) {
                throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s). See the DoFn#getAllowedTimestmapSkew() Javadoc for details on changing the allowed skew.", timestamp, this.windowedValue.getTimestamp(), PeriodFormat.getDefault().print((ReadablePeriod)this.fn.getAllowedTimestampSkew().toPeriod())));
            }
        }

        @Override
        public WindowingInternals<InputT, OutputT> windowingInternals() {
            return new WindowingInternals<InputT, OutputT>(){

                @Override
                public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                    DoFnProcessContext.this.context.outputWindowedValue(output, timestamp, windows, pane);
                }

                @Override
                public Collection<? extends BoundedWindow> windows() {
                    return DoFnProcessContext.this.windowedValue.getWindows();
                }

                @Override
                public PaneInfo pane() {
                    return DoFnProcessContext.this.windowedValue.getPane();
                }

                @Override
                public TimerInternals timerInternals() {
                    return DoFnProcessContext.this.context.stepContext.timerInternals();
                }

                @Override
                public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
                    Coder windowCoder = DoFnProcessContext.this.context.windowFn.windowCoder();
                    DoFnProcessContext.this.context.stepContext.writePCollectionViewData(tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), DoFnProcessContext.this.window(), windowCoder);
                }

                @Override
                public StateInternals stateInternals() {
                    return DoFnProcessContext.this.context.stepContext.stateInternals();
                }
            };
        }

        protected <InputT, OutputT> Aggregator<InputT, OutputT> createAggregatorInternal(String name, Combine.CombineFn<InputT, ?, OutputT> combiner) {
            return this.context.createAggregatorInternal(name, combiner);
        }
    }

    private static class DoFnContext<InputT, OutputT>
    extends DoFn.Context {
        private static final int MAX_SIDE_OUTPUTS = 1000;
        final PipelineOptions options;
        final DoFn<InputT, OutputT> fn;
        final SideInputReader sideInputReader;
        final OutputManager outputManager;
        final TupleTag<OutputT> mainOutputTag;
        final ExecutionContext.StepContext stepContext;
        final CounterSet.AddCounterMutator addCounterMutator;
        final WindowFn windowFn;
        private Set<TupleTag<?>> outputTags;

        public DoFnContext(PipelineOptions options, DoFn<InputT, OutputT> fn, SideInputReader sideInputReader, OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator, WindowFn windowFn) {
            this.options = options;
            this.fn = fn;
            this.sideInputReader = sideInputReader;
            this.outputManager = outputManager;
            this.mainOutputTag = mainOutputTag;
            this.outputTags = Sets.newHashSet();
            this.outputTags.add(mainOutputTag);
            for (TupleTag<?> sideOutputTag : sideOutputTags) {
                this.outputTags.add(sideOutputTag);
            }
            this.stepContext = stepContext;
            this.addCounterMutator = addCounterMutator;
            this.windowFn = windowFn;
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return this.options;
        }

        <T> WindowedValue<T> makeWindowedValue(T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            final Instant inputTimestamp = timestamp;
            if (timestamp == null) {
                timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
            }
            if (windows == null) {
                try {
                    WindowFn windowFn = this.windowFn;
                    windowFn.getClass();
                    windows = this.windowFn.assignWindows(new WindowFn.AssignContext(windowFn){

                        public Object element() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input element when none was available");
                        }

                        @Override
                        public Instant timestamp() {
                            if (inputTimestamp == null) {
                                throw new UnsupportedOperationException("WindowFn attempted to access input timestamp when none was available");
                            }
                            return inputTimestamp;
                        }

                        @Override
                        public Collection<? extends BoundedWindow> windows() {
                            throw new UnsupportedOperationException("WindowFn attempted to access input windows when none were available");
                        }
                    });
                }
                catch (Exception e) {
                    Throwables.propagateIfInstanceOf(e, UserCodeException.class);
                    throw new UserCodeException(e);
                }
            }
            return WindowedValue.of(output, timestamp, windows, pane);
        }

        public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) {
            if (!this.sideInputReader.contains(view)) {
                throw new IllegalArgumentException("calling sideInput() with unknown view");
            }
            Object sideInputWindow = view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
            return this.sideInputReader.get(view, (BoundedWindow)sideInputWindow);
        }

        void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.outputWindowedValue(this.makeWindowedValue(output, timestamp, windows, pane));
        }

        void outputWindowedValue(WindowedValue<OutputT> windowedElem) {
            this.outputManager.output(this.mainOutputTag, windowedElem);
            if (this.stepContext != null) {
                this.stepContext.noteOutput(windowedElem);
            }
        }

        protected <T> void sideOutputWindowedValue(TupleTag<T> tag, T output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.sideOutputWindowedValue(tag, this.makeWindowedValue(output, timestamp, windows, pane));
        }

        protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) {
            if (!this.outputTags.contains(tag)) {
                if (this.outputTags.size() >= 1000) {
                    throw new IllegalArgumentException("the number of side outputs has exceeded a limit of 1000");
                }
                this.outputTags.add(tag);
            }
            this.outputManager.output(tag, windowedElem);
            if (this.stepContext != null) {
                this.stepContext.noteSideOutput(tag, windowedElem);
            }
        }

        @Override
        public void output(OutputT output) {
            this.outputWindowedValue(output, null, null, PaneInfo.NO_FIRING);
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            this.outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING);
        }

        @Override
        public <T> void sideOutput(TupleTag<T> tag, T output) {
            Preconditions.checkNotNull(tag, "TupleTag passed to sideOutput cannot be null");
            this.sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING);
        }

        @Override
        public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            Preconditions.checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null");
            this.sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING);
        }

        private String generateInternalAggregatorName(String userName) {
            boolean system = this.fn.getClass().isAnnotationPresent(SystemDoFnInternal.class);
            String string = system ? "" : "user-";
            String string2 = this.stepContext.getStepName();
            return new StringBuilder(1 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(userName).length()).append(string).append(string2).append("-").append(userName).toString();
        }

        @Override
        protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
            Preconditions.checkNotNull(combiner, "Combiner passed to createAggregator cannot be null");
            return new CounterAggregator(this.generateInternalAggregatorName(name), combiner, this.addCounterMutator);
        }
    }

    public static class ListOutputManager
    implements OutputManager {
        private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap();

        @Override
        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            List<WindowedValue<?>> outputList = this.outputLists.get(tag);
            if (outputList == null) {
                List<WindowedValue<?>> untypedList = outputList = Lists.newArrayList();
                this.outputLists.put(tag, untypedList);
            }
            outputList.add(output);
        }

        public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
            List<WindowedValue<?>> outputList = this.outputLists.get(tag);
            return outputList != null ? outputList : Collections.emptyList();
        }
    }

    public static interface OutputManager {
        public <T> void output(TupleTag<T> var1, WindowedValue<T> var2);
    }
}

