package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.api.services.dataflow.model.Status;
import com.google.api.services.dataflow.model.WorkItem;
import com.google.api.services.dataflow.model.WorkItemServiceState;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Cache;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingHandler;
import com.google.cloud.dataflow.sdk.util.CloudCounterUtils;
import com.google.cloud.dataflow.sdk.util.CloudMetricUtils;
import com.google.cloud.dataflow.sdk.util.PCollectionViewWindow;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.Sized;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.Metric;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.SourceFormat;
import com.google.cloud.dataflow.sdk.util.common.worker.WorkExecutor;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker.class */
public class DataflowWorker {
    private static final Logger LOG = LoggerFactory.getLogger(DataflowWorker.class);
    private final WorkUnitClient workUnitClient;
    private final DataflowWorkerHarnessOptions options;
    private final Cache<PCollectionViewWindow<?>, Sized<Object>> sideInputCache;
    private Server statusServer;
    private static final int OVERHEAD_WEIGHT = 8;
    private static final int MEGABYTES = 1048576;
    public static final int DEFAULT_STATUS_PORT = 18081;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker$DataflowWorkerExecutionContext.class */
    public static class DataflowWorkerExecutionContext extends DataflowExecutionContext {
        private final Cache<PCollectionViewWindow<?>, Sized<Object>> cache;
        private final PipelineOptions options;

