package com.google.cloud.dataflow.sdk.runners.dataflow;

import com.google.api.client.util.Base64;
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.SourceGetMetadataRequest;
import com.google.api.services.dataflow.model.SourceGetMetadataResponse;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationRequest;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.SourceSplitOptions;
import com.google.api.services.dataflow.model.SourceSplitRequest;
import com.google.api.services.dataflow.model.SourceSplitResponse;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Ascii;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat.class */
public class BasicSerializableSourceFormat implements SourceFormat {
    private static final String SERIALIZED_SOURCE = "serialized_source";

    @VisibleForTesting
    static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
    private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 67108864;
    public static final String TOO_MANY_SOURCE_SPLITS_ERROR = "Total number of Source objects generated by splitIntoBundles() operation, %d, is larger than the allowable limit, %d. For more information, please check the corresponding FAQ entry at:\nhttps://cloud.google.com/dataflow/faq";
    private static final int MAX_NUMBER_OF_SPLITS = 16000;
    private final PipelineOptions options;
    private static final Logger LOG = LoggerFactory.getLogger(BasicSerializableSourceFormat.class);
    private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat$2, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState = new int[ReaderToIteratorAdapter.NextState.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState[ReaderToIteratorAdapter.NextState.UNKNOWN_BEFORE_START.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState[ReaderToIteratorAdapter.NextState.UNKNOWN_BEFORE_ADVANCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState[ReaderToIteratorAdapter.NextState.AVAILABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState[ReaderToIteratorAdapter.NextState.UNAVAILABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$BoundedReaderIterator.class */
    private static class BoundedReaderIterator<T> implements Reader.ReaderIterator<WindowedValue<T>> {
        private BoundedSource.BoundedReader<T> reader;
        private ReaderToIteratorAdapter<T> iteratorAdapter;

        private BoundedReaderIterator(BoundedSource.BoundedReader<T> boundedReader) {
            this.reader = boundedReader;
            this.iteratorAdapter = new ReaderToIteratorAdapter<>(boundedReader, false);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            return this.iteratorAdapter.hasNext();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<T> next() throws IOException {
            return this.iteratorAdapter.next();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public BoundedReaderIterator<T> copy() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator, java.lang.AutoCloseable
        public void close() throws IOException {
            this.reader.close();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.Progress getProgress() {
            if (!(this.reader instanceof BoundedSource.BoundedReader)) {
                return null;
            }
            ApproximateProgress approximateProgress = new ApproximateProgress();
            Double fractionConsumed = this.reader.getFractionConsumed();
            if (fractionConsumed != null) {
                approximateProgress.setPercentComplete(Float.valueOf(fractionConsumed.floatValue()));
            }
            return SourceTranslationUtils.cloudProgressToReaderProgress(approximateProgress);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest dynamicSplitRequest) {
            Float percentComplete = SourceTranslationUtils.splitRequestToApproximateProgress(dynamicSplitRequest).getPercentComplete();
            if (percentComplete == null) {
                return null;
            }
            BoundedSource<T> currentSource = this.reader.getCurrentSource();
            BoundedSource<T> splitAtFraction2 = this.reader.splitAtFraction2(percentComplete.doubleValue());
            if (splitAtFraction2 == null) {
                return null;
            }
            BoundedSource<T> currentSource2 = this.reader.getCurrentSource();
            if (currentSource == currentSource2) {
                String valueOf = String.valueOf("Successful split did not change the current source: primary is identical to original (Source objects MUST be immutable): ");
                String valueOf2 = String.valueOf(currentSource2);
                throw new IllegalStateException(new StringBuilder(0 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length()).append(valueOf).append(valueOf2).toString());
            }
            if (currentSource == splitAtFraction2) {
                String valueOf3 = String.valueOf("Successful split did not change the current source: residual is identical to original (Source objects MUST be immutable): ");
                String valueOf4 = String.valueOf(splitAtFraction2);
                throw new IllegalStateException(new StringBuilder(0 + String.valueOf(valueOf3).length() + String.valueOf(valueOf4).length()).append(valueOf3).append(valueOf4).toString());
            }
            try {
                currentSource2.validate();
                try {
                    splitAtFraction2.validate();
                    return new BoundedSourceSplit(currentSource2, splitAtFraction2);
                } catch (Exception e) {
                    String valueOf5 = String.valueOf("Successful split produced an illegal residual source. \nOriginal: ");
                    String valueOf6 = String.valueOf(currentSource);
                    String valueOf7 = String.valueOf(currentSource2);
                    String valueOf8 = String.valueOf(splitAtFraction2);
                    throw new IllegalStateException(new StringBuilder(21 + String.valueOf(valueOf5).length() + String.valueOf(valueOf6).length() + String.valueOf(valueOf7).length() + String.valueOf(valueOf8).length()).append(valueOf5).append(valueOf6).append("\nPrimary: ").append(valueOf7).append("\nResidual: ").append(valueOf8).toString());
                }
            } catch (Exception e2) {
                String valueOf9 = String.valueOf("Successful split produced an illegal primary source. \nOriginal: ");
                String valueOf10 = String.valueOf(currentSource);
                String valueOf11 = String.valueOf(currentSource2);
                String valueOf12 = String.valueOf(splitAtFraction2);
                throw new IllegalStateException(new StringBuilder(21 + String.valueOf(valueOf9).length() + String.valueOf(valueOf10).length() + String.valueOf(valueOf11).length() + String.valueOf(valueOf12).length()).append(valueOf9).append(valueOf10).append("\nPrimary: ").append(valueOf11).append("\nResidual: ").append(valueOf12).toString());
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$BoundedSourceSplit.class */
    public static final class BoundedSourceSplit<T> implements Reader.DynamicSplitResult {
        public final BoundedSource<T> primary;
        public final BoundedSource<T> residual;

        public BoundedSourceSplit(BoundedSource<T> boundedSource, BoundedSource<T> boundedSource2) {
            this.primary = boundedSource;
            this.residual = boundedSource2;
        }

        public String toString() {
            return String.format("<primary: %s; residual: %s>", this.primary, this.residual);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$Factory.class */
    public static class Factory implements ReaderFactory {
        @Override // com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory
        public Reader<?> create(CloudObject cloudObject, @Nullable Coder<?> coder, @Nullable PipelineOptions pipelineOptions, @Nullable ExecutionContext executionContext, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String str) throws Exception {
            return BasicSerializableSourceFormat.create(cloudObject, pipelineOptions, executionContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$ReaderToIteratorAdapter.class */
    public static class ReaderToIteratorAdapter<T> {
        private Source.Reader<T> reader;
        private NextState state;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState.class */
        public enum NextState {
            UNKNOWN_BEFORE_START,
            UNKNOWN_BEFORE_ADVANCE,
            AVAILABLE,
            UNAVAILABLE
        }

        private ReaderToIteratorAdapter(Source.Reader<T> reader, boolean z) {
            this.reader = reader;
            this.state = z ? NextState.UNKNOWN_BEFORE_ADVANCE : NextState.UNKNOWN_BEFORE_START;
        }

        public boolean hasNext() throws IOException {
            switch (AnonymousClass2.$SwitchMap$com$google$cloud$dataflow$sdk$runners$dataflow$BasicSerializableSourceFormat$ReaderToIteratorAdapter$NextState[this.state.ordinal()]) {
                case 1:
                    try {
                        if (this.reader.start()) {
                            this.state = NextState.AVAILABLE;
                            return true;
                        }
                        this.state = NextState.UNAVAILABLE;
                        return false;
                    } catch (Exception e) {
                        String valueOf = String.valueOf(this.reader.getCurrentSource());
                        throw new IOException(new StringBuilder(37 + String.valueOf(valueOf).length()).append("Failed to start reading from source: ").append(valueOf).toString(), e);
                    }
                case 2:
                    if (this.reader.advance()) {
                        this.state = NextState.AVAILABLE;
                        return true;
                    }
                    this.state = NextState.UNAVAILABLE;
                    return false;
                case Ascii.ETX /* 3 */:
                    return true;
                case 4:
                    return false;
                default:
                    throw new AssertionError();
            }
        }

        public WindowedValue<T> next() throws IOException {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            this.state = NextState.UNKNOWN_BEFORE_ADVANCE;
            return WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$UnboundedReader.class */
    public static class UnboundedReader<T> extends Reader<WindowedValue<ValueWithRecordId<T>>> {
        private final PipelineOptions options;
        private final CloudObject spec;
        private final StreamingModeExecutionContext context;

        UnboundedReader(PipelineOptions pipelineOptions, CloudObject cloudObject, StreamingModeExecutionContext streamingModeExecutionContext) {
            this.options = pipelineOptions;
            this.spec = cloudObject;
            this.context = streamingModeExecutionContext;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
        public Reader.ReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() {
            UnboundedSource.UnboundedReader cachedReader = this.context.getCachedReader();
            boolean z = cachedReader != null;
            if (cachedReader == null) {
                UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource = parseSource(Integer.parseInt(this.context.getSerializedKey().toStringUtf8().substring(0, 16), 16) - 1);
                UnboundedSource.CheckpointMark checkpointMark = null;
                if (parseSource.getCheckpointMarkCoder() != null) {
                    checkpointMark = this.context.getReaderCheckpoint(parseSource.getCheckpointMarkCoder());
                }
                cachedReader = parseSource.createReader(this.options, checkpointMark);
            }
            this.context.setActiveReader(cachedReader);
            return new UnboundedReaderIterator(cachedReader, z);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
        public boolean supportsRestart() {
            return true;
        }

        private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int i) {
            try {
                List<String> strings = Structs.getStrings(this.spec, BasicSerializableSourceFormat.SERIALIZED_SOURCE_SPLITS, null);
                Preconditions.checkArgument(strings != null, "UnboundedSource object did not contain splits");
                Preconditions.checkArgument(i < strings.size(), "UnboundedSource splits contained too few splits.  Requested index was %s, size was %s", Integer.valueOf(i), Integer.valueOf(strings.size()));
                Object deserializeFromByteArray = SerializableUtils.deserializeFromByteArray(Base64.decodeBase64(strings.get(i)), "UnboundedSource split");
                if (deserializeFromByteArray instanceof UnboundedSource) {
                    return (UnboundedSource) deserializeFromByteArray;
                }
                String valueOf = String.valueOf(deserializeFromByteArray.getClass());
                throw new IllegalArgumentException(new StringBuilder(30 + String.valueOf(valueOf).length()).append("Expected UnboundedSource, got ").append(valueOf).toString());
            } catch (Exception e) {
                throw new RuntimeException("Parsing serialized source splits failed: ", e);
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/dataflow/BasicSerializableSourceFormat$UnboundedReaderIterator.class */
    private static class UnboundedReaderIterator<T> implements Reader.ReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
        private static final int MAX_BUNDLE_SIZE = 10000;
        private static final Duration MAX_BUNDLE_READ_TIME = Duration.standardSeconds(10);
        private ReaderToIteratorAdapter<T> iteratorAdapter;
        private UnboundedSource.UnboundedReader<T> reader;
        private Instant endTime;
        private int elemsRead;

        private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> unboundedReader, boolean z) {
            this.iteratorAdapter = new ReaderToIteratorAdapter<>(unboundedReader, z);
            this.reader = unboundedReader;
            this.endTime = Instant.now().plus(MAX_BUNDLE_READ_TIME);
            this.elemsRead = 0;
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            if (this.elemsRead >= MAX_BUNDLE_SIZE || Instant.now().isAfter(this.endTime)) {
                return false;
            }
            AttemptBoundedExponentialBackOff attemptBoundedExponentialBackOff = new AttemptBoundedExponentialBackOff(5, 100L);
            while (!this.iteratorAdapter.hasNext()) {
                long nextBackOffMillis = attemptBoundedExponentialBackOff.nextBackOffMillis();
                if (nextBackOffMillis == -1) {
                    return false;
                }
                try {
                    Thread.sleep(nextBackOffMillis);
                } catch (InterruptedException e) {
                }
            }
            return true;
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<ValueWithRecordId<T>> next() throws IOException {
            WindowedValue<T> next = this.iteratorAdapter.next();
            this.elemsRead++;
            return (WindowedValue<ValueWithRecordId<T>>) next.withValue(new ValueWithRecordId(next.getValue(), this.reader.getCurrentRecordId()));
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public UnboundedReaderIterator<T> copy() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator, java.lang.AutoCloseable
        public void close() {
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.Progress getProgress() {
            return null;
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest dynamicSplitRequest) {
            return null;
        }
    }

    public BasicSerializableSourceFormat(PipelineOptions pipelineOptions) {
        this.options = pipelineOptions;
    }

    public static DynamicSourceSplit toSourceSplit(BoundedSourceSplit<?> boundedSourceSplit, PipelineOptions pipelineOptions) {
        DynamicSourceSplit dynamicSourceSplit = new DynamicSourceSplit();
        try {
            com.google.api.services.dataflow.model.Source serializeToCloudSource = serializeToCloudSource(boundedSourceSplit.primary, pipelineOptions);
            com.google.api.services.dataflow.model.Source serializeToCloudSource2 = serializeToCloudSource(boundedSourceSplit.residual, pipelineOptions);
            dynamicSourceSplit.setPrimary(new DerivedSource().setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT").setSource(serializeToCloudSource));
            dynamicSourceSplit.setResidual(new DerivedSource().setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT").setSource(serializeToCloudSource2));
            return dynamicSourceSplit;
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize one of the parts of the source split", e);
        }
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat
    public SourceFormat.OperationResponse performSourceOperation(SourceFormat.OperationRequest operationRequest) throws Exception {
        SourceOperationRequest sourceOperationRequestToCloudSourceOperationRequest = SourceTranslationUtils.sourceOperationRequestToCloudSourceOperationRequest(operationRequest);
        SourceOperationResponse sourceOperationResponse = new SourceOperationResponse();
        if (sourceOperationRequestToCloudSourceOperationRequest.getGetMetadata() != null) {
            sourceOperationResponse.setGetMetadata(performGetMetadata(sourceOperationRequestToCloudSourceOperationRequest.getGetMetadata()));
        } else {
            if (sourceOperationRequestToCloudSourceOperationRequest.getSplit() == null) {
                throw new UnsupportedOperationException("Unknown source operation request");
            }
            sourceOperationResponse.setSplit(performSplit(sourceOperationRequestToCloudSourceOperationRequest.getSplit()));
        }
        return SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse(sourceOperationResponse);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static Reader<WindowedValue<?>> create(CloudObject cloudObject, final PipelineOptions pipelineOptions, ExecutionContext executionContext) throws Exception {
        final Source<?> deserializeFromCloudSource = deserializeFromCloudSource(cloudObject);
        if (deserializeFromCloudSource instanceof BoundedSource) {
            return new Reader<WindowedValue<Object>>() { // from class: com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat.1
                @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
                public Reader.ReaderIterator<WindowedValue<Object>> iterator() throws IOException {
                    return new BoundedReaderIterator(((BoundedSource) Source.this).createReader(pipelineOptions));
                }
            };
        }
        if (deserializeFromCloudSource instanceof UnboundedSource) {
            return new UnboundedReader(pipelineOptions, cloudObject, (StreamingModeExecutionContext) executionContext);
        }
        String valueOf = String.valueOf(deserializeFromCloudSource.getClass());
        throw new IllegalArgumentException(new StringBuilder(24 + String.valueOf(valueOf).length()).append("Unexpected source kind: ").append(valueOf).toString());
    }

    public static boolean isFirstUnboundedSourceSplit(ByteString byteString) {
        return byteString.equals(firstSplitKey);
    }

    private SourceSplitResponse performSplit(SourceSplitRequest sourceSplitRequest) throws Exception {
        Source<?> deserializeFromCloudSource = deserializeFromCloudSource(sourceSplitRequest.getSource().getSpec());
        if (!(deserializeFromCloudSource instanceof BoundedSource)) {
            String valueOf = String.valueOf(deserializeFromCloudSource);
            throw new UnsupportedOperationException(new StringBuilder(35 + String.valueOf(valueOf).length()).append("Cannot split a non-Bounded source: ").append(valueOf).toString());
        }
        BoundedSource boundedSource = (BoundedSource) deserializeFromCloudSource;
        LOG.debug("Splitting source: {}", boundedSource);
        SourceSplitResponse sourceSplitResponse = new SourceSplitResponse();
        sourceSplitResponse.setBundles(new ArrayList());
        SourceSplitOptions options = sourceSplitRequest.getOptions();
        Long desiredBundleSizeBytes = options == null ? null : options.getDesiredBundleSizeBytes();
        if (desiredBundleSizeBytes == null) {
            desiredBundleSizeBytes = Long.valueOf(DEFAULT_DESIRED_BUNDLE_SIZE_BYTES);
        }
        List<BoundedSource> splitIntoBundles = boundedSource.splitIntoBundles(desiredBundleSizeBytes.longValue(), this.options);
        if (splitIntoBundles.size() > MAX_NUMBER_OF_SPLITS) {
            throw new IOException(String.format(TOO_MANY_SOURCE_SPLITS_ERROR, Integer.valueOf(splitIntoBundles.size()), Integer.valueOf(MAX_NUMBER_OF_SPLITS)));
        }
        LOG.debug("Splitting produced {} bundles", Integer.valueOf(splitIntoBundles.size()));
        for (BoundedSource boundedSource2 : splitIntoBundles) {
            try {
                boundedSource2.validate();
                DerivedSource derivedSource = new DerivedSource();
                com.google.api.services.dataflow.model.Source serializeToCloudSource = serializeToCloudSource(boundedSource2, this.options);
                serializeToCloudSource.setDoesNotNeedSplitting(true);
                derivedSource.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
                derivedSource.setSource(serializeToCloudSource);
                sourceSplitResponse.getBundles().add(derivedSource);
            } catch (Exception e) {
                String valueOf2 = String.valueOf("Splitting a valid source produced an invalid bundle. \nOriginal source: ");
                String valueOf3 = String.valueOf(boundedSource);
                String valueOf4 = String.valueOf(boundedSource2);
                throw new IllegalArgumentException(new StringBuilder(17 + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length() + String.valueOf(valueOf4).length()).append(valueOf2).append(valueOf3).append("\nInvalid bundle: ").append(valueOf4).toString(), e);
            }
        }
        sourceSplitResponse.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
        return sourceSplitResponse;
    }

    private SourceGetMetadataResponse performGetMetadata(SourceGetMetadataRequest sourceGetMetadataRequest) throws Exception {
        Source<?> deserializeFromCloudSource = deserializeFromCloudSource(sourceGetMetadataRequest.getSource().getSpec());
        SourceMetadata sourceMetadata = new SourceMetadata();
        if (deserializeFromCloudSource instanceof BoundedSource) {
            BoundedSource boundedSource = (BoundedSource) deserializeFromCloudSource;
            sourceMetadata.setProducesSortedKeys(Boolean.valueOf(boundedSource.producesSortedKeys(this.options)));
            sourceMetadata.setEstimatedSizeBytes(Long.valueOf(boundedSource.getEstimatedSizeBytes(this.options)));
        }
        SourceGetMetadataResponse sourceGetMetadataResponse = new SourceGetMetadataResponse();
        sourceGetMetadataResponse.setMetadata(sourceMetadata);
        return sourceGetMetadataResponse;
    }

    public static Source<?> deserializeFromCloudSource(Map<String, Object> map) throws Exception {
        Source<?> source = (Source) SerializableUtils.deserializeFromByteArray(Base64.decodeBase64(Structs.getString(map, SERIALIZED_SOURCE)), "Source");
        try {
            source.validate();
            return source;
        } catch (Exception e) {
            Logger logger = LOG;
            String valueOf = String.valueOf(source);
            logger.error(new StringBuilder(16 + String.valueOf(valueOf).length()).append("Invalid source: ").append(valueOf).toString(), e);
            throw e;
        }
    }

    private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions dataflowPipelineOptions) {
        if (dataflowPipelineOptions.getMaxNumWorkers() > 0) {
            return dataflowPipelineOptions.getMaxNumWorkers();
        }
        if (dataflowPipelineOptions.getNumWorkers() > 0) {
            return dataflowPipelineOptions.getNumWorkers() * 3;
        }
        return 20;
    }

    public static com.google.api.services.dataflow.model.Source serializeToCloudSource(Source<?> source, PipelineOptions pipelineOptions) throws Exception {
        com.google.api.services.dataflow.model.Source source2 = new com.google.api.services.dataflow.model.Source();
        source2.setSpec(CloudObject.forClass(BasicSerializableSourceFormat.class));
        Structs.addString(source2.getSpec(), SERIALIZED_SOURCE, Base64.encodeBase64String(SerializableUtils.serializeToByteArray(source)));
        SourceMetadata sourceMetadata = new SourceMetadata();
        if (source instanceof BoundedSource) {
            BoundedSource boundedSource = (BoundedSource) source;
            try {
                sourceMetadata.setProducesSortedKeys(Boolean.valueOf(boundedSource.producesSortedKeys(pipelineOptions)));
            } catch (Exception e) {
                Logger logger = LOG;
                String valueOf = String.valueOf(source);
                logger.warn(new StringBuilder(52 + String.valueOf(valueOf).length()).append("Failed to check if the source produces sorted keys: ").append(valueOf).toString(), e);
            }
            try {
                sourceMetadata.setEstimatedSizeBytes(Long.valueOf(boundedSource.getEstimatedSizeBytes(pipelineOptions)));
            } catch (Exception e2) {
                Logger logger2 = LOG;
                String valueOf2 = String.valueOf(source);
                logger2.warn(new StringBuilder(38 + String.valueOf(valueOf2).length()).append("Size estimation of the source failed: ").append(valueOf2).toString(), e2);
            }
        } else {
            if (!(source instanceof UnboundedSource)) {
                String valueOf3 = String.valueOf(source.getClass());
                throw new IllegalArgumentException(new StringBuilder(24 + String.valueOf(valueOf3).length()).append("Unexpected source kind: ").append(valueOf3).toString());
            }
            sourceMetadata.setInfinite(true);
            ArrayList arrayList = new ArrayList();
            Iterator it = ((UnboundedSource) source).generateInitialSplits(getDesiredNumUnboundedSourceSplits((DataflowPipelineOptions) pipelineOptions.as(DataflowPipelineOptions.class)), pipelineOptions).iterator();
            while (it.hasNext()) {
                arrayList.add(Base64.encodeBase64String(SerializableUtils.serializeToByteArray((UnboundedSource) it.next())));
            }
            Preconditions.checkArgument(!arrayList.isEmpty(), "UnboundedSources must have at least one split");
            Structs.addStringList(source2.getSpec(), SERIALIZED_SOURCE_SPLITS, arrayList);
        }
        source2.setMetadata(sourceMetadata);
        return source2;
    }

    public static <T> void evaluateReadHelper(Read.Bounded<T> bounded, DirectPipelineRunner.EvaluationContext evaluationContext) {
        try {
            ArrayList arrayList = new ArrayList();
            BoundedSource.BoundedReader<T> createReader = bounded.getSource().createReader(evaluationContext.getPipelineOptions());
            Throwable th = null;
            try {
                try {
                    for (boolean start = createReader.start(); start; start = createReader.advance()) {
                        arrayList.add(DirectPipelineRunner.ValueWithMetadata.of(WindowedValue.timestampedValueInGlobalWindow(createReader.getCurrent(), createReader.getCurrentTimestamp())));
                    }
                    if (createReader != null) {
                        if (0 != 0) {
                            try {
                                createReader.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createReader.close();
                        }
                    }
                    evaluationContext.setPCollectionValuesWithMetadata((PCollection) evaluationContext.getOutput(bounded), arrayList);
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> void translateReadHelper(Source<T> source, PTransform<?, ? extends PValue> pTransform, DataflowPipelineTranslator.TranslationContext translationContext) {
        try {
            translationContext.addStep(pTransform, "ParallelRead");
            translationContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
            translationContext.addInput(PropertyNames.SOURCE_STEP_INPUT, SourceTranslationUtils.cloudSourceToDictionary(serializeToCloudSource(source, translationContext.getPipelineOptions())));
            translationContext.addValueOnlyOutput(PropertyNames.OUTPUT, (PValue) translationContext.getOutput(pTransform));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
