package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.StateFetcher;
import com.google.cloud.dataflow.sdk.util.TimerManager;
import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
import com.google.cloud.dataflow.sdk.values.CodedTupleTagMap;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext.class */
public class StreamingModeExecutionContext extends ExecutionContext {
    private String computation;
    private Instant inputDataWatermark;
    private Windmill.WorkItem work;
    private StateFetcher stateFetcher;
    private Windmill.WorkItemCommitRequest.Builder outputBuilder;
    private Map<TupleTag<?>, Map<BoundedWindow, Object>> sideInputCache = new HashMap();

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext$StepContext.class */
    class StepContext extends ExecutionContext.StepContext {
        private KeyedStateCache tagCache;

        public StepContext(String str) {
            super(str);
            int length = str.length();
            String valueOf = String.valueOf(String.valueOf(str));
            String sb = new StringBuilder(12 + valueOf.length()).append(length).append(":").append(valueOf).toString();
            this.tagCache = new KeyedStateCache(sb, CacheBuilder.newBuilder().build(new TagLoader(sb)), CacheBuilder.newBuilder().build(new TagListLoader(sb)));
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> void store(CodedTupleTag<T> codedTupleTag, T t, Instant instant) {
            this.tagCache.store(codedTupleTag, t, instant);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public <T> void remove(CodedTupleTag<T> codedTupleTag) {
            this.tagCache.removeTags(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext, com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public CodedTupleTagMap lookup(Iterable<? extends CodedTupleTag<?>> iterable) throws IOException {
            return CodedTupleTagMap.of(this.tagCache.lookupTags(iterable));
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> void writeToTagList(CodedTupleTag<T> codedTupleTag, T t, Instant instant) {
            this.tagCache.writeToTagList(codedTupleTag, t, instant);
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> Iterable<T> readTagList(CodedTupleTag<T> codedTupleTag) throws IOException {
            return readTagLists(Arrays.asList(codedTupleTag)).get(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> Map<CodedTupleTag<T>, Iterable<T>> readTagLists(Iterable<CodedTupleTag<T>> iterable) throws IOException {
            return (Map<CodedTupleTag<T>, Iterable<T>>) this.tagCache.readTagLists(iterable);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext.StepContext
        public <T> void deleteTagList(CodedTupleTag<T> codedTupleTag) {
            this.tagCache.removeTagLists(codedTupleTag);
        }

        public void flushState() throws IOException {
            this.tagCache.flushTo(StreamingModeExecutionContext.this.outputBuilder);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext$TagListLoader.class */
    private class TagListLoader extends CacheLoader<CodedTupleTag<?>, List<?>> {
        private final String mangledPrefix;

        private TagListLoader(String str) {
            this.mangledPrefix = str;
        }

        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader
        public List<?> load(CodedTupleTag<?> codedTupleTag) throws Exception {
            return loadAll(Arrays.asList(codedTupleTag)).get(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader
        public Map<CodedTupleTag<?>, List<?>> loadAll(Iterable<? extends CodedTupleTag<?>> iterable) throws Exception {
            return StreamingModeExecutionContext.this.stateFetcher.fetchList(StreamingModeExecutionContext.this.computation, StreamingModeExecutionContext.this.getSerializedKey(), StreamingModeExecutionContext.this.getWorkToken(), this.mangledPrefix, iterable);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StreamingModeExecutionContext$TagLoader.class */
    private class TagLoader extends CacheLoader<CodedTupleTag<?>, Optional<?>> {
        private final String mangledPrefix;

        private TagLoader(String str) {
            this.mangledPrefix = str;
        }

        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader
        public Optional<?> load(CodedTupleTag<?> codedTupleTag) throws Exception {
            return loadAll(Arrays.asList(codedTupleTag)).get(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheLoader
        public Map<CodedTupleTag<?>, Optional<?>> loadAll(Iterable<? extends CodedTupleTag<?>> iterable) throws Exception {
            return StreamingModeExecutionContext.this.stateFetcher.fetch(StreamingModeExecutionContext.this.computation, StreamingModeExecutionContext.this.getSerializedKey(), StreamingModeExecutionContext.this.getWorkToken(), this.mangledPrefix, iterable);
        }
    }

    public StreamingModeExecutionContext(String str, StateFetcher stateFetcher) {
        this.computation = str;
        this.stateFetcher = stateFetcher;
    }

    public void start(Windmill.WorkItem workItem, Instant instant, Windmill.WorkItemCommitRequest.Builder builder) {
        this.work = workItem;
        this.outputBuilder = builder;
        this.sideInputCache.clear();
        this.inputDataWatermark = instant;
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public ExecutionContext.StepContext createStepContext(String str) {
        return new StepContext(str);
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public TimerManager getTimerManager() {
        return new TimerManager() { // from class: com.google.cloud.dataflow.sdk.util.StreamingModeExecutionContext.1
            @Override // com.google.cloud.dataflow.sdk.util.TimerManager
            public void setTimer(String str, Instant instant, TimerManager.TimeDomain timeDomain) {
                StreamingModeExecutionContext.this.outputBuilder.addOutputTimers(Windmill.Timer.newBuilder().setTimestamp(TimeUnit.MILLISECONDS.toMicros(instant.getMillis())).setTag(ByteString.copyFromUtf8(str)).setType(StreamingModeExecutionContext.this.timerType(timeDomain)).build());
            }

            @Override // com.google.cloud.dataflow.sdk.util.TimerManager
            public void deleteTimer(String str, TimerManager.TimeDomain timeDomain) {
                StreamingModeExecutionContext.this.outputBuilder.addOutputTimers(Windmill.Timer.newBuilder().setTag(ByteString.copyFromUtf8(str)).setType(StreamingModeExecutionContext.this.timerType(timeDomain)).build());
            }

            @Override // com.google.cloud.dataflow.sdk.util.TimerManager
            public Instant currentProcessingTime() {
                return Instant.now();
            }

            @Override // com.google.cloud.dataflow.sdk.util.TimerManager
            public Instant currentWatermarkTime() {
                return StreamingModeExecutionContext.this.inputDataWatermark;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Windmill.Timer.Type timerType(TimerManager.TimeDomain timeDomain) {
        return timeDomain == TimerManager.TimeDomain.EVENT_TIME ? Windmill.Timer.Type.WATERMARK : Windmill.Timer.Type.REALTIME;
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public <T> T getSideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow, PTuple pTuple) {
        if (pTuple.has(pCollectionView.getTagInternal())) {
            return (T) fetchSideInput(pCollectionView, boundedWindow, StateFetcher.SideInputState.CACHED_IN_WORKITEM);
        }
        throw new IllegalArgumentException("calling sideInput() with unknown view; did you forget to pass the view in ParDo.withSideInputs()?");
    }

    public boolean issueSideInputFetch(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow, StateFetcher.SideInputState sideInputState) {
        return fetchSideInput(pCollectionView, boundedWindow, sideInputState) != null;
    }

    private <T> T fetchSideInput(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow, StateFetcher.SideInputState sideInputState) {
        String str;
        BoundedWindow sideInputWindow = pCollectionView.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(boundedWindow);
        Map<BoundedWindow, Object> map = this.sideInputCache.get(pCollectionView.getTagInternal());
        if (map == null) {
            map = new HashMap();
            this.sideInputCache.put(pCollectionView.getTagInternal(), map);
        }
        T t = (T) map.get(sideInputWindow);
        if (t != null) {
            return t;
        }
        if (sideInputState != StateFetcher.SideInputState.CACHED_IN_WORKITEM) {
            T t2 = (T) this.stateFetcher.fetchSideInput(pCollectionView, sideInputWindow, sideInputState);
            if (t2 == null) {
                return null;
            }
            map.put(sideInputWindow, t2);
            return t2;
        }
        String valueOf = String.valueOf(pCollectionView.getTagInternal().getId());
        if (valueOf.length() != 0) {
            str = "Expected side input to be cached. Tag: ".concat(valueOf);
        } else {
            str = r3;
            String str2 = new String("Expected side input to be cached. Tag: ");
        }
        throw new IllegalStateException(str);
    }

    @Override // com.google.cloud.dataflow.sdk.util.ExecutionContext
    public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<Iterable<WindowedValue<T>>> coder, W w, Coder<W> coder2) throws IOException {
        if (getSerializedKey().size() != 0) {
            throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
        }
        ByteString.Output newOutput = ByteString.newOutput();
        coder.encode(iterable, newOutput, Coder.Context.OUTER);
        ByteString.Output newOutput2 = ByteString.newOutput();
        coder2.encode(w, newOutput2, Coder.Context.OUTER);
        this.outputBuilder.addGlobalDataUpdates(Windmill.GlobalData.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(tupleTag.getId()).setVersion(newOutput2.toByteString()).build()).setData(newOutput.toByteString()).build());
    }

    public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
        return this.work.getGlobalDataIdNotificationsList();
    }

    public void addBlockingSideInput(Windmill.GlobalDataRequest globalDataRequest) {
        this.outputBuilder.addGlobalDataRequests(globalDataRequest);
        this.outputBuilder.addGlobalDataIdRequests(globalDataRequest.getDataId());
    }

    public void addBlockingSideInputs(Iterable<Windmill.GlobalDataRequest> iterable) {
        Iterator<Windmill.GlobalDataRequest> it = iterable.iterator();
        while (it.hasNext()) {
            addBlockingSideInput(it.next());
        }
    }

    public ByteString getSerializedKey() {
        return this.work.getKey();
    }

    public long getWorkToken() {
        return this.work.getWorkToken();
    }

    public Windmill.WorkItem getWork() {
        return this.work;
    }

    public Windmill.WorkItemCommitRequest.Builder getOutputBuilder() {
        return this.outputBuilder;
    }

    public void flushState() {
        Iterator<ExecutionContext.StepContext> it = getAllStepContexts().iterator();
        while (it.hasNext()) {
            try {
                ((StepContext) it.next()).flushState();
            } catch (IOException e) {
                throw new RuntimeException("Failed to flush state");
            }
        }
    }
}
