/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.InstantCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter;
import com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter;
import com.google.cloud.dataflow.sdk.runners.worker.ShuffleEntryWriter;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;

public class ShuffleSink<T>
extends Sink<WindowedValue<T>> {
    static final long SHUFFLE_WRITER_BUFFER_SIZE = 0x8000000L;
    final byte[] shuffleWriterConfig;
    final ShuffleKind shuffleKind;
    final PipelineOptions options;
    final CounterSet.AddCounterMutator addCounterMutator;
    boolean shardByKey;
    boolean groupValues;
    boolean sortValues;
    WindowedValue.WindowedValueCoder<T> windowedElemCoder;
    WindowedValue.WindowedValueCoder windowedValueCoder;
    Coder<T> elemCoder;
    Coder keyCoder;
    Coder valueCoder;
    Coder sortKeyCoder;
    Coder sortValueCoder;

    public static ShuffleKind parseShuffleKind(String shuffleKind) throws Exception {
        try {
            return Enum.valueOf(ShuffleKind.class, shuffleKind.trim().toUpperCase());
        }
        catch (IllegalArgumentException e) {
            throw new Exception("unexpected shuffle_kind", e);
        }
    }

    public ShuffleSink(PipelineOptions options, byte[] shuffleWriterConfig, ShuffleKind shuffleKind, Coder<WindowedValue<T>> coder, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        this.shuffleWriterConfig = shuffleWriterConfig;
        this.shuffleKind = shuffleKind;
        this.options = options;
        this.addCounterMutator = addCounterMutator;
        this.initCoder(coder);
    }

    private void initCoder(Coder<WindowedValue<T>> coder) throws Exception {
        switch (this.shuffleKind) {
            case UNGROUPED: {
                this.shardByKey = false;
                this.groupValues = false;
                this.sortValues = false;
                break;
            }
            case PARTITION_KEYS: {
                this.shardByKey = true;
                this.groupValues = false;
                this.sortValues = false;
                break;
            }
            case GROUP_KEYS: {
                this.shardByKey = true;
                this.groupValues = true;
                this.sortValues = false;
                break;
            }
            case GROUP_KEYS_AND_SORT_VALUES: {
                this.shardByKey = true;
                this.groupValues = true;
                this.sortValues = true;
                break;
            }
            default: {
                throw new AssertionError((Object)"unexpected shuffle kind");
            }
        }
        this.windowedElemCoder = (WindowedValue.WindowedValueCoder)coder;
        this.elemCoder = this.windowedElemCoder.getValueCoder();
        if (this.shardByKey) {
            if (!(this.elemCoder instanceof KvCoder)) {
                throw new Exception("unexpected kind of coder for elements written to a key-grouping shuffle");
            }
            KvCoder kvCoder = (KvCoder)this.elemCoder;
            this.keyCoder = kvCoder.getKeyCoder();
            this.valueCoder = kvCoder.getValueCoder();
            if (this.sortValues) {
                if (!(this.valueCoder instanceof KvCoder)) {
                    throw new Exception("unexpected kind of coder for values written to a value-sorting shuffle");
                }
                KvCoder kvValueCoder = (KvCoder)this.valueCoder;
                this.sortKeyCoder = kvValueCoder.getKeyCoder();
                this.sortValueCoder = kvValueCoder.getValueCoder();
            } else {
                this.sortKeyCoder = null;
                this.sortValueCoder = null;
            }
            this.windowedValueCoder = this.groupValues ? null : this.windowedElemCoder.withValueCoder(this.valueCoder);
        } else {
            this.keyCoder = null;
            this.valueCoder = null;
            this.sortKeyCoder = null;
            this.sortValueCoder = null;
            this.windowedValueCoder = null;
        }
    }

    public Sink.SinkWriter<WindowedValue<T>> writer(ShuffleEntryWriter writer, String datasetId) {
        return new ShuffleSinkWriter(writer, this.options, this.addCounterMutator, datasetId);
    }

    @Override
    public Sink.SinkWriter<WindowedValue<T>> writer() throws IOException {
        Preconditions.checkArgument(this.shuffleWriterConfig != null);
        ApplianceShuffleWriter applianceWriter = new ApplianceShuffleWriter(this.shuffleWriterConfig, 0x8000000L);
        String datasetId = applianceWriter.getDatasetId();
        return this.writer(new ChunkingShuffleEntryWriter(applianceWriter), datasetId);
    }

    @Override
    protected StateSampler.StateKind getStateSamplerStateKind() {
        return StateSampler.StateKind.FRAMEWORK;
    }

    class ShuffleSinkWriter
    implements Sink.SinkWriter<WindowedValue<T>> {
        private static final String COUNTER_WORKER_PREFIX = "worker-";
        private static final String COUNTER_DATASET_PREFIX = "-dataset-";
        private static final String COUNTER_SUFFIX = "-shuffle-bytes";
        private ShuffleEntryWriter writer;
        private long seqNum = 0L;
        private final Counter<Long> perWorkerPerDatasetBytesCounter;
        private final Counter<Long> perDatasetBytesCounter;

        ShuffleSinkWriter(ShuffleEntryWriter writer, PipelineOptions options, CounterSet.AddCounterMutator addCounterMutator, String datasetId) {
            this.writer = writer;
            DataflowWorkerHarnessOptions dataflowOptions = options.as(DataflowWorkerHarnessOptions.class);
            String string = String.valueOf(COUNTER_WORKER_PREFIX);
            String string2 = dataflowOptions.getWorkerId();
            String string3 = String.valueOf(COUNTER_DATASET_PREFIX);
            String string4 = String.valueOf(COUNTER_SUFFIX);
            this.perWorkerPerDatasetBytesCounter = addCounterMutator.addCounter(Counter.longs(new StringBuilder(0 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length() + String.valueOf(datasetId).length() + String.valueOf(string4).length()).append(string).append(string2).append(string3).append(datasetId).append(string4).toString(), Counter.AggregationKind.SUM));
            this.perDatasetBytesCounter = addCounterMutator.addCounter(Counter.longs(new StringBuilder(26 + String.valueOf(datasetId).length()).append("dax-shuffle-").append(datasetId).append("-written-bytes").toString(), Counter.AggregationKind.SUM));
        }

        @Override
        public long add(WindowedValue<T> windowedElem) throws IOException {
            byte[] valueBytes;
            byte[] secondaryKeyBytes;
            byte[] keyBytes;
            Object elem = windowedElem.getValue();
            if (ShuffleSink.this.shardByKey) {
                if (!(elem instanceof KV)) {
                    throw new AssertionError((Object)"expecting the values written to a key-grouping shuffle to be KVs");
                }
                KV kv = (KV)elem;
                Object key = kv.getKey();
                Object value = kv.getValue();
                keyBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.keyCoder, key);
                if (ShuffleSink.this.sortValues) {
                    if (!(value instanceof KV)) {
                        throw new AssertionError((Object)"expecting the value parts of the KVs written to a value-sorting shuffle to also be KVs");
                    }
                    KV kvValue = (KV)value;
                    Object sortKey = kvValue.getKey();
                    Object sortValue = kvValue.getValue();
                    secondaryKeyBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.sortKeyCoder, sortKey);
                    valueBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.sortValueCoder, sortValue);
                } else if (ShuffleSink.this.groupValues) {
                    secondaryKeyBytes = windowedElem.getTimestamp().equals((Object)BoundedWindow.TIMESTAMP_MIN_VALUE) ? null : CoderUtils.encodeToByteArray(InstantCoder.of(), windowedElem.getTimestamp());
                    valueBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.valueCoder, value);
                } else {
                    secondaryKeyBytes = null;
                    valueBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.windowedValueCoder, windowedElem.withValue(value));
                }
            } else {
                keyBytes = CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), this.seqNum++);
                secondaryKeyBytes = null;
                valueBytes = CoderUtils.encodeToByteArray(ShuffleSink.this.windowedElemCoder, windowedElem);
            }
            ShuffleEntry entry = new ShuffleEntry(keyBytes, secondaryKeyBytes, valueBytes);
            this.writer.put(entry);
            long bytes = entry.length();
            this.perWorkerPerDatasetBytesCounter.addValue(bytes);
            this.perDatasetBytesCounter.addValue(bytes);
            return bytes;
        }

        @Override
        public void close() throws IOException {
            this.writer.close();
        }
    }

    static enum ShuffleKind {
        UNGROUPED,
        PARTITION_KEYS,
        GROUP_KEYS,
        GROUP_KEYS_AND_SORT_VALUES;

    }
}

