/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.InstructionOutput;
import com.google.api.services.dataflow.model.MapTask;
import com.google.api.services.dataflow.model.ParallelInstruction;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowWorkerHarnessOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.dataflow.BasicSerializableSourceFormat;
import com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness;
import com.google.cloud.dataflow.sdk.runners.worker.MapTaskExecutorFactory;
import com.google.cloud.dataflow.sdk.runners.worker.MetricTrackingWindmillServerStub;
import com.google.cloud.dataflow.sdk.runners.worker.StateFetcher;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.UserCodeTimeTracker;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillStateReader;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingInitializer;
import com.google.cloud.dataflow.sdk.runners.worker.logging.DataflowWorkerLoggingMDC;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.WindmillServerStub;
import com.google.cloud.dataflow.sdk.util.BoundedQueueExecutor;
import com.google.cloud.dataflow.sdk.util.Serializer;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputObjectAndByteCounter;
import com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingDataflowWorker {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class);
    static final int MAX_PROCESSING_THREADS = 300;
    static final long THREAD_EXPIRATION_TIME_SEC = 60L;
    static final int MAX_WORK_UNITS_QUEUED = 100;
    static final long MAX_COMMIT_BYTES = 0x2000000L;
    static final int DEFAULT_STATUS_PORT = 8081;
    static final double PUSHBACK_THRESHOLD_RATIO = 0.9;
    static final String DEFAULT_WINDMILL_SERVER_CLASS_NAME = "com.google.cloud.dataflow.sdk.runners.worker.windmill.WindmillServer";
    private final ConcurrentMap<String, MapTask> instructionMap;
    private final ConcurrentMap<String, ConcurrentLinkedQueue<Windmill.WorkItemCommitRequest>> outputMap;
    private final ConcurrentMap<String, ConcurrentLinkedQueue<WorkerAndContext>> mapTaskExecutors;
    private final ConcurrentMap<String, ActiveWorkForComputation> activeWorkMap;
    private final ConcurrentMap<String, ConcurrentMap<ByteString, ReaderCacheEntry>> readerCache;
    private ConcurrentMap<Long, Runnable> commitCallbacks;
    private ConcurrentMap<String, String> stateNameMap;
    private ConcurrentMap<String, String> systemNameToComputationIdMap;
    private ThreadFactory threadFactory;
    private BoundedQueueExecutor workUnitExecutor;
    private ExecutorService commitExecutor;
    private WindmillServerStub windmillServer;
    private Thread dispatchThread;
    private AtomicBoolean running;
    private StateFetcher stateFetcher;
    private DataflowWorkerHarnessOptions options;
    private long clientId;
    private Server statusServer;
    private final AtomicReference<Throwable> lastException;
    private final MetricTrackingWindmillServerStub metricTrackingWindmillServer;
    private Timer globalCountersUpdatesTimer;
    private final UserCodeTimeTracker userCodeTimeTracker = new UserCodeTimeTracker();
    private final AtomicInteger nextStateSamplerId = new AtomicInteger();
    private static long lastPushbackLog = 0L;

    public static boolean isKeyTokenInvalidException(Throwable t) {
        while (t != null) {
            if (t instanceof KeyTokenInvalidException) {
                return true;
            }
            t = t.getCause();
        }
        return false;
    }

    static MapTask parseMapTask(String input) throws IOException {
        return (MapTask)Transport.getJsonFactory().fromString(input, MapTask.class);
    }

    public static void main(String[] args) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(DataflowWorkerHarness.WorkerUncaughtExceptionHandler.INSTANCE);
        DataflowWorkerLoggingInitializer.initialize();
        DataflowWorkerHarnessOptions options = PipelineOptionsFactory.createFromSystemPropertiesInternal();
        options.setAppName("StreamingWorkerHarness");
        options.setStreaming(true);
        DataflowWorkerLoggingInitializer.configure(options);
        String hostport = System.getProperty("windmill.hostport");
        if (hostport == null) {
            throw new Exception("-Dwindmill.hostport must be set to the location of the windmill server");
        }
        int statusPort = 8081;
        if (System.getProperties().containsKey("status_port")) {
            statusPort = Integer.parseInt(System.getProperty("status_port"));
        }
        String windmillServerClassName = DEFAULT_WINDMILL_SERVER_CLASS_NAME;
        if (System.getProperties().containsKey("windmill.serverclassname")) {
            windmillServerClassName = System.getProperty("windmill.serverclassname");
        }
        ArrayList<MapTask> mapTasks = new ArrayList<MapTask>();
        for (String arg : args) {
            mapTasks.add(StreamingDataflowWorker.parseMapTask(arg));
        }
        WindmillServerStub windmillServer = (WindmillServerStub)Class.forName(windmillServerClassName).getDeclaredConstructor(String.class).newInstance(hostport);
        StreamingDataflowWorker worker = new StreamingDataflowWorker(mapTasks, windmillServer, options);
        worker.start();
        worker.runStatusServer(statusPort);
    }

    public StreamingDataflowWorker(List<MapTask> mapTasks, WindmillServerStub server, DataflowWorkerHarnessOptions options) {
        this.options = options;
        this.instructionMap = new ConcurrentHashMap<String, MapTask>();
        this.outputMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<Windmill.WorkItemCommitRequest>>();
        this.mapTaskExecutors = new ConcurrentHashMap<String, ConcurrentLinkedQueue<WorkerAndContext>>();
        this.activeWorkMap = new ConcurrentHashMap<String, ActiveWorkForComputation>();
        this.readerCache = new ConcurrentHashMap<String, ConcurrentMap<ByteString, ReaderCacheEntry>>();
        this.commitCallbacks = new ConcurrentHashMap<Long, Runnable>();
        this.stateNameMap = new ConcurrentHashMap<String, String>();
        this.systemNameToComputationIdMap = new ConcurrentHashMap<String, String>();
        this.threadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                return t;
            }
        };
        this.workUnitExecutor = new BoundedQueueExecutor(StreamingDataflowWorker.chooseMaximumNumberOfThreads(options), 60L, TimeUnit.SECONDS, 100, this.threadFactory);
        this.commitExecutor = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2), new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setPriority(10);
                t.setName("CommitThread");
                return t;
            }
        }, new ThreadPoolExecutor.DiscardPolicy());
        this.windmillServer = server;
        this.metricTrackingWindmillServer = new MetricTrackingWindmillServerStub(server);
        this.running = new AtomicBoolean();
        this.stateFetcher = new StateFetcher(this.metricTrackingWindmillServer);
        this.clientId = new Random().nextLong();
        this.lastException = new AtomicReference();
        for (MapTask mapTask : mapTasks) {
            this.addComputation(mapTask);
        }
        DataflowWorkerLoggingMDC.setJobId(options.getJobId());
        DataflowWorkerLoggingMDC.setWorkerId(options.getWorkerId());
    }

    private static int chooseMaximumNumberOfThreads(DataflowWorkerHarnessOptions pipelineOptions) {
        if (pipelineOptions.getNumberOfWorkerHarnessThreads() != 0) {
            return pipelineOptions.getNumberOfWorkerHarnessThreads();
        }
        return 300;
    }

    void addStateNameMappings(Map<String, String> nameMap) {
        this.stateNameMap.putAll(nameMap);
    }

    public void start() {
        this.running.set(true);
        this.dispatchThread = this.threadFactory.newThread(new Runnable(){

            @Override
            public void run() {
                StreamingDataflowWorker.this.dispatchLoop();
            }
        });
        this.dispatchThread.setPriority(1);
        this.dispatchThread.setName("DispatchThread");
        this.dispatchThread.start();
        this.globalCountersUpdatesTimer = new Timer("GlobalCountersUpdates");
        this.globalCountersUpdatesTimer.schedule(new TimerTask(){

            @Override
            public void run() {
                StreamingDataflowWorker.this.reportPeriodicStats();
            }
        }, 1000L, 1000L);
        this.reportHarnessStartup();
    }

    public void stop() {
        try {
            if (this.globalCountersUpdatesTimer != null) {
                this.globalCountersUpdatesTimer.cancel();
            }
            if (this.statusServer != null) {
                this.statusServer.stop();
            }
            this.running.set(false);
            this.dispatchThread.join();
            this.workUnitExecutor.shutdown();
            if (!this.workUnitExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                throw new RuntimeException("Work executor did not terminate within 5 minutes");
            }
            for (ConcurrentLinkedQueue queue : this.mapTaskExecutors.values()) {
                WorkerAndContext workerAndContext;
                while ((workerAndContext = (WorkerAndContext)queue.poll()) != null) {
                    workerAndContext.getWorker().close();
                }
            }
            this.commitExecutor.shutdown();
            if (!this.commitExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                throw new RuntimeException("Commit executor did not terminate within 5 minutes");
            }
        }
        catch (Exception e) {
            LOG.warn("Exception while shutting down: ", (Throwable)e);
        }
    }

    public void runStatusServer(int statusPort) {
        this.statusServer = new Server(statusPort);
        this.statusServer.setHandler((Handler)new StatusHandler());
        try {
            this.statusServer.start();
            LOG.info("Status server started on port {}", (Object)statusPort);
            this.statusServer.join();
        }
        catch (Exception e) {
            LOG.warn("Status server failed to start: ", (Throwable)e);
        }
    }

    private void addComputation(MapTask mapTask) {
        String computationId;
        String string = computationId = this.systemNameToComputationIdMap.containsKey(mapTask.getSystemName()) ? (String)this.systemNameToComputationIdMap.get(mapTask.getSystemName()) : mapTask.getSystemName();
        if (!this.instructionMap.containsKey(computationId)) {
            LOG.info("Adding config for {}: {}", (Object)computationId, (Object)mapTask);
            this.outputMap.put(computationId, new ConcurrentLinkedQueue());
            this.instructionMap.put(computationId, mapTask);
            this.mapTaskExecutors.put(computationId, new ConcurrentLinkedQueue());
            this.activeWorkMap.put(computationId, new ActiveWorkForComputation(this.workUnitExecutor));
            this.readerCache.put(computationId, new ConcurrentHashMap());
        }
    }

    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected static boolean inPushback(Runtime rt) {
        long maxMemory;
        long currentMemorySize = rt.totalMemory();
        long memoryUsed = currentMemorySize - rt.freeMemory();
        if ((double)memoryUsed <= (double)(maxMemory = rt.maxMemory()) * 0.9) {
            return false;
        }
        if (lastPushbackLog < System.currentTimeMillis() - 60000L) {
            LOG.warn("In pushback, not accepting new work. Using {}MB / {}MB ({}MB currently used by JVM)", new Object[]{memoryUsed >> 20, maxMemory >> 20, currentMemorySize >> 20});
            lastPushbackLog = System.currentTimeMillis();
        }
        return true;
    }

    private void dispatchLoop() {
        LOG.info("Dispatch starting");
        Runtime rt = Runtime.getRuntime();
        while (this.running.get()) {
            Windmill.GetWorkResponse workResponse;
            if (StreamingDataflowWorker.inPushback(rt)) {
                System.gc();
                while (StreamingDataflowWorker.inPushback(rt)) {
                    StreamingDataflowWorker.sleep(10);
                }
            }
            int backoff = 1;
            while ((workResponse = this.getWork()).getWorkCount() <= 0) {
                StreamingDataflowWorker.sleep(backoff);
                backoff = Math.min(1000, backoff * 2);
                if (this.running.get()) continue;
            }
            for (Windmill.ComputationWorkItems computationWork : workResponse.getWorkList()) {
                MapTask mapTask;
                final String computation = computationWork.getComputationId();
                if (!this.instructionMap.containsKey(computation)) {
                    this.getConfig(computation);
                }
                if ((mapTask = (MapTask)this.instructionMap.get(computation)) == null) {
                    LOG.warn("Received work for unknown computation: {}. Known computations are {}", (Object)computation, this.instructionMap.keySet());
                    continue;
                }
                long watermarkMicros = computationWork.getInputDataWatermark();
                final Instant inputDataWatermark = new Instant(watermarkMicros / 1000L);
                ActiveWorkForComputation activeWork = (ActiveWorkForComputation)this.activeWorkMap.get(computation);
                for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
                    Work work = new Work(workItem.getWorkToken()){

                        @Override
                        public void run() {
                            StreamingDataflowWorker.this.process(computation, mapTask, inputDataWatermark, workItem);
                        }
                    };
                    if (!activeWork.activateWork(workItem.getKey(), work)) continue;
                    this.workUnitExecutor.execute(work);
                }
            }
        }
        LOG.info("Dispatch done");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process(final String computation, final MapTask mapTask, final Instant inputDataWatermark, final Windmill.WorkItem work) {
        LOG.debug("Starting processing for {}:\n{}", (Object)computation, (Object)work);
        Windmill.WorkItemCommitRequest.Builder outputBuilder = Windmill.WorkItemCommitRequest.newBuilder().setKey(work.getKey()).setWorkToken(work.getWorkToken());
        StreamingModeExecutionContext context = null;
        MapTaskExecutor worker = null;
        try {
            String string = String.valueOf(work.getKey().toStringUtf8());
            String string2 = String.valueOf(Long.toString(work.getWorkToken()));
            DataflowWorkerLoggingMDC.setWorkId(new StringBuilder(1 + String.valueOf(string).length() + String.valueOf(string2).length()).append(string).append("-").append(string2).toString());
            DataflowWorkerLoggingMDC.setStageName(computation);
            WorkerAndContext workerAndContext = (WorkerAndContext)((ConcurrentLinkedQueue)this.mapTaskExecutors.get(computation)).poll();
            if (workerAndContext == null) {
                CounterSet counters = new CounterSet(new Counter[0]);
                context = new StreamingModeExecutionContext(mapTask.getSystemName(), (ConcurrentMap)this.readerCache.get(computation), this.stateNameMap);
                StateSampler sampler = new StateSampler(String.valueOf(mapTask.getStageName()).concat("-"), counters.getAddCounterMutator());
                int stateSamplerId = this.nextStateSamplerId.incrementAndGet();
                sampler.addSamplingCallback(new UserCodeTimeTracker.StateSamplerCallback(this.userCodeTimeTracker, stateSamplerId));
                this.userCodeTimeTracker.workStarted(sampler.getPrefix(), stateSamplerId, counters.getAddCounterMutator());
                worker = MapTaskExecutorFactory.create(this.options, mapTask, context, counters, sampler);
                ReadOperation readOperation = worker.getReadOperation();
                readOperation.setProgressUpdatePeriodMs(0L);
                Preconditions.checkState(worker.supportsRestart(), "Streaming runner requires all operations support restart.");
                ParallelInstruction read = (ParallelInstruction)mapTask.getInstructions().get(0);
                if (BasicSerializableSourceFormat.class.getName().equals(read.getRead().getSource().getSpec().get("@type"))) {
                    String string3 = String.valueOf(mapTask.getSystemName());
                    readOperation.receivers[0].addOutputCounter(new OutputObjectAndByteCounter(new MapTaskExecutorFactory.ElementByteSizeObservableCoder(Serializer.deserialize(((InstructionOutput)read.getOutputs().get(0)).getCodec(), Coder.class)), worker.getOutputCounters().getAddCounterMutator()).setSamplingPeriod(100).countBytes(string3.length() != 0 ? "dataflow_input_size-".concat(string3) : new String("dataflow_input_size-")));
                }
            } else {
                worker = workerAndContext.getWorker();
                context = workerAndContext.getContext();
            }
            WindmillStateReader stateReader = new WindmillStateReader(this.metricTrackingWindmillServer, computation, work.getKey(), work.getWorkToken());
            StateFetcher localStateFetcher = this.stateFetcher.byteTrackingView();
            context.start(work, inputDataWatermark, stateReader, localStateFetcher, outputBuilder);
            for (Long callbackId : context.getReadyCommitCallbackIds()) {
                final Runnable callback = (Runnable)this.commitCallbacks.remove(callbackId);
                if (callback == null) continue;
                this.workUnitExecutor.forceExecute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            callback.run();
                        }
                        catch (Throwable t) {
                            LOG.error("Source checkpoint finalization failed:", t);
                        }
                    }
                });
            }
            worker.execute();
            this.commitCallbacks.putAll(context.flushState());
            long shuffleBytesRead = 0L;
            for (Windmill.InputMessageBundle bundle : work.getMessageBundlesList()) {
                for (Windmill.Message message : bundle.getMessagesList()) {
                    shuffleBytesRead += (long)message.getSerializedSize();
                }
            }
            long stateBytesRead = stateReader.getBytesRead() + localStateFetcher.getBytesRead();
            long stateBytesWritten = Windmill.WorkItemCommitRequest.newBuilder((Windmill.WorkItemCommitRequest)outputBuilder.build()).clearOutputMessages().build().getSerializedSize();
            CounterSet counters = worker.getOutputCounters();
            counters.getAddCounterMutator().addCounter(Counter.longs("WindmillShuffleBytesRead", Counter.AggregationKind.SUM)).addValue(shuffleBytesRead);
            counters.getAddCounterMutator().addCounter(Counter.longs("WindmillStateBytesRead", Counter.AggregationKind.SUM)).addValue(stateBytesRead);
            counters.getAddCounterMutator().addCounter(Counter.longs("WindmillStateBytesWritten", Counter.AggregationKind.SUM)).addValue(stateBytesWritten);
            this.buildCounters(counters, outputBuilder);
            ((ConcurrentLinkedQueue)this.mapTaskExecutors.get(computation)).offer(new WorkerAndContext(worker, context));
            worker = null;
            context = null;
            Windmill.WorkItemCommitRequest output = outputBuilder.build();
            ((ConcurrentLinkedQueue)this.outputMap.get(computation)).add(output);
            this.scheduleCommit();
            LOG.debug("Processing done for work token: {}", (Object)work.getWorkToken());
        }
        catch (Throwable t) {
            if (worker != null) {
                try {
                    worker.close();
                }
                catch (Exception e) {
                    LOG.warn("Failed to close worker: ", (Throwable)e);
                }
            }
            Throwable throwable = t = t instanceof UserCodeException ? t.getCause() : t;
            if (StreamingDataflowWorker.isKeyTokenInvalidException(t)) {
                LOG.debug("Execution of work for {} for key {} failed due to token expiration, will not retry locally.", (Object)computation, (Object)work.getKey().toStringUtf8());
                ((ActiveWorkForComputation)this.activeWorkMap.get(computation)).completeWork(work.getKey());
            } else {
                LOG.error("Execution of work for {} for key {} failed, retrying.", (Object)computation, (Object)work.getKey().toStringUtf8());
                LOG.error("\nError: ", t);
                this.lastException.set(t);
                LOG.debug("Failed work: {}", (Object)work);
                if (this.reportFailure(computation, work, t)) {
                    StreamingDataflowWorker.sleep(10000);
                    this.workUnitExecutor.forceExecute(new Runnable(){

                        @Override
                        public void run() {
                            StreamingDataflowWorker.this.process(computation, mapTask, inputDataWatermark, work);
                        }
                    });
                } else {
                    LOG.debug("Aborting processing due to exception reporting failure");
                    ((ActiveWorkForComputation)this.activeWorkMap.get(computation)).completeWork(work.getKey());
                }
            }
        }
        finally {
            DataflowWorkerLoggingMDC.setWorkId(null);
            DataflowWorkerLoggingMDC.setStageName(null);
        }
    }

    private void scheduleCommit() {
        this.commitExecutor.execute(new Commit());
    }

    private Windmill.GetWorkResponse getWork() {
        return this.windmillServer.getWork(Windmill.GetWorkRequest.newBuilder().setClientId(this.clientId).setMaxItems(100L).build());
    }

    private void commitWork(Windmill.CommitWorkRequest request) {
        this.windmillServer.commitWork(request);
    }

    private void getConfig(String computation) {
        Windmill.GetConfigRequest request = Windmill.GetConfigRequest.newBuilder().addComputations(computation).build();
        Windmill.GetConfigResponse response = this.windmillServer.getConfig(request);
        for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry : response.getSystemNameToComputationIdMapList()) {
            this.systemNameToComputationIdMap.put(entry.getSystemName(), entry.getComputationId());
        }
        for (String serializedMapTask : response.getCloudWorksList()) {
            try {
                this.addComputation(StreamingDataflowWorker.parseMapTask(serializedMapTask));
            }
            catch (IOException e) {
                LOG.warn("Parsing MapTask failed: {}", (Object)serializedMapTask);
                LOG.warn("Error: ", (Throwable)e);
            }
        }
        for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry : response.getNameMapList()) {
            this.stateNameMap.put(entry.getUserName(), entry.getSystemName());
        }
    }

    private void buildCounters(CounterSet counterSet, Windmill.WorkItemCommitRequest.Builder builder) {
        block6: for (Counter<?> counter : counterSet) {
            Windmill.Counter.Kind kind;
            Windmill.Counter.Builder counterBuilder = Windmill.Counter.newBuilder();
            Object aggregateObj = null;
            switch (counter.getKind()) {
                case SUM: {
                    kind = Windmill.Counter.Kind.SUM;
                    break;
                }
                case MAX: {
                    kind = Windmill.Counter.Kind.MAX;
                    break;
                }
                case MIN: {
                    kind = Windmill.Counter.Kind.MIN;
                    break;
                }
                case MEAN: {
                    kind = Windmill.Counter.Kind.MEAN;
                    Counter.CounterMean<?> mean = counter.getAndResetMeanDelta();
                    long count = mean.getCount();
                    aggregateObj = mean.getAggregate();
                    if (count <= 0L) continue block6;
                    counterBuilder.setMeanCount(count);
                    break;
                }
                default: {
                    LOG.debug("Unhandled counter type: {}", (Object)counter.getKind());
                    continue block6;
                }
            }
            if (counter.getKind() != Counter.AggregationKind.MEAN) {
                aggregateObj = counter.getAndResetDelta();
            }
            if (!this.addKnownTypeToCounterBuilder(aggregateObj, counterBuilder)) continue;
            counterBuilder.setName(counter.getName()).setKind(kind);
            builder.addCounterUpdates(counterBuilder);
        }
    }

    private boolean addKnownTypeToCounterBuilder(Object aggregateObj, Windmill.Counter.Builder counterBuilder) {
        if (aggregateObj instanceof Double) {
            double aggregate = (Double)aggregateObj;
            if (aggregate != 0.0) {
                counterBuilder.setDoubleScalar(aggregate);
            }
        } else if (aggregateObj instanceof Long) {
            long aggregate = (Long)aggregateObj;
            if (aggregate != 0L) {
                counterBuilder.setIntScalar(aggregate);
            }
        } else if (aggregateObj instanceof Integer) {
            long aggregate = ((Integer)aggregateObj).longValue();
            if (aggregate != 0L) {
                counterBuilder.setIntScalar(aggregate);
            }
        } else {
            LOG.debug("Unhandled aggregate class: {}", aggregateObj.getClass());
            return false;
        }
        return true;
    }

    private Windmill.Exception buildExceptionReport(Throwable t) {
        Windmill.Exception.Builder builder = Windmill.Exception.newBuilder();
        builder.addStackFrames(t.toString());
        for (StackTraceElement frame : t.getStackTrace()) {
            builder.addStackFrames(frame.toString());
        }
        if (t.getCause() != null) {
            builder.setCause(this.buildExceptionReport(t.getCause()));
        }
        return builder.build();
    }

    private boolean reportFailure(String computation, Windmill.WorkItem work, Throwable t) {
        Windmill.ReportStatsResponse response = this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().setComputationId(computation).setKey(work.getKey()).setWorkToken(work.getWorkToken()).addExceptions(this.buildExceptionReport(t)).build());
        return !response.getFailed();
    }

    private void reportHarnessStartup() {
        Windmill.Counter.Builder counterBuilder = Windmill.Counter.newBuilder();
        counterBuilder = counterBuilder.setName("dataflow_java_harness_restarts").setKind(Windmill.Counter.Kind.SUM).setIntScalar(1L);
        Windmill.ReportStatsResponse response = this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().addCounterUpdates(counterBuilder).build());
        if (response.getFailed()) {
            LOG.warn("Failed to notify windmill on harness startup. dataflow_java_harness_restarts will  not be incremented.");
        }
    }

    private void reportPeriodicStats() {
        Runtime rt = Runtime.getRuntime();
        long usedMemory = rt.totalMemory() - rt.freeMemory();
        long maxMemory = rt.maxMemory();
        Windmill.Counter.Builder counterBuilder = Windmill.Counter.newBuilder();
        counterBuilder = counterBuilder.setName("dataflow_java_harness_memory_utilization").setKind(Windmill.Counter.Kind.MEAN).setCumulative(true).setIntScalar(usedMemory).setMeanCount(maxMemory);
        Windmill.ReportStatsResponse response = this.windmillServer.reportStats(Windmill.ReportStatsRequest.newBuilder().addCounterUpdates(counterBuilder).build());
        if (response.getFailed()) {
            LOG.warn("Failed to send periodic counters to windmill.");
        }
    }

    private void printHeader(PrintWriter response) {
        response.println("<h1>Streaming Worker Harness</h1>");
        boolean bl = this.running.get();
        response.println(new StringBuilder(18).append("Running: ").append(bl).append("<br>").toString());
        long l = this.clientId;
        response.println(new StringBuilder(28).append("ID: ").append(l).append("<br>").toString());
    }

    private void printMetrics(PrintWriter response) {
        response.println("<h2>Metrics</h2>");
        int n = this.workUnitExecutor.getPoolSize();
        int n2 = this.workUnitExecutor.getMaximumPoolSize();
        response.println(new StringBuilder(43).append("Worker Threads: ").append(n).append("/").append(n2).append("<br>").toString());
        n = this.workUnitExecutor.getActiveCount();
        response.println(new StringBuilder(31).append("Active Threads: ").append(n).append("<br>").toString());
        n = this.workUnitExecutor.getQueue().size();
        n2 = 100;
        response.println(new StringBuilder(44).append("Work Queue Size: ").append(n).append("/").append(n2).append("<br>").toString());
        response.println("Commit Queues: <ul>");
        for (Map.Entry entry : this.outputMap.entrySet()) {
            response.print("<li>");
            response.print((String)entry.getKey());
            response.print(": ");
            response.print(((ConcurrentLinkedQueue)entry.getValue()).size());
            response.println("</li>");
        }
        response.println("</ul>");
        response.println("Active Keys: <ul>");
        for (Map.Entry computationEntry : this.activeWorkMap.entrySet()) {
            response.print("<li>");
            response.print((String)computationEntry.getKey());
            response.print(":");
            ((ActiveWorkForComputation)computationEntry.getValue()).printActiveWork(response);
            response.println("</li>");
        }
        response.println("</ul>");
        this.metricTrackingWindmillServer.printHtml(response);
    }

    private void printResources(PrintWriter response) {
        Runtime rt = Runtime.getRuntime();
        response.append("<h2>Resources</h2>\n");
        long l = rt.totalMemory() >> 20;
        response.append(new StringBuilder(41).append("Total Memory: ").append(l).append("MB<br>\n").toString());
        l = rt.totalMemory() - rt.freeMemory() >> 20;
        response.append(new StringBuilder(40).append("Used Memory: ").append(l).append("MB<br>\n").toString());
        l = rt.maxMemory() >> 20;
        response.append(new StringBuilder(39).append("Max Memory: ").append(l).append("MB<br>\n").toString());
    }

    private void printSpecs(PrintWriter response) {
        response.append("<h2>Specs</h2>\n");
        for (Map.Entry entry : this.instructionMap.entrySet()) {
            String string = (String)entry.getKey();
            response.println(new StringBuilder(9 + String.valueOf(string).length()).append("<h3>").append(string).append("</h3>").toString());
            response.print("<script>document.write(JSON.stringify(");
            response.print(((MapTask)entry.getValue()).toString());
            response.println(", null, \"&nbsp&nbsp\").replace(/\\n/g, \"<br>\"))</script>");
        }
    }

    private void printLastException(PrintWriter response) {
        Throwable t = this.lastException.get();
        if (t != null) {
            response.println("<h2>Last Exception</h2>");
            StringWriter writer = new StringWriter();
            t.printStackTrace(new PrintWriter(writer));
            response.println(writer.toString().replace("\t", "&nbsp&nbsp").replace("\n", "<br>"));
        }
    }

    private void printThreads(PrintWriter response) {
        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
        for (Map.Entry<Thread, StackTraceElement[]> entry : stacks.entrySet()) {
            Thread thread = entry.getKey();
            StackTraceElement[] stackTraceElementArray = String.valueOf(thread);
            String string = String.valueOf((Object)thread.getState());
            response.println(new StringBuilder(20 + String.valueOf(stackTraceElementArray).length() + String.valueOf(string).length()).append("Thread: ").append((String)stackTraceElementArray).append(" State: ").append(string).append("<br>").toString());
            for (StackTraceElement element : entry.getValue()) {
                String string2 = String.valueOf(element);
                response.println(new StringBuilder(14 + String.valueOf(string2).length()).append("&nbsp&nbsp").append(string2).append("<br>").toString());
            }
            response.println("<br>");
        }
    }

    private class StatusHandler
    extends AbstractHandler {
        private StatusHandler() {
        }

        public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
            response.setContentType("text/html;charset=utf-8");
            response.setStatus(200);
            baseRequest.setHandled(true);
            PrintWriter responseWriter = response.getWriter();
            responseWriter.println("<html><body>");
            if (target.equals("/healthz")) {
                responseWriter.println("ok");
            } else if (target.equals("/threadz")) {
                StreamingDataflowWorker.this.printThreads(responseWriter);
            } else {
                StreamingDataflowWorker.this.printHeader(responseWriter);
                StreamingDataflowWorker.this.printMetrics(responseWriter);
                StreamingDataflowWorker.this.printResources(responseWriter);
                StreamingDataflowWorker.this.printLastException(responseWriter);
                StreamingDataflowWorker.this.printSpecs(responseWriter);
            }
            responseWriter.println("</body></html>");
        }
    }

    private static class WorkerAndContext {
        public MapTaskExecutor worker;
        public StreamingModeExecutionContext context;

        public WorkerAndContext(MapTaskExecutor worker, StreamingModeExecutionContext context) {
            this.worker = worker;
            this.context = context;
        }

        public MapTaskExecutor getWorker() {
            return this.worker;
        }

        public StreamingModeExecutionContext getContext() {
            return this.context;
        }
    }

    static class ActiveWorkForComputation {
        private Map<ByteString, Queue<Work>> activeWork = new HashMap<ByteString, Queue<Work>>();
        private BoundedQueueExecutor executor;

        ActiveWorkForComputation(BoundedQueueExecutor executor) {
            this.executor = executor;
        }

        public synchronized boolean activateWork(ByteString key, Work work) {
            Queue<Work> queue = this.activeWork.get(key);
            if (queue == null) {
                queue = new LinkedList<Work>();
                this.activeWork.put(key, queue);
                queue.add(work);
                return true;
            }
            if (queue.peek().getWorkToken() != work.getWorkToken()) {
                queue.add(work);
            }
            return false;
        }

        public synchronized void completeWork(ByteString key) {
            Queue<Work> queue = this.activeWork.get(key);
            queue.poll();
            if (queue.peek() != null) {
                this.executor.forceExecute(queue.peek());
            } else {
                this.activeWork.remove(key);
            }
        }

        public synchronized void printActiveWork(PrintWriter writer) {
            writer.println("<ul>");
            for (Map.Entry<ByteString, Queue<Work>> entry : this.activeWork.entrySet()) {
                Queue<Work> queue = entry.getValue();
                writer.print("<li>Key: ");
                writer.print(entry.getKey().toStringUtf8());
                writer.print(" Token: ");
                writer.print(queue.peek().getWorkToken());
                if (queue.size() > 1) {
                    writer.print("(");
                    writer.print(queue.size() - 1);
                    writer.print(" queued)");
                }
                writer.println("</li>");
            }
            writer.println("</ul>");
        }
    }

    private class Commit
    implements Runnable {
        private Commit() {
        }

        /*
         * Enabled aggressive block sorting
         */
        @Override
        public void run() {
            block0: while (true) {
                Windmill.CommitWorkRequest.Builder commitRequestBuilder = Windmill.CommitWorkRequest.newBuilder();
                long remainingCommitBytes = 0x2000000L;
                Iterator iterator = StreamingDataflowWorker.this.outputMap.entrySet().iterator();
                while (true) {
                    Windmill.WorkItemCommitRequest request;
                    ConcurrentLinkedQueue queue;
                    Windmill.ComputationCommitWorkRequest.Builder computationRequestBuilder;
                    Map.Entry entry;
                    if (iterator.hasNext()) {
                        entry = iterator.next();
                        computationRequestBuilder = Windmill.ComputationCommitWorkRequest.newBuilder();
                        queue = (ConcurrentLinkedQueue)entry.getValue();
                    } else {
                        if (commitRequestBuilder.getRequestsCount() <= 0) {
                            return;
                        }
                        Windmill.CommitWorkRequest commitRequest = commitRequestBuilder.build();
                        LOG.trace("Commit: {}", (Object)commitRequest);
                        StreamingDataflowWorker.this.commitWork(commitRequest);
                        Iterator iterator2 = commitRequest.getRequestsList().iterator();
                        block2: while (true) {
                            if (!iterator2.hasNext()) continue block0;
                            Windmill.ComputationCommitWorkRequest computationRequest = (Windmill.ComputationCommitWorkRequest)iterator2.next();
                            ActiveWorkForComputation activeWork = (ActiveWorkForComputation)StreamingDataflowWorker.this.activeWorkMap.get(computationRequest.getComputationId());
                            Iterator iterator3 = computationRequest.getRequestsList().iterator();
                            while (true) {
                                if (!iterator3.hasNext()) continue block2;
                                Windmill.WorkItemCommitRequest workRequest = (Windmill.WorkItemCommitRequest)iterator3.next();
                                activeWork.completeWork(workRequest.getKey());
                            }
                            break;
                        }
                    }
                    while (remainingCommitBytes > 0L && (request = (Windmill.WorkItemCommitRequest)queue.poll()) != null) {
                        remainingCommitBytes -= (long)request.getSerializedSize();
                        computationRequestBuilder.addRequests(request);
                    }
                    if (computationRequestBuilder.getRequestsCount() <= 0) continue;
                    computationRequestBuilder.setComputationId((String)entry.getKey());
                    commitRequestBuilder.addRequests(computationRequestBuilder);
                }
                break;
            }
        }
    }

    static abstract class Work
    implements Runnable {
        private final long workToken;

        public Work(long workToken) {
            this.workToken = workToken;
        }

        public long getWorkToken() {
            return this.workToken;
        }
    }

    public static class ReaderCacheEntry {
        UnboundedSource.UnboundedReader<?> reader;
        long token;

        public ReaderCacheEntry(UnboundedSource.UnboundedReader<?> reader, long token) {
            this.reader = reader;
            this.token = token;
        }
    }

    public static class KeyTokenInvalidException
    extends RuntimeException {
        public KeyTokenInvalidException(String key) {
            String string = String.valueOf(key);
            super(string.length() != 0 ? "Unable to fetch data due to token mismatch for key ".concat(string) : new String("Unable to fetch data due to token mismatch for key "));
        }
    }
}

