package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.values.KV;

@SystemDoFnInternal
/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingGroupAlsoByWindowsDoFn.class */
public abstract class StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> extends DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>> {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingGroupAlsoByWindowsDoFn$StreamingGABWViaWindowSetDoFn.class */
    public static class StreamingGABWViaWindowSetDoFn<K, InputT, OutputT, W extends BoundedWindow> extends StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
        private final Aggregator<Long, Long> droppedDueToClosedWindow = createAggregator(ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn());
        private final Aggregator<Long, Long> droppedDueToLateness = createAggregator(ReduceFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn());
        private final WindowingStrategy<Object, W> windowingStrategy;
        private SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory;
        private transient ReduceFnRunner<K, InputT, OutputT, W> runner;

        public StreamingGABWViaWindowSetDoFn(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn.Factory<K, InputT, OutputT, W> factory) {
            this.windowingStrategy = windowingStrategy;
            this.reduceFnFactory = factory;
        }

        private void initForKey(DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>>.ProcessContext processContext, K k) throws Exception {
            if (this.runner == null) {
                this.runner = new ReduceFnRunner<>(k, this.windowingStrategy, processContext.windowingInternals().timerInternals(), processContext.windowingInternals(), this.droppedDueToClosedWindow, this.droppedDueToLateness, this.reduceFnFactory.create(k));
            }
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void processElement(DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
            if (processContext.element().isTimer()) {
                processTimer(processContext);
            } else {
                processValue(processContext);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void processTimer(DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
            initForKey(processContext, processContext.element().key());
            this.runner.onTimer(processContext.element().getTimer());
        }

        private void processValue(DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>>.ProcessContext processContext) throws Exception {
            initForKey(processContext, processContext.element().element().getKey());
            this.runner.processElement(WindowedValue.of(processContext.element().element().getValue(), processContext.timestamp(), processContext.windowingInternals().windows(), processContext.pane()));
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.DoFn
        public void finishBundle(DoFn<TimerOrElement<KV<K, InputT>>, KV<K, OutputT>>.Context context) throws Exception {
            if (this.runner != null) {
                this.runner.merge();
                this.runner.persist();
            }
            this.runner = null;
        }
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W> create(WindowingStrategy<?, W> windowingStrategy, AppliedCombineFn<K, InputT, AccumT, OutputT> appliedCombineFn, Coder<K> coder) {
        Preconditions.checkNotNull(appliedCombineFn);
        return new StreamingGABWViaWindowSetDoFn(windowingStrategy, SystemReduceFn.combining(coder, appliedCombineFn));
    }

    public static <K, V, W extends BoundedWindow> StreamingGroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createForIterable(WindowingStrategy<?, W> windowingStrategy, Coder<V> coder) {
        return new StreamingGABWViaWindowSetDoFn(windowingStrategy, SystemReduceFn.buffering(coder));
    }
}
