/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.TypedPValue;
import java.util.UUID;
import org.joda.time.Instant;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class Write {
    public static <T> Bound<T> to(Sink<T> sink) {
        return new Bound(sink);
    }

    public static class Bound<T>
    extends PTransform<PCollection<T>, PDone> {
        private final Sink<T> sink;

        private Bound(Sink<T> sink) {
            this.sink = sink;
        }

        @Override
        public PDone apply(PCollection<T> input) {
            PipelineOptions options = input.getPipeline().getOptions();
            this.sink.validate(options);
            return this.createWrite(input, this.sink.createWriteOperation(options));
        }

        public Sink<T> getSink() {
            return this.sink;
        }

        private <WriteT> PDone createWrite(PCollection<T> input, Sink.WriteOperation<T, WriteT> writeOperation) {
            Pipeline p = input.getPipeline();
            SerializableCoder<?> operationCoder = SerializableCoder.of(writeOperation.getClass());
            TypedPValue operationCollection = (PCollection)p.apply(Create.of(writeOperation).withCoder(operationCoder));
            operationCollection = ((PCollection)((PCollection)operationCollection).apply("Initialize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Sink.WriteOperation<T, WriteT>>(){

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    Sink.WriteOperation writeOperation = (Sink.WriteOperation)c.element();
                    writeOperation.initialize(c.getPipelineOptions());
                    c.output(writeOperation);
                }
            }))).setCoder((Coder)operationCoder);
            final PCollectionView writeOperationView = (PCollectionView)((PCollection)operationCollection).apply(View.asSingleton());
            PCollection results = (PCollection)((PCollection)((PCollection)input.apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>(){
                private Sink.Writer<T, WriteT> writer = null;

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    if (this.writer == null) {
                        Sink.WriteOperation writeOperation = (Sink.WriteOperation)c.sideInput(writeOperationView);
                        this.writer = writeOperation.createWriter(c.getPipelineOptions());
                        this.writer.open(UUID.randomUUID().toString());
                    }
                    try {
                        this.writer.write(c.element());
                    }
                    catch (Exception e) {
                        try {
                            this.writer.close();
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        throw e;
                    }
                }

                @Override
                public void finishBundle(DoFn.Context c) throws Exception {
                    if (this.writer != null) {
                        Object result = this.writer.close();
                        c.outputWithTimestamp(result, Instant.now());
                    }
                }
            }).withSideInputs(writeOperationView))).setCoder((Coder)writeOperation.getWriterResultCoder())).apply(Window.into(new GlobalWindows()));
            final PCollectionView resultsView = (PCollectionView)results.apply(View.asIterable());
            PCollection done = (PCollection)((PCollection)operationCollection).apply("Finalize", ParDo.of(new DoFn<Sink.WriteOperation<T, WriteT>, Integer>(){

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    Iterable results = (Iterable)c.sideInput(resultsView);
                    Sink.WriteOperation writeOperation = (Sink.WriteOperation)c.element();
                    writeOperation.finalize(results, c.getPipelineOptions());
                }
            }).withSideInputs(resultsView));
            return PDone.in(input.getPipeline());
        }
    }
}

