/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.worker.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingGroupAlsoByWindowsReshuffleDoFn;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.ReduceFnRunner;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;

@SystemDoFnInternal
public abstract class StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
extends DoFn<KeyedWorkItem<InputT>, KV<K, OutputT>> {
    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W> create(WindowingStrategy<?, W> windowingStrategy, AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn, Coder<K> keyCoder) {
        Preconditions.checkNotNull(combineFn);
        return new StreamingGABWViaWindowSetDoFn<K, InputT, OutputT, W>(windowingStrategy, SystemReduceFn.combining(keyCoder, combineFn));
    }

    public static <K, V, W extends BoundedWindow> DoFn<KeyedWorkItem<V>, KV<K, Iterable<V>>> createForIterable(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
        if (StreamingGroupAlsoByWindowsReshuffleDoFn.isReshuffle(windowingStrategy)) {
            return new StreamingGroupAlsoByWindowsReshuffleDoFn();
        }
        return new StreamingGABWViaWindowSetDoFn(windowingStrategy, SystemReduceFn.buffering(inputCoder));
    }

    private static class StreamingGABWViaWindowSetDoFn<K, InputT, OutputT, W extends BoundedWindow>
    extends StreamingGroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
        private final Aggregator<Long, Long> droppedDueToClosedWindow = this.createAggregator("DroppedDueToClosedWindow", new Sum.SumLongFn());
        private final Aggregator<Long, Long> droppedDueToLateness = this.createAggregator("DroppedDueToLateness", new Sum.SumLongFn());
        private final WindowingStrategy<Object, W> windowingStrategy;
        private SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory;

        public StreamingGABWViaWindowSetDoFn(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory) {
            WindowingStrategy<?, W> noWildcard = windowingStrategy;
            this.windowingStrategy = noWildcard;
            this.reduceFnFactory = reduceFnFactory;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            KeyedWorkItem element = (KeyedWorkItem)c.element();
            Object key = ((KeyedWorkItem)c.element()).key();
            TimerInternals timerInternals = c.windowingInternals().timerInternals();
            ReduceFnRunner runner = new ReduceFnRunner(key, this.windowingStrategy, timerInternals, c.windowingInternals(), this.droppedDueToClosedWindow, this.droppedDueToLateness, this.reduceFnFactory.create(key));
            for (TimerInternals.TimerData timer : element.timersIterable()) {
                runner.onTimer(timer);
            }
            runner.processElements(element.elementsIterable());
            runner.merge();
            runner.persist();
        }
    }
}