        public DataflowWorkerExecutionContext(Cache<PCollectionViewWindow<?>, Sized<Object>> cache, PipelineOptions pipelineOptions) {
            this.cache = cache;
            this.options = pipelineOptions;
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext
        public SideInputReader getSideInputReader(Iterable<? extends SideInputInfo> iterable) throws Exception {
            return CachingSideInputReader.of(DataflowSideInputReader.of(iterable, this.options, this), this.cache);
        }

        @Override // com.google.cloud.dataflow.sdk.runners.worker.DataflowExecutionContext
        public SideInputReader getSideInputReaderForViews(Iterable<? extends PCollectionView<?>> iterable) {
            throw new UnsupportedOperationException("Cannot call getSideInputReaderForViews for batch DataflowWorker: the MapTask specification should have had SideInputInfo descriptors for each side input, and a SideInputReader provided via getSideInputReader");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker$StatusHandler.class */
    public class StatusHandler extends AbstractHandler {
        private StatusHandler() {
        }

        public void handle(String str, Request request, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws IOException, ServletException {
            httpServletResponse.setContentType("text/html;charset=utf-8");
            request.setHandled(true);
            PrintWriter writer = httpServletResponse.getWriter();
            if (str.equals("/healthz")) {
                httpServletResponse.setStatus(200);
                writer.println("ok");
            } else {
                httpServletResponse.setStatus(404);
                writer.println("not found");
            }
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/DataflowWorker$WorkUnitClient.class */
    public static abstract class WorkUnitClient {
        public abstract WorkItem getWorkItem() throws IOException;

        public abstract WorkItemServiceState reportWorkItemStatus(WorkItemStatus workItemStatus) throws IOException;
    }

    public DataflowWorker(WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions) {
        this.workUnitClient = workUnitClient;
        this.options = dataflowWorkerHarnessOptions;
        this.sideInputCache = CacheBuilder.newBuilder().maximumWeight(dataflowWorkerHarnessOptions.getWorkerCacheMb().intValue() * MEGABYTES).weigher(new SizedWeigher(8)).softValues().build();
    }

    public boolean getAndPerformWork() throws IOException {
        WorkItem workItem = this.workUnitClient.getWorkItem();
        if (workItem == null) {
            return false;
        }
        return doWork(workItem);
    }

    private boolean doWork(WorkItem workItem) throws IOException {
        String str;
        WorkExecutor create;
        LOG.debug("Executing: {}", workItem);
        AutoCloseable autoCloseable = null;
        long longValue = workItem.getInitialReportIndex().longValue();
        try {
            try {
                this.options.setProject(workItem.getProjectId());
                DataflowWorkerExecutionContext dataflowWorkerExecutionContext = new DataflowWorkerExecutionContext(this.sideInputCache, this.options);
                if (workItem.getMapTask() != null) {
                    create = MapTaskExecutorFactory.create(this.options, workItem.getMapTask(), dataflowWorkerExecutionContext);
                } else {
                    if (workItem.getSourceOperationTask() == null) {
                        String valueOf = String.valueOf(workItem.toString());
                        if (valueOf.length() != 0) {
                            str = "Unknown kind of work item: ".concat(valueOf);
                        } else {
                            str = r3;
                            String str2 = new String("Unknown kind of work item: ");
                        }
                        throw new RuntimeException(str);
                    }
                    create = SourceOperationExecutorFactory.create(this.options, workItem.getSourceOperationTask());
                }
                DataflowWorkProgressUpdater dataflowWorkProgressUpdater = new DataflowWorkProgressUpdater(workItem, create, this.workUnitClient, this.options);
                try {
                    executeWork(create, dataflowWorkProgressUpdater);
                    long nextReportIndex = dataflowWorkProgressUpdater.getNextReportIndex();
                    CounterSet outputCounters = create.getOutputCounters();
                    Iterator<Counter<?>> it = outputCounters.iterator();
                    while (it.hasNext()) {
                        LOG.trace("COUNTER {}.", it.next());
                    }
                    Collection<Metric<?>> outputMetrics = create.getOutputMetrics();
                    for (Metric<?> metric : outputMetrics) {
                        LOG.trace("METRIC {}: {}", metric.getName(), metric.getValue());
                    }
                    reportStatus(this.options, "Success", workItem, outputCounters, outputMetrics, create instanceof SourceOperationExecutor ? SourceTranslationUtils.cloudSourceOperationResponseToSourceOperationResponse(((SourceOperationExecutor) create).getResponse()) : null, null, nextReportIndex);
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Exception e) {
                            LOG.warn("Uncaught exception occurred during work unit shutdown:", e);
                        }
                    }
                    return true;
                } catch (Throwable th) {
                    dataflowWorkProgressUpdater.getNextReportIndex();
                    throw th;
                }
            } catch (Throwable th2) {
                handleWorkError(workItem, null, longValue, th2);
                if (0 != 0) {
                    try {
                        autoCloseable.close();
                    } catch (Exception e2) {
                        LOG.warn("Uncaught exception occurred during work unit shutdown:", e2);
                    }
                }
                return false;
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                try {
                    autoCloseable.close();
                } catch (Exception e3) {
                    LOG.warn("Uncaught exception occurred during work unit shutdown:", e3);
                }
            }
            throw th3;
        }
    }

    void executeWork(WorkExecutor workExecutor, DataflowWorkProgressUpdater dataflowWorkProgressUpdater) throws Exception {
        dataflowWorkProgressUpdater.startReportingProgress();
        try {
            workExecutor.execute();
        } finally {
            dataflowWorkProgressUpdater.stopReportingProgress();
        }
    }

    private void handleWorkError(WorkItem workItem, WorkExecutor workExecutor, long j, Throwable th) throws IOException {
        LOG.warn("Uncaught exception occurred during work unit execution:", th);
        Throwable cause = th instanceof UserCodeException ? th.getCause() : th;
        Status status = new Status();
        status.setCode(2);
        status.setMessage(DataflowWorkerLoggingHandler.formatException(cause));
        reportStatus(this.options, "Failure", workItem, workExecutor == null ? null : workExecutor.getOutputCounters(), workExecutor == null ? null : workExecutor.getOutputMetrics(), null, status == null ? null : Collections.singletonList(status), j);
    }

    private void reportStatus(DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions, String str, WorkItem workItem, @Nullable CounterSet counterSet, @Nullable Collection<Metric<?>> collection, @Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> list, long j) throws IOException {
        if (null == list || list.size() <= 0) {
            LOG.debug("{} processing work item {}", str, uniqueId(workItem));
        } else {
            LOG.warn("{} processing work item {}", str, uniqueId(workItem));
        }
        this.workUnitClient.reportWorkItemStatus(buildStatus(workItem, true, counterSet, collection, dataflowWorkerHarnessOptions, null, null, operationResponse, list, j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v47, types: [java.util.List] */
    /* JADX WARN: Type inference failed for: r0v50, types: [java.util.List] */
    public static WorkItemStatus buildStatus(WorkItem workItem, boolean z, @Nullable CounterSet counterSet, @Nullable Collection<Metric<?>> collection, DataflowWorkerHarnessOptions dataflowWorkerHarnessOptions, @Nullable Reader.Progress progress, @Nullable Reader.DynamicSplitResult dynamicSplitResult, @Nullable SourceFormat.OperationResponse operationResponse, @Nullable List<Status> list, long j) {
        ArrayList arrayList;
        WorkItemStatus workItemStatus = new WorkItemStatus();
        workItemStatus.setWorkItemId(Long.toString(workItem.getId().longValue()));
        workItemStatus.setCompleted(Boolean.valueOf(z));
        workItemStatus.setReportIndex(Long.valueOf(j));
        ArrayList arrayList2 = null;
        ArrayList arrayList3 = null;
        if (counterSet != null) {
            arrayList2 = CloudCounterUtils.extractCounters(counterSet, false);
        }
        if (collection != null) {
            arrayList3 = CloudMetricUtils.extractCloudMetrics(collection, dataflowWorkerHarnessOptions.getWorkerId());
        }
        if (arrayList2 == null) {
            arrayList = arrayList3;
        } else if (collection == null) {
            arrayList = arrayList2;
        } else {
            arrayList = new ArrayList();
            arrayList.addAll(arrayList2);
            arrayList.addAll(arrayList3);
        }
        workItemStatus.setMetricUpdates(arrayList);
        if (list != null) {
            workItemStatus.setErrors(list);
        }
        if (progress != null) {
            workItemStatus.setProgress(SourceTranslationUtils.readerProgressToCloudProgress(progress));
        }
        if (dynamicSplitResult instanceof Reader.DynamicSplitResultWithPosition) {
            workItemStatus.setStopPosition(SourceTranslationUtils.toCloudPosition(((Reader.DynamicSplitResultWithPosition) dynamicSplitResult).getAcceptedPosition()));
        } else if (dynamicSplitResult instanceof BasicSerializableSourceFormat.BoundedSourceSplit) {
            workItemStatus.setDynamicSourceSplit(BasicSerializableSourceFormat.toSourceSplit((BasicSerializableSourceFormat.BoundedSourceSplit) dynamicSplitResult, dataflowWorkerHarnessOptions));
        } else if (dynamicSplitResult != null) {
            String valueOf = String.valueOf(dynamicSplitResult);
            throw new IllegalArgumentException(new StringBuilder(41 + String.valueOf(valueOf).length()).append("Unexpected type of dynamic split result: ").append(valueOf).toString());
        }
        if (workItem.getSourceOperationTask() != null) {
            workItemStatus.setSourceOperationResponse(SourceTranslationUtils.sourceOperationResponseToCloudSourceOperationResponse(operationResponse));
        }
        return workItemStatus;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String uniqueId(WorkItem workItem) {
        String valueOf = String.valueOf(workItem.getProjectId());
        String valueOf2 = String.valueOf(workItem.getJobId());
        String valueOf3 = String.valueOf(workItem.getId());
        return new StringBuilder(2 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length() + String.valueOf(valueOf3).length()).append(valueOf).append(";").append(valueOf2).append(";").append(valueOf3).toString();
    }

    public void runStatusServer(int i) {
        this.statusServer = new Server(i);
        this.statusServer.setHandler(new StatusHandler());
        try {
            this.statusServer.start();
            LOG.info("Status server started on port {}", Integer.valueOf(i));
        } catch (Exception e) {
            LOG.warn("Status server failed to start: ", e);
        }
    }
}
