/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleReader;
import com.google.cloud.dataflow.sdk.runners.worker.ByteArrayShufflePosition;
import com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleBatchReader;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntryReader;
import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nullable;

public class UngroupedShuffleReader<T>
extends Reader<T> {
    final byte[] shuffleReaderConfig;
    final String startShufflePosition;
    final String stopShufflePosition;
    final Coder<T> coder;

    public UngroupedShuffleReader(PipelineOptions options, byte[] shuffleReaderConfig, @Nullable String startShufflePosition, @Nullable String stopShufflePosition, Coder<T> coder) {
        this.shuffleReaderConfig = shuffleReaderConfig;
        this.startShufflePosition = startShufflePosition;
        this.stopShufflePosition = stopShufflePosition;
        this.coder = coder;
    }

    @Override
    public Reader.ReaderIterator<T> iterator() throws IOException {
        Preconditions.checkArgument(this.shuffleReaderConfig != null);
        return this.iterator(new BatchingShuffleEntryReader(new ChunkingShuffleBatchReader(new ApplianceShuffleReader(this.shuffleReaderConfig))));
    }

    Reader.ReaderIterator<T> iterator(ShuffleEntryReader reader) {
        return new UngroupedShuffleReaderIterator(reader);
    }

    class UngroupedShuffleReaderIterator
    extends AbstractBoundedReaderIterator<T> {
        Iterator<ShuffleEntry> iterator;

        UngroupedShuffleReaderIterator(ShuffleEntryReader reader) {
            this.iterator = reader.read(ByteArrayShufflePosition.fromBase64(UngroupedShuffleReader.this.startShufflePosition), ByteArrayShufflePosition.fromBase64(UngroupedShuffleReader.this.stopShufflePosition));
        }

        @Override
        protected boolean hasNextImpl() throws IOException {
            return this.iterator.hasNext();
        }

        @Override
        protected T nextImpl() throws IOException {
            ShuffleEntry record = this.iterator.next();
            byte[] value = record.getValue();
            UngroupedShuffleReader.this.notifyElementRead(record.length());
            return CoderUtils.decodeFromByteArray(UngroupedShuffleReader.this.coder, value);
        }
    }
}

