/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.dataflow;

import com.google.api.client.util.Base64;
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.Source;
import com.google.api.services.dataflow.model.SourceGetMetadataRequest;
import com.google.api.services.dataflow.model.SourceGetMetadataResponse;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationRequest;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.SourceSplitOptions;
import com.google.api.services.dataflow.model.SourceSplitRequest;
import com.google.api.services.dataflow.model.SourceSplitResponse;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BasicSerializableSourceFormat
implements SourceFormat {
    private static final String SERIALIZED_SOURCE = "serialized_source";
    @VisibleForTesting
    static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
    private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 0x4000000L;
    public static final String TOO_MANY_SOURCE_SPLITS_ERROR = "Total number of Source objects generated by splitIntoBundles() operation, %d, is larger than the allowable limit, %d. For more information, please check the corresponding FAQ entry at:\nhttps://cloud.google.com/dataflow/faq";
    private static final int MAX_NUMBER_OF_SPLITS = 16000;
    private static final Logger LOG = LoggerFactory.getLogger(BasicSerializableSourceFormat.class);
    private final PipelineOptions options;
    private static final ByteString firstSplitKey = ByteString.copyFromUtf8((String)"0000000000000001");

    public BasicSerializableSourceFormat(PipelineOptions options) {
        this.options = options;
    }

    public static DynamicSourceSplit toSourceSplit(BoundedSourceSplit<?> sourceSplitResult, PipelineOptions options) {
        Source residualSource;
        Source primarySource;
        DynamicSourceSplit sourceSplit = new DynamicSourceSplit();
        try {
            primarySource = BasicSerializableSourceFormat.serializeToCloudSource(sourceSplitResult.primary, options);
            residualSource = BasicSerializableSourceFormat.serializeToCloudSource(sourceSplitResult.residual, options);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to serialize one of the parts of the source split", e);
        }
        sourceSplit.setPrimary(new DerivedSource().setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT").setSource(primarySource));
        sourceSplit.setResidual(new DerivedSource().setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT").setSource(residualSource));
        return sourceSplit;
    }

    @Override
    public SourceFormat.OperationResponse performSourceOperation(SourceFormat.OperationRequest request) throws Exception {
        SourceOperationRequest cloudRequest = SourceTranslationUtils.sourceOperationRequestToCloudSourceOperationRequest(request);
        SourceOperationResponse cloudResponse = new SourceOperationResponse();
        if (cloudRequest.getGetMetadata() != null) {
            cloudResponse.setGetMetadata(this.performGetMetadata(cloudRequest.getGetMetadata()));
        } else if (cloudRequest.getSplit() != null) {
            cloudResponse.setSplit(this.performSplit(cloudRequest.getSplit()));
        } else {
            throw new UnsupportedOperationException("Unknown source operation request");
        }
        return SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse(cloudResponse);
    }

    public static Reader<WindowedValue<?>> create(CloudObject spec, final PipelineOptions options, ExecutionContext executionContext) throws Exception {
        final com.google.cloud.dataflow.sdk.io.Source<?> source = BasicSerializableSourceFormat.deserializeFromCloudSource((Map<String, Object>)((Object)spec));
        if (source instanceof BoundedSource) {
            Reader<WindowedValue<Object>> reader = new Reader<WindowedValue<Object>>(){

                @Override
                public Reader.ReaderIterator<WindowedValue<Object>> iterator() throws IOException {
                    return new BoundedReaderIterator<Object>(((BoundedSource)source).createReader(options));
                }
            };
            return reader;
        }
        if (source instanceof UnboundedSource) {
            UnboundedReader reader = new UnboundedReader(options, spec, (StreamingModeExecutionContext)executionContext);
            return reader;
        }
        String string = String.valueOf(source.getClass());
        throw new IllegalArgumentException(new StringBuilder(24 + String.valueOf(string).length()).append("Unexpected source kind: ").append(string).toString());
    }

    public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) {
        return splitKey.equals((Object)firstSplitKey);
    }

    private SourceSplitResponse performSplit(SourceSplitRequest request) throws Exception {
        List bundles;
        Long desiredBundleSizeBytes;
        com.google.cloud.dataflow.sdk.io.Source<?> anySource = BasicSerializableSourceFormat.deserializeFromCloudSource(request.getSource().getSpec());
        if (!(anySource instanceof BoundedSource)) {
            String string = String.valueOf(anySource);
            throw new UnsupportedOperationException(new StringBuilder(35 + String.valueOf(string).length()).append("Cannot split a non-Bounded source: ").append(string).toString());
        }
        BoundedSource source = (BoundedSource)anySource;
        LOG.debug("Splitting source: {}", (Object)source);
        SourceSplitResponse response = new SourceSplitResponse();
        response.setBundles(new ArrayList());
        SourceSplitOptions splitOptions = request.getOptions();
        Long l = desiredBundleSizeBytes = splitOptions == null ? null : splitOptions.getDesiredBundleSizeBytes();
        if (desiredBundleSizeBytes == null) {
            desiredBundleSizeBytes = 0x4000000L;
        }
        if ((bundles = source.splitIntoBundles(desiredBundleSizeBytes, this.options)).size() > 16000) {
            throw new IOException(String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), 16000));
        }
        LOG.debug("Splitting produced {} bundles", (Object)bundles.size());
        for (BoundedSource split : bundles) {
            try {
                split.validate();
            }
            catch (Exception e) {
                String string = String.valueOf("Splitting a valid source produced an invalid bundle. \nOriginal source: ");
                String string2 = String.valueOf(source);
                String string3 = String.valueOf(split);
                throw new IllegalArgumentException(new StringBuilder(17 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length()).append(string).append(string2).append("\nInvalid bundle: ").append(string3).toString(), e);
            }
            DerivedSource bundle = new DerivedSource();
            Source cloudSource = BasicSerializableSourceFormat.serializeToCloudSource(split, this.options);
            cloudSource.setDoesNotNeedSplitting(Boolean.valueOf(true));
            bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
            bundle.setSource(cloudSource);
            response.getBundles().add(bundle);
        }
        response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
        return response;
    }

    private SourceGetMetadataResponse performGetMetadata(SourceGetMetadataRequest request) throws Exception {
        com.google.cloud.dataflow.sdk.io.Source<?> source = BasicSerializableSourceFormat.deserializeFromCloudSource(request.getSource().getSpec());
        SourceMetadata metadata = new SourceMetadata();
        if (source instanceof BoundedSource) {
            BoundedSource boundedSource = (BoundedSource)source;
            metadata.setProducesSortedKeys(Boolean.valueOf(boundedSource.producesSortedKeys(this.options)));
            metadata.setEstimatedSizeBytes(Long.valueOf(boundedSource.getEstimatedSizeBytes(this.options)));
        }
        SourceGetMetadataResponse response = new SourceGetMetadataResponse();
        response.setMetadata(metadata);
        return response;
    }

    public static com.google.cloud.dataflow.sdk.io.Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception {
        com.google.cloud.dataflow.sdk.io.Source source = (com.google.cloud.dataflow.sdk.io.Source)SerializableUtils.deserializeFromByteArray(Base64.decodeBase64((String)Structs.getString(spec, SERIALIZED_SOURCE)), "Source");
        try {
            source.validate();
        }
        catch (Exception e) {
            String string = String.valueOf(source);
            LOG.error(new StringBuilder(16 + String.valueOf(string).length()).append("Invalid source: ").append(string).toString(), (Throwable)e);
            throw e;
        }
        return source;
    }

    private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
        if (options.getMaxNumWorkers() > 0) {
            return options.getMaxNumWorkers();
        }
        if (options.getNumWorkers() > 0) {
            return options.getNumWorkers() * 3;
        }
        return 20;
    }

    public static Source serializeToCloudSource(com.google.cloud.dataflow.sdk.io.Source<?> source, PipelineOptions options) throws Exception {
        Source cloudSource = new Source();
        cloudSource.setSpec((Map)((Object)CloudObject.forClass(BasicSerializableSourceFormat.class)));
        Structs.addString(cloudSource.getSpec(), SERIALIZED_SOURCE, Base64.encodeBase64String((byte[])SerializableUtils.serializeToByteArray(source)));
        SourceMetadata metadata = new SourceMetadata();
        if (source instanceof BoundedSource) {
            String string;
            BoundedSource boundedSource = (BoundedSource)source;
            try {
                metadata.setProducesSortedKeys(Boolean.valueOf(boundedSource.producesSortedKeys(options)));
            }
            catch (Exception e) {
                string = String.valueOf(source);
                LOG.warn(new StringBuilder(52 + String.valueOf(string).length()).append("Failed to check if the source produces sorted keys: ").append(string).toString(), (Throwable)e);
            }
            try {
                metadata.setEstimatedSizeBytes(Long.valueOf(boundedSource.getEstimatedSizeBytes(options)));
            }
            catch (Exception e) {
                string = String.valueOf(source);
                LOG.warn(new StringBuilder(38 + String.valueOf(string).length()).append("Size estimation of the source failed: ").append(string).toString(), (Throwable)e);
            }
        } else if (source instanceof UnboundedSource) {
            UnboundedSource unboundedSource = (UnboundedSource)source;
            metadata.setInfinite(Boolean.valueOf(true));
            ArrayList<String> encodedSplits = new ArrayList<String>();
            int desiredNumSplits = BasicSerializableSourceFormat.getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
            for (UnboundedSource split : unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
                encodedSplits.add(Base64.encodeBase64String((byte[])SerializableUtils.serializeToByteArray(split)));
            }
            Preconditions.checkArgument(!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
            Structs.addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
        } else {
            String string = String.valueOf(source.getClass());
            throw new IllegalArgumentException(new StringBuilder(24 + String.valueOf(string).length()).append("Unexpected source kind: ").append(string).toString());
        }
        cloudSource.setMetadata(metadata);
        return cloudSource;
    }

    public static <T> void evaluateReadHelper(Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
        try {
            ArrayList output = new ArrayList();
            BoundedSource<T> source = transform.getSource();
            try (BoundedSource.BoundedReader<T> reader = source.createReader(context.getPipelineOptions());){
                boolean available = reader.start();
                while (available) {
                    output.add(DirectPipelineRunner.ValueWithMetadata.of(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp())));
                    available = reader.advance();
                }
            }
            context.setPCollectionValuesWithMetadata((PCollection)context.getOutput(transform), output);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> void translateReadHelper(com.google.cloud.dataflow.sdk.io.Source<T> source, PTransform<?, ? extends PValue> transform, DataflowPipelineTranslator.TranslationContext context) {
        try {
            context.addStep(transform, "ParallelRead");
            context.addInput("format", "custom_source");
            context.addInput("custom_source_step_input", SourceTranslationUtils.cloudSourceToDictionary(BasicSerializableSourceFormat.serializeToCloudSource(source, context.getPipelineOptions())));
            context.addValueOnlyOutput("output", context.getOutput(transform));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static class UnboundedReaderIterator<T>
    implements Reader.ReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
        private static final int MAX_BUNDLE_SIZE = 10000;
        private static final Duration MAX_BUNDLE_READ_TIME = Duration.standardSeconds((long)10L);
        private ReaderToIteratorAdapter<T> iteratorAdapter;
        private UnboundedSource.UnboundedReader<T> reader;
        private Instant endTime;
        private int elemsRead;

        private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) {
            this.iteratorAdapter = new ReaderToIteratorAdapter(reader, started);
            this.reader = reader;
            this.endTime = Instant.now().plus((ReadableDuration)MAX_BUNDLE_READ_TIME);
            this.elemsRead = 0;
        }

        @Override
        public boolean hasNext() throws IOException {
            if (this.elemsRead >= 10000 || Instant.now().isAfter((ReadableInstant)this.endTime)) {
                return false;
            }
            AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(5, 100L);
            while (!this.iteratorAdapter.hasNext()) {
                long nextBackoff = backoff.nextBackOffMillis();
                if (nextBackoff == -1L) {
                    return false;
                }
                try {
                    Thread.sleep(nextBackoff);
                }
                catch (InterruptedException e) {}
            }
            return true;
        }

        @Override
        public WindowedValue<ValueWithRecordId<T>> next() throws IOException {
            WindowedValue<T> result = this.iteratorAdapter.next();
            ++this.elemsRead;
            return result.withValue(new ValueWithRecordId<T>(result.getValue(), this.reader.getCurrentRecordId()));
        }

        @Override
        public UnboundedReaderIterator<T> copy() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() {
        }

        @Override
        public Reader.Progress getProgress() {
            return null;
        }

        @Override
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest request) {
            return null;
        }
    }

    private static class BoundedReaderIterator<T>
    implements Reader.ReaderIterator<WindowedValue<T>> {
        private BoundedSource.BoundedReader<T> reader;
        private ReaderToIteratorAdapter<T> iteratorAdapter;

        private BoundedReaderIterator(BoundedSource.BoundedReader<T> reader) {
            this.reader = reader;
            this.iteratorAdapter = new ReaderToIteratorAdapter(reader, false);
        }

        @Override
        public boolean hasNext() throws IOException {
            return this.iteratorAdapter.hasNext();
        }

        @Override
        public WindowedValue<T> next() throws IOException {
            return this.iteratorAdapter.next();
        }

        @Override
        public BoundedReaderIterator<T> copy() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public void close() throws IOException {
            this.reader.close();
        }

        @Override
        public Reader.Progress getProgress() {
            if (this.reader instanceof BoundedSource.BoundedReader) {
                ApproximateProgress progress = new ApproximateProgress();
                Double fractionConsumed = this.reader.getFractionConsumed();
                if (fractionConsumed != null) {
                    progress.setPercentComplete(Float.valueOf(fractionConsumed.floatValue()));
                }
                return SourceTranslationUtils.cloudProgressToReaderProgress(progress);
            }
            return null;
        }

        @Override
        public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest request) {
            ApproximateProgress stopPosition = SourceTranslationUtils.splitRequestToApproximateProgress(request);
            Float fractionConsumed = stopPosition.getPercentComplete();
            if (fractionConsumed == null) {
                return null;
            }
            com.google.cloud.dataflow.sdk.io.Source original = this.reader.getCurrentSource();
            BoundedSource<T> residual = this.reader.splitAtFraction(fractionConsumed.doubleValue());
            if (residual == null) {
                return null;
            }
            com.google.cloud.dataflow.sdk.io.Source primary = this.reader.getCurrentSource();
            if (original == primary) {
                String string = String.valueOf("Successful split did not change the current source: primary is identical to original (Source objects MUST be immutable): ");
                String string2 = String.valueOf(primary);
                throw new IllegalStateException(new StringBuilder(0 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append(string2).toString());
            }
            if (original == residual) {
                String string = String.valueOf("Successful split did not change the current source: residual is identical to original (Source objects MUST be immutable): ");
                String string3 = String.valueOf(residual);
                throw new IllegalStateException(new StringBuilder(0 + String.valueOf(string).length() + String.valueOf(string3).length()).append(string).append(string3).toString());
            }
            try {
                primary.validate();
            }
            catch (Exception e) {
                String string = String.valueOf("Successful split produced an illegal primary source. \nOriginal: ");
                String string4 = String.valueOf(original);
                String string5 = String.valueOf(primary);
                String string6 = String.valueOf(residual);
                throw new IllegalStateException(new StringBuilder(21 + String.valueOf(string).length() + String.valueOf(string4).length() + String.valueOf(string5).length() + String.valueOf(string6).length()).append(string).append(string4).append("\nPrimary: ").append(string5).append("\nResidual: ").append(string6).toString());
            }
            try {
                residual.validate();
            }
            catch (Exception e) {
                String string = String.valueOf("Successful split produced an illegal residual source. \nOriginal: ");
                String string7 = String.valueOf(original);
                String string8 = String.valueOf(primary);
                String string9 = String.valueOf(residual);
                throw new IllegalStateException(new StringBuilder(21 + String.valueOf(string).length() + String.valueOf(string7).length() + String.valueOf(string8).length() + String.valueOf(string9).length()).append(string).append(string7).append("\nPrimary: ").append(string8).append("\nResidual: ").append(string9).toString());
            }
            return new BoundedSourceSplit<T>(primary, residual);
        }
    }

    private static class ReaderToIteratorAdapter<T> {
        private Source.Reader<T> reader;
        private NextState state;

        private ReaderToIteratorAdapter(Source.Reader<T> reader, boolean started) {
            this.reader = reader;
            this.state = started ? NextState.UNKNOWN_BEFORE_ADVANCE : NextState.UNKNOWN_BEFORE_START;
        }

        public boolean hasNext() throws IOException {
            switch (this.state) {
                case UNKNOWN_BEFORE_START: {
                    try {
                        if (this.reader.start()) {
                            this.state = NextState.AVAILABLE;
                            return true;
                        }
                        this.state = NextState.UNAVAILABLE;
                        return false;
                    }
                    catch (Exception e) {
                        String string = String.valueOf(this.reader.getCurrentSource());
                        throw new IOException(new StringBuilder(37 + String.valueOf(string).length()).append("Failed to start reading from source: ").append(string).toString(), e);
                    }
                }
                case UNKNOWN_BEFORE_ADVANCE: {
                    if (this.reader.advance()) {
                        this.state = NextState.AVAILABLE;
                        return true;
                    }
                    this.state = NextState.UNAVAILABLE;
                    return false;
                }
                case AVAILABLE: {
                    return true;
                }
                case UNAVAILABLE: {
                    return false;
                }
            }
            throw new AssertionError();
        }

        public WindowedValue<T> next() throws IOException {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            this.state = NextState.UNKNOWN_BEFORE_ADVANCE;
            return WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp());
        }

        private static enum NextState {
            UNKNOWN_BEFORE_START,
            UNKNOWN_BEFORE_ADVANCE,
            AVAILABLE,
            UNAVAILABLE;

        }
    }

    private static class UnboundedReader<T>
    extends Reader<WindowedValue<ValueWithRecordId<T>>> {
        private final PipelineOptions options;
        private final CloudObject spec;
        private final StreamingModeExecutionContext context;

        UnboundedReader(PipelineOptions options, CloudObject spec, StreamingModeExecutionContext context) {
            this.options = options;
            this.spec = spec;
            this.context = context;
        }

        @Override
        public Reader.ReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() {
            boolean started;
            UnboundedSource.UnboundedReader<Object> reader = this.context.getCachedReader();
            boolean bl = started = reader != null;
            if (reader == null) {
                String key = this.context.getSerializedKey().toStringUtf8();
                int splitIndex = Integer.parseInt(key.substring(0, 16), 16) - 1;
                UnboundedSource<T, UnboundedSource.CheckpointMark> splitSource = this.parseSource(splitIndex);
                UnboundedSource.CheckpointMark checkpoint = null;
                if (splitSource.getCheckpointMarkCoder() != null) {
                    checkpoint = this.context.getReaderCheckpoint(splitSource.getCheckpointMarkCoder());
                }
                reader = splitSource.createReader(this.options, checkpoint);
            }
            this.context.setActiveReader(reader);
            return new UnboundedReaderIterator(reader, started);
        }

        @Override
        public boolean supportsRestart() {
            return true;
        }

        private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index) {
            List<String> serializedSplits = null;
            try {
                serializedSplits = Structs.getStrings((Map<String, Object>)((Object)this.spec), BasicSerializableSourceFormat.SERIALIZED_SOURCE_SPLITS, null);
            }
            catch (Exception e) {
                throw new RuntimeException("Parsing serialized source splits failed: ", e);
            }
            Preconditions.checkArgument(serializedSplits != null, "UnboundedSource object did not contain splits");
            Preconditions.checkArgument(index < serializedSplits.size(), "UnboundedSource splits contained too few splits.  Requested index was %s, size was %s", index, serializedSplits.size());
            Object rawSource = SerializableUtils.deserializeFromByteArray(Base64.decodeBase64((String)serializedSplits.get(index)), "UnboundedSource split");
            if (!(rawSource instanceof UnboundedSource)) {
                String string = String.valueOf(rawSource.getClass());
                throw new IllegalArgumentException(new StringBuilder(30 + String.valueOf(string).length()).append("Expected UnboundedSource, got ").append(string).toString());
            }
            return (UnboundedSource)rawSource;
        }
    }

    public static class Factory
    implements ReaderFactory {
        @Override
        public Reader<?> create(CloudObject spec, @Nullable Coder<?> coder, @Nullable PipelineOptions options, @Nullable ExecutionContext executionContext, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String operationName) throws Exception {
            return BasicSerializableSourceFormat.create(spec, options, executionContext);
        }
    }

    public static final class BoundedSourceSplit<T>
    implements Reader.DynamicSplitResult {
        public final BoundedSource<T> primary;
        public final BoundedSource<T> residual;

        public BoundedSourceSplit(BoundedSource<T> primary, BoundedSource<T> residual) {
            this.primary = primary;
            this.residual = residual;
        }

        public String toString() {
            return String.format("<primary: %s; residual: %s>", this.primary, this.residual);
        }
    }
}

