package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/PubsubReader.class */
public class PubsubReader<T> extends Reader<WindowedValue<T>> {
    private final WindowedValue.ValueOnlyWindowedValueCoder<?> coder;
    private StreamingModeExecutionContext context;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/PubsubReader$Factory.class */
    public static class Factory implements ReaderFactory {
        @Override // com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory
        public Reader<?> create(CloudObject cloudObject, @Nullable Coder<?> coder, @Nullable PipelineOptions pipelineOptions, @Nullable ExecutionContext executionContext, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String str) throws Exception {
            return new PubsubReader(coder, (StreamingModeExecutionContext) executionContext);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/PubsubReader$PubsubReaderIterator.class */
    class PubsubReaderIterator extends Reader.AbstractReaderIterator<WindowedValue<T>> {
        private int bundleIndex = 0;
        private int messageIndex = 0;

        PubsubReaderIterator() {
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            Windmill.WorkItem work = PubsubReader.this.context.getWork();
            return this.bundleIndex < work.getMessageBundlesCount() && this.messageIndex < work.getMessageBundles(this.bundleIndex).getMessagesCount();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<T> next() throws IOException {
            Windmill.Message messages = PubsubReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessages(this.messageIndex);
            if (this.messageIndex >= PubsubReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessagesCount() - 1) {
                this.messageIndex = 0;
                this.bundleIndex++;
            } else {
                this.messageIndex++;
            }
            long millis = TimeUnit.MICROSECONDS.toMillis(messages.getTimestamp());
            InputStream newInput = messages.getData().newInput();
            PubsubReader.this.notifyElementRead(newInput.available());
            return WindowedValue.timestampedValueInGlobalWindow(PubsubReader.this.coder.getValueCoder().decode(newInput, Coder.Context.OUTER), new Instant(millis));
        }
    }

    PubsubReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext streamingModeExecutionContext) {
        this.coder = (WindowedValue.ValueOnlyWindowedValueCoder) coder;
        this.context = streamingModeExecutionContext;
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public Reader.ReaderIterator<WindowedValue<T>> iterator() throws IOException {
        return new PubsubReaderIterator();
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public boolean supportsRestart() {
        return true;
    }
}
