package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.Proto2Coder;
import com.google.cloud.dataflow.sdk.coders.SetCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.StateFetcher;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingSideInputDoFnRunner.class */
public class StreamingSideInputDoFnRunner<InputT, OutputT, ReceiverT, W extends BoundedWindow> extends DoFnRunner<InputT, OutputT, ReceiverT> {
    private ExecutionContext.StepContext stepContext;
    private StreamingModeExecutionContext execContext;
    private WindowingStrategy<?, W> windowingStrategy;
    private Map<String, PCollectionView<?>> sideInputViews;
    private CodedTupleTag<Map<W, Set<Windmill.GlobalDataRequest>>> blockedMapTag;
    private Map<W, Set<Windmill.GlobalDataRequest>> blockedMap;
    private Coder<InputT> elemCoder;

    public StreamingSideInputDoFnRunner(PipelineOptions pipelineOptions, DoFnInfo<InputT, OutputT> doFnInfo, PTuple pTuple, DoFnRunner.OutputManager<ReceiverT> outputManager, TupleTag<OutputT> tupleTag, List<TupleTag<?>> list, ExecutionContext.StepContext stepContext, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        super(pipelineOptions, doFnInfo.getDoFn(), pTuple, outputManager, tupleTag, list, stepContext, addCounterMutator, doFnInfo.getWindowingStrategy());
        this.stepContext = stepContext;
        this.windowingStrategy = (WindowingStrategy<?, W>) doFnInfo.getWindowingStrategy();
        this.elemCoder = doFnInfo.getInputCoder();
        this.sideInputViews = new HashMap();
        for (PCollectionView<?> pCollectionView : doFnInfo.getSideInputViews()) {
            this.sideInputViews.put(pCollectionView.getTagInternal().getId(), pCollectionView);
        }
        this.execContext = (StreamingModeExecutionContext) stepContext.getExecutionContext();
        this.blockedMapTag = CodedTupleTag.of("blockedMap:", MapCoder.of(this.windowingStrategy.getWindowFn().windowCoder(), SetCoder.of(Proto2Coder.of(Windmill.GlobalDataRequest.class))));
        this.blockedMap = (Map) stepContext.lookup(this.blockedMapTag);
        if (this.blockedMap == null) {
            this.blockedMap = new HashMap();
        }
    }

