/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.ShardingWritableByteChannel;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;

public class AvroSink<T>
extends Sink<WindowedValue<T>> {
    final String filenamePrefix;
    final String shardFormat;
    final String filenameSuffix;
    final int shardCount;
    final AvroCoder<T> avroCoder;
    final Schema schema;

    public AvroSink(String filename, WindowedValue.ValueOnlyWindowedValueCoder<T> coder) {
        this(filename, "", "", 1, coder);
    }

    public AvroSink(String filenamePrefix, String shardFormat, String filenameSuffix, int shardCount, WindowedValue.ValueOnlyWindowedValueCoder<T> coder) {
        if (!(coder.getValueCoder() instanceof AvroCoder)) {
            throw new IllegalArgumentException("AvroSink requires an AvroCoder");
        }
        this.filenamePrefix = filenamePrefix;
        this.shardFormat = shardFormat;
        this.filenameSuffix = filenameSuffix;
        this.shardCount = shardCount;
        this.avroCoder = (AvroCoder)coder.getValueCoder();
        this.schema = this.avroCoder.getSchema();
    }

    public Sink.SinkWriter<WindowedValue<T>> writer(DatumWriter<T> datumWriter) throws IOException {
        WritableByteChannel writer = IOChannelUtils.create(this.filenamePrefix, this.shardFormat, this.filenameSuffix, this.shardCount, "application/octet-stream");
        if (writer instanceof ShardingWritableByteChannel) {
            return new AvroShardingFileWriter(datumWriter, (ShardingWritableByteChannel)writer);
        }
        return new AvroFileWriter(datumWriter, writer);
    }

    @Override
    public Sink.SinkWriter<WindowedValue<T>> writer() throws IOException {
        return this.writer(this.avroCoder.createDatumWriter());
    }

    class AvroShardingFileWriter
    implements Sink.SinkWriter<WindowedValue<T>> {
        private ArrayList<AvroFileWriter> fileWriters = new ArrayList();
        private final Random random = new Random();

        public AvroShardingFileWriter(DatumWriter<T> datumWriter, ShardingWritableByteChannel outputChannel) throws IOException {
            for (int i = 0; i < outputChannel.getNumShards(); ++i) {
                this.fileWriters.add(new AvroFileWriter(datumWriter, outputChannel.getChannel(i)));
            }
        }

        @Override
        public long add(WindowedValue<T> value) throws IOException {
            return this.fileWriters.get(this.random.nextInt(this.fileWriters.size())).add(value);
        }

        @Override
        public void close() throws IOException {
            for (AvroFileWriter fileWriter : this.fileWriters) {
                fileWriter.close();
            }
        }
    }

    class AvroFileWriter
    implements Sink.SinkWriter<WindowedValue<T>> {
        DataFileWriter<T> fileWriter;

        public AvroFileWriter(DatumWriter<T> datumWriter, WritableByteChannel outputChannel) throws IOException {
            this.fileWriter = new DataFileWriter(datumWriter);
            this.fileWriter.create(AvroSink.this.schema, Channels.newOutputStream(outputChannel));
        }

        @Override
        public long add(WindowedValue<T> value) throws IOException {
            this.fileWriter.append(value.getValue());
            return CoderUtils.encodeToByteArray(AvroSink.this.avroCoder, value.getValue()).length;
        }

        @Override
        public void close() throws IOException {
            this.fileWriter.close();
        }
    }
}

