/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsDoFn;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

@SystemDoFnInternal
class GroupAlsoByWindowsAndCombineDoFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> {
    private final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn;
    private WindowFn<Object, W> windowFn;

    public static boolean isSupported(WindowingStrategy<?, ?> strategy) {
        if (!(strategy.getTrigger().getSpec() instanceof DefaultTrigger)) {
            return false;
        }
        return strategy.getMode().equals((Object)WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES) || strategy.getMode().equals((Object)WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES);
    }

    public GroupAlsoByWindowsAndCombineDoFn(WindowFn<Object, W> windowFn, Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
        this.windowFn = windowFn;
        this.combineFn = combineFn;
    }

    @Override
    public void processElement(DoFn.ProcessContext c) throws Exception {
        final Object key = ((KV)c.element()).getKey();
        Iterator iterator = ((Iterable)((KV)c.element()).getValue()).iterator();
        final PriorityQueue<BoundedWindow> liveWindows = new PriorityQueue<BoundedWindow>(11, new Comparator<BoundedWindow>(){

            @Override
            public int compare(BoundedWindow w1, BoundedWindow w2) {
                return Long.signum(w1.maxTimestamp().getMillis() - w2.maxTimestamp().getMillis());
            }
        });
        final HashMap accumulators = Maps.newHashMap();
        final HashMap<BoundedWindow, Instant> minTimestamps = Maps.newHashMap();
        CombiningMergeContext mergeContext = new CombiningMergeContext(){

            @Override
            public Collection<W> windows() {
                return liveWindows;
            }

            @Override
            public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
                ArrayList accumsToBeMerged = new ArrayList(toBeMerged.size());
                Instant minTimestamp = null;
                for (BoundedWindow window : toBeMerged) {
                    accumsToBeMerged.add(accumulators.remove(window));
                    Instant timestampToBeMerged = (Instant)minTimestamps.remove(window);
                    if (minTimestamp != null && (timestampToBeMerged == null || !timestampToBeMerged.isBefore((ReadableInstant)minTimestamp))) continue;
                    minTimestamp = timestampToBeMerged;
                }
                liveWindows.removeAll(toBeMerged);
                minTimestamps.put(mergeResult, minTimestamp);
                liveWindows.add(mergeResult);
                accumulators.put(mergeResult, GroupAlsoByWindowsAndCombineDoFn.this.combineFn.mergeAccumulators(key, accumsToBeMerged));
            }
        };
        while (iterator.hasNext()) {
            WindowedValue e = (WindowedValue)iterator.next();
            Collection<BoundedWindow> windows = e.getWindows();
            for (BoundedWindow w : windows) {
                Instant timestamp = (Instant)minTimestamps.get(w);
                if (timestamp == null || timestamp.compareTo((ReadableInstant)e.getTimestamp()) > 0) {
                    minTimestamps.put(w, e.getTimestamp());
                } else {
                    minTimestamps.put(w, timestamp);
                }
                Object accum = accumulators.get(w);
                Preconditions.checkState(timestamp == null && accum == null || timestamp != null && accum != null);
                if (accum == null) {
                    accum = this.combineFn.createAccumulator(key);
                    liveWindows.add(w);
                }
                accum = this.combineFn.addInput(key, accum, e.getValue());
                accumulators.put(w, accum);
            }
            this.windowFn.mergeWindows(mergeContext);
            while (!liveWindows.isEmpty() && liveWindows.peek().maxTimestamp().isBefore((ReadableInstant)e.getTimestamp())) {
                this.closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
            }
        }
        while (!liveWindows.isEmpty()) {
            this.closeWindow(key, liveWindows.poll(), accumulators, minTimestamps, c);
        }
    }

    private void closeWindow(K key, W w, Map<W, AccumT> accumulators, Map<W, Instant> minTimestamps, DoFn.ProcessContext c) {
        AccumT accum = accumulators.remove(w);
        Instant timestamp = minTimestamps.remove(w);
        Preconditions.checkState(accum != null && timestamp != null);
        c.windowingInternals().outputWindowedValue(KV.of(key, this.combineFn.extractOutput(key, accum)), this.windowFn.getOutputTime(timestamp, w), Arrays.asList(w), PaneInfo.ON_TIME_AND_ONLY_FIRING);
    }

    static /* synthetic */ WindowFn access$100(GroupAlsoByWindowsAndCombineDoFn x0) {
        return x0.windowFn;
    }

    private abstract class CombiningMergeContext
    extends WindowFn.MergeContext {
    }
}

