/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.testing;

import com.google.api.services.dataflow.model.JobMessage;
import com.google.api.services.dataflow.model.JobMetrics;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Optional;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestDataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.MonitoringUtil;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDataflowPipelineRunner
extends PipelineRunner<DataflowPipelineJob> {
    private static final String TENTATIVE_COUNTER = "tentative";
    private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class);
    private final TestDataflowPipelineOptions options;
    private final DataflowPipelineRunner runner;
    private int expectedNumberOfAssertions = 0;

    TestDataflowPipelineRunner(TestDataflowPipelineOptions options) {
        this.options = options;
        this.runner = DataflowPipelineRunner.fromOptions(options);
    }

    public static TestDataflowPipelineRunner fromOptions(PipelineOptions options) {
        TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class);
        return new TestDataflowPipelineRunner(dataflowOptions);
    }

    @Override
    public DataflowPipelineJob run(Pipeline pipeline) {
        return this.run(pipeline, this.runner);
    }

    DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) {
        DataflowPipelineJob job;
        final MonitoringUtil.PrintHandler messageHandler = new MonitoringUtil.PrintHandler(this.options.getJobMessageOutput());
        try {
            job = runner.run(pipeline);
        }
        catch (DataflowJobExecutionException ex) {
            throw new IllegalStateException("The dataflow failed.");
        }
        LOG.info("Running Dataflow job {} with {} expected assertions.", (Object)job.getJobId(), (Object)this.expectedNumberOfAssertions);
        try {
            Optional<Boolean> result;
            if (this.options.isStreaming()) {
                Future<Optional<Boolean>> resultFuture = this.options.getExecutorService().submit(new Callable<Optional<Boolean>>(){

                    @Override
                    public Optional<Boolean> call() throws Exception {
                        try {
                            while (true) {
                                Optional<Boolean> result;
                                if ((result = TestDataflowPipelineRunner.this.checkForSuccess(job)).isPresent()) {
                                    Optional<Boolean> optional = result;
                                    return optional;
                                }
                                Thread.sleep(10000L);
                            }
                        }
                        finally {
                            LOG.info("Cancelling Dataflow job {}", (Object)job.getJobId());
                            job.cancel();
                        }
                    }
                });
                PipelineResult.State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, new MonitoringUtil.JobMessagesHandler(){

                    @Override
                    public void process(List<JobMessage> messages) {
                        messageHandler.process(messages);
                        for (JobMessage message : messages) {
                            if (message.getMessageImportance() == null || !message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) continue;
                            LOG.info("Dataflow job {} threw exception, cancelling. Exception was: {}", (Object)job.getJobId(), (Object)message.getMessageText());
                            try {
                                job.cancel();
                            }
                            catch (Exception e) {
                                throw Throwables.propagate(e);
                            }
                        }
                    }
                });
                if (finalState == null || finalState == PipelineResult.State.RUNNING) {
                    LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", (Object)job.getJobId());
                    job.cancel();
                }
                result = resultFuture.get();
            } else {
                job.waitToFinish(-1L, TimeUnit.SECONDS, messageHandler);
                result = this.checkForSuccess(job);
            }
            if (!result.isPresent()) {
                throw new IllegalStateException("The dataflow did not output a success or failure metric.");
            }
            if (!result.get().booleanValue()) {
                throw new IllegalStateException("The dataflow failed.");
            }
        }
        catch (Exception e) {
            Throwables.propagateIfPossible(e);
            throw Throwables.propagate(e);
        }
        return job;
    }

    @Override
    public <OutputT extends POutput, InputT extends PInput> OutputT apply(PTransform<InputT, OutputT> transform, InputT input) {
        if (transform instanceof DataflowAssert.OneSideInputAssert || transform instanceof DataflowAssert.TwoSideInputAssert) {
            ++this.expectedNumberOfAssertions;
        }
        return this.runner.apply(transform, input);
    }

    Optional<Boolean> checkForSuccess(DataflowPipelineJob job) throws IOException {
        PipelineResult.State state = job.getState();
        if (state == PipelineResult.State.FAILED || state == PipelineResult.State.CANCELLED) {
            LOG.info("The pipeline failed");
            return Optional.of(false);
        }
        JobMetrics metrics = (JobMetrics)job.getDataflowClient().projects().jobs().getMetrics(job.getProjectId(), job.getJobId()).execute();
        if (metrics == null || metrics.getMetrics() == null) {
            LOG.warn("Metrics not present for Dataflow job {}.", (Object)job.getJobId());
        } else {
            int successes = 0;
            int failures = 0;
            for (MetricUpdate metric : metrics.getMetrics()) {
                if (metric.getName() == null || metric.getName().getContext() == null || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) continue;
                if ("DataflowAssertSuccess".equals(metric.getName().getName())) {
                    successes += ((BigDecimal)metric.getScalar()).intValue();
                    continue;
                }
                if (!"DataflowAssertFailure".equals(metric.getName().getName())) continue;
                failures += ((BigDecimal)metric.getScalar()).intValue();
            }
            if (failures > 0) {
                LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
                return Optional.of(false);
            }
            if (successes >= this.expectedNumberOfAssertions) {
                LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
                return Optional.of(true);
            }
            LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected assertions.", new Object[]{job.getJobId(), successes, failures, this.expectedNumberOfAssertions});
        }
        return Optional.absent();
    }

    public String toString() {
        String string = String.valueOf(this.options.getAppName());
        return string.length() != 0 ? "TestDataflowPipelineRunner#".concat(string) : new String("TestDataflowPipelineRunner#");
    }
}

