/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.TimeUnit;

class PubsubSink<T>
extends Sink<WindowedValue<T>> {
    private final String topic;
    private final String timestampLabel;
    private final String idLabel;
    private final Coder<WindowedValue<T>> coder;
    private final StreamingModeExecutionContext context;

    PubsubSink(String topic, String timestampLabel, String idLabel, Coder<WindowedValue<T>> coder, StreamingModeExecutionContext context) {
        this.topic = topic;
        this.timestampLabel = timestampLabel;
        this.idLabel = idLabel;
        this.coder = coder;
        this.context = context;
    }

    public static <T> PubsubSink<T> create(PipelineOptions options, CloudObject spec, Coder<WindowedValue<T>> coder, ExecutionContext context, CounterSet.AddCounterMutator addCounterMutator) throws Exception {
        String topic = Structs.getString((Map<String, Object>)((Object)spec), "pubsub_topic");
        String timestampLabel = Structs.getString((Map<String, Object>)((Object)spec), "pubsub_timestamp_label", "");
        String idLabel = Structs.getString((Map<String, Object>)((Object)spec), "pubsub_id_label", "");
        return new PubsubSink<T>(topic, timestampLabel, idLabel, coder, (StreamingModeExecutionContext)context);
    }

    @Override
    public Sink.SinkWriter<WindowedValue<T>> writer() {
        return new PubsubWriter(this.topic);
    }

    @Override
    public boolean supportsRestart() {
        return true;
    }

    class PubsubWriter
    implements Sink.SinkWriter<WindowedValue<T>> {
        private Windmill.PubSubMessageBundle.Builder outputBuilder;

        private PubsubWriter(String topic) {
            this.outputBuilder = Windmill.PubSubMessageBundle.newBuilder().setTopic(topic).setTimestampLabel(PubsubSink.this.timestampLabel).setIdLabel(PubsubSink.this.idLabel);
        }

        private <T> ByteString encode(Coder<T> coder, T object) throws IOException {
            ByteString.Output stream = ByteString.newOutput();
            coder.encode(object, (OutputStream)stream, Coder.Context.OUTER);
            return stream.toByteString();
        }

        @Override
        public long add(WindowedValue<T> data) throws IOException {
            ByteString byteString = this.encode(PubsubSink.this.coder, data);
            long timestampMicros = TimeUnit.MILLISECONDS.toMicros(data.getTimestamp().getMillis());
            this.outputBuilder.addMessages(Windmill.Message.newBuilder().setData(byteString).setTimestamp(timestampMicros).build());
            return byteString.size();
        }

        @Override
        public void close() throws IOException {
            Windmill.PubSubMessageBundle pubsubMessages = this.outputBuilder.build();
            if (pubsubMessages.getMessagesCount() > 0) {
                PubsubSink.this.context.getOutputBuilder().addPubsubMessages(pubsubMessages);
            }
            this.outputBuilder.clear();
        }
    }
}

