package org.apache.hadoop.mapreduce.v2.app.rm.preemption;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.event.EventHandler;

/* loaded from: input_file:org/apache/hadoop/mapreduce/v2/app/rm/preemption/CheckpointAMPreemptionPolicy.class */
public class CheckpointAMPreemptionPolicy implements AMPreemptionPolicy {
    private final Set<TaskAttemptId> toBePreempted;
    private final Set<TaskAttemptId> countedPreemptions;
    private final Map<TaskId, TaskCheckpointID> checkpoints;
    private final Map<TaskAttemptId, Resource> pendingFlexiblePreemptions;
    private EventHandler eventHandler;
    static final Log LOG = LogFactory.getLog(CheckpointAMPreemptionPolicy.class);

    public CheckpointAMPreemptionPolicy() {
        this(Collections.synchronizedSet(new HashSet()), Collections.synchronizedSet(new HashSet()), Collections.synchronizedMap(new HashMap()), Collections.synchronizedMap(new HashMap()));
    }

    CheckpointAMPreemptionPolicy(Set<TaskAttemptId> set, Set<TaskAttemptId> set2, Map<TaskId, TaskCheckpointID> map, Map<TaskAttemptId, Resource> map2) {
        this.toBePreempted = set;
        this.countedPreemptions = set2;
        this.checkpoints = map;
        this.pendingFlexiblePreemptions = map2;
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void init(AppContext appContext) {
        this.eventHandler = appContext.getEventHandler();
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void preempt(AMPreemptionPolicy.Context context, PreemptionMessage preemptionMessage) {
        if (preemptionMessage != null) {
            StrictPreemptionContract strictContract = preemptionMessage.getStrictContract();
            if (strictContract != null && strictContract.getContainers() != null && strictContract.getContainers().size() > 0) {
                LOG.info("strict preemption :" + preemptionMessage.getStrictContract().getContainers().size() + " containers to kill");
                Iterator it = preemptionMessage.getStrictContract().getContainers().iterator();
                while (it.hasNext()) {
                    ContainerId id = ((PreemptionContainer) it.next()).getId();
                    TaskAttemptId taskAttempt = context.getTaskAttempt(id);
                    if (taskAttempt != null) {
                        if (TaskType.REDUCE.equals(taskAttempt.getTaskId().getTaskType())) {
                            this.toBePreempted.add(taskAttempt);
                            LOG.info("preempting " + id + " running task:" + taskAttempt);
                        } else {
                            LOG.info("NOT preempting " + id + " running task:" + taskAttempt);
                        }
                    }
                }
            }
            PreemptionContract contract = preemptionMessage.getContract();
            if (contract == null || contract.getResourceRequest() == null || contract.getResourceRequest().size() <= 0 || contract.getContainers() == null || contract.getContainers().size() <= 0) {
                return;
            }
            LOG.info("negotiable preemption :" + preemptionMessage.getContract().getResourceRequest().size() + " resourceReq, " + preemptionMessage.getContract().getContainers().size() + " containers");
            List resourceRequest = preemptionMessage.getContract().getResourceRequest();
            int i = 0;
            int i2 = 0;
            for (Resource resource : this.pendingFlexiblePreemptions.values()) {
                i += resource.getMemory();
                i2 += resource.getVirtualCores();
            }
            Iterator it2 = resourceRequest.iterator();
            while (it2.hasNext()) {
                ResourceRequest resourceRequest2 = ((PreemptionResourceRequest) it2.next()).getResourceRequest();
                if ("*".equals(resourceRequest2.getResourceName())) {
                    LOG.info("ResourceRequest:" + resourceRequest2);
                    int numContainers = resourceRequest2.getNumContainers();
                    int memory = numContainers * resourceRequest2.getCapability().getMemory();
                    int virtualCores = numContainers * resourceRequest2.getCapability().getVirtualCores();
                    if (i > 0) {
                        memory -= i;
                        i -= memory;
                    }
                    if (i2 > 0) {
                        virtualCores -= i2;
                        i2 -= virtualCores;
                    }
                    List<Container> containers = context.getContainers(TaskType.REDUCE);
                    Collections.sort(containers, new Comparator<Container>() { // from class: org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy.1
                        @Override // java.util.Comparator
                        public int compare(Container container, Container container2) {
                            return container2.getId().compareTo(container.getId());
                        }
                    });
                    for (Container container : containers) {
                        if (memory > 0 || virtualCores > 0) {
                            TaskAttemptId taskAttempt2 = context.getTaskAttempt(container.getId());
                            int memory2 = container.getResource().getMemory();
                            int virtualCores2 = container.getResource().getVirtualCores();
                            if (!this.toBePreempted.contains(taskAttempt2)) {
                                memory -= memory2;
                                virtualCores -= virtualCores2;
                                this.toBePreempted.add(taskAttempt2);
                                this.pendingFlexiblePreemptions.put(taskAttempt2, container.getResource());
                            }
                            LOG.info("ResourceRequest:" + resourceRequest2 + " satisfied preempting " + taskAttempt2);
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void handleFailedContainer(TaskAttemptId taskAttemptId) {
        this.toBePreempted.remove(taskAttemptId);
        this.checkpoints.remove(taskAttemptId.getTaskId());
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void handleCompletedContainer(TaskAttemptId taskAttemptId) {
        LOG.info(" task completed:" + taskAttemptId);
        this.toBePreempted.remove(taskAttemptId);
        this.pendingFlexiblePreemptions.remove(taskAttemptId);
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public boolean isPreempted(TaskAttemptId taskAttemptId) {
        if (!this.toBePreempted.contains(taskAttemptId)) {
            return false;
        }
        updatePreemptionCounters(taskAttemptId);
        return true;
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void reportSuccessfulPreemption(TaskAttemptId taskAttemptId) {
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public TaskCheckpointID getCheckpointID(TaskId taskId) {
        return this.checkpoints.get(taskId);
    }

    @Override // org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy
    public void setCheckpointID(TaskId taskId, TaskCheckpointID taskCheckpointID) {
        this.checkpoints.put(taskId, taskCheckpointID);
        if (taskCheckpointID != null) {
            updateCheckpointCounters(taskId, taskCheckpointID);
        }
    }

    private void updateCheckpointCounters(TaskId taskId, TaskCheckpointID taskCheckpointID) {
        JobCounterUpdateEvent jobCounterUpdateEvent = new JobCounterUpdateEvent(taskId.getJobId());
        jobCounterUpdateEvent.addCounterUpdate(JobCounter.CHECKPOINTS, 1L);
        this.eventHandler.handle(jobCounterUpdateEvent);
        JobCounterUpdateEvent jobCounterUpdateEvent2 = new JobCounterUpdateEvent(taskId.getJobId());
        jobCounterUpdateEvent2.addCounterUpdate(JobCounter.CHECKPOINT_BYTES, taskCheckpointID.getCheckpointBytes());
        this.eventHandler.handle(jobCounterUpdateEvent2);
        JobCounterUpdateEvent jobCounterUpdateEvent3 = new JobCounterUpdateEvent(taskId.getJobId());
        jobCounterUpdateEvent3.addCounterUpdate(JobCounter.CHECKPOINT_TIME, taskCheckpointID.getCheckpointTime());
        this.eventHandler.handle(jobCounterUpdateEvent3);
    }

    private void updatePreemptionCounters(TaskAttemptId taskAttemptId) {
        if (this.countedPreemptions.contains(taskAttemptId)) {
            return;
        }
        this.countedPreemptions.add(taskAttemptId);
        JobCounterUpdateEvent jobCounterUpdateEvent = new JobCounterUpdateEvent(taskAttemptId.getTaskId().getJobId());
        jobCounterUpdateEvent.addCounterUpdate(JobCounter.TASKS_REQ_PREEMPT, 1L);
        this.eventHandler.handle(jobCounterUpdateEvent);
    }
}
