/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.dataflow.model.MetricStructuredName;
import com.google.api.services.dataflow.model.MetricUpdate;
import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Cache;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.worker.CachingSideInputReader;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowSideInputReader;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkProgressUpdater;
import com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SizedWeigher;
import com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutor;
import com.google.cloud.dataflow.sdk.runners.worker.SourceOperationExecutorFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
import com.google.cloud.dataflow.sdk.runners.worker.UserCodeTimeTracker;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingHandler;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CloudCounterUtils;
import com.google.cloud.dataflow.sdk.util.CloudMetricUtils;
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.Sized;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.Metric;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.common.worker.WorkExecutor;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataflowWorker {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowWorker.class);
    private final WorkUnitClient workUnitClient;
    private final DataflowWorkerHarnessOptions options;
    private final Cache<PCollectionViewWindow<?>, Sized<Object>> sideInputCache;
    private Server statusServer;
    private final UserCodeTimeTracker userCodeTimeTracker = new UserCodeTimeTracker();
    private static final int OVERHEAD_WEIGHT = 8;
    private static final int MEGABYTES = 0x100000;
    public static final int DEFAULT_STATUS_PORT = 18081;

    public DataflowWorker(WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
        this.workUnitClient = workUnitClient;
        this.options = options;
        this.sideInputCache = CacheBuilder.newBuilder().maximumWeight(options.getWorkerCacheMb() * 0x100000).weigher(new SizedWeigher(8)).softValues().build();
    }

    public boolean getAndPerformWork() throws IOException {
        WorkItem work = this.workUnitClient.getWorkItem();
        if (work == null) {
            return false;
        }
        return this.doWork(work);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean doWork(WorkItem workItem) throws IOException {
        LOG.debug("Executing: {}", (Object)workItem);
        WorkExecutor worker = null;
        SourceFormat.OperationResponse operationResponse = null;
        long nextReportIndex = workItem.getInitialReportIndex();
        try {
            this.options.setProject(workItem.getProjectId());
            DataflowWorkerExecutionContext executionContext = new DataflowWorkerExecutionContext(this.sideInputCache, this.options);
            CounterSet counters = new CounterSet(new Counter[0]);
            StateSampler sampler = null;
            if (workItem.getMapTask() != null) {
                sampler = new StateSampler(String.valueOf(workItem.getMapTask().getStageName()).concat("-"), counters.getAddCounterMutator());
                worker = MapTaskExecutorFactory.create(this.options, workItem.getMapTask(), executionContext, counters, sampler);
            } else if (workItem.getSourceOperationTask() != null) {
                sampler = new StateSampler("source-operation-", counters.getAddCounterMutator());
                worker = SourceOperationExecutorFactory.create(this.options, workItem.getSourceOperationTask());
            } else {
                String string = String.valueOf(workItem.toString());
                throw new RuntimeException(string.length() != 0 ? "Unknown kind of work item: ".concat(string) : new String("Unknown kind of work item: "));
            }
            sampler.addSamplingCallback(new UserCodeTimeTracker.StateSamplerCallback(this.userCodeTimeTracker, workItem.getId()));
            DataflowWorkProgressUpdater progressUpdater = new DataflowWorkProgressUpdater(workItem, worker, this.workUnitClient, this.options);
            try (AutoCloseable scope = this.userCodeTimeTracker.scopedWork(sampler.getPrefix(), workItem.getId(), counters.getAddCounterMutator());){
                try {
                    this.executeWork(worker, progressUpdater);
                }
                finally {
                    worker.close();
                    nextReportIndex = progressUpdater.getNextReportIndex();
                }
            }
            for (Object counter : counters) {
                LOG.trace("COUNTER {}.", counter);
            }
            Collection<Metric<?>> metrics = worker.getOutputMetrics();
            for (Metric metric : metrics) {
                LOG.trace("METRIC {}: {}", (Object)metric.getName(), metric.getValue());
            }
            operationResponse = worker instanceof SourceOperationExecutor ? SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse(((SourceOperationExecutor)worker).getResponse()) : null;
            try {
                this.reportStatus(this.options, "Success", workItem, counters, metrics, operationResponse, null, nextReportIndex);
            }
            catch (GoogleJsonResponseException e) {
                if (operationResponse != null && worker instanceof SourceOperationExecutor && SourceOperationExecutor.isSplitResponseTooLarge(operationResponse)) {
                    throw new RuntimeException("Total size of the BoundedSource objects generated by splitIntoBundles() operation is larger than the allowable limit. For more information, please check the corresponding FAQ entry at :\nhttps://cloud.google.com/dataflow/faq", e);
                }
                throw e;
            }
            boolean bl = true;
            return bl;
        }
        catch (Throwable e) {
            this.handleWorkError(workItem, worker, nextReportIndex, e);
            boolean bl = false;
            return bl;
        }
        finally {
            if (worker != null) {
                try {
                    worker.close();
                }
                catch (Exception exception) {
                    LOG.warn("Uncaught exception occurred during work unit shutdown:", (Throwable)exception);
                }
            }
        }
    }

    void executeWork(WorkExecutor worker, DataflowWorkProgressUpdater progressUpdater) throws Exception {
        progressUpdater.startReportingProgress();
        try {
            worker.execute();
        }
        finally {
            progressUpdater.stopReportingProgress();
        }
    }

    private void handleWorkError(WorkItem workItem, WorkExecutor worker, long nextReportIndex, Throwable e) throws IOException {
        LOG.warn("Uncaught exception occurred during work unit execution:", e);
        Throwable t = e instanceof UserCodeException ? e.getCause() : e;
        Status error = new Status();
        error.setCode(Integer.valueOf(2));
        error.setMessage(DataflowWorkerLoggingHandler.formatException(t));
        this.reportStatus(this.options, "Failure", workItem, worker == null ? null : worker.getOutputCounters(), worker == null ? null : worker.getOutputMetrics(), null, error == null ? null : Collections.singletonList(error), nextReportIndex);
    }

    private void reportStatus(DataflowWorkerHarnessOptions options, String status, WorkItem workItem, @Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics, @Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors, long reportIndex) throws IOException {
        String message = "{} processing work item {}";
        if (null != errors && errors.size() > 0) {
            LOG.warn(message, (Object)status, (Object)DataflowWorker.uniqueId(workItem));
        } else {
            LOG.debug(message, (Object)status, (Object)DataflowWorker.uniqueId(workItem));
        }
        WorkItemStatus workItemStatus = DataflowWorker.buildStatus(workItem, true, counters, metrics, options, null, null, operationResponse, errors, reportIndex);
        this.workUnitClient.reportWorkItemStatus(workItemStatus);
    }

    static WorkItemStatus buildStatus(WorkItem workItem, boolean completed, @Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics, DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress, @Nullable Reader.DynamicSplitResult dynamicSplitResult, @Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors, long reportIndex) {
        return DataflowWorker.buildStatus(workItem, completed, counters, metrics, options, progress, dynamicSplitResult, operationResponse, errors, reportIndex, null);
    }

    static WorkItemStatus buildStatus(WorkItem workItem, boolean completed, @Nullable CounterSet counters, @Nullable Collection<Metric<?>> metrics, DataflowWorkerHarnessOptions options, @Nullable Reader.Progress progress, @Nullable Reader.DynamicSplitResult dynamicSplitResult, @Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> errors, long reportIndex, @Nullable StateSampler.StateSamplerInfo stateSamplerInfo) {
        WorkItemStatus status = new WorkItemStatus();
        status.setWorkItemId(Long.toString(workItem.getId()));
        status.setCompleted(Boolean.valueOf(completed));
        status.setReportIndex(Long.valueOf(reportIndex));
        List<MetricUpdate> counterUpdates = null;
        List<MetricUpdate> metricUpdates = null;
        if (counters != null) {
            boolean delta = false;
            counterUpdates = CloudCounterUtils.extractCounters(counters, delta);
        }
        if (metrics != null) {
            metricUpdates = CloudMetricUtils.extractCloudMetrics(metrics, options.getWorkerId());
        }
        ArrayList<MetricUpdate> updates = new ArrayList<MetricUpdate>();
        if (counterUpdates != null) {
            updates.addAll(counterUpdates);
        }
        if (metricUpdates != null) {
            updates.addAll(metricUpdates);
        }
        if (stateSamplerInfo != null) {
            MetricUpdate update = new MetricUpdate();
            update.setKind("internal");
            MetricStructuredName name = new MetricStructuredName();
            name.setName("state-sampler");
            update.setName(name);
            HashMap<String, Object> metric = new HashMap<String, Object>();
            if (stateSamplerInfo.state != null) {
                metric.put("last-state-name", stateSamplerInfo.state);
            }
            if (stateSamplerInfo.transitionCount != null) {
                metric.put("num-transitions", stateSamplerInfo.transitionCount);
            }
            if (stateSamplerInfo.stateDurationMillis != null) {
                metric.put("last-state-duration-ms", stateSamplerInfo.stateDurationMillis);
            }
            update.setInternal(metric);
            updates.add(update);
        }
        status.setMetricUpdates(updates);
        if (errors != null) {
            status.setErrors(errors);
        }
        if (progress != null) {
            status.setProgress(SourceTranslationUtils.readerProgressToCloudProgress(progress));
        }
        if (dynamicSplitResult instanceof Reader.DynamicSplitResultWithPosition) {
            Reader.DynamicSplitResultWithPosition asPosition = (Reader.DynamicSplitResultWithPosition)dynamicSplitResult;
            status.setStopPosition(SourceTranslationUtils.toCloudPosition(asPosition.getAcceptedPosition()));
        } else if (dynamicSplitResult instanceof BasicSerializableSourceFormat.BoundedSourceSplit) {
            status.setDynamicSourceSplit(BasicSerializableSourceFormat.toSourceSplit((BasicSerializableSourceFormat.BoundedSourceSplit)dynamicSplitResult, options));
        } else if (dynamicSplitResult != null) {
            String string = String.valueOf(dynamicSplitResult);
            throw new IllegalArgumentException(new StringBuilder(41 + String.valueOf(string).length()).append("Unexpected type of dynamic split result: ").append(string).toString());
        }
        if (workItem.getSourceOperationTask() != null) {
            status.setSourceOperationResponse(SourceTranslationUtils.sourceOperationResponseToCloudSourceOperationResponse(operationResponse));
        }
        return status;
    }

    static String uniqueId(WorkItem work) {
        String string = String.valueOf(work.getProjectId());
        String string2 = String.valueOf(work.getJobId());
        String string3 = String.valueOf(work.getId());
        return new StringBuilder(2 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length()).append(string).append(";").append(string2).append(";").append(string3).toString();
    }

    public void runStatusServer(int statusPort) {
        LOG.info("Status server started on port {}", (Object)statusPort);
        this.runStatusServer(new Server(statusPort));
    }

    void runStatusServer(Server server) {
        this.statusServer = server;
        this.statusServer.setHandler((Handler)new StatusHandler());
        try {
            this.statusServer.start();
        }
        catch (Exception e) {
            LOG.warn("Status server failed to start: ", (Throwable)e);
        }
    }

    private class StatusHandler
    extends AbstractHandler {
        private StatusHandler() {
        }

        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
            response.setContentType("text/html;charset=utf-8");
            baseRequest.setHandled(true);
            PrintWriter responseWriter = response.getWriter();
            if (target.equals("/healthz")) {
                response.setStatus(200);
                responseWriter.println("ok");
            } else if (target.equals("/threadz")) {
                response.setStatus(200);
                this.printThreads(responseWriter);
            } else {
                response.setStatus(404);
                responseWriter.println("not found");
            }
        }

        private void printThreads(PrintWriter response) {
            Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
            for (Map.Entry<Thread, StackTraceElement[]> entry : stacks.entrySet()) {
                Thread thread = entry.getKey();
                StackTraceElement[] stackTraceElementArray = String.valueOf(thread);
                String string = String.valueOf((Object)thread.getState());
                response.println(new StringBuilder(31 + String.valueOf(stackTraceElementArray).length() + String.valueOf(string).length()).append("--- Thread: ").append((String)stackTraceElementArray).append(" State: ").append(string).append(" stack: ---").toString());
                for (StackTraceElement element : entry.getValue()) {
                    String string2 = String.valueOf(element);
                    response.println(new StringBuilder(2 + String.valueOf(string2).length()).append("  ").append(string2).toString());
                }
            }
        }
    }

    private static class DataflowWorkerExecutionContext
    extends BatchModeExecutionContext {
        private final Cache<PCollectionViewWindow<?>, Sized<Object>> cache;
        private final PipelineOptions options;

        public DataflowWorkerExecutionContext(Cache<PCollectionViewWindow<?>, Sized<Object>> cache, PipelineOptions options) {
            super(options);
            this.cache = cache;
            this.options = options;
        }

        @Override
        public SideInputReader getSideInputReader(Iterable<? extends SideInputInfo> sideInputInfos) throws Exception {
            return CachingSideInputReader.of(DataflowSideInputReader.of(sideInputInfos, this.options, this), this.cache);
        }

        @Override
        public SideInputReader getSideInputReaderForViews(Iterable<? extends PCollectionView<?>> sideInputViews) {
            throw new UnsupportedOperationException("Cannot call getSideInputReaderForViews for batch DataflowWorker: the MapTask specification should have had SideInputInfo descriptors for each side input, and a SideInputReader provided via getSideInputReader");
        }
    }

    public static abstract class WorkUnitClient {
        public abstract WorkItem getWorkItem() throws IOException;

        public abstract WorkItemServiceState reportWorkItemStatus(WorkItemStatus var1) throws IOException;
    }
}