    private Map<W, CodedTupleTag<WindowedValue<InputT>>> getReadyWindowTags() {
        HashMap hashMap = new HashMap();
        for (Windmill.GlobalDataId globalDataId : this.execContext.getSideInputNotifications()) {
            if (this.sideInputViews.get(globalDataId.getTag()) != null) {
                for (Map.Entry<W, Set<Windmill.GlobalDataRequest>> entry : this.blockedMap.entrySet()) {
                    Set<Windmill.GlobalDataRequest> value = entry.getValue();
                    HashSet hashSet = new HashSet();
                    for (Windmill.GlobalDataRequest globalDataRequest : value) {
                        if (globalDataId.equals(globalDataRequest.getDataId())) {
                            hashSet.add(globalDataRequest);
                        }
                    }
                    value.removeAll(hashSet);
                    if (value.isEmpty()) {
                        try {
                            W key = entry.getKey();
                            boolean z = true;
                            for (PCollectionView<?> pCollectionView : this.sideInputViews.values()) {
                                if (!this.execContext.issueSideInputFetch(pCollectionView, key, StateFetcher.SideInputState.KNOWN_READY)) {
                                    Windmill.GlobalDataRequest buildGlobalDataRequest = buildGlobalDataRequest(pCollectionView, key);
                                    this.execContext.addBlockingSideInput(buildGlobalDataRequest);
                                    value.add(buildGlobalDataRequest);
                                    z = false;
                                }
                            }
                            if (z) {
                                hashMap.put(key, getElemListTag(key));
                            }
                        } catch (IOException e) {
                            throw Throwables.propagate(e);
                        }
                    }
                }
            }
        }
        return hashMap;
    }

    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void startBundle() {
        super.startBundle();
        Map<W, CodedTupleTag<WindowedValue<InputT>>> readyWindowTags = getReadyWindowTags();
        try {
            Map readTagLists = this.stepContext.readTagLists(readyWindowTags.values());
            for (Map.Entry<W, CodedTupleTag<WindowedValue<InputT>>> entry : readyWindowTags.entrySet()) {
                this.blockedMap.remove(entry.getKey());
                try {
                    Iterator it = ((Iterable) readTagLists.get(entry.getValue())).iterator();
                    while (it.hasNext()) {
                        this.fn.processElement(createProcessContext((WindowedValue) it.next()));
                    }
                    this.stepContext.deleteTagList(entry.getValue());
                } catch (Throwable th) {
                    Throwables.propagateIfInstanceOf(th, UserCodeException.class);
                    throw new UserCodeException(th);
                }
            }
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private Set<Windmill.GlobalDataRequest> computeBlockedSideInputs(W w) throws IOException {
        Set<Windmill.GlobalDataRequest> set = this.blockedMap.get(w);
        if (set == null) {
            for (PCollectionView<?> pCollectionView : this.sideInputViews.values()) {
                if (!this.execContext.issueSideInputFetch(pCollectionView, w, StateFetcher.SideInputState.UNKNOWN)) {
                    if (set == null) {
                        set = new HashSet();
                        this.blockedMap.put(w, set);
                    }
                    set.add(buildGlobalDataRequest(pCollectionView, w));
                }
            }
        }
        return set;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void invokeProcessElement(WindowedValue<InputT> windowedValue) {
        try {
            BoundedWindow next = windowedValue.getWindows().iterator().next();
            Set<Windmill.GlobalDataRequest> computeBlockedSideInputs = computeBlockedSideInputs(next);
            if (computeBlockedSideInputs == null) {
                this.fn.processElement(createProcessContext(windowedValue));
            } else {
                this.stepContext.writeToTagList(getElemListTag(next), windowedValue, windowedValue.getTimestamp());
                this.execContext.addBlockingSideInputs(computeBlockedSideInputs);
            }
        } catch (Throwable th) {
            Throwables.propagateIfInstanceOf(th, UserCodeException.class);
            throw new UserCodeException(th);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.util.DoFnRunner
    public void finishBundle() {
        super.finishBundle();
        try {
            this.stepContext.store(this.blockedMapTag, this.blockedMap);
        } catch (IOException e) {
            throw new RuntimeException("Exception while storing streaming side input info: ", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Windmill.GlobalDataRequest buildGlobalDataRequest(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow) throws IOException {
        Coder windowCoder = pCollectionView.getWindowingStrategyInternal().getWindowFn().windowCoder();
        BoundedWindow sideInputWindow = pCollectionView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(boundedWindow);
        ByteString.Output newOutput = ByteString.newOutput();
        windowCoder.encode(sideInputWindow, newOutput, Coder.Context.OUTER);
        return Windmill.GlobalDataRequest.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(pCollectionView.getTagInternal().getId()).setVersion(newOutput.toByteString()).build()).setExistenceWatermarkDeadline(TimeUnit.MILLISECONDS.toMicros(pCollectionView.getWindowingStrategyInternal().getTrigger().getSpec().getWatermarkThatGuaranteesFiring(sideInputWindow).getMillis())).build();
    }

    private CodedTupleTag<WindowedValue<InputT>> getElemListTag(W w) throws IOException {
        String str;
        String valueOf = String.valueOf(CoderUtils.encodeToBase64(this.windowingStrategy.getWindowFn().windowCoder(), w));
        if (valueOf.length() != 0) {
            str = "e:".concat(valueOf);
        } else {
            str = r1;
            String str2 = new String("e:");
        }
        return CodedTupleTag.of(str, WindowedValue.getFullCoder(this.elemCoder, this.windowingStrategy.getWindowFn().windowCoder()));
    }
}
