package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.Structs;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Sink;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/PubsubSink.class */
public class PubsubSink<T> extends Sink<WindowedValue<T>> {
    private String topic;
    private Coder<WindowedValue<T>> coder;
    private StreamingModeExecutionContext context;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/PubsubSink$PubsubWriter.class */
    class PubsubWriter implements Sink.SinkWriter<WindowedValue<T>> {
        private Windmill.PubSubMessageBundle.Builder outputBuilder;

        private PubsubWriter(String str) {
            this.outputBuilder = Windmill.PubSubMessageBundle.newBuilder().setTopic(str);
        }

        private <T> ByteString encode(Coder<T> coder, T t) throws IOException {
            ByteString.Output newOutput = ByteString.newOutput();
            coder.encode(t, newOutput, Coder.Context.OUTER);
            return newOutput.toByteString();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter
        public long add(WindowedValue<T> windowedValue) throws IOException {
            this.outputBuilder.addMessages(Windmill.Message.newBuilder().setData(encode(PubsubSink.this.coder, windowedValue)).setTimestamp(TimeUnit.MILLISECONDS.toMicros(windowedValue.getTimestamp().getMillis())).build());
            return r0.size();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink.SinkWriter, java.lang.AutoCloseable
        public void close() throws IOException {
            Windmill.PubSubMessageBundle build = this.outputBuilder.build();
            if (build.getMessagesCount() > 0) {
                PubsubSink.this.context.getOutputBuilder().addPubsubMessages(build);
            }
            this.outputBuilder.clear();
        }
    }

    PubsubSink(String str, Coder<WindowedValue<T>> coder, StreamingModeExecutionContext streamingModeExecutionContext) {
        this.topic = str;
        this.coder = coder;
        this.context = streamingModeExecutionContext;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static <T> PubsubSink<T> create(PipelineOptions pipelineOptions, CloudObject cloudObject, Coder<WindowedValue<T>> coder, ExecutionContext executionContext) throws Exception {
        return new PubsubSink<>(Structs.getString(cloudObject, PropertyNames.PUBSUB_TOPIC), coder, (StreamingModeExecutionContext) executionContext);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public Sink.SinkWriter<WindowedValue<T>> writer() {
        return new PubsubWriter(this.topic);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Sink
    public boolean supportsRestart() {
        return true;
    }
}
