/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.MergeableState;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import java.io.Serializable;

public class SystemReduceFn<K, InputT, OutputT, W extends BoundedWindow>
extends ReduceFn<K, InputT, OutputT, W> {
    private static final String BUFFER_NAME = "buf";
    private StateTag<? extends MergeableState<InputT, OutputT>> bufferTag;

    public static <K, T, W extends BoundedWindow> Factory<K, T, Iterable<T>, W> buffering(Coder<T> inputCoder) {
        final StateTag<BagState<T>> bufferTag = StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
        return new Factory<K, T, Iterable<T>, W>(){

            @Override
            public ReduceFn<K, T, Iterable<T>, W> create(K key) {
                return new SystemReduceFn(bufferTag);
            }
        };
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> Factory<K, InputT, OutputT, W> combining(final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
        return new Factory<K, InputT, OutputT, W>(){

            @Override
            public ReduceFn<K, InputT, OutputT, W> create(K key) {
                StateTag bufferTag = StateTags.makeSystemTagInternal(StateTags.combiningValue(SystemReduceFn.BUFFER_NAME, combineFn.getAccumulatorCoder(), combineFn.getFn().forKey(key, keyCoder)));
                return new SystemReduceFn(bufferTag);
            }
        };
    }

    public SystemReduceFn(StateTag<? extends MergeableState<InputT, OutputT>> bufferTag) {
        this.bufferTag = bufferTag;
    }

    @Override
    public void processValue(ReduceFn.ProcessValueContext c) throws Exception {
        c.state().access(this.bufferTag).add(c.value());
    }

    @Override
    public void onMerge(ReduceFn.OnMergeContext c) throws Exception {
    }

    @Override
    public void prefetchOnTrigger(ReduceFn.StateContext c) {
        c.accessAcrossMergedWindows(this.bufferTag).get();
    }

    @Override
    public void onTrigger(ReduceFn.OnTriggerContext c) throws Exception {
        c.output(c.state().accessAcrossMergedWindows(this.bufferTag).get().read());
    }

    @Override
    public void clearState(ReduceFn.Context c) throws Exception {
        c.state().accessAcrossMergedWindows(this.bufferTag).clear();
    }

    @Override
    public StateContents<Boolean> isEmpty(ReduceFn.StateContext state) {
        return state.accessAcrossMergedWindows(this.bufferTag).isEmpty();
    }

    public static interface Factory<K, InputT, OutputT, W extends BoundedWindow>
    extends Serializable {
        public ReduceFn<K, InputT, OutputT, W> create(K var1);
    }
}

