/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import java.io.Serializable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class WatermarkHold<W extends BoundedWindow>
implements Serializable {
    @VisibleForTesting
    static final StateTag<WatermarkStateInternal> DATA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold"));
    private final WindowingStrategy<?, W> windowingStrategy;

    public WatermarkHold(WindowingStrategy<?, W> windowingStrategy) {
        this.windowingStrategy = windowingStrategy;
    }

    public void addHold(ReduceFn.ProcessValueContext c, boolean isLate) {
        Instant holdTo = isLate ? ((BoundedWindow)c.window()).maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()) : this.windowingStrategy.getWindowFn().getOutputTime(c.timestamp(), c.window());
        c.state().access(DATA_HOLD_TAG).add(holdTo);
    }

    public StateContents<Instant> extractAndRelease(final ReduceFn.Context c) {
        final WatermarkStateInternal dataHold = c.state().accessAcrossMergedWindows(DATA_HOLD_TAG);
        final StateContents holdFuture = dataHold.get();
        return new StateContents<Instant>(){

            @Override
            public Instant read() {
                Instant hold = (Instant)holdFuture.read();
                if (hold == null || hold.isAfter((ReadableInstant)((BoundedWindow)c.window()).maxTimestamp())) {
                    hold = ((BoundedWindow)c.window()).maxTimestamp();
                }
                dataHold.clear();
                return hold;
            }
        };
    }

    public void holdForOnTime(ReduceFn.Context c) {
        c.state().accessAcrossMergedWindows(DATA_HOLD_TAG).add(((BoundedWindow)c.window()).maxTimestamp());
    }

    public void holdForFinal(ReduceFn.Context c) {
        if (c.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS) {
            c.state().accessAcrossMergedWindows(DATA_HOLD_TAG).add(((BoundedWindow)c.window()).maxTimestamp().plus((ReadableDuration)c.windowingStrategy().getAllowedLateness()));
        }
    }

    public void releaseOnTime(ReduceFn.Context c) {
        c.state().accessAcrossMergedWindows(DATA_HOLD_TAG).clear();
        if (c.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS && c.windowingStrategy().getAllowedLateness().isLongerThan((ReadableDuration)Duration.ZERO)) {
            this.holdForFinal(c);
        }
    }

    public void releaseFinal(ReduceFn.Context c) {
        c.state().accessAcrossMergedWindows(DATA_HOLD_TAG).clear();
    }
}

