/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

class WindmillSink<T>
extends Sink<WindowedValue<T>> {
    private WindmillStreamWriter writer;
    private final Coder<T> valueCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    WindmillSink(String destinationName, Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
        this.writer = new WindmillStreamWriter(destinationName);
        WindowedValue.FullWindowedValueCoder inputCoder = (WindowedValue.FullWindowedValueCoder)coder;
        this.valueCoder = inputCoder.getValueCoder();
        this.windowsCoder = inputCoder.getWindowsCoder();
        this.context = context;
    }

    public static ByteString encodeMetadata(Coder<Collection<? extends BoundedWindow>> windowsCoder, Collection<? extends BoundedWindow> windows, PaneInfo pane) throws IOException {
        ByteString.Output stream = ByteString.newOutput();
        PaneInfo.PaneInfoCoder.INSTANCE.encode(pane, (OutputStream)stream, Coder.Context.NESTED);
        windowsCoder.encode(windows, (OutputStream)stream, Coder.Context.OUTER);
        return stream.toByteString();
    }

    public static PaneInfo decodeMetadataPane(ByteString metadata) throws IOException {
        InputStream inStream = metadata.newInput();
        return PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, Coder.Context.NESTED);
    }

    public static Collection<? extends BoundedWindow> decodeMetadataWindows(Coder<Collection<? extends BoundedWindow>> windowsCoder, ByteString metadata) throws IOException {
        InputStream inStream = metadata.newInput();
        PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, Coder.Context.NESTED);
        return windowsCoder.decode(inStream, Coder.Context.OUTER);
    }

    public static <T> WindmillSink<T> create(PipelineOptions options, CloudObject spec, Coder<WindowedValue<T>> coder, ExecutionContext context, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        return new WindmillSink<T>(Structs.getString((Map<String, Object>)((Object)spec), "stream_id"), coder, (StreamingModeExecutionContext)context);
    }

    @Override
    public Sink.SinkWriter<WindowedValue<T>> writer() {
        return this.writer;
    }

    @Override
    public boolean supportsRestart() {
        return true;
    }

    class WindmillStreamWriter
    implements Sink.SinkWriter<WindowedValue<T>> {
        private Map<ByteString, Windmill.KeyedMessageBundle.Builder> productionMap;
        private final String destinationName;

        private WindmillStreamWriter(String destinationName) {
            this.destinationName = destinationName;
            this.productionMap = new HashMap<ByteString, Windmill.KeyedMessageBundle.Builder>();
        }

        private <T> ByteString encode(Coder<T> coder, T object) throws IOException {
            ByteString.Output stream = ByteString.newOutput();
            coder.encode(object, (OutputStream)stream, Coder.Context.OUTER);
            return stream.toByteString();
        }

        @Override
        public long add(WindowedValue<T> data) throws IOException {
            ByteString value;
            ByteString key;
            ByteString id = ByteString.EMPTY;
            ByteString metadata = WindmillSink.encodeMetadata(WindmillSink.this.windowsCoder, data.getWindows(), data.getPane());
            if (WindmillSink.this.valueCoder instanceof KvCoder) {
                KvCoder kvCoder = (KvCoder)WindmillSink.this.valueCoder;
                KV kv = (KV)data.getValue();
                key = this.encode(kvCoder.getKeyCoder(), kv.getKey());
                Coder valueCoder = kvCoder.getValueCoder();
                if (valueCoder instanceof ValueWithRecordId.ValueWithRecordIdCoder) {
                    ValueWithRecordId valueAndId = (ValueWithRecordId)kv.getValue();
                    value = this.encode(((ValueWithRecordId.ValueWithRecordIdCoder)valueCoder).getValueCoder(), valueAndId.getValue());
                    id = ByteString.copyFrom((byte[])valueAndId.getId());
                } else {
                    value = this.encode(valueCoder, kv.getValue());
                }
            } else {
                key = WindmillSink.this.context.getSerializedKey();
                value = this.encode(WindmillSink.this.valueCoder, data.getValue());
            }
            Windmill.KeyedMessageBundle.Builder keyedOutput = this.productionMap.get(key);
            if (keyedOutput == null) {
                keyedOutput = Windmill.KeyedMessageBundle.newBuilder().setKey(key);
                this.productionMap.put(key, keyedOutput);
            }
            long timestampMicros = TimeUnit.MILLISECONDS.toMicros(data.getTimestamp().getMillis());
            Windmill.Message.Builder builder = Windmill.Message.newBuilder().setTimestamp(timestampMicros).setData(value).setMetadata(metadata);
            keyedOutput.addMessages(builder.build());
            keyedOutput.addMessagesIds(id);
            return key.size() + value.size() + metadata.size() + id.size();
        }

        @Override
        public void close() throws IOException {
            Windmill.OutputMessageBundle.Builder outputBuilder = Windmill.OutputMessageBundle.newBuilder().setDestinationStreamId(this.destinationName);
            for (Windmill.KeyedMessageBundle.Builder keyedOutput : this.productionMap.values()) {
                outputBuilder.addBundles(keyedOutput.build());
            }
            if (outputBuilder.getBundlesCount() > 0) {
                WindmillSink.this.context.getOutputBuilder().addOutputMessages(outputBuilder.build());
            }
            this.productionMap.clear();
        }
    }
}

