/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.StorageRequest;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.GcsOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileBasedSink<T>
extends Sink<T> {
    protected final String baseOutputFilename;
    protected final String extension;
    protected final String fileNamingTemplate;

    public FileBasedSink(String baseOutputFilename, String extension) {
        this(baseOutputFilename, extension, "-SSSSS-of-NNNNN");
    }

    public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
        this.baseOutputFilename = baseOutputFilename;
        this.extension = extension;
        this.fileNamingTemplate = fileNamingTemplate;
    }

    @Override
    public void validate(PipelineOptions options) {
    }

    public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions var1);

    @NotThreadSafe
    private static class BatchHelper {
        private final List<QueueRequestCallback> pendingBatchEntries = new LinkedList<QueueRequestCallback>();
        private final BatchRequest batch;
        private final long maxRequestsPerBatch;
        private boolean flushing = false;

        public BatchHelper(HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
            this.batch = gcs.batch(requestInitializer);
            this.maxRequestsPerBatch = maxRequestsPerBatch;
        }

        public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback) throws IOException {
            QueueRequestCallback queueCallback = new QueueRequestCallback(){

                @Override
                public void enqueue() throws IOException {
                    req.queue(BatchHelper.this.batch, callback);
                }
            };
            this.pendingBatchEntries.add(queueCallback);
            this.flushIfPossibleAndRequired();
        }

        private void flushIfPossibleAndRequired() throws IOException {
            if ((long)this.pendingBatchEntries.size() > this.maxRequestsPerBatch) {
                this.flushIfPossible();
            }
        }

        private void flushIfPossible() throws IOException {
            if (!this.flushing && this.pendingBatchEntries.size() > 0) {
                this.flushing = true;
                try {
                    while ((long)this.batch.size() < this.maxRequestsPerBatch && this.pendingBatchEntries.size() > 0) {
                        QueueRequestCallback head = this.pendingBatchEntries.remove(0);
                        head.enqueue();
                    }
                    this.batch.execute();
                }
                finally {
                    this.flushing = false;
                }
            }
        }

        public void flush() throws IOException {
            this.flushIfPossible();
        }

        protected static interface QueueRequestCallback {
            public void enqueue() throws IOException;
        }
    }

    private static class LocalFileOperations
    implements FileOperations {
        private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);

        private LocalFileOperations() {
        }

        @Override
        public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
            Preconditions.checkArgument(srcFilenames.size() == destFilenames.size(), String.format("Number of source files {} must equal number of destination files {}", srcFilenames.size(), destFilenames.size()));
            int numFiles = srcFilenames.size();
            for (int i = 0; i < numFiles; ++i) {
                String src = srcFilenames.get(i);
                String dst = destFilenames.get(i);
                LOG.debug("Copying {} to {}", (Object)src, (Object)dst);
                this.copyOne(src, dst);
            }
        }

        private void copyOne(String source, String destination) throws IOException {
            try {
                Files.copy(Paths.get(source, new String[0]), Paths.get(destination, new String[0]), StandardCopyOption.REPLACE_EXISTING);
            }
            catch (NoSuchFileException e) {
                LOG.debug("{} does not exist.", (Object)source);
            }
        }

        @Override
        public void remove(Collection<String> filenames) throws IOException {
            for (String filename : filenames) {
                LOG.debug("Removing file {}", (Object)filename);
                this.removeOne(filename);
            }
        }

        private void removeOne(String filename) throws IOException {
            boolean exists = Files.deleteIfExists(Paths.get(filename, new String[0]));
            if (!exists) {
                LOG.debug("{} does not exist.", (Object)filename);
            }
        }
    }

    private static class GcsOperations
    implements FileOperations {
        private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class);
        private static final int MAX_REQUESTS_PER_BATCH = 1000;
        private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
        private GcsOptions gcsOptions;
        private Storage gcs;
        private BatchHelper batchHelper;

        public GcsOperations(PipelineOptions options) {
            this.gcsOptions = options.as(GcsOptions.class);
            this.gcs = Transport.newStorageClient(this.gcsOptions).build();
            this.batchHelper = new BatchHelper(this.gcs.getRequestFactory().getInitializer(), this.gcs, 1000L);
        }

        @Override
        public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
            Preconditions.checkArgument(srcFilenames.size() == destFilenames.size(), String.format("Number of source files {} must equal number of destination files {}", srcFilenames.size(), destFilenames.size()));
            for (int i = 0; i < srcFilenames.size(); ++i) {
                final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
                final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
                LOG.debug("Copying {} to {}", (Object)sourcePath, (Object)destPath);
                Storage.Objects.Copy copyObject = this.gcs.objects().copy(sourcePath.getBucket(), sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
                this.batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>(){

                    public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
                        LOG.debug("Successfully copied {} to {}", (Object)sourcePath, (Object)destPath);
                    }

                    public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
                        if (!GcsOperations.this.errorExtractor.itemNotFound(e)) {
                            throw new IOException(e.toString());
                        }
                        LOG.debug("{} does not exist.", (Object)sourcePath);
                    }
                });
            }
            this.batchHelper.flush();
        }

        @Override
        public void remove(Collection<String> filenames) throws IOException {
            for (String filename : filenames) {
                final GcsPath path = GcsPath.fromUri(filename);
                String string = String.valueOf(path);
                LOG.debug(new StringBuilder(10 + String.valueOf(string).length()).append("Removing: ").append(string).toString());
                Storage.Objects.Delete deleteObject = this.gcs.objects().delete(path.getBucket(), path.getObject());
                this.batchHelper.queue(deleteObject, new JsonBatchCallback<Void>(){

                    public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
                        LOG.debug("Successfully removed {}", (Object)path);
                    }

                    public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
                        if (!GcsOperations.this.errorExtractor.itemNotFound(e)) {
                            throw new IOException(e.toString());
                        }
                        LOG.debug("{} does not exist.", (Object)path);
                    }
                });
            }
            this.batchHelper.flush();
        }
    }

    private static interface FileOperations {
        public void copy(List<String> var1, List<String> var2) throws IOException;

        public void remove(Collection<String> var1) throws IOException;
    }

    private static class FileOperationsFactory {
        private FileOperationsFactory() {
        }

        public static FileOperations getFileOperations(String spec, PipelineOptions options) throws IOException {
            IOChannelFactory factory = IOChannelUtils.getFactory(spec);
            if (factory instanceof GcsIOChannelFactory) {
                return new GcsOperations(options);
            }
            if (factory instanceof FileIOChannelFactory) {
                return new LocalFileOperations();
            }
            throw new IOException("Unrecognized file system.");
        }
    }

    public static final class FileResult
    implements Serializable {
        private final String filename;

        public FileResult(String filename) {
            this.filename = filename;
        }

        public String getFilename() {
            return this.filename;
        }
    }

    public static abstract class FileBasedWriter<T>
    extends Sink.Writer<T, FileResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
        final FileBasedWriteOperation<T> writeOperation;
        private String id;
        private String filename;
        private WritableByteChannel channel;
        protected String mimeType = "text/plain";

        public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) {
            Preconditions.checkNotNull(writeOperation);
            this.writeOperation = writeOperation;
        }

        protected abstract void prepareWrite(WritableByteChannel var1) throws Exception;

        protected void writeHeader() throws Exception {
        }

        protected void writeFooter() throws Exception {
        }

        @Override
        public final void open(String uId) throws Exception {
            this.id = uId;
            this.filename = FileBasedWriteOperation.buildTemporaryFilename(((FileBasedWriteOperation)this.getWriteOperation()).baseTemporaryFilename, uId);
            LOG.debug("Opening {}.", (Object)this.filename);
            this.channel = IOChannelUtils.create(this.filename, this.mimeType);
            try {
                this.prepareWrite(this.channel);
                LOG.debug("Writing header to {}.", (Object)this.filename);
                this.writeHeader();
            }
            catch (Exception e) {
                try {
                    LOG.error("Writing header to {} failed, closing channel.", (Object)this.filename);
                    this.channel.close();
                }
                catch (IOException closeException) {
                    LOG.error("Closing channel for {} failed: {}", (Object)this.filename, (Object)closeException.getMessage());
                }
                throw e;
            }
            LOG.debug("Starting write of bundle {} to {}.", (Object)this.id, (Object)this.filename);
        }

        @Override
        public final FileResult close() throws Exception {
            try (WritableByteChannel theChannel = this.channel;){
                LOG.debug("Writing footer to {}.", (Object)this.filename);
                this.writeFooter();
            }
            FileResult result = new FileResult(this.filename);
            LOG.debug("Result for bundle {}: {}", (Object)this.id, (Object)this.filename);
            return result;
        }

        public FileBasedWriteOperation<T> getWriteOperation() {
            return this.writeOperation;
        }
    }

    public static abstract class FileBasedWriteOperation<T>
    extends Sink.WriteOperation<T, FileResult> {
        private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);
        protected final FileBasedSink<T> sink;
        protected final TemporaryFileRetention temporaryFileRetention;
        protected final String baseTemporaryFilename;
        protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-";

        protected static final String buildTemporaryFilename(String prefix, String suffix) {
            String string = String.valueOf(TEMPORARY_FILENAME_SEPARATOR);
            return new StringBuilder(0 + String.valueOf(prefix).length() + String.valueOf(string).length() + String.valueOf(suffix).length()).append(prefix).append(string).append(suffix).toString();
        }

        public FileBasedWriteOperation(FileBasedSink<T> sink) {
            this(sink, sink.baseOutputFilename);
        }

        public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
            this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
        }

        public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename, TemporaryFileRetention temporaryFileRetention) {
            this.sink = sink;
            this.baseTemporaryFilename = baseTemporaryFilename;
            this.temporaryFileRetention = temporaryFileRetention;
        }

        public abstract FileBasedWriter<T> createWriter(PipelineOptions var1) throws Exception;

        @Override
        public void initialize(PipelineOptions options) throws Exception {
        }

        @Override
        public void finalize(Iterable<FileResult> writerResults, PipelineOptions options) throws Exception {
            ArrayList<String> files = new ArrayList<String>();
            for (FileResult result : writerResults) {
                LOG.debug("Temporary bundle output file {} will be copied.", (Object)result.getFilename());
                files.add(result.getFilename());
            }
            this.copyToOutputFiles(files, options);
            if (this.temporaryFileRetention == TemporaryFileRetention.REMOVE) {
                this.removeTemporaryFiles(options);
            }
        }

        protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options) throws IOException {
            int numFiles = filenames.size();
            ArrayList<String> srcFilenames = new ArrayList<String>();
            List<String> destFilenames = this.generateDestinationFilenames(numFiles);
            srcFilenames.addAll(filenames);
            Collections.sort(srcFilenames);
            if (numFiles > 0) {
                LOG.debug("Copying {} files.", (Object)numFiles);
                FileOperations fileOperations = FileOperationsFactory.getFileOperations(destFilenames.get(0), options);
                fileOperations.copy(srcFilenames, destFilenames);
            } else {
                LOG.info("No output files to write.");
            }
            return destFilenames;
        }

        protected final List<String> generateDestinationFilenames(int numFiles) {
            String string;
            ArrayList<String> destFilenames = new ArrayList<String>();
            String extension = ((FileBasedSink)this.getSink()).extension;
            String baseOutputFilename = ((FileBasedSink)this.getSink()).baseOutputFilename;
            String fileNamingTemplate = ((FileBasedSink)this.getSink()).fileNamingTemplate;
            if (extension.length() == 0) {
                string = extension;
            } else {
                String string2 = String.valueOf(extension);
                string = string2.length() != 0 ? ".".concat(string2) : new String(".");
            }
            String suffix = string;
            for (int i = 0; i < numFiles; ++i) {
                destFilenames.add(IOChannelUtils.constructName(baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
            }
            return destFilenames;
        }

        protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
            String pattern = FileBasedWriteOperation.buildTemporaryFilename(this.baseTemporaryFilename, "*");
            LOG.debug("Finding temporary bundle output files matching {}.", (Object)pattern);
            FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
            IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
            Collection<String> matches = factory.match(pattern);
            LOG.debug("{} temporary files matched {}", (Object)matches.size(), (Object)pattern);
            LOG.debug("Removing {} files.", (Object)matches.size());
            fileOperations.remove(matches);
        }

        @Override
        public Coder<FileResult> getWriterResultCoder() {
            return SerializableCoder.of(FileResult.class);
        }

        @Override
        public FileBasedSink<T> getSink() {
            return this.sink;
        }

        public static enum TemporaryFileRetention {
            KEEP,
            REMOVE;

        }
    }
}

