package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.ActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.TriggerRunner;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/ReduceFnRunner.class */
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements ActiveWindowSet.MergeCallback<W> {
    private static final Logger LOG = LoggerFactory.getLogger(ReduceFnRunner.class);
    public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow";
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness";
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final TimerInternals timerInternals;
    private final WindowingInternals<?, KV<K, OutputT>> windowingInternals;
    private final Aggregator<Long, Long> droppedDueToClosedWindow;
    private final Aggregator<Long, Long> droppedDueToLateness;
    private final TriggerRunner<W> triggerRunner;
    private final K key;
    private final ActiveWindowSet<W> activeWindows = createActiveWindowSet();
    private final WatermarkHold<W> watermarkHold;
    private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
    private final ReduceFn<K, InputT, OutputT, W> reduceFn;
    private final PaneInfoTracker paneInfo;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/ReduceFnRunner$Lateness.class */
    public enum Lateness {
        NOT_LATE(false, false),
        LATE(true, false),
        PAST_ALLOWED_LATENESS(true, true);

        private final boolean isLate;
        private final boolean isPastAllowedLateness;

        Lateness(boolean z, boolean z2) {
            this.isLate = z;
            this.isPastAllowedLateness = z2;
        }
    }

    public ReduceFnRunner(K k, WindowingStrategy<?, W> windowingStrategy, TimerInternals timerInternals, WindowingInternals<?, KV<K, OutputT>> windowingInternals, Aggregator<Long, Long> aggregator, Aggregator<Long, Long> aggregator2, ReduceFn<K, InputT, OutputT, W> reduceFn) {
        this.key = k;
        this.timerInternals = timerInternals;
        this.paneInfo = new PaneInfoTracker(timerInternals);
        this.windowingInternals = windowingInternals;
        this.droppedDueToClosedWindow = aggregator;
        this.droppedDueToLateness = aggregator2;
        this.reduceFn = reduceFn;
        this.windowingStrategy = windowingStrategy;
        this.contextFactory = new ReduceFnContextFactory<>(k, reduceFn, this.windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows, timerInternals);
        this.watermarkHold = new WatermarkHold<>(windowingStrategy);
        this.triggerRunner = new TriggerRunner<>(windowingStrategy.getTrigger(), new TriggerContextFactory(windowingStrategy, this.windowingInternals.stateInternals(), this.activeWindows));
    }

    private ActiveWindowSet<W> createActiveWindowSet() {
        return this.windowingStrategy.getWindowFn().isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet(this.windowingStrategy.getWindowFn(), this.windowingInternals.stateInternals());
    }

    @VisibleForTesting
    boolean isFinished(W w) {
        return this.triggerRunner.isClosed(this.contextFactory.base(w).state());
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(WindowedValue<InputT> windowedValue) {
        Lateness lateness = getLateness(windowedValue);
        if (lateness.isPastAllowedLateness) {
            this.droppedDueToLateness.addValue(Long.valueOf(windowedValue.getWindows().size()));
            return;
        }
        Collection<? extends BoundedWindow> windows = windowedValue.getWindows();
        Iterator<? extends BoundedWindow> it = windows.iterator();
        while (it.hasNext()) {
            this.triggerRunner.prefetchForValue(this.contextFactory.forValue(it.next(), windowedValue.getValue(), windowedValue.getTimestamp()).state());
        }
        for (BoundedWindow boundedWindow : windows) {
            ReduceFn<?, ?, ?, W>.ProcessValueContext forValue = this.contextFactory.forValue(boundedWindow, windowedValue.getValue(), windowedValue.getTimestamp());
            if (this.triggerRunner.isClosed(forValue.state())) {
                this.droppedDueToClosedWindow.addValue(1L);
            } else {
                if (this.activeWindows.add(boundedWindow)) {
                    scheduleCleanup(forValue);
                }
                this.watermarkHold.addHold(forValue, lateness.isLate);
                try {
                    this.reduceFn.processValue(forValue);
                    try {
                        handleTriggerResult(forValue, this.triggerRunner.processValue(forValue));
                    } catch (Exception e) {
                        Throwables.propagateIfPossible(e);
                        throw new RuntimeException("Failed to run trigger", e);
                    }
                } catch (Exception e2) {
                    throw wrapMaybeUserException(e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void merge() throws Exception {
        this.activeWindows.mergeIfAppropriate(null, this);
    }

    public void persist() {
        this.activeWindows.persist();
    }

    @Override // com.google.cloud.dataflow.sdk.util.ActiveWindowSet.MergeCallback
    public void onMerge(Collection<W> collection, W w, boolean z) {
        ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge = this.contextFactory.forMerge(collection, w);
        this.triggerRunner.prefetchForMerge(forMerge.state());
        try {
            this.reduceFn.onMerge(forMerge);
            if (z) {
                scheduleCleanup(forMerge);
            }
            try {
                handleTriggerResult(forMerge, this.triggerRunner.onMerge(forMerge));
                for (W w2 : collection) {
                    if (!w2.equals(w)) {
                        try {
                            ReduceFn<?, ?, ?, W>.Context base = this.contextFactory.base(w2);
                            cancelCleanup(base);
                            this.triggerRunner.clearEverything(base);
                            this.paneInfo.clear(base.state());
                        } catch (Exception e) {
                            Throwables.propagateIfPossible(e);
                            throw new RuntimeException("Exception while clearing trigger state", e);
                        }
                    }
                }
            } catch (Exception e2) {
                Throwables.propagateIfPossible(e2);
                throw new RuntimeException("Failed to merge the triggers", e2);
            }
        } catch (Exception e3) {
            throw wrapMaybeUserException(e3);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onTimer(TimerInternals.TimerData timerData) {
        if (!(timerData.getNamespace() instanceof StateNamespaces.WindowNamespace)) {
            String valueOf = String.valueOf(timerData.getNamespace());
            throw new IllegalArgumentException(new StringBuilder(34 + String.valueOf(valueOf).length()).append("Expected WindowNamespace, but was ").append(valueOf).toString());
        }
        StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace) timerData.getNamespace();
        BoundedWindow window = windowNamespace.getWindow();
        if (TimeDomain.EVENT_TIME == timerData.getDomain() && isCleanupTime(window, timerData.getTimestamp())) {
            try {
                doCleanup(windowNamespace.getWindow());
                return;
            } catch (Exception e) {
                LOG.error("Exception while garbage collecting window {}", windowNamespace.getWindow(), e);
                return;
            }
        }
        if (isCleanupTime(window, this.timerInternals.currentWatermarkTime())) {
            return;
        }
        ReduceFn<?, ?, ?, W>.Context base = this.contextFactory.base(window);
        this.triggerRunner.prefetchForTimer(base.state());
        if (this.triggerRunner.isClosed(base.state())) {
            return;
        }
        try {
            handleTriggerResult(base, this.triggerRunner.onTimer(base, timerData));
        } catch (Exception e2) {
            Throwables.propagateIfPossible(e2);
            throw new RuntimeException("Exception in onTimer for trigger", e2);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void doCleanup(W w) throws Exception {
        ReduceFn<K, InputT, OutputT, W>.Context base = this.contextFactory.base(w);
        if (!this.triggerRunner.isClosed(base.state())) {
            try {
                if (!this.activeWindows.mergeIfAppropriate(base.window(), this)) {
                    return;
                } else {
                    onTrigger(base, true);
                }
            } catch (Exception e) {
                Throwables.propagateIfPossible(e);
                throw new RuntimeException("Exception while merging windows", e);
            }
        }
        this.reduceFn.clearState(base);
        this.triggerRunner.clearEverything(base);
        this.paneInfo.clear(base.state());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleTriggerResult(ReduceFn<K, InputT, OutputT, W>.Context context, TriggerRunner.Result result) {
        if (!result.isFire()) {
            result.persistFinishedSet(context.state());
            return;
        }
        try {
            if (this.activeWindows.mergeIfAppropriate(context.window(), this)) {
                onTrigger(context, result.isFinish());
                if (shouldDiscardAfterFiring(result)) {
                    try {
                        this.reduceFn.clearState(context);
                        this.activeWindows.remove(context.window());
                    } catch (Exception e) {
                        throw wrapMaybeUserException(e);
                    }
                }
                if (result.isFinish()) {
                    try {
                        this.triggerRunner.clearState(context);
                        this.paneInfo.clear(context.state());
                    } catch (Exception e2) {
                        Throwables.propagateIfPossible(e2);
                        throw new RuntimeException("Exception while clearing trigger state", e2);
                    }
                }
                result.persistFinishedSet(context.state());
            }
        } catch (Exception e3) {
            Throwables.propagateIfPossible(e3);
            throw new RuntimeException("Exception while merging windows", e3);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void onTrigger(ReduceFn<K, InputT, OutputT, W>.Context context, boolean z) {
        StateContents<Instant> extractAndRelease = this.watermarkHold.extractAndRelease(context);
        final ArrayList arrayList = new ArrayList();
        StateContents<PaneInfo> nextPaneInfo = this.paneInfo.getNextPaneInfo(context, z);
        try {
            this.reduceFn.onTrigger(this.contextFactory.forTrigger(context.window(), nextPaneInfo, new ReduceFnContextFactory.OnTriggerCallbacks<OutputT>() { // from class: com.google.cloud.dataflow.sdk.util.ReduceFnRunner.1
                @Override // com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory.OnTriggerCallbacks
                public void output(OutputT outputt) {
                    arrayList.add(outputt);
                }
            }));
            Instant read = extractAndRelease.read();
            List singletonList = Collections.singletonList(context.window());
            PaneInfo read2 = nextPaneInfo.read();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.windowingInternals.outputWindowedValue(KV.of(this.key, it.next()), read, singletonList, read2);
            }
        } catch (Exception e) {
            throw wrapMaybeUserException(e);
        }
    }

    private Instant cleanupTime(W w) {
        return w.maxTimestamp().plus(this.windowingStrategy.getAllowedLateness());
    }

    private boolean isCleanupTime(W w, Instant instant) {
        return !instant.isBefore(cleanupTime(w));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void scheduleCleanup(ReduceFn<?, ?, ?, W>.Context context) {
        context.timers().setTimer(cleanupTime(context.window()), TimeDomain.EVENT_TIME);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void cancelCleanup(ReduceFn<?, ?, ?, W>.Context context) {
        context.timers().deleteTimer(cleanupTime(context.window()), TimeDomain.EVENT_TIME);
    }

    private boolean shouldDiscardAfterFiring(TriggerRunner.Result result) {
        return result.isFinish() || (result.isFire() && WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES == this.windowingStrategy.getMode());
    }

    private Lateness getLateness(WindowedValue<InputT> windowedValue) {
        return windowedValue.getTimestamp().isBefore(this.timerInternals.currentWatermarkTime().minus(this.windowingStrategy.getAllowedLateness())) ? Lateness.PAST_ALLOWED_LATENESS : windowedValue.getTimestamp().isBefore(this.timerInternals.currentWatermarkTime()) ? Lateness.LATE : Lateness.NOT_LATE;
    }

    private RuntimeException wrapMaybeUserException(Throwable th) {
        if (this.reduceFn instanceof SystemReduceFn) {
            throw Throwables.propagate(th);
        }
        throw new UserCodeException(th);
    }
}
