/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.LeaseWorkItemRequest;
import com.google.api.services.dataflow.model.LeaseWorkItemResponse;
import com.google.api.services.dataflow.model.ReportWorkItemStatusRequest;
import com.google.api.services.dataflow.model.ReportWorkItemStatusResponse;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingInitializer;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.TimeUtil;
import com.google.cloud.dataflow.sdk.util.Transport;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import javax.annotation.concurrent.ThreadSafe;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowWorkerHarness {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowWorkerHarness.class);
    private static final String APPLICATION_NAME = "DataflowWorkerHarness";
    static final int BACKOFF_INITIAL_INTERVAL_MILLIS = 5000;
    static final int BACKOFF_MAX_INTERVAL_MILLIS = 300000;

    private static BackOff createBackOff() {
        return new IntervalBoundedExponentialBackOff(300000, 5000L);
    }

    public static void main(String[] args) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(WorkerUncaughtExceptionHandler.INSTANCE);
        DataflowWorkerLoggingInitializer.initialize();
        DataflowWorkerHarnessOptions pipelineOptions = PipelineOptionsFactory.createFromSystemPropertiesInternal();
        DataflowWorkerLoggingInitializer.configure(pipelineOptions);
        Sleeper sleeper = Sleeper.DEFAULT;
        DataflowWorker worker = DataflowWorkerHarness.create(pipelineOptions);
        int statusPort = 18081;
        if (System.getProperties().containsKey("status_port")) {
            statusPort = Integer.parseInt(System.getProperty("status_port"));
        }
        worker.runStatusServer(statusPort);
        DataflowWorkerHarness.processWork(pipelineOptions, worker, sleeper);
    }

    static void processWork(DataflowWorkerHarnessOptions pipelineOptions, DataflowWorker worker, Sleeper sleeper) throws InterruptedException {
        int numThreads = DataflowWorkerHarness.chooseNumberOfThreads(pipelineOptions);
        ExecutorService executor = pipelineOptions.getExecutorService();
        LinkedList<WorkerThread> tasks = new LinkedList<WorkerThread>();
        LOG.debug("Starting {} worker threads", (Object)numThreads);
        for (int i = 0; i < numThreads; ++i) {
            tasks.add(new WorkerThread(worker, sleeper));
        }
        LOG.debug("Waiting for {} worker threads", (Object)numThreads);
        executor.invokeAll(tasks);
        LOG.error("All threads died.");
    }

    static DataflowWorker create(DataflowWorkerHarnessOptions options) {
        DataflowWorkerLoggingMDC.setJobId(options.getJobId());
        DataflowWorkerLoggingMDC.setWorkerId(options.getWorkerId());
        options.setAppName(APPLICATION_NAME);
        IOChannelUtils.setIOFactory("gs", new GcsIOChannelFactory(options));
        DataflowWorkUnitClient client = DataflowWorkUnitClient.fromOptions(options);
        return new DataflowWorker(client, options);
    }

    private static int chooseNumberOfThreads(DataflowWorkerHarnessOptions pipelineOptions) {
        if (pipelineOptions.getNumberOfWorkerHarnessThreads() != 0) {
            return pipelineOptions.getNumberOfWorkerHarnessThreads();
        }
        return Math.max(Runtime.getRuntime().availableProcessors(), 1);
    }

    @ThreadSafe
    static class DataflowWorkUnitClient
    extends DataflowWorker.WorkUnitClient {
        private static final ThreadLocal<DateTime> stageStartTime = new ThreadLocal();
        private final Dataflow dataflow;
        private final DataflowWorkerHarnessOptions options;

        static DataflowWorkUnitClient fromOptions(DataflowWorkerHarnessOptions options) {
            return new DataflowWorkUnitClient(Transport.newDataflowClient(options).build(), options);
        }

        DataflowWorkUnitClient(Dataflow dataflow, DataflowWorkerHarnessOptions options) {
            this.dataflow = dataflow;
            this.options = options;
        }

        @Override
        public WorkItem getWorkItem() throws IOException {
            LeaseWorkItemRequest request = new LeaseWorkItemRequest();
            request.setFactory(Transport.getJsonFactory());
            request.setWorkItemTypes(ImmutableList.of("map_task", "seq_map_task", "remote_source_task"));
            request.setWorkerCapabilities(ImmutableList.of(this.options.getWorkerId(), "remote_source", "custom_source"));
            request.setWorkerId(this.options.getWorkerId());
            request.setCurrentWorkerTime(TimeUtil.toCloudTime((ReadableInstant)DateTime.now()));
            request.setRequestedLeaseDuration(TimeUtil.toCloudDuration((ReadableDuration)Duration.millis((long)180000L)));
            LOG.debug("Leasing work: {}", (Object)request);
            LeaseWorkItemResponse response = (LeaseWorkItemResponse)this.dataflow.projects().jobs().workItems().lease(this.options.getProject(), this.options.getJobId(), request).execute();
            LOG.debug("Lease work response: {}", (Object)response);
            List workItems = response.getWorkItems();
            if (workItems == null || workItems.isEmpty()) {
                return null;
            }
            if (workItems.size() > 1) {
                String string = String.valueOf(response);
                throw new IOException(new StringBuilder(77 + String.valueOf(string).length()).append("This version of the SDK expects no more than one work item from the service: ").append(string).toString());
            }
            WorkItem work = (WorkItem)response.getWorkItems().get(0);
            if (work == null || work.getId() == null) {
                return null;
            }
            if (work.getMapTask() != null) {
                String stage = work.getMapTask().getStageName();
                DataflowWorkerLoggingMDC.setStageName(stage);
                LOG.info("Starting MapTask stage {}", (Object)stage);
            } else if (work.getSeqMapTask() != null) {
                String stage = work.getSeqMapTask().getStageName();
                DataflowWorkerLoggingMDC.setStageName(stage);
                LOG.info("Starting SeqMapTask stage {}", (Object)stage);
            } else {
                DataflowWorkerLoggingMDC.setStageName(null);
            }
            stageStartTime.set(DateTime.now());
            DataflowWorkerLoggingMDC.setWorkId(Long.toString(work.getId()));
            return work;
        }

        @Override
        public WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus) throws IOException {
            ReportWorkItemStatusResponse result;
            DateTime startTime;
            DateTime endTime = DateTime.now();
            workItemStatus.setFactory(Transport.getJsonFactory());
            LOG.debug("Reporting work status: {}", (Object)workItemStatus);
            if (workItemStatus.getCompleted().booleanValue() && DataflowWorkerLoggingMDC.getStageName() != null && (startTime = stageStartTime.get()) != null) {
                Interval elapsed = new Interval((ReadableInstant)startTime, (ReadableInstant)endTime);
                int numErrors = workItemStatus.getErrors() == null ? 0 : workItemStatus.getErrors().size();
                LOG.info("Finished processing stage {} with {} errors in {} seconds ", new Object[]{DataflowWorkerLoggingMDC.getStageName(), numErrors, (double)elapsed.toDurationMillis() / 1000.0});
            }
            if ((result = (ReportWorkItemStatusResponse)this.dataflow.projects().jobs().workItems().reportStatus(this.options.getProject(), this.options.getJobId(), new ReportWorkItemStatusRequest().setWorkerId(this.options.getWorkerId()).setWorkItemStatuses(Collections.singletonList(workItemStatus)).setCurrentWorkerTime(TimeUtil.toCloudTime((ReadableInstant)endTime))).execute()) == null || result.getWorkItemServiceStates() == null || result.getWorkItemServiceStates().size() != 1) {
                throw new IOException("This version of the SDK expects exactly one work item service state from the service");
            }
            WorkItemServiceState state = (WorkItemServiceState)result.getWorkItemServiceStates().get(0);
            LOG.debug("ReportWorkItemStatus result: {}", (Object)state);
            return state;
        }
    }

    private static class WorkerThread
    implements Callable<Boolean> {
        private final DataflowWorker worker;
        private final Sleeper sleeper;
        private final BackOff backOff;

        WorkerThread(DataflowWorker worker, Sleeper sleeper) {
            this.worker = worker;
            this.sleeper = sleeper;
            this.backOff = DataflowWorkerHarness.createBackOff();
        }

        @Override
        public Boolean call() {
            boolean success = true;
            try {
                do {
                    if (!(success = this.doWork())) continue;
                    this.backOff.reset();
                } while (success || BackOffUtils.next((Sleeper)this.sleeper, (BackOff)this.backOff));
            }
            catch (IOException e) {
                LOG.error("Already tried several attempts at working on tasks. Aborting.", (Throwable)e);
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted during thread execution or sleep.", (Throwable)e);
            }
            catch (Throwable t) {
                LOG.error("Thread {} died.", (Object)Thread.currentThread().getId(), (Object)t);
            }
            return false;
        }

        private boolean doWork() {
            try {
                LOG.debug("Thread starting getAndPerformWork.");
                boolean success = this.worker.getAndPerformWork();
                LOG.debug("{} processing one WorkItem.", (Object)(success ? "Finished" : "Failed"));
                return success;
            }
            catch (IOException e) {
                LOG.debug("There was a problem getting work.", (Throwable)e);
                return false;
            }
            catch (Exception e) {
                LOG.error("There was an unhandled error caused by the Dataflow SDK.", (Throwable)e);
                return false;
            }
        }
    }

    static class WorkerUncaughtExceptionHandler
    implements Thread.UncaughtExceptionHandler {
        static final WorkerUncaughtExceptionHandler INSTANCE = new WorkerUncaughtExceptionHandler();

        WorkerUncaughtExceptionHandler() {
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            LOG.error("Uncaught exception in main thread. Exiting with status code 1.", e);
            System.err.println("Uncaught exception in main thread. Exiting with status code 1.");
            e.printStackTrace();
            System.exit(1);
        }
    }
}

