/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Heartbeater;
import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;

public class TezJobMonitor {
    private static final Log LOG = LogFactory.getLog((String)TezJobMonitor.class.getName());
    private static final String CLASS_NAME = TezJobMonitor.class.getName();
    private transient SessionState.LogHelper console;
    private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
    private final int checkInterval = 200;
    private final int maxRetryInterval = 2500;
    private final int printInterval = 3000;
    private long lastPrintTime;
    private Set<String> completed;
    private static final List<DAGClient> shutdownList = Collections.synchronizedList(new LinkedList());

    public TezJobMonitor() {
        this.console = new SessionState.LogHelper(LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public int monitorExecution(DAGClient dagClient, HiveTxnManager txnMgr, HiveConf conf) throws InterruptedException {
        DAGStatus status = null;
        this.completed = new HashSet<String>();
        boolean running = false;
        boolean done = false;
        int failedCounter = 0;
        int rc = 0;
        DAGStatus.State lastState = null;
        String lastReport = null;
        HashSet opts = new HashSet();
        Heartbeater heartbeater = new Heartbeater(txnMgr, (Configuration)conf);
        shutdownList.add(dagClient);
        this.console.printInfo("\n");
        this.perfLogger.PerfLogBegin(CLASS_NAME, "TezRunDag");
        this.perfLogger.PerfLogBegin(CLASS_NAME, "TezSubmitToRunningDag");
        while (true) {
            try {
                while (true) {
                    status = dagClient.getDAGStatus(opts);
                    Map progressMap = status.getVertexProgress();
                    DAGStatus.State state = status.getState();
                    heartbeater.heartbeat();
                    if (state != lastState || state == DAGStatus.State.RUNNING) {
                        lastState = state;
                        switch (state) {
                            case SUBMITTED: {
                                this.console.printInfo("Status: Submitted");
                                break;
                            }
                            case INITING: {
                                this.console.printInfo("Status: Initializing");
                                break;
                            }
                            case RUNNING: {
                                if (!running) {
                                    this.perfLogger.PerfLogEnd(CLASS_NAME, "TezSubmitToRunningDag");
                                    this.console.printInfo("Status: Running (application id: " + dagClient.getApplicationId() + ")\n");
                                    for (String s : progressMap.keySet()) {
                                        this.perfLogger.PerfLogBegin(CLASS_NAME, "TezRunVertex." + s);
                                    }
                                    running = true;
                                }
                                lastReport = this.printStatus(progressMap, lastReport, this.console);
                                break;
                            }
                            case SUCCEEDED: {
                                lastReport = this.printStatus(progressMap, lastReport, this.console);
                                this.console.printInfo("Status: Finished successfully");
                                running = false;
                                done = true;
                                break;
                            }
                            case KILLED: {
                                this.console.printInfo("Status: Killed");
                                running = false;
                                done = true;
                                rc = 1;
                                break;
                            }
                            case FAILED: 
                            case ERROR: {
                                this.console.printError("Status: Failed");
                                running = false;
                                done = true;
                                rc = 2;
                                break;
                            }
                        }
                    }
                    if (done) continue;
                    Thread.sleep(200L);
                }
            }
            catch (Exception e) {
                this.console.printInfo("Exception: " + e.getMessage());
                if (++failedCounter % 2500 / 200 != 0 && !(e instanceof InterruptedException)) {
                    this.console.printInfo("Retrying...");
                }
                try {
                    this.console.printInfo("Killing DAG...");
                    dagClient.tryKillDAG();
                }
                catch (IOException io) {
                }
                catch (TezException te) {
                    // empty catch block
                }
                e.printStackTrace();
                this.console.printError("Execution has failed.");
                rc = 1;
                done = true;
            }
            finally {
                if (!done) continue;
                if (rc != 0 && status != null) {
                    for (String diag : status.getDiagnostics()) {
                        this.console.printError(diag);
                    }
                }
                shutdownList.remove(dagClient);
            }
            break;
        }
        this.perfLogger.PerfLogEnd(CLASS_NAME, "TezRunDag");
        return rc;
    }

    private String printStatus(Map<String, Progress> progressMap, String lastReport, SessionState.LogHelper console) {
        StringBuffer reportBuffer = new StringBuffer();
        TreeSet<String> keys = new TreeSet<String>(progressMap.keySet());
        for (String s : keys) {
            Progress progress = progressMap.get(s);
            int complete = progress.getSucceededTaskCount();
            int total = progress.getTotalTaskCount();
            if (total <= 0) {
                reportBuffer.append(String.format("%s: -/-\t", s, complete, total));
                continue;
            }
            if (complete == total && !this.completed.contains(s)) {
                this.completed.add(s);
                this.perfLogger.PerfLogEnd(CLASS_NAME, "TezRunVertex." + s);
            }
            reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total));
        }
        String report = reportBuffer.toString();
        if (!report.equals(lastReport) || System.currentTimeMillis() >= this.lastPrintTime + 3000L) {
            console.printInfo(report);
            this.lastPrintTime = System.currentTimeMillis();
        }
        return report;
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                for (DAGClient c : shutdownList) {
                    try {
                        System.err.println("Trying to shutdown DAG");
                        c.tryKillDAG();
                    }
                    catch (Exception exception) {}
                }
                try {
                    for (TezSessionState s : TezSessionState.getOpenSessions()) {
                        System.err.println("Shutting down tez session.");
                        s.close(false);
                    }
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        });
    }
}

