/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.Random;
import javax.annotation.Nullable;

public class TextSink<T>
extends Sink<T> {
    static final byte[] NEWLINE = TextSink.getNewline();
    final String namePrefix;
    final String shardFormat;
    final String nameSuffix;
    final int shardCount;
    final boolean appendTrailingNewlines;
    final String header;
    final String footer;
    final Coder<T> coder;

    private static byte[] getNewline() {
        String newline = "\n";
        try {
            return newline.getBytes("UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            throw new RuntimeException("UTF-8 not supported", e);
        }
    }

    public static <V> TextSink<WindowedValue<V>> createForTest(String filename, boolean appendTrailingNewlines, @Nullable String header, @Nullable String footer, Coder<V> coder) {
        return TextSink.create(filename, "", "", 1, appendTrailingNewlines, header, footer, WindowedValue.getValueOnlyCoder(coder));
    }

    public static <V> TextSink<WindowedValue<V>> createForDirectPipelineRunner(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, boolean appendTrailingNewlines, @Nullable String header, @Nullable String footer, Coder<V> coder) {
        return TextSink.create(filenamePrefix, shardFormat, filenameSuffix, shardCount, appendTrailingNewlines, header, footer, WindowedValue.getValueOnlyCoder(coder));
    }

    public static <V> TextSink<V> create(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, boolean appendTrailingNewlines, @Nullable String header, @Nullable String footer, Coder<V> coder) {
        return new TextSink<V>(filenamePrefix, shardFormat, filenameSuffix, shardCount, appendTrailingNewlines, header, footer, coder);
    }

    private TextSink(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, boolean appendTrailingNewlines, @Nullable String header, @Nullable String footer, Coder<T> coder) {
        this.namePrefix = filenamePrefix;
        this.shardFormat = shardFormat;
        this.nameSuffix = filenameSuffix;
        this.shardCount = shardCount;
        this.appendTrailingNewlines = appendTrailingNewlines;
        this.header = header;
        this.footer = footer;
        this.coder = coder;
    }

    @Override
    public Sink.SinkWriter<T> writer() throws IOException {
        if (!(this.coder instanceof WindowedValue.WindowedValueCoder)) {
            String string = String.valueOf(this.coder.getClass().getName());
            throw new IOException(string.length() != 0 ? "Expected WindowedValueCoder for inputCoder, got: ".concat(string) : new String("Expected WindowedValueCoder for inputCoder, got: "));
        }
        Coder valueCoder = ((WindowedValue.WindowedValueCoder)this.coder).getValueCoder();
        String mimeType = valueCoder.equals(StringUtf8Coder.of()) ? "text/plain" : "application/octet-stream";
        WritableByteChannel writer = IOChannelUtils.create(this.namePrefix, this.shardFormat, this.nameSuffix, this.shardCount, mimeType);
        if (writer instanceof ShardingWritableByteChannel) {
            return new ShardingTextFileWriter((ShardingWritableByteChannel)writer);
        }
        return new TextFileWriter(writer);
    }

    class ShardingTextFileWriter
    extends AbstractTextFileWriter {
        private final Random rng;
        private final int numShards;
        private final ShardingWritableByteChannel outputChannel;

        ShardingTextFileWriter(ShardingWritableByteChannel outputChannel) throws IOException {
            this.rng = new Random();
            this.outputChannel = outputChannel;
            this.numShards = outputChannel.getNumShards();
            this.init();
        }

        @Override
        public void close() throws IOException {
            try {
                super.close();
            }
            finally {
                this.outputChannel.close();
            }
        }

        @Override
        protected void write(int shardNum, ByteBuffer buf) throws IOException {
            this.outputChannel.writeToShard(shardNum, buf);
        }

        @Override
        protected int getShardNum(T value) {
            return this.rng.nextInt(this.numShards);
        }
    }

    class TextFileWriter
    extends AbstractTextFileWriter {
        private final WritableByteChannel outputChannel;

        TextFileWriter(WritableByteChannel outputChannel) throws IOException {
            this.outputChannel = outputChannel;
            this.init();
        }

        @Override
        public void close() throws IOException {
            try {
                super.close();
            }
            finally {
                this.outputChannel.close();
            }
        }

        @Override
        protected void write(int shardNum, ByteBuffer buf) throws IOException {
            this.outputChannel.write(buf);
        }

        @Override
        protected int getShardNum(T value) {
            return 0;
        }
    }

    abstract class AbstractTextFileWriter
    implements Sink.SinkWriter<T> {
        AbstractTextFileWriter() {
        }

        protected void init() throws IOException {
            if (TextSink.this.header != null) {
                this.printLine(-2, CoderUtils.encodeToByteArray(StringUtf8Coder.of(), TextSink.this.header));
            }
        }

        @Override
        public long add(T value) throws IOException {
            return this.printLine(this.getShardNum(value), CoderUtils.encodeToByteArray(TextSink.this.coder, value));
        }

        @Override
        public void close() throws IOException {
            if (TextSink.this.footer != null) {
                this.printLine(-2, CoderUtils.encodeToByteArray(StringUtf8Coder.of(), TextSink.this.footer));
            }
        }

        protected long printLine(int shardNum, byte[] line) throws IOException {
            long length = line.length;
            this.write(shardNum, ByteBuffer.wrap(line));
            if (TextSink.this.appendTrailingNewlines) {
                this.write(shardNum, ByteBuffer.wrap(NEWLINE));
                length += (long)NEWLINE.length;
            }
            return length;
        }

        protected abstract void write(int var1, ByteBuffer var2) throws IOException;

        protected abstract int getShardNum(T var1);
    }
}

