/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillSink;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.joda.time.Instant;

class UngroupedWindmillReader<T>
extends Reader<WindowedValue<T>> {
    private final Coder<T> valueCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    UngroupedWindmillReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
        WindowedValue.FullWindowedValueCoder inputCoder = (WindowedValue.FullWindowedValueCoder)coder;
        this.valueCoder = inputCoder.getValueCoder();
        this.windowsCoder = inputCoder.getWindowsCoder();
        this.context = context;
    }

    @Override
    public Reader.ReaderIterator<WindowedValue<T>> iterator() throws IOException {
        return new UngroupedWindmillReaderIterator();
    }

    @Override
    public boolean supportsRestart() {
        return true;
    }

    class UngroupedWindmillReaderIterator
    extends Reader.AbstractReaderIterator<WindowedValue<T>> {
        private int bundleIndex = 0;
        private int messageIndex = 0;

        UngroupedWindmillReaderIterator() {
        }

        @Override
        public boolean hasNext() throws IOException {
            Windmill.WorkItem work = UngroupedWindmillReader.this.context.getWork();
            return this.bundleIndex < work.getMessageBundlesCount() && this.messageIndex < work.getMessageBundles(this.bundleIndex).getMessagesCount();
        }

        @Override
        public WindowedValue<T> next() throws IOException {
            Windmill.Message message = UngroupedWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessages(this.messageIndex);
            if (this.messageIndex >= UngroupedWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessagesCount() - 1) {
                this.messageIndex = 0;
                ++this.bundleIndex;
            } else {
                ++this.messageIndex;
            }
            Instant timestampMillis = new Instant(TimeUnit.MICROSECONDS.toMillis(message.getTimestamp()));
            InputStream data = message.getData().newInput();
            InputStream metadata = message.getMetadata().newInput();
            Collection<BoundedWindow> windows = WindmillSink.decodeMetadataWindows(UngroupedWindmillReader.this.windowsCoder, message.getMetadata());
            PaneInfo pane = WindmillSink.decodeMetadataPane(message.getMetadata());
            if (UngroupedWindmillReader.this.valueCoder instanceof KvCoder) {
                KvCoder kvCoder = (KvCoder)UngroupedWindmillReader.this.valueCoder;
                InputStream key = UngroupedWindmillReader.this.context.getSerializedKey().newInput();
                UngroupedWindmillReader.this.notifyElementRead(key.available() + data.available() + metadata.available());
                KV result = KV.of(this.decode(kvCoder.getKeyCoder(), key), this.decode(kvCoder.getValueCoder(), data));
                return WindowedValue.of(result, timestampMillis, windows, pane);
            }
            UngroupedWindmillReader.this.notifyElementRead(data.available() + metadata.available());
            return WindowedValue.of(this.decode(UngroupedWindmillReader.this.valueCoder, data), timestampMillis, windows, pane);
        }

        private <X> X decode(Coder<X> coder, InputStream input) throws IOException {
            return coder.decode(input, Coder.Context.OUTER);
        }
    }

    static class Factory
    implements ReaderFactory {
        Factory() {
        }

        @Override
        public Reader<?> create(CloudObject spec, @Nullable Coder<?> coder, @Nullable PipelineOptions options, @Nullable ExecutionContext executionContext, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String operationName) throws Exception {
            Coder typedCoder = coder;
            return new UngroupedWindmillReader(typedCoder, (StreamingModeExecutionContext)executionContext);
        }
    }
}

