/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.options.GcsOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.joda.time.Instant;

public class IntraBundleParallelization {
    private static final int DEFAULT_MAX_PARALLELISM = 16;

    public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
        return new Unbound().of(doFn);
    }

    public static Unbound withMaxParallelism(int maxParallelism) {
        return new Unbound().withMaxParallelism(maxParallelism);
    }

    public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>
    extends DoFn<InputT, OutputT> {
        private final DoFn<InputT, OutputT> doFn;
        private int maxParallelism;
        private transient ExecutorService executor;
        private transient Semaphore workTickets;
        private transient AtomicReference<Throwable> failure;

        public MultiThreadedIntraBundleProcessingDoFn(DoFn<InputT, OutputT> doFn, int maxParallelism) {
            Preconditions.checkArgument(maxParallelism > 0, "Expected parallelism factor greater than zero, received %s.", maxParallelism);
            this.doFn = doFn;
            this.maxParallelism = maxParallelism;
        }

        @Override
        public void startBundle(DoFn.Context c) throws Exception {
            this.doFn.startBundle(c);
            this.executor = c.getPipelineOptions().as(GcsOptions.class).getExecutorService();
            this.workTickets = new Semaphore(this.maxParallelism);
            this.failure = new AtomicReference();
        }

        @Override
        public void processElement(final DoFn.ProcessContext c) throws Exception {
            try {
                this.workTickets.acquire();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted while scheduling work", e);
            }
            if (this.failure.get() != null) {
                throw Throwables.propagate(this.failure.get());
            }
            this.executor.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        MultiThreadedIntraBundleProcessingDoFn.this.doFn.processElement(new WrappedContext(c));
                    }
                    catch (Throwable t) {
                        MultiThreadedIntraBundleProcessingDoFn.this.failure.compareAndSet(null, t);
                        Throwables.propagateIfPossible(t);
                        String string = String.valueOf(t);
                        throw new AssertionError((Object)new StringBuilder(30 + String.valueOf(string).length()).append("Unexpected checked exception: ").append(string).toString());
                    }
                    finally {
                        MultiThreadedIntraBundleProcessingDoFn.this.workTickets.release();
                    }
                }
            });
        }

        @Override
        public void finishBundle(DoFn.Context c) throws Exception {
            this.workTickets.acquire(this.maxParallelism);
            if (this.failure.get() != null) {
                throw Throwables.propagate(this.failure.get());
            }
            this.doFn.finishBundle(c);
        }

        @Override
        protected TypeDescriptor<InputT> getInputTypeDescriptor() {
            return this.doFn.getInputTypeDescriptor();
        }

        @Override
        protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
            return this.doFn.getOutputTypeDescriptor();
        }

        private class WrappedContext
        extends DoFn.ProcessContext {
            private final DoFn.ProcessContext context;

            WrappedContext(DoFn.ProcessContext context) {
                super(MultiThreadedIntraBundleProcessingDoFn.this);
                this.context = context;
            }

            @Override
            public InputT element() {
                return this.context.element();
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return this.context.getPipelineOptions();
            }

            @Override
            public <T> T sideInput(PCollectionView<T> view) {
                return this.context.sideInput(view);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void output(OutputT output) {
                MultiThreadedIntraBundleProcessingDoFn multiThreadedIntraBundleProcessingDoFn = MultiThreadedIntraBundleProcessingDoFn.this;
                synchronized (multiThreadedIntraBundleProcessingDoFn) {
                    this.context.output(output);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void outputWithTimestamp(OutputT output, Instant timestamp) {
                MultiThreadedIntraBundleProcessingDoFn multiThreadedIntraBundleProcessingDoFn = MultiThreadedIntraBundleProcessingDoFn.this;
                synchronized (multiThreadedIntraBundleProcessingDoFn) {
                    this.context.outputWithTimestamp(output, timestamp);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public <T> void sideOutput(TupleTag<T> tag, T output) {
                MultiThreadedIntraBundleProcessingDoFn multiThreadedIntraBundleProcessingDoFn = MultiThreadedIntraBundleProcessingDoFn.this;
                synchronized (multiThreadedIntraBundleProcessingDoFn) {
                    this.context.sideOutput(tag, output);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
                MultiThreadedIntraBundleProcessingDoFn multiThreadedIntraBundleProcessingDoFn = MultiThreadedIntraBundleProcessingDoFn.this;
                synchronized (multiThreadedIntraBundleProcessingDoFn) {
                    this.context.sideOutputWithTimestamp(tag, output, timestamp);
                }
            }

            @Override
            public Instant timestamp() {
                return this.context.timestamp();
            }

            @Override
            public BoundedWindow window() {
                return this.context.window();
            }

            @Override
            public PaneInfo pane() {
                return this.context.pane();
            }

            @Override
            public WindowingInternals<InputT, OutputT> windowingInternals() {
                return this.context.windowingInternals();
            }

            @Override
            protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
                return this.context.createAggregatorInternal(name, combiner);
            }
        }
    }

    public static class Bound<InputT, OutputT>
    extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
        private final DoFn<InputT, OutputT> doFn;
        private final int maxParallelism;

        Bound(DoFn<InputT, OutputT> doFn, int maxParallelism) {
            Preconditions.checkArgument(maxParallelism > 0, "Expected parallelism factor greater than zero, received %s.", maxParallelism);
            this.doFn = doFn;
            this.maxParallelism = maxParallelism;
        }

        public Bound<InputT, OutputT> withMaxParallelism(int maxParallelism) {
            return new Bound<InputT, OutputT>(this.doFn, maxParallelism);
        }

        public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
            return new Bound<InputT, OutputT>(doFn, this.maxParallelism);
        }

        @Override
        public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
            return (PCollection)input.apply(ParDo.of(new MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>(this.doFn, this.maxParallelism)));
        }
    }

    public static class Unbound {
        private final int maxParallelism;

        Unbound() {
            this(16);
        }

        Unbound(int maxParallelism) {
            Preconditions.checkArgument(maxParallelism > 0, "Expected parallelism factor greater than zero, received %s.", maxParallelism);
            this.maxParallelism = maxParallelism;
        }

        public Unbound withMaxParallelism(int maxParallelism) {
            return new Unbound(maxParallelism);
        }

        public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
            return new Bound<InputT, OutputT>(doFn, this.maxParallelism);
        }
    }
}

