/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.StateFetcher;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillStateInternals;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillStateReader;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BaseExecutionContext;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

public class StreamingModeExecutionContext
extends DataflowExecutionContext {
    private final String stageName;
    private final Map<TupleTag<?>, Map<BoundedWindow, Object>> sideInputCache;
    private final ConcurrentMap<ByteString, StreamingDataflowWorker.ReaderCacheEntry> readerCache;
    private final ConcurrentMap<String, String> stateNameMap;
    private Windmill.WorkItem work;
    private Instant inputDataWatermark;
    private WindmillStateReader stateReader;
    private StateFetcher stateFetcher;
    private Windmill.WorkItemCommitRequest.Builder outputBuilder;
    private UnboundedSource.UnboundedReader<?> activeReader;

    public StreamingModeExecutionContext(String stageName, ConcurrentMap<ByteString, StreamingDataflowWorker.ReaderCacheEntry> readerCache, ConcurrentMap<String, String> stateNameMap) {
        this.stageName = stageName;
        this.sideInputCache = new HashMap();
        this.readerCache = readerCache;
        this.stateNameMap = stateNameMap;
    }

    public void start(Windmill.WorkItem work, Instant inputDataWatermark, WindmillStateReader stateReader, StateFetcher stateFetcher, Windmill.WorkItemCommitRequest.Builder outputBuilder) {
        this.work = work;
        this.inputDataWatermark = inputDataWatermark;
        this.stateReader = stateReader;
        this.stateFetcher = stateFetcher;
        this.outputBuilder = outputBuilder;
        this.sideInputCache.clear();
        for (ExecutionContext.StepContext stepContext : this.getAllStepContexts()) {
            ((StepContext)stepContext).start(stateReader, inputDataWatermark);
        }
    }

    @Override
    public ExecutionContext.StepContext createStepContext(String stepName, String transformName, StateSampler stateSampler) {
        StepContext context = new StepContext(stepName, transformName, stateSampler);
        context.start(this.stateReader, this.inputDataWatermark);
        return context;
    }

    @Override
    public SideInputReader getSideInputReader(Iterable<? extends SideInputInfo> sideInputInfos) {
        throw new UnsupportedOperationException("Cannot call getSideInputReader for StreamingDataflowWorker: the MapTask specification should not have had any SideInputInfo descriptors since the streaming runner does not yet support them.");
    }

    @Override
    public SideInputReader getSideInputReaderForViews(Iterable<? extends PCollectionView<?>> views) {
        return StreamingModeSideInputReader.of(views, this);
    }

    private <T> T fetchSideInput(PCollectionView<T> view, BoundedWindow sideInputWindow, String stateFamily, StateFetcher.SideInputState state, Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
        Object sideInput;
        Map<BoundedWindow, Object> tagCache = this.sideInputCache.get(view.getTagInternal());
        if (tagCache == null) {
            tagCache = new HashMap<BoundedWindow, Object>();
            this.sideInputCache.put(view.getTagInternal(), tagCache);
        }
        if ((sideInput = tagCache.get(sideInputWindow)) == null) {
            if (state == StateFetcher.SideInputState.CACHED_IN_WORKITEM) {
                String string = String.valueOf(view.getTagInternal().getId());
                throw new IllegalStateException(string.length() != 0 ? "Expected side input to be cached. Tag: ".concat(string) : new String("Expected side input to be cached. Tag: "));
            }
            T typed = this.stateFetcher.fetchSideInput(view, sideInputWindow, stateFamily, state, scopedReadStateSupplier);
            sideInput = typed;
            if (sideInput != null) {
                tagCache.put(sideInputWindow, sideInput);
                return (T)sideInput;
            }
            return null;
        }
        return (T)sideInput;
    }

    public Iterable<Windmill.GlobalDataId> getSideInputNotifications() {
        return this.work.getGlobalDataIdNotificationsList();
    }

    public ByteString getSerializedKey() {
        return this.work.getKey();
    }

    public long getWorkToken() {
        return this.work.getWorkToken();
    }

    public Windmill.WorkItem getWork() {
        return this.work;
    }

    public Windmill.WorkItemCommitRequest.Builder getOutputBuilder() {
        return this.outputBuilder;
    }

    public UnboundedSource.UnboundedReader<?> getCachedReader() {
        StreamingDataflowWorker.ReaderCacheEntry entry = (StreamingDataflowWorker.ReaderCacheEntry)this.readerCache.get(this.getSerializedKey());
        if (entry == null) {
            return null;
        }
        if (entry.token != this.getWork().getCacheToken()) {
            this.readerCache.remove(this.getSerializedKey());
            return null;
        }
        return entry.reader;
    }

    public void setActiveReader(UnboundedSource.UnboundedReader<?> reader) {
        this.activeReader = reader;
    }

    public UnboundedSource.CheckpointMark getReaderCheckpoint(Coder<? extends UnboundedSource.CheckpointMark> coder) {
        try {
            ByteString state = this.work.getSourceState().getState();
            if (state.isEmpty()) {
                return null;
            }
            return coder.decode(state.newInput(), Coder.Context.OUTER);
        }
        catch (IOException e) {
            throw new RuntimeException("Exception while decoding checkpoint", e);
        }
    }

    public Map<Long, Runnable> flushState() {
        HashMap<Long, Runnable> callbacks = new HashMap<Long, Runnable>();
        for (ExecutionContext.StepContext stepContext : this.getAllStepContexts()) {
            ((StepContext)stepContext).flushState();
        }
        if (this.activeReader != null) {
            Windmill.SourceState.Builder sourceStateBuilder = this.outputBuilder.getSourceStateUpdatesBuilder();
            final UnboundedSource.CheckpointMark checkpointMark = this.activeReader.getCheckpointMark();
            Instant watermark = this.activeReader.getWatermark();
            long id = ThreadLocalRandom.current().nextLong();
            sourceStateBuilder.addFinalizeIds(id);
            callbacks.put(id, new Runnable(){

                @Override
                public void run() {
                    try {
                        checkpointMark.finalizeCheckpoint();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Exception while finalizing checkpoint", e);
                    }
                }
            });
            Coder checkpointCoder = ((UnboundedSource)this.activeReader.getCurrentSource()).getCheckpointMarkCoder();
            if (checkpointCoder != null) {
                ByteString.Output stream = ByteString.newOutput();
                try {
                    checkpointCoder.encode(checkpointMark, (OutputStream)stream, Coder.Context.OUTER);
                }
                catch (IOException e) {
                    throw new RuntimeException("Exception while encoding checkpoint", e);
                }
                sourceStateBuilder.setState(stream.toByteString());
            }
            this.outputBuilder.setSourceWatermark(TimeUnit.MILLISECONDS.toMicros(watermark.getMillis()));
            long backlogBytes = this.activeReader.getSplitBacklogBytes();
            if (backlogBytes == -1L && BasicSerializableSourceFormat.isFirstUnboundedSourceSplit(this.getSerializedKey())) {
                backlogBytes = this.activeReader.getTotalBacklogBytes();
            }
            if (backlogBytes != -1L) {
                String string = String.valueOf(this.stageName);
                this.outputBuilder.addCounterUpdates(Windmill.Counter.newBuilder().setName(string.length() != 0 ? "dataflow_backlog_size-".concat(string) : new String("dataflow_backlog_size-")).setKind(Windmill.Counter.Kind.SUM).setIntScalar(backlogBytes).setCumulative(true).build());
            }
            this.readerCache.put(this.getSerializedKey(), new StreamingDataflowWorker.ReaderCacheEntry(this.activeReader, this.getWork().getCacheToken()));
        }
        return callbacks;
    }

    public List<Long> getReadyCommitCallbackIds() {
        return this.work.getSourceState().getFinalizeIdsList();
    }

    public static ByteString timerTag(TimerInternals.TimerData key) {
        String tagString = String.format("%s+%d:%d", key.getNamespace().stringKey(), key.getDomain().ordinal(), key.getTimestamp().getMillis());
        return ByteString.copyFromUtf8((String)tagString);
    }

    public static class StreamingModeSideInputReader
    implements SideInputReader {
        private StreamingModeExecutionContext context;
        private Set<PCollectionView<?>> viewSet;

        private StreamingModeSideInputReader(Iterable<? extends PCollectionView<?>> views, StreamingModeExecutionContext context) {
            this.context = context;
            this.viewSet = ImmutableSet.copyOf(views);
        }

        public static StreamingModeSideInputReader of(Iterable<? extends PCollectionView<?>> views, StreamingModeExecutionContext context) {
            return new StreamingModeSideInputReader(views, context);
        }

        @Override
        public <T> T get(PCollectionView<T> view, BoundedWindow window) {
            if (!this.contains(view)) {
                throw new RuntimeException("get() called with unknown view");
            }
            return (T)this.context.fetchSideInput(view, window, null, StateFetcher.SideInputState.CACHED_IN_WORKITEM, null);
        }

        @Override
        public <T> boolean contains(PCollectionView<T> view) {
            return this.viewSet.contains(view);
        }

        @Override
        public boolean isEmpty() {
            return this.viewSet.isEmpty();
        }
    }

    class StepContext
    extends BaseExecutionContext.StepContext {
        private WindmillStateInternals stateInternals;
        private WindmillTimerInternals timerInternals;
        private final String prefix;
        private final String stateFamily;
        private final Supplier<StateSampler.ScopedState> scopedReadStateSupplier;

        public StepContext(final String stepName, String transformName, final StateSampler stateSampler) {
            super(StreamingModeExecutionContext.this, stepName, transformName);
            if (StreamingModeExecutionContext.this.stateNameMap.isEmpty()) {
                this.prefix = transformName;
                this.stateFamily = "";
            } else {
                String mappedName = (String)StreamingModeExecutionContext.this.stateNameMap.get(transformName);
                this.stateFamily = this.prefix = mappedName == null ? "" : mappedName;
            }
            this.scopedReadStateSupplier = new Supplier<StateSampler.ScopedState>(){
                private int readState = -1;

                @Override
                public StateSampler.ScopedState get() {
                    if (stateSampler == null) {
                        return null;
                    }
                    if (this.readState == -1) {
                        this.readState = stateSampler.stateForName(String.valueOf(stepName).concat("-windmill-read"), StateSampler.StateKind.FRAMEWORK);
                    }
                    return stateSampler.scopedState(this.readState);
                }
            };
        }

        public void start(WindmillStateReader stateReader, Instant inputDataWatermark) {
            boolean useStateFamilies = !StreamingModeExecutionContext.this.stateNameMap.isEmpty();
            this.stateInternals = new WindmillStateInternals(this.prefix, useStateFamilies, stateReader, this.scopedReadStateSupplier);
            this.timerInternals = new WindmillTimerInternals(this.stateFamily, Preconditions.checkNotNull(inputDataWatermark));
        }

        public void flushState() {
            this.stateInternals.persist(StreamingModeExecutionContext.this.outputBuilder);
            this.timerInternals.persistTo(StreamingModeExecutionContext.this.outputBuilder);
        }

        @Override
        public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws IOException {
            if (StreamingModeExecutionContext.this.getSerializedKey().size() != 0) {
                throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
            }
            ByteString.Output dataStream = ByteString.newOutput();
            dataCoder.encode(data, (OutputStream)dataStream, Coder.Context.OUTER);
            ByteString.Output windowStream = ByteString.newOutput();
            windowCoder.encode(window, (OutputStream)windowStream, Coder.Context.OUTER);
            Windmill.GlobalData.Builder builder = Windmill.GlobalData.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(tag.getId()).setVersion(windowStream.toByteString()).build()).setData(dataStream.toByteString());
            if (this.stateFamily != null) {
                builder.setStateFamily(this.stateFamily);
            }
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataUpdates(builder.build());
        }

        public boolean issueSideInputFetch(PCollectionView<?> view, BoundedWindow mainInputWindow, StateFetcher.SideInputState state) {
            Object sideInputWindow = view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
            return StreamingModeExecutionContext.this.fetchSideInput(view, sideInputWindow, this.stateFamily, state, this.scopedReadStateSupplier) != null;
        }

        public void addBlockingSideInput(Windmill.GlobalDataRequest sideInput) {
            if (this.stateFamily != null) {
                sideInput = Windmill.GlobalDataRequest.newBuilder((Windmill.GlobalDataRequest)sideInput).setStateFamily(this.stateFamily).build();
            }
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataRequests(sideInput);
            StreamingModeExecutionContext.this.outputBuilder.addGlobalDataIdRequests(sideInput.getDataId());
        }

        public void addBlockingSideInputs(Iterable<Windmill.GlobalDataRequest> sideInputs) {
            for (Windmill.GlobalDataRequest sideInput : sideInputs) {
                this.addBlockingSideInput(sideInput);
            }
        }

        @Override
        public StateInternals stateInternals() {
            return Preconditions.checkNotNull(this.stateInternals);
        }

        @Override
        public TimerInternals timerInternals() {
            return Preconditions.checkNotNull(this.timerInternals);
        }
    }

    private static class WindmillTimerInternals
    implements TimerInternals {
        private Map<TimerInternals.TimerData, Boolean> timers = new HashMap<TimerInternals.TimerData, Boolean>();
        private Instant inputDataWatermark;
        private String stateFamily;

        public WindmillTimerInternals(String stateFamily, Instant inputDataWatermark) {
            this.inputDataWatermark = inputDataWatermark;
            this.stateFamily = stateFamily;
        }

        @Override
        public void setTimer(TimerInternals.TimerData timerKey) {
            this.timers.put(timerKey, true);
        }

        @Override
        public void deleteTimer(TimerInternals.TimerData timerKey) {
            this.timers.put(timerKey, false);
        }

        @Override
        public Instant currentProcessingTime() {
            return Instant.now();
        }

        @Override
        public Instant currentWatermarkTime() {
            return this.inputDataWatermark;
        }

        public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
            for (Map.Entry<TimerInternals.TimerData, Boolean> entry : this.timers.entrySet()) {
                Windmill.Timer.Builder timer = outputBuilder.addOutputTimersBuilder().setTag(StreamingModeExecutionContext.timerTag(entry.getKey())).setType(this.timerType(entry.getKey().getDomain()));
                if (this.stateFamily != null) {
                    timer.setStateFamily(this.stateFamily);
                }
                if (!entry.getValue().booleanValue()) continue;
                long timestampMicros = TimeUnit.MILLISECONDS.toMicros(entry.getKey().getTimestamp().getMillis());
                timer.setTimestamp(timestampMicros);
            }
            this.timers.clear();
        }

        private Windmill.Timer.Type timerType(TimeDomain domain) {
            switch (domain) {
                case EVENT_TIME: {
                    return Windmill.Timer.Type.WATERMARK;
                }
                case PROCESSING_TIME: {
                    return Windmill.Timer.Type.REALTIME;
                }
                case SYNCHRONIZED_PROCESSING_TIME: {
                    return Windmill.Timer.Type.DEPENDENT_REALTIME;
                }
            }
            String string = String.valueOf((Object)domain);
            throw new IllegalArgumentException(new StringBuilder(25 + String.valueOf(string).length()).append("Unrecgonized TimeDomain: ").append(string).toString());
        }
    }
}

