/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
import com.google.cloud.dataflow.sdk.coders.SetCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.runners.worker.StateFetcher;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.DoFnInfo;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends BoundedWindow>
extends DoFnRunner<InputT, OutputT> {
    private StreamingModeExecutionContext.StepContext stepContext;
    private StreamingModeExecutionContext execContext;
    private Map<String, PCollectionView<?>> sideInputViews;
    private final StateTag<BagState<WindowedValue<InputT>>> elementsAddr;
    private final StateTag<WatermarkStateInternal> watermarkHoldingAddr;
    private final StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>> blockedMapAddr;
    private Map<W, Set<Windmill.GlobalDataRequest>> blockedMap;
    private WindowFn<?, W> windowFn;

    public StreamingSideInputDoFnRunner(PipelineOptions options, DoFnInfo<InputT, OutputT> doFnInfo, SideInputReader sideInputReader, DoFnRunner.OutputManager outputManager, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        super(options, doFnInfo.getDoFn(), sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext, addCounterMutator, doFnInfo.getWindowingStrategy());
        WindowFn<?, ?> wildcardWindowFn;
        this.stepContext = (StreamingModeExecutionContext.StepContext)stepContext;
        WindowFn<?, ?> typedWindowFn = wildcardWindowFn = doFnInfo.getWindowingStrategy().getWindowFn();
        this.windowFn = typedWindowFn;
        this.sideInputViews = new HashMap();
        for (PCollectionView<?> view : doFnInfo.getSideInputViews()) {
            this.sideInputViews.put(view.getTagInternal().getId(), view);
        }
        this.execContext = (StreamingModeExecutionContext)stepContext.getExecutionContext();
        this.blockedMapAddr = StreamingSideInputDoFnRunner.blockedMapAddr(this.windowFn);
        this.elementsAddr = StateTags.makeSystemTagInternal(StateTags.bag("elem", WindowedValue.getFullCoder(doFnInfo.getInputCoder(), this.windowFn.windowCoder())));
        this.watermarkHoldingAddr = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold"));
        this.blockedMap = stepContext.stateInternals().state(StateNamespaces.global(), this.blockedMapAddr).get().read();
        if (this.blockedMap == null) {
            this.blockedMap = new HashMap<W, Set<Windmill.GlobalDataRequest>>();
        }
    }

    @VisibleForTesting
    static <W extends BoundedWindow> StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>> blockedMapAddr(WindowFn<?, W> windowFn) {
        return StateTags.value("blockedMap", MapCoder.of(windowFn.windowCoder(), SetCoder.of(Proto2Coder.of(Windmill.GlobalDataRequest.class))));
    }

    private Set<W> getReadyWindows() {
        HashSet<BoundedWindow> readyWindows = new HashSet<BoundedWindow>();
        for (Windmill.GlobalDataId id : this.execContext.getSideInputNotifications()) {
            if (this.sideInputViews.get(id.getTag()) == null) continue;
            for (Map.Entry<W, Set<Windmill.GlobalDataRequest>> entry : this.blockedMap.entrySet()) {
                Set<Windmill.GlobalDataRequest> windowBlockedSet = entry.getValue();
                HashSet<Windmill.GlobalDataRequest> found = new HashSet<Windmill.GlobalDataRequest>();
                for (Windmill.GlobalDataRequest request : windowBlockedSet) {
                    if (!id.equals((Object)request.getDataId())) continue;
                    found.add(request);
                }
                windowBlockedSet.removeAll(found);
                if (!windowBlockedSet.isEmpty()) continue;
                try {
                    BoundedWindow window = (BoundedWindow)entry.getKey();
                    boolean allSideInputsCached = true;
                    for (PCollectionView<?> view : this.sideInputViews.values()) {
                        if (this.stepContext.issueSideInputFetch(view, window, StateFetcher.SideInputState.KNOWN_READY)) continue;
                        Windmill.GlobalDataRequest request = this.buildGlobalDataRequest(view, window);
                        this.stepContext.addBlockingSideInput(request);
                        windowBlockedSet.add(request);
                        allSideInputsCached = false;
                    }
                    if (!allSideInputsCached) continue;
                    readyWindows.add(window);
                }
                catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        }
        return readyWindows;
    }

    @Override
    public void startBundle() {
        super.startBundle();
        Set<W> readyWindows = this.getReadyWindows();
        for (BoundedWindow window : readyWindows) {
            this.elementBag(window).get();
            WatermarkStateInternal watermarkHold = this.watermarkHold(window);
            watermarkHold.get();
            watermarkHold.clear();
        }
        for (BoundedWindow window : readyWindows) {
            this.blockedMap.remove(window);
            BagState<WindowedValue<InputT>> elementsBag = this.elementBag(window);
            Iterable elements = (Iterable)elementsBag.get().read();
            try {
                for (WindowedValue elem : elements) {
                    this.fn.processElement(this.createProcessContext(elem));
                }
            }
            catch (Throwable t) {
                Throwables.propagateIfInstanceOf(t, UserCodeException.class);
                throw new UserCodeException(t);
            }
            elementsBag.clear();
        }
    }

    private Set<Windmill.GlobalDataRequest> computeBlockedSideInputs(W window) throws IOException {
        Set<Windmill.GlobalDataRequest> blocked = this.blockedMap.get(window);
        if (blocked == null) {
            for (PCollectionView<?> view : this.sideInputViews.values()) {
                if (this.stepContext.issueSideInputFetch(view, (BoundedWindow)window, StateFetcher.SideInputState.UNKNOWN)) continue;
                if (blocked == null) {
                    blocked = new HashSet<Windmill.GlobalDataRequest>();
                    this.blockedMap.put(window, blocked);
                }
                blocked.add(this.buildGlobalDataRequest(view, (BoundedWindow)window));
            }
        }
        return blocked;
    }

    @VisibleForTesting
    BagState<WindowedValue<InputT>> elementBag(W window) {
        return this.stepContext.stateInternals().state(StateNamespaces.window(this.windowFn.windowCoder(), window), this.elementsAddr);
    }

    @VisibleForTesting
    WatermarkStateInternal watermarkHold(W window) {
        return this.stepContext.stateInternals().state(StateNamespaces.window(this.windowFn.windowCoder(), window), this.watermarkHoldingAddr);
    }

    @Override
    public void invokeProcessElement(WindowedValue<InputT> elem) {
        BoundedWindow window = Iterables.getOnlyElement(elem.getWindows());
        try {
            Set<Windmill.GlobalDataRequest> blocked = this.computeBlockedSideInputs(window);
            if (blocked == null) {
                this.fn.processElement(this.createProcessContext(elem));
            } else {
                this.elementBag(window).add(elem);
                this.watermarkHold(window).add(elem.getTimestamp());
                this.stepContext.addBlockingSideInputs(blocked);
            }
        }
        catch (Throwable t) {
            Throwables.propagateIfInstanceOf(t, UserCodeException.class);
            throw new UserCodeException(t);
        }
    }

    @Override
    public void finishBundle() {
        super.finishBundle();
        this.stepContext.stateInternals().state(StateNamespaces.global(), this.blockedMapAddr).set(this.blockedMap);
    }

    private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlobalDataRequest(PCollectionView<?> view, BoundedWindow mainWindow) throws IOException {
        WindowingStrategy<?, ?> sideWindowStrategy = view.getWindowingStrategyInternal();
        WindowFn<?, ?> sideWindowFn = sideWindowStrategy.getWindowFn();
        Coder<?> sideInputWindowCoder = sideWindowFn.windowCoder();
        Object sideInputWindow = sideWindowFn.getSideInputWindow(mainWindow);
        ByteString.Output windowStream = ByteString.newOutput();
        sideInputWindowCoder.encode(sideInputWindow, (OutputStream)windowStream, Coder.Context.OUTER);
        return Windmill.GlobalDataRequest.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(view.getTagInternal().getId()).setVersion(windowStream.toByteString()).build()).setExistenceWatermarkDeadline(TimeUnit.MILLISECONDS.toMicros(sideWindowStrategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring(sideInputWindow).getMillis())).build();
    }
}

