/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.io.BlockBasedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Arrays;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class AvroSource<T>
extends BlockBasedSource<T> {
    static final long DEFAULT_MIN_BUNDLE_SIZE = 128000L;
    private final String schema;
    private final Class<T> type;
    private final String codec;
    private final byte[] syncMarker;
    private transient AvroCoder<T> coder = null;

    public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, Class<T> clazz) {
        return Read.from(new AvroSource<T>(filePattern, 128000L, ReflectData.get().getSchema(clazz).toString(), clazz, null, null));
    }

    public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
        return new AvroSource<GenericRecord>(fileNameOrPattern, 128000L, null, GenericRecord.class, null, null);
    }

    public AvroSource<GenericRecord> withSchema(String schema) {
        return new AvroSource<GenericRecord>(this.getFileOrPatternSpec(), this.getMinBundleSize(), schema, GenericRecord.class, this.codec, this.syncMarker);
    }

    public AvroSource<GenericRecord> withSchema(Schema schema) {
        return new AvroSource<GenericRecord>(this.getFileOrPatternSpec(), this.getMinBundleSize(), schema.toString(), GenericRecord.class, this.codec, this.syncMarker);
    }

    public <X> AvroSource<X> withSchema(Class<X> clazz) {
        return new AvroSource<X>(this.getFileOrPatternSpec(), this.getMinBundleSize(), ReflectData.get().getSchema(clazz).toString(), clazz, this.codec, this.syncMarker);
    }

    public AvroSource<T> withMinBundleSize(long minBundleSize) {
        return new AvroSource<T>(this.getFileOrPatternSpec(), minBundleSize, this.schema, this.type, this.codec, this.syncMarker);
    }

    private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type, String codec, byte[] syncMarker) {
        super(fileNameOrPattern, minBundleSize);
        this.schema = schema;
        this.codec = codec;
        this.syncMarker = syncMarker;
        this.type = type;
    }

    private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset, String schema, Class<T> type, String codec, byte[] syncMarker) {
        super(fileName, minBundleSize, startOffset, endOffset);
        this.schema = schema;
        this.codec = codec;
        this.syncMarker = syncMarker;
        this.type = type;
    }

    @Override
    public void validate() {
        super.validate();
    }

    static Metadata readMetadataFromFile(String fileName) throws IOException {
        byte[] syncMarker;
        String codec = null;
        String schema = null;
        try (InputStream stream = Channels.newInputStream(IOChannelUtils.getFactory(fileName).open(fileName));){
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
            byte[] magic = new byte[DataFileConstants.MAGIC.length];
            decoder.readFixed(magic);
            if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {
                String string = String.valueOf(fileName);
                throw new IOException(string.length() != 0 ? "Missing Avro file signature: ".concat(string) : new String("Missing Avro file signature: "));
            }
            ByteBuffer valueBuffer = ByteBuffer.allocate(512);
            long numRecords = decoder.readMapStart();
            while (numRecords > 0L) {
                for (long recordIndex = 0L; recordIndex < numRecords; ++recordIndex) {
                    String key = decoder.readString();
                    valueBuffer = decoder.readBytes(valueBuffer);
                    byte[] bytes = new byte[valueBuffer.remaining()];
                    valueBuffer.get(bytes);
                    if (key.equals("avro.codec")) {
                        codec = new String(bytes, "UTF-8");
                        continue;
                    }
                    if (!key.equals("avro.schema")) continue;
                    schema = new String(bytes, "UTF-8");
                }
                numRecords = decoder.mapNext();
            }
            if (codec == null) {
                codec = "null";
            }
            syncMarker = new byte[16];
            decoder.readFixed(syncMarker);
        }
        return new Metadata(syncMarker, codec, schema);
    }

    @Override
    public AvroSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
        byte[] syncMarker = this.syncMarker;
        String codec = this.codec;
        String schema = this.schema;
        if (codec == null || syncMarker == null) {
            Metadata metadata;
            try {
                metadata = AvroSource.readMetadataFromFile(fileName);
            }
            catch (IOException e) {
                String string = String.valueOf(fileName);
                throw new RuntimeException(string.length() != 0 ? "Error reading metadata from file ".concat(string) : new String("Error reading metadata from file "), e);
            }
            codec = metadata.codec;
            syncMarker = metadata.syncMarker;
            if (schema == null) {
                schema = metadata.schema;
            }
        }
        return new AvroSource<T>(fileName, this.getMinBundleSize(), start, end, schema, this.type, codec, syncMarker);
    }

    @Override
    public AvroReader<T> createSingleFileReader(PipelineOptions options) {
        return new AvroReader(this);
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
        return false;
    }

    @Override
    public AvroCoder<T> getDefaultOutputCoder() {
        if (this.coder == null) {
            Schema.Parser parser = new Schema.Parser();
            this.coder = AvroCoder.of(this.type, parser.parse(this.schema));
        }
        return this.coder;
    }

    public String getSchema() {
        return this.schema;
    }

    private byte[] getSyncMarker() {
        return this.syncMarker;
    }

    private String getCodec() {
        return this.codec;
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    public static class AvroReader<T>
    extends BlockBasedSource.BlockBasedReader<T> {
        private AvroBlock<T> currentBlock;
        private long currentBlockOffset = 0L;
        private long currentBlockSizeBytes = 0L;
        private long currentOffset = 0L;
        private PushbackInputStream stream;
        private final byte[] readBuffer = new byte[20];
        private BinaryDecoder decoder;

        public AvroReader(AvroSource<T> source) {
            super(source);
        }

        @Override
        public AvroSource<T> getCurrentSource() {
            return (AvroSource)super.getCurrentSource();
        }

        @Override
        public boolean readNextBlock() throws IOException {
            this.currentOffset += AvroReader.advancePastNextSyncMarker(this.stream, ((AvroSource)this.getCurrentSource()).getSyncMarker());
            this.currentBlockOffset = this.currentOffset - (long)((AvroSource)this.getCurrentSource()).getSyncMarker().length;
            int read = this.stream.read(this.readBuffer);
            if (read <= 0) {
                return false;
            }
            this.decoder = DecoderFactory.get().binaryDecoder(this.readBuffer, this.decoder);
            long numRecords = this.decoder.readLong();
            long blockSize = this.decoder.readLong();
            int headerSize = this.readBuffer.length - this.decoder.inputStream().available();
            this.stream.unread(this.readBuffer, headerSize, read - headerSize);
            byte[] data = new byte[(int)blockSize];
            this.stream.read(data);
            this.currentBlock = new AvroBlock(data, numRecords, this.getCurrentSource());
            this.currentBlockSizeBytes = blockSize;
            this.currentOffset += (long)headerSize + blockSize;
            return true;
        }

        @Override
        public AvroBlock<T> getCurrentBlock() {
            return this.currentBlock;
        }

        @Override
        public long getCurrentBlockOffset() {
            return this.currentBlockOffset;
        }

        @Override
        public long getCurrentBlockSize() {
            return this.currentBlockSizeBytes;
        }

        private PushbackInputStream createStream(ReadableByteChannel channel) {
            return new PushbackInputStream(Channels.newInputStream(channel), ((AvroSource)this.getCurrentSource()).getSyncMarker().length + this.readBuffer.length);
        }

        @Override
        protected void startReading(ReadableByteChannel channel) throws IOException {
            this.stream = this.createStream(channel);
            this.currentOffset = this.getCurrentSource().getStartOffset();
        }

        static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker) throws IOException {
            int read;
            Seeker seeker = new Seeker(syncMarker);
            byte[] syncBuffer = new byte[syncMarker.length];
            long totalBytesConsumed = 0L;
            int mark = -1;
            do {
                if ((read = stream.read(syncBuffer)) < 0) continue;
                mark = seeker.find(syncBuffer, read);
                totalBytesConsumed += (long)read;
            } while (mark < 0 && read > 0);
            if (mark >= 0) {
                stream.unread(syncBuffer, mark + 1, read - (mark + 1));
                totalBytesConsumed -= (long)(read - (mark + 1));
            }
            return totalBytesConsumed;
        }

        static class Seeker {
            private byte[] marker;
            private byte[] searchBuffer;
            private int available = 0;

            public Seeker(byte[] marker) {
                this.marker = marker;
                this.searchBuffer = new byte[marker.length];
            }

            public int find(byte[] buffer, int length) {
                for (int i = 0; i < length; ++i) {
                    System.arraycopy(this.searchBuffer, 1, this.searchBuffer, 0, this.searchBuffer.length - 1);
                    this.searchBuffer[this.searchBuffer.length - 1] = buffer[i];
                    this.available = Math.min(this.available + 1, this.searchBuffer.length);
                    if (!ByteBuffer.wrap(this.searchBuffer, this.searchBuffer.length - this.available, this.available).equals(ByteBuffer.wrap(this.marker))) continue;
                    this.available = 0;
                    return i;
                }
                return -1;
            }
        }
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    static class AvroBlock<T>
    extends BlockBasedSource.Block<T> {
        private final long numRecords;
        private T currentRecord;
        private long currentRecordIndex = 0L;
        private final DatumReader<T> reader;
        private final BinaryDecoder decoder;

        private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {
            ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
            switch (codec) {
                case "snappy": {
                    return new SnappyCompressorInputStream((InputStream)byteStream);
                }
                case "deflate": {
                    Inflater inflater = new Inflater(true);
                    return new InflaterInputStream(byteStream, inflater);
                }
                case "xz": {
                    return new XZCompressorInputStream((InputStream)byteStream);
                }
                case "bzip2": {
                    return new BZip2CompressorInputStream((InputStream)byteStream);
                }
                case "null": {
                    return byteStream;
                }
            }
            String string = String.valueOf(codec);
            throw new IllegalArgumentException(string.length() != 0 ? "Unsupported codec: ".concat(string) : new String("Unsupported codec: "));
        }

        AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws IOException {
            this.numRecords = numRecords;
            this.reader = ((AvroCoder)source.getDefaultOutputCoder()).createDatumReader();
            this.decoder = DecoderFactory.get().binaryDecoder(AvroBlock.decodeAsInputStream(data, ((AvroSource)source).getCodec()), null);
        }

        @Override
        public T getCurrentRecord() {
            return this.currentRecord;
        }

        @Override
        public boolean readNextRecord() throws IOException {
            if (this.currentRecordIndex >= this.numRecords) {
                return false;
            }
            this.currentRecord = this.reader.read(null, (Decoder)this.decoder);
            ++this.currentRecordIndex;
            return true;
        }

        @Override
        public double getFractionOfBlockConsumed() {
            return (double)this.currentRecordIndex / (double)this.numRecords;
        }
    }

    static class Metadata {
        byte[] syncMarker;
        String codec;
        String schema;

        public Metadata(byte[] syncMarker, String codec, String schema) {
            this.syncMarker = syncMarker;
            this.codec = codec;
            this.schema = schema;
        }
    }
}

