/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Instant;

class PubsubReader<T>
extends Reader<WindowedValue<T>> {
    private final WindowedValue.ValueOnlyWindowedValueCoder<?> coder;
    private StreamingModeExecutionContext context;

    PubsubReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
        WindowedValue.ValueOnlyWindowedValueCoder typedCoder;
        this.coder = typedCoder = (WindowedValue.ValueOnlyWindowedValueCoder)coder;
        this.context = context;
    }

    @Override
    public Reader.ReaderIterator<WindowedValue<T>> iterator() throws IOException {
        return new PubsubReaderIterator();
    }

    @Override
    public boolean supportsRestart() {
        return true;
    }

    class PubsubReaderIterator
    extends Reader.AbstractReaderIterator<WindowedValue<T>> {
        private int bundleIndex = 0;
        private int messageIndex = 0;

        PubsubReaderIterator() {
        }

        @Override
        public boolean hasNext() throws IOException {
            Windmill.WorkItem work = PubsubReader.this.context.getWork();
            return this.bundleIndex < work.getMessageBundlesCount() && this.messageIndex < work.getMessageBundles(this.bundleIndex).getMessagesCount();
        }

        @Override
        public WindowedValue<T> next() throws IOException {
            Windmill.Message message = PubsubReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessages(this.messageIndex);
            if (this.messageIndex >= PubsubReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessagesCount() - 1) {
                this.messageIndex = 0;
                ++this.bundleIndex;
            } else {
                ++this.messageIndex;
            }
            long timestampMillis = TimeUnit.MICROSECONDS.toMillis(message.getTimestamp());
            InputStream data = message.getData().newInput();
            PubsubReader.this.notifyElementRead(data.available());
            Object value = PubsubReader.this.coder.getValueCoder().decode(data, Coder.Context.OUTER);
            return WindowedValue.timestampedValueInGlobalWindow(value, new Instant(timestampMillis));
        }
    }

    static class Factory
    implements ReaderFactory {
        Factory() {
        }

        @Override
        public Reader<?> create(CloudObject cloudSourceSpec, @Nullable Coder<?> coder, @Nullable PipelineOptions options, @Nullable ExecutionContext executionContext, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String operationName) throws Exception {
            Coder typedCoder = coder;
            return new PubsubReader(typedCoder, (StreamingModeExecutionContext)executionContext);
        }
    }
}

