package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Ascii;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.TimerOrElement;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindowingWindmillReader.class */
public class WindowingWindmillReader<T> extends Reader<WindowedValue<TimerOrElement<T>>> {
    private final Coder<T> valueCoder;
    private final Coder<? extends BoundedWindow> windowCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.dataflow.sdk.runners.worker.WindowingWindmillReader$1, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindowingWindmillReader$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$windmill$Windmill$Timer$Type = new int[Windmill.Timer.Type.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$windmill$Windmill$Timer$Type[Windmill.Timer.Type.REALTIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$windmill$Windmill$Timer$Type[Windmill.Timer.Type.DEPENDENT_REALTIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$windmill$Windmill$Timer$Type[Windmill.Timer.Type.WATERMARK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindowingWindmillReader$WindowingWindmillReaderIterator.class */
    class WindowingWindmillReaderIterator extends Reader.AbstractReaderIterator<WindowedValue<TimerOrElement<T>>> {
        private int bundleIndex = 0;
        private int messageIndex = 0;
        private int timerIndex = 0;

        WindowingWindmillReaderIterator() {
        }

        private boolean hasMoreMessages() {
            Windmill.WorkItem work = WindowingWindmillReader.this.context.getWork();
            return this.bundleIndex < work.getMessageBundlesCount() && this.messageIndex < work.getMessageBundles(this.bundleIndex).getMessagesCount();
        }

        private boolean hasMoreTimers() {
            Windmill.WorkItem work = WindowingWindmillReader.this.context.getWork();
            return work.hasTimers() && this.timerIndex < work.getTimers().getTimersCount();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            return hasMoreMessages() || hasMoreTimers();
        }

        private TimeDomain getTimeDomain(Windmill.Timer.Type type) {
            switch (AnonymousClass1.$SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$windmill$Windmill$Timer$Type[type.ordinal()]) {
                case 1:
                    return TimeDomain.PROCESSING_TIME;
                case 2:
                    return TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
                case Ascii.ETX /* 3 */:
                    return TimeDomain.EVENT_TIME;
                default:
                    String valueOf = String.valueOf(type);
                    throw new IllegalArgumentException(new StringBuilder(23 + String.valueOf(valueOf).length()).append("Unsupported timer type ").append(valueOf).toString());
            }
        }

        private <W extends BoundedWindow> WindowedValue<TimerOrElement<T>> createTimer(Object obj, Windmill.Timer timer) {
            String stringUtf8 = timer.getTag().toStringUtf8();
            StateNamespace fromString = StateNamespaces.fromString(stringUtf8.substring(0, stringUtf8.indexOf(43)), WindowingWindmillReader.this.windowCoder);
            Instant instant = new Instant(TimeUnit.MICROSECONDS.toMillis(timer.getTimestamp()));
            return WindowedValue.of(TimerOrElement.timer(obj, TimerInternals.TimerData.of(fromString, instant, getTimeDomain(timer.getType()))), instant, new ArrayList(), PaneInfo.DEFAULT);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<TimerOrElement<T>> next() throws IOException {
            if (hasMoreTimers()) {
                if (!(WindowingWindmillReader.this.valueCoder instanceof KvCoder)) {
                    throw new RuntimeException("Timer set on non-keyed DoFn");
                }
                Windmill.TimerBundle timers = WindowingWindmillReader.this.context.getWork().getTimers();
                int i = this.timerIndex;
                this.timerIndex = i + 1;
                return createTimer(((KvCoder) WindowingWindmillReader.this.valueCoder).getKeyCoder().decode(WindowingWindmillReader.this.context.getSerializedKey().newInput(), Coder.Context.OUTER), timers.getTimers(i));
            }
            Windmill.Message messages = WindowingWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessages(this.messageIndex);
            if (this.messageIndex >= WindowingWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessagesCount() - 1) {
                this.messageIndex = 0;
                this.bundleIndex++;
            } else {
                this.messageIndex++;
            }
            Instant instant = new Instant(TimeUnit.MICROSECONDS.toMillis(messages.getTimestamp()));
            InputStream newInput = messages.getData().newInput();
            InputStream newInput2 = messages.getMetadata().newInput();
            Collection<? extends BoundedWindow> decodeMetadataWindows = WindmillSink.decodeMetadataWindows(WindowingWindmillReader.this.windowsCoder, messages.getMetadata());
            PaneInfo decodeMetadataPane = WindmillSink.decodeMetadataPane(messages.getMetadata());
            if (!(WindowingWindmillReader.this.valueCoder instanceof KvCoder)) {
                WindowingWindmillReader.this.notifyElementRead(newInput.available() + newInput2.available());
                return WindowedValue.of(TimerOrElement.element(decode(WindowingWindmillReader.this.valueCoder, newInput)), instant, decodeMetadataWindows, decodeMetadataPane);
            }
            KvCoder kvCoder = (KvCoder) WindowingWindmillReader.this.valueCoder;
            InputStream newInput3 = WindowingWindmillReader.this.context.getSerializedKey().newInput();
            WindowingWindmillReader.this.notifyElementRead(newInput3.available() + newInput.available() + newInput2.available());
            return WindowedValue.of(TimerOrElement.element(KV.of(decode(kvCoder.getKeyCoder(), newInput3), decode(kvCoder.getValueCoder(), newInput))), instant, decodeMetadataWindows, decodeMetadataPane);
        }

        private <T> T decode(Coder<T> coder, InputStream inputStream) throws IOException {
            return coder.decode(inputStream, Coder.Context.OUTER);
        }
    }

    WindowingWindmillReader(Coder<WindowedValue<TimerOrElement<T>>> coder, StreamingModeExecutionContext streamingModeExecutionContext) {
        WindowedValue.FullWindowedValueCoder fullWindowedValueCoder = (WindowedValue.FullWindowedValueCoder) coder;
        this.windowsCoder = fullWindowedValueCoder.getWindowsCoder();
        this.windowCoder = fullWindowedValueCoder.getWindowCoder();
        this.valueCoder = ((TimerOrElement.TimerOrElementCoder) fullWindowedValueCoder.getValueCoder()).getElementCoder();
        this.context = streamingModeExecutionContext;
    }

    public static <T> WindowingWindmillReader<T> create(PipelineOptions pipelineOptions, CloudObject cloudObject, Coder coder, ExecutionContext executionContext) {
        return new WindowingWindmillReader<>(coder, (StreamingModeExecutionContext) executionContext);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public Reader.ReaderIterator<WindowedValue<TimerOrElement<T>>> iterator() throws IOException {
        return new WindowingWindmillReaderIterator();
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public boolean supportsRestart() {
        return true;
    }
}
