/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleReader;
import com.google.cloud.dataflow.sdk.runners.worker.ByteArrayShufflePosition;
import com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleBatchReader;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntryReader;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.util.Iterator;

public class PartitioningShuffleReader<K, V>
extends Reader<WindowedValue<KV<K, V>>> {
    final byte[] shuffleReaderConfig;
    final String startShufflePosition;
    final String stopShufflePosition;
    Coder<K> keyCoder;
    WindowedValue.WindowedValueCoder<V> windowedValueCoder;

    public PartitioningShuffleReader(PipelineOptions options, byte[] shuffleReaderConfig, String startShufflePosition, String stopShufflePosition, Coder<WindowedValue<KV<K, V>>> coder) throws Exception {
        this.shuffleReaderConfig = shuffleReaderConfig;
        this.startShufflePosition = startShufflePosition;
        this.stopShufflePosition = stopShufflePosition;
        this.initCoder(coder);
    }

    private void initCoder(Coder<WindowedValue<KV<K, V>>> coder) throws Exception {
        if (!(coder instanceof WindowedValue.WindowedValueCoder)) {
            String string = String.valueOf(coder);
            throw new Exception(new StringBuilder(44 + String.valueOf(string).length()).append("unexpected kind of coder for WindowedValue: ").append(string).toString());
        }
        WindowedValue.WindowedValueCoder windowedElemCoder = (WindowedValue.WindowedValueCoder)coder;
        Coder elemCoder = windowedElemCoder.getValueCoder();
        if (!(elemCoder instanceof KvCoder)) {
            String string = String.valueOf("unexpected kind of coder for elements read from a key-partitioning shuffle: ");
            String string2 = String.valueOf(elemCoder);
            throw new Exception(new StringBuilder(0 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append(string2).toString());
        }
        KvCoder kvCoder = (KvCoder)elemCoder;
        this.keyCoder = kvCoder.getKeyCoder();
        this.windowedValueCoder = windowedElemCoder.withValueCoder(kvCoder.getValueCoder());
    }

    @Override
    public Reader.ReaderIterator<WindowedValue<KV<K, V>>> iterator() throws IOException {
        Preconditions.checkArgument(this.shuffleReaderConfig != null);
        return this.iterator(new BatchingShuffleEntryReader(new ChunkingShuffleBatchReader(new ApplianceShuffleReader(this.shuffleReaderConfig))));
    }

    Reader.ReaderIterator<WindowedValue<KV<K, V>>> iterator(ShuffleEntryReader reader) {
        return new PartitioningShuffleReaderIterator(reader);
    }

    class PartitioningShuffleReaderIterator
    extends AbstractBoundedReaderIterator<WindowedValue<KV<K, V>>> {
        Iterator<ShuffleEntry> iterator;

        PartitioningShuffleReaderIterator(ShuffleEntryReader reader) {
            this.iterator = reader.read(ByteArrayShufflePosition.fromBase64(PartitioningShuffleReader.this.startShufflePosition), ByteArrayShufflePosition.fromBase64(PartitioningShuffleReader.this.stopShufflePosition));
        }

        @Override
        protected boolean hasNextImpl() throws IOException {
            return this.iterator.hasNext();
        }

        @Override
        protected WindowedValue<KV<K, V>> nextImpl() throws IOException {
            ShuffleEntry record = this.iterator.next();
            Object key = CoderUtils.decodeFromByteArray(PartitioningShuffleReader.this.keyCoder, record.getKey());
            WindowedValue windowedValue = (WindowedValue)CoderUtils.decodeFromByteArray(PartitioningShuffleReader.this.windowedValueCoder, record.getValue());
            PartitioningShuffleReader.this.notifyElementRead(record.length());
            return windowedValue.withValue(KV.of(key, windowedValue.getValue()));
        }
    }
}

