/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.ActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.MergingActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.NonEmptyPanes;
import com.google.cloud.dataflow.sdk.util.NonMergingActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.PaneInfoTracker;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.TriggerContextFactory;
import com.google.cloud.dataflow.sdk.util.TriggerRunner;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WatermarkHold;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow>
implements ActiveWindowSet.MergeCallback<W> {
    public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final TimerInternals timerInternals;
    private final WindowingInternals<?, KV<K, OutputT>> windowingInternals;
    private final Aggregator<Long, Long> droppedDueToClosedWindow;
    private final Aggregator<Long, Long> droppedDueToLateness;
    private final TriggerRunner<W> triggerRunner;
    private final K key;
    private final ActiveWindowSet<W> activeWindows;
    private final WatermarkHold<W> watermarkHold;
    private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
    private final ReduceFn<K, InputT, OutputT, W> reduceFn;
    private final PaneInfoTracker paneInfo;
    private final NonEmptyPanes<W> nonEmptyPanes;

    public ReduceFnRunner(K key, WindowingStrategy<?, W> windowingStrategy, TimerInternals timerInternals, WindowingInternals<?, KV<K, OutputT>> windowingInternals, Aggregator<Long, Long> droppedDueToClosedWindow, Aggregator<Long, Long> droppedDueToLateness, ReduceFn<K, InputT, OutputT, W> reduceFn) {
        this.key = key;
        this.timerInternals = timerInternals;
        this.paneInfo = new PaneInfoTracker(timerInternals);
        this.windowingInternals = windowingInternals;
        this.droppedDueToClosedWindow = droppedDueToClosedWindow;
        this.droppedDueToLateness = droppedDueToLateness;
        this.reduceFn = reduceFn;
        WindowingStrategy<?, W> objectWindowingStrategy = windowingStrategy;
        this.windowingStrategy = objectWindowingStrategy;
        this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
        this.activeWindows = this.createActiveWindowSet();
        this.contextFactory = new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows, timerInternals);
        this.watermarkHold = new WatermarkHold<W>(windowingStrategy);
        this.triggerRunner = new TriggerRunner<W>(windowingStrategy.getTrigger(), new TriggerContextFactory<W>(windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows));
    }

    private ActiveWindowSet<W> createActiveWindowSet() {
        return this.windowingStrategy.getWindowFn().isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet<W>(this.windowingStrategy.getWindowFn(), this.windowingInternals.stateInternals());
    }

    @VisibleForTesting
    boolean isFinished(W window) {
        return this.triggerRunner.isClosed(this.contextFactory.base(window).state());
    }

    public void processElements(Iterable<WindowedValue<InputT>> values) {
        for (WindowedValue<InputT> value : values) {
            this.processElement(value);
        }
    }

    private void processElement(WindowedValue<InputT> value) {
        ReduceFn.ProcessValueContext context;
        Lateness lateness = this.getLateness(value);
        if (lateness.isPastAllowedLateness) {
            this.droppedDueToLateness.addValue(Long.valueOf(value.getWindows().size()));
            return;
        }
        Collection<BoundedWindow> windows = value.getWindows();
        for (BoundedWindow window : windows) {
            context = this.contextFactory.forValue(window, value.getValue(), value.getTimestamp());
            this.triggerRunner.prefetchForValue(context.state());
        }
        for (BoundedWindow window : windows) {
            context = this.contextFactory.forValue(window, value.getValue(), value.getTimestamp());
            if (this.triggerRunner.isClosed(context.state())) {
                this.droppedDueToClosedWindow.addValue(1L);
                continue;
            }
            if (this.activeWindows.add(window)) {
                this.scheduleCleanup(context);
            }
            this.nonEmptyPanes.recordContent(context);
            this.watermarkHold.addHold(context, lateness.isLate);
            try {
                this.reduceFn.processValue(context);
            }
            catch (Exception e) {
                throw this.wrapMaybeUserException(e);
            }
            try {
                this.handleTriggerResult(context, false, this.triggerRunner.processValue(context));
            }
            catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw new RuntimeException("Failed to run trigger", e);
            }
        }
    }

    private void holdForEmptyPanes(ReduceFn.Context context) {
        if (this.timerInternals.currentWatermarkTime().isAfter((ReadableInstant)((BoundedWindow)context.window()).maxTimestamp())) {
            this.watermarkHold.holdForFinal(context);
        } else {
            this.watermarkHold.holdForOnTime(context);
        }
    }

    public void merge() throws Exception {
        this.activeWindows.mergeIfAppropriate(null, this);
    }

    public void persist() {
        this.activeWindows.persist();
    }

    @Override
    public void onMerge(Collection<W> mergedWindows, W resultWindow, boolean isResultWindowNew) {
        TriggerRunner.Result triggerResult;
        ReduceFn.OnMergeContext resultContext = this.contextFactory.forMerge(mergedWindows, resultWindow);
        this.triggerRunner.prefetchForMerge(resultContext.state());
        try {
            this.reduceFn.onMerge(resultContext);
        }
        catch (Exception e) {
            throw this.wrapMaybeUserException(e);
        }
        try {
            triggerResult = this.triggerRunner.onMerge(resultContext);
        }
        catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException("Failed to merge the triggers", e);
        }
        for (BoundedWindow mergedWindow : mergedWindows) {
            if (mergedWindow.equals(resultWindow)) continue;
            try {
                ReduceFn.Context mergedContext = this.contextFactory.base(mergedWindow);
                this.cancelCleanup(mergedContext);
                this.triggerRunner.clearEverything(mergedContext);
                this.paneInfo.clear(mergedContext.state());
            }
            catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw new RuntimeException("Exception while clearing trigger state", e);
            }
        }
        if (isResultWindowNew) {
            this.scheduleCleanup(resultContext);
        }
        this.handleTriggerResult(resultContext, false, triggerResult);
    }

    public void onTimer(TimerInternals.TimerData timer) {
        boolean isAtWatermark;
        if (!(timer.getNamespace() instanceof StateNamespaces.WindowNamespace)) {
            String string = String.valueOf(timer.getNamespace());
            throw new IllegalArgumentException(new StringBuilder(34 + String.valueOf(string).length()).append("Expected WindowNamespace, but was ").append(string).toString());
        }
        StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace)timer.getNamespace();
        Object window = windowNamespace.getWindow();
        ReduceFn.Context context = this.contextFactory.base(window);
        boolean bl = isAtWatermark = TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore((ReadableInstant)((BoundedWindow)window).maxTimestamp());
        if (TimeDomain.EVENT_TIME == timer.getDomain() && (this.isCleanupTime(window, timer.getTimestamp()) || this.isCleanupTime(window, this.timerInternals.currentWatermarkTime()))) {
            if (isAtWatermark) {
                TriggerRunner.Result timerResult = this.runTriggersForTimer(context, timer);
                isAtWatermark = timerResult != null && timerResult.isFire();
            }
            try {
                this.doCleanup(context, isAtWatermark);
            }
            catch (Exception e) {
                Throwables.propagateIfInstanceOf(e, UserCodeException.class);
                String string = String.valueOf(windowNamespace.getWindow());
                throw new RuntimeException(new StringBuilder(42 + String.valueOf(string).length()).append("Exception while garbage collecting window ").append(string).toString(), e);
            }
        } else {
            if (this.isCleanupTime(window, this.timerInternals.currentWatermarkTime())) {
                return;
            }
            TriggerRunner.Result timerResult = this.runTriggersForTimer(context, timer);
            if (timerResult != null) {
                this.handleTriggerResult(context, isAtWatermark, timerResult);
            }
            if (TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().isEqual((ReadableInstant)((BoundedWindow)window).maxTimestamp())) {
                this.scheduleCleanup(context);
            }
        }
    }

    @Nullable
    private TriggerRunner.Result runTriggersForTimer(ReduceFn.Context context, TimerInternals.TimerData timer) {
        this.triggerRunner.prefetchForTimer(context.state());
        if (this.triggerRunner.isClosed(context.state())) {
            return null;
        }
        try {
            return this.triggerRunner.onTimer(context, timer);
        }
        catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException("Exception in onTimer for trigger", e);
        }
    }

    private void doCleanup(ReduceFn.Context context, boolean maybeAtWatermark) throws Exception {
        if (!this.triggerRunner.isClosed(context.state())) {
            try {
                if (!this.activeWindows.mergeIfAppropriate(context.window(), this)) {
                    return;
                }
            }
            catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw new RuntimeException("Exception while merging windows", e);
            }
            this.onTrigger(context, maybeAtWatermark, true);
        }
        this.nonEmptyPanes.clearPane(context);
        try {
            this.reduceFn.clearState(context);
        }
        catch (Exception e) {
            throw this.wrapMaybeUserException(e);
        }
        this.triggerRunner.clearEverything(context);
        this.paneInfo.clear(context.state());
        this.watermarkHold.releaseOnTime(context);
    }

    private void handleTriggerResult(ReduceFn.Context context, boolean maybeAtWatermark, TriggerRunner.Result result) {
        if (!result.isFire()) {
            result.persistFinishedSet(context.state());
            return;
        }
        try {
            if (!this.activeWindows.mergeIfAppropriate(context.window(), this)) {
                return;
            }
        }
        catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw new RuntimeException("Exception while merging windows", e);
        }
        this.onTrigger(context, maybeAtWatermark, result.isFinish());
        this.nonEmptyPanes.clearPane(context);
        if (this.shouldDiscardAfterFiring(result)) {
            try {
                this.reduceFn.clearState(context);
            }
            catch (Exception e) {
                throw this.wrapMaybeUserException(e);
            }
            this.activeWindows.remove(context.window());
        }
        if (result.isFinish()) {
            try {
                this.triggerRunner.clearState(context);
                this.paneInfo.clear(context.state());
                this.watermarkHold.releaseFinal(context);
            }
            catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw new RuntimeException("Exception while clearing trigger state", e);
            }
        }
        result.persistFinishedSet(context.state());
    }

    public static <T> StateContents<T> stateContentsOf(final T value) {
        return new StateContents<T>(){

            @Override
            public T read() {
                return value;
            }
        };
    }

    private void onTrigger(final ReduceFn.Context context, boolean isAtWatermark, boolean isFinal) {
        StateContents<Instant> outputTimestampFuture = this.watermarkHold.extractAndRelease(context);
        StateContents<PaneInfo> paneFuture = this.paneInfo.getNextPaneInfo(context, isAtWatermark, isFinal);
        StateContents<Boolean> isEmptyFuture = this.nonEmptyPanes.isEmpty(context);
        this.reduceFn.prefetchOnTrigger(context.state());
        final PaneInfo pane = paneFuture.read();
        final Instant outputTimestamp = outputTimestampFuture.read();
        boolean shouldOutput = isEmptyFuture.read() == false || isFinal && this.windowingStrategy.getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS || isAtWatermark && pane.getTiming() == PaneInfo.Timing.ON_TIME;
        this.holdForEmptyPanes(context);
        if (!shouldOutput) {
            return;
        }
        final List windows = Collections.singletonList(context.window());
        ReduceFn.OnTriggerContext triggerContext = this.contextFactory.forTrigger(context.window(), paneFuture, new ReduceFnContextFactory.OnTriggerCallbacks<OutputT>(){

            @Override
            public void output(OutputT toOutput) {
                ReduceFnRunner.this.paneInfo.storeCurrentPaneInfo(context, pane);
                ReduceFnRunner.this.windowingInternals.outputWindowedValue(KV.of(ReduceFnRunner.this.key, toOutput), outputTimestamp, windows, pane);
            }
        });
        try {
            this.reduceFn.onTrigger(triggerContext);
        }
        catch (Exception e) {
            throw this.wrapMaybeUserException(e);
        }
    }

    private Instant cleanupTime(W window) {
        return ((BoundedWindow)window).maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness());
    }

    private boolean isCleanupTime(W window, Instant timestamp) {
        return !timestamp.isBefore((ReadableInstant)this.cleanupTime(window));
    }

    private void scheduleCleanup(ReduceFn.Context context) {
        if (this.timerInternals.currentWatermarkTime().isAfter((ReadableInstant)((BoundedWindow)context.window()).maxTimestamp())) {
            context.timers().setTimer(this.cleanupTime(context.window()), TimeDomain.EVENT_TIME);
        } else {
            context.timers().setTimer(((BoundedWindow)context.window()).maxTimestamp(), TimeDomain.EVENT_TIME);
        }
    }

    private void cancelCleanup(ReduceFn.Context context) {
        context.timers().deleteTimer(this.cleanupTime(context.window()), TimeDomain.EVENT_TIME);
    }

    private boolean shouldDiscardAfterFiring(TriggerRunner.Result result) {
        return result.isFinish() || result.isFire() && WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES == this.windowingStrategy.getMode();
    }

    private Lateness getLateness(WindowedValue<InputT> value) {
        Instant latestAllowed = this.timerInternals.currentWatermarkTime().minus((ReadableDuration)this.windowingStrategy.getAllowedLateness());
        if (value.getTimestamp().isBefore((ReadableInstant)latestAllowed)) {
            return Lateness.PAST_ALLOWED_LATENESS;
        }
        if (value.getTimestamp().isBefore((ReadableInstant)this.timerInternals.currentWatermarkTime())) {
            return Lateness.LATE;
        }
        return Lateness.NOT_LATE;
    }

    private RuntimeException wrapMaybeUserException(Throwable t) {
        if (this.reduceFn instanceof SystemReduceFn) {
            throw Throwables.propagate(t);
        }
        throw new UserCodeException(t);
    }

    private static enum Lateness {
        NOT_LATE(false, false),
        LATE(true, false),
        PAST_ALLOWED_LATENESS(true, true);

        private final boolean isLate;
        private final boolean isPastAllowedLateness;

        private Lateness(boolean isLate, boolean isPastAllowedLateness) {
            this.isLate = isLate;
            this.isPastAllowedLateness = isPastAllowedLateness;
        }
    }
}

