package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.Preconditions;
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.Position;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.Reiterable;
import com.google.cloud.dataflow.sdk.util.common.Reiterator;
import com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.KeyGroupedShuffleEntries;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader.class */
public class GroupingShuffleReader<K, V> extends Reader<WindowedValue<KV<K, Reiterable<V>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleReader.class);
    final byte[] shuffleReaderConfig;

    @Nullable
    final String startShufflePosition;

    @Nullable
    final String stopShufflePosition;
    final BatchModeExecutionContext executionContext;
    Coder<K> keyCoder;
    Coder<V> valueCoder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator.class */
    public final class GroupingShuffleReaderIterator extends AbstractBoundedReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
        private final Iterator<KeyGroupedShuffleEntries> groups;
        private final GroupingShuffleRangeTracker rangeTracker;
        private ByteArrayShufflePosition lastGroupStart;
        private KeyGroupedShuffleEntries currentGroup = null;
        protected StateSampler stateSampler;
        protected int readState;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator$ValuesIterable.class */
        public final class ValuesIterable implements Reiterable<V> {
            private final Reiterable<ShuffleEntry> base;

            public ValuesIterable(Reiterable<ShuffleEntry> reiterable) {
                this.base = (Reiterable) Preconditions.checkNotNull(reiterable);
            }

            @Override // java.lang.Iterable
            public GroupingShuffleReader<K, V>.GroupingShuffleReaderIterator.ValuesIterator iterator() {
                return new ValuesIterator(this.base.iterator());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator$ValuesIterator.class */
        public final class ValuesIterator implements Reiterator<V> {
            private final Reiterator<ShuffleEntry> base;

            public ValuesIterator(Reiterator<ShuffleEntry> reiterator) {
                this.base = (Reiterator) Preconditions.checkNotNull(reiterator);
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                StateSampler.ScopedState scopedState = GroupingShuffleReaderIterator.this.stateSampler.scopedState(GroupingShuffleReaderIterator.this.readState);
                Throwable th = null;
                try {
                    boolean hasNext = this.base.hasNext();
                    if (scopedState != null) {
                        if (0 != 0) {
                            try {
                                scopedState.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            scopedState.close();
                        }
                    }
                    return hasNext;
                } catch (Throwable th3) {
                    if (scopedState != null) {
                        if (0 != 0) {
                            try {
                                scopedState.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            scopedState.close();
                        }
                    }
                    throw th3;
                }
            }

            @Override // java.util.Iterator
            public V next() {
                StateSampler.ScopedState scopedState = GroupingShuffleReaderIterator.this.stateSampler.scopedState(GroupingShuffleReaderIterator.this.readState);
                Throwable th = null;
                try {
                    try {
                        V v = (V) CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.valueCoder, this.base.next().getValue());
                        if (scopedState != null) {
                            if (0 != 0) {
                                try {
                                    scopedState.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                scopedState.close();
                            }
                        }
                        return v;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th3) {
                    if (scopedState != null) {
                        if (0 != 0) {
                            try {
                                scopedState.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            scopedState.close();
                        }
                    }
                    throw th3;
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                this.base.remove();
            }

            @Override // com.google.cloud.dataflow.sdk.util.common.Reiterator
            /* renamed from: copy */
            public GroupingShuffleReader<K, V>.GroupingShuffleReaderIterator.ValuesIterator copy2() {
                return new ValuesIterator(this.base.copy2());
            }
        }

        public GroupingShuffleReaderIterator(ShuffleEntryReader shuffleEntryReader) {
            this.stateSampler = null;
            if (GroupingShuffleReader.this.stateSampler == null) {
                this.stateSampler = new StateSampler("local", new CounterSet(new Counter[0]).getAddCounterMutator());
                this.readState = this.stateSampler.stateForName("shuffle");
            } else {
                Preconditions.checkNotNull(GroupingShuffleReader.this.stateSamplerOperationName);
                this.stateSampler = GroupingShuffleReader.this.stateSampler;
                this.readState = this.stateSampler.stateForName(String.valueOf(GroupingShuffleReader.this.stateSamplerOperationName).concat("-process"));
            }
            this.rangeTracker = new GroupingShuffleRangeTracker(ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.startShufflePosition), ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.stopShufflePosition));
            StateSampler.ScopedState scopedState = this.stateSampler.scopedState(this.readState);
            Throwable th = null;
            try {
                try {
                    this.groups = new GroupingShuffleEntryIterator(shuffleEntryReader.read(this.rangeTracker.getStartPosition(), this.rangeTracker.getStopPosition())) { // from class: com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleReader.GroupingShuffleReaderIterator.1
                        @Override // com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator
                        protected void notifyElementRead(long j) {
                            GroupingShuffleReader.this.notifyElementRead(j);
                        }
                    };
                    if (scopedState != null) {
                        if (0 == 0) {
                            scopedState.close();
                            return;
                        }
                        try {
                            scopedState.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (scopedState != null) {
                    if (th != null) {
                        try {
                            scopedState.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        scopedState.close();
                    }
                }
                throw th4;
            }
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator
        protected boolean hasNextImpl() throws IOException {
            StateSampler.ScopedState scopedState = this.stateSampler.scopedState(this.readState);
            Throwable th = null;
            try {
                if (!this.groups.hasNext()) {
                    return false;
                }
                this.currentGroup = this.groups.next();
                if (scopedState != null) {
                    if (0 != 0) {
                        try {
                            scopedState.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        scopedState.close();
                    }
                }
                ByteArrayShufflePosition of = ByteArrayShufflePosition.of(this.currentGroup.position);
                boolean z = this.lastGroupStart == null || !of.equals(this.lastGroupStart);
                this.lastGroupStart = of;
                return this.rangeTracker.tryReturnRecordAt(z, of);
            } finally {
                if (scopedState != null) {
                    if (0 != 0) {
                        try {
                            scopedState.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        scopedState.close();
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.google.cloud.dataflow.sdk.util.common.worker.AbstractBoundedReaderIterator
        public WindowedValue<KV<K, Reiterable<V>>> nextImpl() throws IOException {
            Object decodeFromByteArray = CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.keyCoder, this.currentGroup.key);
            if (GroupingShuffleReader.this.executionContext != null) {
                GroupingShuffleReader.this.executionContext.setKey(decodeFromByteArray);
            }
            KeyGroupedShuffleEntries keyGroupedShuffleEntries = this.currentGroup;
            this.currentGroup = null;
            return WindowedValue.valueInEmptyWindows(KV.of(decodeFromByteArray, new ValuesIterable(keyGroupedShuffleEntries.values)));
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.AbstractReaderIterator, com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.Progress getProgress() {
            Position position = new Position();
            ApproximateProgress approximateProgress = new ApproximateProgress();
            ByteArrayShufflePosition lastGroupStart = this.rangeTracker.getLastGroupStart();
            if (lastGroupStart != null) {
                position.setShufflePosition(lastGroupStart.encodeBase64());
                approximateProgress.setPosition(position);
            }
            return SourceTranslationUtils.cloudProgressToReaderProgress(approximateProgress);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.AbstractReaderIterator, com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest dynamicSplitRequest) {
            Preconditions.checkNotNull(dynamicSplitRequest);
            Position position = SourceTranslationUtils.splitRequestToApproximateProgress(dynamicSplitRequest).getPosition();
            if (position == null) {
                GroupingShuffleReader.LOG.warn("GroupingShuffleReader only supports split at a Position. Requested: {}", dynamicSplitRequest);
                return null;
            }
            String shufflePosition = position.getShufflePosition();
            if (shufflePosition == null) {
                GroupingShuffleReader.LOG.warn("GroupingShuffleReader only supports split at a shuffle position. Requested: {}", position);
                return null;
            }
            ByteArrayShufflePosition fromBase64 = ByteArrayShufflePosition.fromBase64(shufflePosition);
            if (this.rangeTracker.trySplitAtPosition(fromBase64)) {
                GroupingShuffleReader.LOG.info("Split GroupingShuffleReader at {}, now {}", fromBase64.encodeBase64(), this.rangeTracker);
                return new Reader.DynamicSplitResultWithPosition(SourceTranslationUtils.cloudPositionToReaderPosition(position));
            }
            GroupingShuffleReader.LOG.info("Refused to split GroupingShuffleReader {} at {}", this.rangeTracker, fromBase64.encodeBase64());
            return null;
        }
    }

    public GroupingShuffleReader(PipelineOptions pipelineOptions, byte[] bArr, @Nullable String str, @Nullable String str2, Coder<WindowedValue<KV<K, Iterable<V>>>> coder, BatchModeExecutionContext batchModeExecutionContext) throws Exception {
        this.shuffleReaderConfig = bArr;
        this.startShufflePosition = str;
        this.stopShufflePosition = str2;
        this.executionContext = batchModeExecutionContext;
        initCoder(coder);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator() throws IOException {
        Preconditions.checkArgument(this.shuffleReaderConfig != null);
        return iterator(new BatchingShuffleEntryReader(new ChunkingShuffleBatchReader(new ApplianceShuffleReader(this.shuffleReaderConfig))));
    }

    private void initCoder(Coder<WindowedValue<KV<K, Iterable<V>>>> coder) throws Exception {
        if (!(coder instanceof WindowedValue.WindowedValueCoder)) {
            String valueOf = String.valueOf(coder);
            throw new Exception(new StringBuilder(44 + String.valueOf(valueOf).length()).append("unexpected kind of coder for WindowedValue: ").append(valueOf).toString());
        }
        Coder valueCoder = ((WindowedValue.WindowedValueCoder) coder).getValueCoder();
        if (!(valueCoder instanceof KvCoder)) {
            String valueOf2 = String.valueOf("unexpected kind of coder for elements read from a key-grouping shuffle: ");
            String valueOf3 = String.valueOf(valueCoder);
            throw new Exception(new StringBuilder(0 + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length()).append(valueOf2).append(valueOf3).toString());
        }
        KvCoder kvCoder = (KvCoder) valueCoder;
        this.keyCoder = kvCoder.getKeyCoder();
        Coder<V> valueCoder2 = kvCoder.getValueCoder();
        if (!(valueCoder2 instanceof IterableCoder)) {
            throw new Exception("unexpected kind of coder for values of KVs read from a key-grouping shuffle");
        }
        this.valueCoder = (Coder<V>) ((IterableCoder) valueCoder2).getElemCoder();
    }

    final Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator(ShuffleEntryReader shuffleEntryReader) {
        return new GroupingShuffleReaderIterator(shuffleEntryReader);
    }
}
