/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.BatchTimerInternals;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsAndCombineDoFn;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaIteratorsDoFn;
import com.google.cloud.dataflow.sdk.util.ReduceFnRunner;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import org.joda.time.Instant;

@SystemDoFnInternal
public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow>
extends DoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> {
    public static <K, V, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createForIterable(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
        WindowingStrategy<?, W> noWildcard = windowingStrategy;
        return GroupAlsoByWindowsViaIteratorsDoFn.isSupported(windowingStrategy) ? new GroupAlsoByWindowsViaIteratorsDoFn(windowingStrategy) : new GABWViaOutputBufferDoFn(noWildcard, SystemReduceFn.buffering(inputCoder));
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> create(WindowingStrategy<?, W> windowingStrategy, AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn, Coder<K> keyCoder) {
        Preconditions.checkNotNull(combineFn);
        WindowingStrategy<?, W> noWildcard = windowingStrategy;
        return GroupAlsoByWindowsAndCombineDoFn.isSupported(windowingStrategy) ? new GroupAlsoByWindowsAndCombineDoFn<K, InputT, AccumT, OutputT, W>(noWildcard.getWindowFn(), combineFn.getFn()) : new GABWViaOutputBufferDoFn<K, InputT, OutputT, W>(noWildcard, SystemReduceFn.combining(keyCoder, combineFn));
    }

    @SystemDoFnInternal
    private static class GABWViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow>
    extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
        private final Aggregator<Long, Long> droppedDueToClosedWindow = this.createAggregator("DroppedDueToClosedWindow", new Sum.SumLongFn());
        private final Aggregator<Long, Long> droppedDueToLateness = this.createAggregator("DroppedDueToLateness", new Sum.SumLongFn());
        private final WindowingStrategy<Object, W> strategy;
        private SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory;

        public GABWViaOutputBufferDoFn(WindowingStrategy<Object, W> windowingStrategy, SystemReduceFn.Factory<K, InputT, OutputT, W> reduceFnFactory) {
            this.strategy = windowingStrategy;
            this.reduceFnFactory = reduceFnFactory;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            Object key = ((KV)c.element()).getKey();
            BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now());
            ReduceFnRunner runner = new ReduceFnRunner(key, this.strategy, timerInternals, c.windowingInternals(), this.droppedDueToClosedWindow, this.droppedDueToLateness, this.reduceFnFactory.create(key));
            Iterable chunks = Iterables.partition((Iterable)((KV)c.element()).getValue(), 1000);
            for (Iterable iterable : chunks) {
                runner.processElements(iterable);
                timerInternals.advanceWatermark(runner, ((WindowedValue)iterable.iterator().next()).getTimestamp());
                timerInternals.advanceProcessingTime(runner, Instant.now());
            }
            runner.merge();
            timerInternals.advanceWatermark(runner, new Instant(Long.MAX_VALUE));
            timerInternals.advanceProcessingTime(runner, new Instant(Long.MAX_VALUE));
            runner.persist();
        }
    }
}

