/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.Preconditions;
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.Position;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleReader;
import com.google.cloud.dataflow.sdk.runners.worker.ByteArrayShufflePosition;
import com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleBatchReader;
import com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleRangeTracker;
import com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleReader;
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.Reiterable;
import com.google.cloud.dataflow.sdk.util.common.Reiterator;
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.KeyGroupedShuffleEntries;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupingShuffleReader<K, V>
extends Reader<WindowedValue<KV<K, Reiterable<V>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleReader.class);
    public static final String SOURCE_NAME = "GroupingShuffleSource";
    final byte[] shuffleReaderConfig;
    @Nullable
    final String startShufflePosition;
    @Nullable
    final String stopShufflePosition;
    final BatchModeExecutionContext executionContext;
    @Nullable
    final CounterSet.AddCounterMutator addCounterMutator;
    @Nullable
    final String operationName;
    @Nullable
    Counter<Long> perOperationPerDatasetBytesCounter;
    Coder<K> keyCoder;
    Coder<V> valueCoder;

    public GroupingShuffleReader(PipelineOptions options, byte[] shuffleReaderConfig, @Nullable String startShufflePosition, @Nullable String stopShufflePosition, Coder<WindowedValue<KV<K, Iterable<V>>>> coder, BatchModeExecutionContext executionContext, CounterSet.AddCounterMutator addCounterMutator, String operationName) throws Exception {
        this.shuffleReaderConfig = shuffleReaderConfig;
        this.startShufflePosition = startShufflePosition;
        this.stopShufflePosition = stopShufflePosition;
        this.executionContext = executionContext;
        this.addCounterMutator = addCounterMutator;
        this.operationName = operationName;
        this.initCoder(coder);
    }

    private synchronized void initCounter(String datasetId) {
        if (this.perOperationPerDatasetBytesCounter == null && this.addCounterMutator != null && this.operationName != null) {
            String string = this.operationName;
            this.perOperationPerDatasetBytesCounter = this.addCounterMutator.addCounter(Counter.longs(new StringBuilder(27 + String.valueOf(datasetId).length() + String.valueOf(string).length()).append("dax-shuffle-").append(datasetId).append("-wf-").append(string).append("-read-bytes").toString(), Counter.AggregationKind.SUM));
        }
    }

    @Override
    protected StateSampler.StateKind getStateSamplerStateKind() {
        return StateSampler.StateKind.FRAMEWORK;
    }

    @Override
    public Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator() throws IOException {
        com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(this.shuffleReaderConfig != null);
        ApplianceShuffleReader asr = new ApplianceShuffleReader(this.shuffleReaderConfig);
        String datasetId = asr.getDatasetId();
        this.initCounter(datasetId);
        return this.iterator(new BatchingShuffleEntryReader(new ChunkingShuffleBatchReader(asr)));
    }

    private void initCoder(Coder<WindowedValue<KV<K, Iterable<V>>>> coder) throws Exception {
        if (!(coder instanceof WindowedValue.WindowedValueCoder)) {
            String string = String.valueOf(coder);
            throw new Exception(new StringBuilder(44 + String.valueOf(string).length()).append("unexpected kind of coder for WindowedValue: ").append(string).toString());
        }
        Coder elemCoder = ((WindowedValue.WindowedValueCoder)coder).getValueCoder();
        if (!(elemCoder instanceof KvCoder)) {
            String string = String.valueOf("unexpected kind of coder for elements read from a key-grouping shuffle: ");
            String string2 = String.valueOf(elemCoder);
            throw new Exception(new StringBuilder(0 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append(string2).toString());
        }
        KvCoder kvCoder = (KvCoder)elemCoder;
        this.keyCoder = kvCoder.getKeyCoder();
        Coder kvValueCoder = kvCoder.getValueCoder();
        if (!(kvValueCoder instanceof IterableCoder)) {
            throw new Exception("unexpected kind of coder for values of KVs read from a key-grouping shuffle");
        }
        IterableCoder iterCoder = (IterableCoder)kvValueCoder;
        this.valueCoder = iterCoder.getElemCoder();
    }

    final Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator(ShuffleEntryReader reader) {
        return new GroupingShuffleReaderIterator(reader);
    }

    private final class GroupingShuffleReaderIterator
    extends AbstractBoundedReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
        private final Iterator<KeyGroupedShuffleEntries> groups;
        private final GroupingShuffleRangeTracker rangeTracker;
        private ByteArrayShufflePosition lastGroupStart;
        private KeyGroupedShuffleEntries currentGroup = null;
        private final AtomicLong currentGroupSize = new AtomicLong(0L);
        protected StateSampler stateSampler = null;
        protected int readState;

        public GroupingShuffleReaderIterator(ShuffleEntryReader reader) {
            if (GroupingShuffleReader.this.stateSampler == null) {
                CounterSet counterSet = new CounterSet(new Counter[0]);
                this.stateSampler = new StateSampler("local", counterSet.getAddCounterMutator());
                this.readState = this.stateSampler.stateForName("shuffle", StateSampler.StateKind.FRAMEWORK);
            } else {
                Preconditions.checkNotNull((Object)GroupingShuffleReader.this.stateSamplerOperationName);
                this.stateSampler = GroupingShuffleReader.this.stateSampler;
                this.readState = this.stateSampler.stateForName(String.valueOf(GroupingShuffleReader.this.stateSamplerOperationName).concat("-process"), StateSampler.StateKind.FRAMEWORK);
            }
            this.rangeTracker = new GroupingShuffleRangeTracker(ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.startShufflePosition), ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.stopShufflePosition));
            try (StateSampler.ScopedState read = this.stateSampler.scopedState(this.readState);){
                this.groups = new GroupingShuffleEntryIterator(reader.read(this.rangeTracker.getStartPosition(), this.rangeTracker.getStopPosition())){

                    @Override
                    protected void notifyElementRead(long byteSize) {
                        GroupingShuffleReaderIterator.this.currentGroupSize.addAndGet(byteSize);
                        GroupingShuffleReader.this.notifyElementRead(byteSize);
                    }
                };
            }
        }

        @Override
        protected boolean hasNextImpl() throws IOException {
            try (StateSampler.ScopedState read = this.stateSampler.scopedState(this.readState);){
                if (!this.groups.hasNext()) {
                    boolean bl = false;
                    return bl;
                }
                this.currentGroup = this.groups.next();
            }
            ByteArrayShufflePosition groupStart = ByteArrayShufflePosition.of(this.currentGroup.position);
            boolean isAtSplitPoint = this.lastGroupStart == null || !groupStart.equals(this.lastGroupStart);
            this.lastGroupStart = groupStart;
            return this.rangeTracker.tryReturnRecordAt(isAtSplitPoint, groupStart);
        }

        @Override
        protected WindowedValue<KV<K, Reiterable<V>>> nextImpl() throws IOException {
            Object key = CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.keyCoder, this.currentGroup.key);
            if (GroupingShuffleReader.this.executionContext != null) {
                GroupingShuffleReader.this.executionContext.setKey(key);
            }
            KeyGroupedShuffleEntries group = this.currentGroup;
            this.currentGroup = null;
            return WindowedValue.valueInEmptyWindows(KV.of(key, new ValuesIterable(group.values)));
        }

        @Override
        public Reader.Progress getProgress() {
            Position position = new Position();
            ApproximateProgress progress = new ApproximateProgress();
            ByteArrayShufflePosition groupStart = this.rangeTracker.getLastGroupStart();
            if (groupStart != null) {
                position.setShufflePosition(groupStart.encodeBase64());
                progress.setPosition(position);
            }
            return SourceTranslationUtils.cloudProgressToReaderProgress(progress);
        }

        @Override
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest splitRequest) {
            Preconditions.checkNotNull((Object)splitRequest);
            ApproximateProgress splitProgress = SourceTranslationUtils.splitRequestToApproximateProgress(splitRequest);
            Position splitPosition = splitProgress.getPosition();
            if (splitPosition == null) {
                LOG.warn("GroupingShuffleReader only supports split at a Position. Requested: {}", (Object)splitRequest);
                return null;
            }
            String splitShufflePosition = splitPosition.getShufflePosition();
            if (splitShufflePosition == null) {
                LOG.warn("GroupingShuffleReader only supports split at a shuffle position. Requested: {}", (Object)splitPosition);
                return null;
            }
            ByteArrayShufflePosition newStopPosition = ByteArrayShufflePosition.fromBase64(splitShufflePosition);
            if (this.rangeTracker.trySplitAtPosition(newStopPosition)) {
                LOG.info("Split GroupingShuffleReader at {}, now {}", (Object)newStopPosition.encodeBase64(), (Object)this.rangeTracker);
                return new Reader.DynamicSplitResultWithPosition(SourceTranslationUtils.cloudPositionToReaderPosition(splitPosition));
            }
            LOG.info("Refused to split GroupingShuffleReader {} at {}", (Object)this.rangeTracker, (Object)newStopPosition.encodeBase64());
            return null;
        }

        private final class ValuesIterator
        implements Reiterator<V> {
            private final Reiterator<ShuffleEntry> base;

            public ValuesIterator(Reiterator<ShuffleEntry> base) {
                this.base = (Reiterator)Preconditions.checkNotNull(base);
            }

            @Override
            public boolean hasNext() {
                try (StateSampler.ScopedState read = GroupingShuffleReaderIterator.this.stateSampler.scopedState(GroupingShuffleReaderIterator.this.readState);){
                    boolean bl = this.base.hasNext();
                    return bl;
                }
            }

            @Override
            public V next() {
                Throwable throwable = null;
                try (StateSampler.ScopedState read = GroupingShuffleReaderIterator.this.stateSampler.scopedState(GroupingShuffleReaderIterator.this.readState);){
                    Object v;
                    ShuffleEntry entry = (ShuffleEntry)this.base.next();
                    if (GroupingShuffleReader.this.perOperationPerDatasetBytesCounter != null) {
                        GroupingShuffleReader.this.perOperationPerDatasetBytesCounter.addValue(GroupingShuffleReaderIterator.this.currentGroupSize.getAndSet(0L));
                    }
                    try {
                        v = CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.valueCoder, entry.getValue());
                    }
                    catch (IOException exn) {
                        try {
                            throw new RuntimeException(exn);
                        }
                        catch (Throwable throwable2) {
                            throwable = throwable2;
                            throw throwable2;
                        }
                    }
                    return v;
                }
            }

            @Override
            public void remove() {
                this.base.remove();
            }

            public com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.ValuesIterator copy() {
                return new ValuesIterator(this.base.copy());
            }
        }

        private final class ValuesIterable
        implements Reiterable<V> {
            private final Reiterable<ShuffleEntry> base;

            public ValuesIterable(Reiterable<ShuffleEntry> base) {
                this.base = (Reiterable)Preconditions.checkNotNull(base);
            }

            public com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleReader$GroupingShuffleReaderIterator.ValuesIterator iterator() {
                return new ValuesIterator(this.base.iterator());
            }
        }
    }
}

