/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkException;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.scheduler.AllocatedBlocks;
import org.apache.spark.streaming.scheduler.BatchAllocationEvent;
import org.apache.spark.streaming.scheduler.BatchCleanupEvent;
import org.apache.spark.streaming.scheduler.BlockAdditionEvent;
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo;
import org.apache.spark.streaming.scheduler.ReceivedBlockTracker$;
import org.apache.spark.streaming.scheduler.ReceivedBlockTrackerLogEvent;
import org.apache.spark.streaming.util.WriteAheadLogFileSegment;
import org.apache.spark.streaming.util.WriteAheadLogManager;
import org.apache.spark.streaming.util.WriteAheadLogManager$;
import org.apache.spark.util.Clock;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.Queue;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;

@ScalaSignature(bytes="\u0006\u0001\u0005ef!B\u0001\u0003\u0001\u0011a!\u0001\u0006*fG\u0016Lg/\u001a3CY>\u001c7\u000e\u0016:bG.,'O\u0003\u0002\u0004\t\u0005I1o\u00195fIVdWM\u001d\u0006\u0003\u000b\u0019\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005\u001dA\u0011!B:qCJ\\'BA\u0005\u000b\u0003\u0019\t\u0007/Y2iK*\t1\"A\u0002pe\u001e\u001c2\u0001A\u0007\u0014!\tq\u0011#D\u0001\u0010\u0015\u0005\u0001\u0012!B:dC2\f\u0017B\u0001\n\u0010\u0005\u0019\te.\u001f*fMB\u0011A#F\u0007\u0002\r%\u0011aC\u0002\u0002\b\u0019><w-\u001b8h\u0011!A\u0002A!A!\u0002\u0013Q\u0012\u0001B2p]\u001a\u001c\u0001\u0001\u0005\u0002\u00157%\u0011AD\u0002\u0002\n'B\f'o[\"p]\u001aD\u0001B\b\u0001\u0003\u0002\u0003\u0006IaH\u0001\u000bQ\u0006$wn\u001c9D_:4\u0007C\u0001\u0011%\u001b\u0005\t#B\u0001\r#\u0015\t\u0019\u0003\"\u0001\u0004iC\u0012|w\u000e]\u0005\u0003K\u0005\u0012QbQ8oM&<WO]1uS>t\u0007\u0002C\u0014\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u0015\u0002\u0013M$(/Z1n\u0013\u0012\u001c\bcA\u00152i9\u0011!f\f\b\u0003W9j\u0011\u0001\f\u0006\u0003[e\ta\u0001\u0010:p_Rt\u0014\"\u0001\t\n\u0005Az\u0011a\u00029bG.\fw-Z\u0005\u0003eM\u00121aU3r\u0015\t\u0001t\u0002\u0005\u0002\u000fk%\u0011ag\u0004\u0002\u0004\u0013:$\b\u0002\u0003\u001d\u0001\u0005\u0003\u0005\u000b\u0011B\u001d\u0002\u000b\rdwnY6\u0011\u0005ijT\"A\u001e\u000b\u0005q2\u0011\u0001B;uS2L!AP\u001e\u0003\u000b\rcwnY6\t\u0011\u0001\u0003!\u0011!Q\u0001\n\u0005\u000b1c\u00195fG.\u0004x.\u001b8u\t&\u0014x\n\u001d;j_:\u00042A\u0004\"E\u0013\t\u0019uB\u0001\u0004PaRLwN\u001c\t\u0003\u000b\"s!A\u0004$\n\u0005\u001d{\u0011A\u0002)sK\u0012,g-\u0003\u0002J\u0015\n11\u000b\u001e:j]\u001eT!aR\b\t\u000b1\u0003A\u0011A'\u0002\rqJg.\u001b;?)\u0019q\u0005+\u0015*T)B\u0011q\nA\u0007\u0002\u0005!)\u0001d\u0013a\u00015!)ad\u0013a\u0001?!)qe\u0013a\u0001Q!)\u0001h\u0013a\u0001s!)\u0001i\u0013a\u0001\u0003\u0016!a\u000b\u0001\u0003X\u0005I\u0011VmY3jm\u0016$'\t\\8dWF+X-^3\u0011\u0007akv,D\u0001Z\u0015\tQ6,A\u0004nkR\f'\r\\3\u000b\u0005q{\u0011AC2pY2,7\r^5p]&\u0011a,\u0017\u0002\u0006#V,W/\u001a\t\u0003\u001f\u0002L!!\u0019\u0002\u0003#I+7-Z5wK\u0012\u0014En\\2l\u0013:4w\u000eC\u0004d\u0001\t\u0007I\u0011\u00023\u0002AM$(/Z1n\u0013\u0012$v.\u00168bY2|7-\u0019;fI\ncwnY6Rk\u0016,Xm]\u000b\u0002KB!\u0001L\u001a\u001bi\u0013\t9\u0017LA\u0004ICNDW*\u00199\u0011\u0005%,V\"\u0001\u0001\t\r-\u0004\u0001\u0015!\u0003f\u0003\u0005\u001aHO]3b[&#Gk\\+oC2dwnY1uK\u0012\u0014En\\2l#V,W/Z:!\u0011\u001di\u0007A1A\u0005\n9\fQ\u0003^5nKR{\u0017\t\u001c7pG\u0006$X\r\u001a\"m_\u000e\\7/F\u0001p!\u0011Af\r\u001d;\u0011\u0005E\u0014X\"\u0001\u0003\n\u0005M$!\u0001\u0002+j[\u0016\u0004\"aT;\n\u0005Y\u0014!aD!mY>\u001c\u0017\r^3e\u00052|7m[:\t\ra\u0004\u0001\u0015!\u0003p\u0003Y!\u0018.\\3U_\u0006cGn\\2bi\u0016$'\t\\8dWN\u0004\u0003b\u0002>\u0001\u0005\u0004%Ia_\u0001\u0011Y><W*\u00198bO\u0016\u0014x\n\u001d;j_:,\u0012\u0001 \t\u0004\u001d\tk\bc\u0001@\u0002\u00025\tqP\u0003\u0002=\t%\u0019\u00111A@\u0003)]\u0013\u0018\u000e^3BQ\u0016\fG\rT8h\u001b\u0006t\u0017mZ3s\u0011\u001d\t9\u0001\u0001Q\u0001\nq\f\u0011\u0003\\8h\u001b\u0006t\u0017mZ3s\u001fB$\u0018n\u001c8!\u0011%\tY\u0001\u0001a\u0001\n\u0013\ti!\u0001\fmCN$\u0018\t\u001c7pG\u0006$X\r\u001a\"bi\u000eDG+[7f+\u0005\u0001\b\"CA\t\u0001\u0001\u0007I\u0011BA\n\u0003ia\u0017m\u001d;BY2|7-\u0019;fI\n\u000bGo\u00195US6,w\fJ3r)\u0011\t)\"a\u0007\u0011\u00079\t9\"C\u0002\u0002\u001a=\u0011A!\u00168ji\"I\u0011QDA\b\u0003\u0003\u0005\r\u0001]\u0001\u0004q\u0012\n\u0004bBA\u0011\u0001\u0001\u0006K\u0001]\u0001\u0018Y\u0006\u001cH/\u00117m_\u000e\fG/\u001a3CCR\u001c\u0007\u000eV5nK\u0002Bq!!\n\u0001\t\u0003\t9#\u0001\u0005bI\u0012\u0014En\\2l)\u0011\tI#a\f\u0011\u00079\tY#C\u0002\u0002.=\u0011qAQ8pY\u0016\fg\u000eC\u0004\u00022\u0005\r\u0002\u0019A0\u0002#I,7-Z5wK\u0012\u0014En\\2l\u0013:4w\u000eC\u0004\u00026\u0001!\t!a\u000e\u0002+\u0005dGn\\2bi\u0016\u0014En\\2lgR{')\u0019;dQR!\u0011QCA\u001d\u0011\u001d\tY$a\rA\u0002A\f\u0011BY1uG\"$\u0016.\\3\t\u000f\u0005}\u0002\u0001\"\u0001\u0002B\u0005\u0001r-\u001a;CY>\u001c7n](g\u0005\u0006$8\r\u001b\u000b\u0005\u0003\u0007\nY\u0005\u0005\u0004F\u0003\u000b\"\u0014\u0011J\u0005\u0004\u0003\u000fR%aA'baB\u0019\u0011&M0\t\u000f\u0005m\u0012Q\ba\u0001a\"9\u0011q\n\u0001\u0005\u0002\u0005E\u0013!G4fi\ncwnY6t\u001f\u001a\u0014\u0015\r^2i\u0003:$7\u000b\u001e:fC6$b!!\u0013\u0002T\u0005U\u0003bBA\u001e\u0003\u001b\u0002\r\u0001\u001d\u0005\b\u0003/\ni\u00051\u00015\u0003!\u0019HO]3b[&#\u0007bBA.\u0001\u0011\u0005\u0011QL\u0001\u001dQ\u0006\u001cXK\\1mY>\u001c\u0017\r^3e%\u0016\u001cW-\u001b<fI\ncwnY6t+\t\tI\u0003C\u0004\u0002b\u0001!\t!a\u0019\u0002)\u001d,G/\u00168bY2|7-\u0019;fI\ncwnY6t)\u0011\tI%!\u001a\t\u000f\u0005]\u0013q\fa\u0001i!9\u0011\u0011\u000e\u0001\u0005\u0002\u0005-\u0014!E2mK\u0006tW\u000f](mI\n\u000bGo\u00195fgR1\u0011QCA7\u0003cBq!a\u001c\u0002h\u0001\u0007\u0001/A\tdY\u0016\fg.\u001e9UQJ,7\u000f\u001b+j[\u0016D\u0001\"a\u001d\u0002h\u0001\u0007\u0011\u0011F\u0001\u0012o\u0006LGOR8s\u0007>l\u0007\u000f\\3uS>t\u0007bBA<\u0001\u0011\u0005\u0011\u0011P\u0001\u0005gR|\u0007\u000f\u0006\u0002\u0002\u0016!9\u0011Q\u0010\u0001\u0005\n\u0005e\u0014!\u0007:fG>4XM\u001d$s_6<&/\u001b;f\u0003\",\u0017\r\u001a'pONDq!!!\u0001\t\u0013\t\u0019)\u0001\u0006xe&$X\rV8M_\u001e$B!!\u0006\u0002\u0006\"A\u0011qQA@\u0001\u0004\tI)\u0001\u0004sK\u000e|'\u000f\u001a\t\u0004\u001f\u0006-\u0015bAAG\u0005\ta\"+Z2fSZ,GM\u00117pG.$&/Y2lKJdunZ#wK:$\bbBAI\u0001\u0011%\u00111S\u0001\u0016O\u0016$(+Z2fSZ,GM\u00117pG.\fV/Z;f)\rA\u0017Q\u0013\u0005\b\u0003/\ny\t1\u00015\u0011\u001d\tI\n\u0001C\u0005\u00037\u000b\u0001c\u0019:fCR,Gj\\4NC:\fw-\u001a:\u0015\u0003qD\u0001\"a(\u0001\t\u0003!\u0011QL\u0001\u0014SNdunZ'b]\u0006<WM]#oC\ndW\rZ\u0004\t\u0003G\u0013\u0001\u0012\u0001\u0003\u0002&\u0006!\"+Z2fSZ,GM\u00117pG.$&/Y2lKJ\u00042aTAT\r\u001d\t!\u0001#\u0001\u0005\u0003S\u001b2!a*\u000e\u0011\u001da\u0015q\u0015C\u0001\u0003[#\"!!*\t\u0011\u0005E\u0016q\u0015C\u0001\u0003g\u000bQc\u00195fG.\u0004x.\u001b8u\t&\u0014Hk\u001c'pO\u0012K'\u000fF\u0002E\u0003kCq!a.\u00020\u0002\u0007A)A\u0007dQ\u0016\u001c7\u000e]8j]R$\u0015N\u001d")
public class ReceivedBlockTracker
implements Logging {
    private final SparkConf conf;
    private final Configuration hadoopConf;
    private final Seq<Object> streamIds;
    private final Clock clock;
    public final Option<String> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption;
    private final HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues;
    private final HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks;
    private final Option<WriteAheadLogManager> logManagerOption;
    private Time lastAllocatedBatchTime;
    private transient Logger org$apache$spark$Logging$$log_;

    public static String checkpointDirToLogDir(String string) {
        return ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir(string);
    }

    public Logger org$apache$spark$Logging$$log_() {
        return this.org$apache$spark$Logging$$log_;
    }

    public void org$apache$spark$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    private HashMap<Object, Queue<ReceivedBlockInfo>> streamIdToUnallocatedBlockQueues() {
        return this.streamIdToUnallocatedBlockQueues;
    }

    private HashMap<Time, AllocatedBlocks> timeToAllocatedBlocks() {
        return this.timeToAllocatedBlocks;
    }

    private Option<WriteAheadLogManager> logManagerOption() {
        return this.logManagerOption;
    }

    private Time lastAllocatedBatchTime() {
        return this.lastAllocatedBatchTime;
    }

    private void lastAllocatedBatchTime_$eq(Time x$1) {
        this.lastAllocatedBatchTime = x$1;
    }

    public synchronized boolean addBlock(ReceivedBlockInfo receivedBlockInfo) {
        boolean bl;
        try {
            this.writeToLog(new BlockAdditionEvent(receivedBlockInfo));
            this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
            this.logDebug((Function0<String>)new Serializable(this, receivedBlockInfo){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockInfo receivedBlockInfo$1;

                public final String apply() {
                    return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Stream ", " received "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.receivedBlockInfo$1.streamId())}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1.blockStoreResult().blockId()}))).toString();
                }
                {
                    this.receivedBlockInfo$1 = receivedBlockInfo$1;
                }
            });
            bl = true;
        }
        catch (Exception exception) {
            this.logError((Function0<String>)new Serializable(this, receivedBlockInfo){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockInfo receivedBlockInfo$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Error adding block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$1}));
                }
                {
                    this.receivedBlockInfo$1 = receivedBlockInfo$1;
                }
            }, exception);
            bl = false;
        }
        return bl;
    }

    public void allocateBlocksToBatch(Time batchTime) {
        ReceivedBlockTracker receivedBlockTracker = this;
        synchronized (receivedBlockTracker) {
            Object object;
            if (this.lastAllocatedBatchTime() == null || batchTime.$greater(this.lastAllocatedBatchTime())) {
                Map streamIdToBlocks = ((TraversableOnce)this.streamIds.map((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ ReceivedBlockTracker $outer;

                    public final Tuple2<Object, scala.collection.mutable.Seq<ReceivedBlockInfo>> apply(int streamId) {
                        return new Tuple2((Object)BoxesRunTime.boxToInteger((int)streamId), (Object)this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).dequeueAll((Function1)new Serializable(this){
                            public static final long serialVersionUID = 0L;

                            public final boolean apply(ReceivedBlockInfo x) {
                                return true;
                            }
                        }));
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                }, Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.conforms());
                AllocatedBlocks allocatedBlocks = new AllocatedBlocks((Map<Object, Seq<ReceivedBlockInfo>>)streamIdToBlocks);
                this.writeToLog(new BatchAllocationEvent(batchTime, allocatedBlocks));
                this.timeToAllocatedBlocks().update((Object)batchTime, (Object)allocatedBlocks);
                this.lastAllocatedBatchTime_$eq(batchTime);
                object = allocatedBlocks;
            } else {
                this.logInfo((Function0<String>)new Serializable(this, batchTime){
                    public static final long serialVersionUID = 0L;
                    private final Time batchTime$1;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Possibly processed batch ", " need to be processed again in WAL recovery"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$1}));
                    }
                    {
                        this.batchTime$1 = batchTime$1;
                    }
                });
                object = BoxedUnit.UNIT;
            }
            return;
        }
    }

    public synchronized Map<Object, Seq<ReceivedBlockInfo>> getBlocksOfBatch(Time batchTime) {
        return (Map)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Seq<ReceivedBlockInfo>> apply(AllocatedBlocks x$1) {
                return x$1.streamIdToAllocatedBlocks();
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Map<Object, Nothing$> apply() {
                return Predef$.MODULE$.Map().empty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getBlocksOfBatchAndStream(Time batchTime, int streamId) {
        return (Seq)this.timeToAllocatedBlocks().get((Object)batchTime).map((Function1)new Serializable(this, streamId){
            public static final long serialVersionUID = 0L;
            private final int streamId$1;

            public final Seq<ReceivedBlockInfo> apply(AllocatedBlocks x$2) {
                return x$2.getBlocksOfStream(this.streamId$1);
            }
            {
                this.streamId$1 = streamId$1;
            }
        }).getOrElse((Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Seq<Nothing$> apply() {
                return (Seq)Seq$.MODULE$.empty();
            }
        });
    }

    public synchronized boolean hasUnallocatedReceivedBlocks() {
        return !this.streamIdToUnallocatedBlockQueues().values().forall((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Queue<ReceivedBlockInfo> x$3) {
                return x$3.isEmpty();
            }
        });
    }

    public synchronized Seq<ReceivedBlockInfo> getUnallocatedBlocks(int streamId) {
        return this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(streamId).toSeq();
    }

    public synchronized void cleanupOldBatches(Time cleanupThreshTime, boolean waitForCompletion) {
        Predef$.MODULE$.assert(cleanupThreshTime.milliseconds() < this.clock.getTimeMillis());
        Seq timesToCleanup = ((TraversableOnce)this.timeToAllocatedBlocks().keys().filter((Function1)new Serializable(this, cleanupThreshTime){
            public static final long serialVersionUID = 0L;
            private final Time cleanupThreshTime$1;

            public final boolean apply(Time x$4) {
                return x$4.$less(this.cleanupThreshTime$1);
            }
            {
                this.cleanupThreshTime$1 = cleanupThreshTime$1;
            }
        })).toSeq();
        this.logInfo((Function0<String>)new Serializable(this, timesToCleanup){
            public static final long serialVersionUID = 0L;
            private final Seq timesToCleanup$1;

            public final String apply() {
                return new StringBuilder().append((Object)"Deleting batches ").append((Object)this.timesToCleanup$1).toString();
            }
            {
                this.timesToCleanup$1 = timesToCleanup$1;
            }
        });
        this.writeToLog(new BatchCleanupEvent((Seq<Time>)timesToCleanup));
        this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)timesToCleanup);
        this.logManagerOption().foreach((Function1)new Serializable(this, cleanupThreshTime, waitForCompletion){
            public static final long serialVersionUID = 0L;
            private final Time cleanupThreshTime$1;
            private final boolean waitForCompletion$1;

            public final void apply(WriteAheadLogManager x$5) {
                x$5.cleanupOldLogs(this.cleanupThreshTime$1.milliseconds(), this.waitForCompletion$1);
            }
            {
                this.cleanupThreshTime$1 = cleanupThreshTime$1;
                this.waitForCompletion$1 = waitForCompletion$1;
            }
        });
    }

    public void stop() {
        this.logManagerOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(WriteAheadLogManager x$6) {
                x$6.stop();
            }
        });
    }

    private synchronized void recoverFromWriteAheadLogs() {
        this.logManagerOption().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ ReceivedBlockTracker $outer;

            public final void apply(WriteAheadLogManager logManager) {
                this.$outer.logInfo((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverFromWriteAheadLogs$1 $outer;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovering from write ahead logs in ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get()}));
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                });
                logManager.readFromLog().foreach((Function1)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anonfun$recoverFromWriteAheadLogs$1 $outer;

                    public final void apply(ByteBuffer byteBuffer) {
                        ReceivedBlockTrackerLogEvent receivedBlockTrackerLogEvent;
                        block5: {
                            block3: {
                                block4: {
                                    block2: {
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().logTrace((Function0<String>)new Serializable(this, byteBuffer){
                                            public static final long serialVersionUID = 0L;
                                            private final ByteBuffer byteBuffer$1;

                                            public final String apply() {
                                                return new StringBuilder().append((Object)"Recovering record ").append((Object)this.byteBuffer$1).toString();
                                            }
                                            {
                                                this.byteBuffer$1 = byteBuffer$1;
                                            }
                                        });
                                        receivedBlockTrackerLogEvent = (ReceivedBlockTrackerLogEvent)Utils$.MODULE$.deserialize(byteBuffer.array());
                                        if (!(receivedBlockTrackerLogEvent instanceof BlockAdditionEvent)) break block2;
                                        BlockAdditionEvent blockAdditionEvent = (BlockAdditionEvent)receivedBlockTrackerLogEvent;
                                        ReceivedBlockInfo receivedBlockInfo = blockAdditionEvent.receivedBlockInfo();
                                        this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(receivedBlockInfo);
                                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                        break block3;
                                    }
                                    if (!(receivedBlockTrackerLogEvent instanceof BatchAllocationEvent)) break block4;
                                    BatchAllocationEvent batchAllocationEvent = (BatchAllocationEvent)receivedBlockTrackerLogEvent;
                                    Time time = batchAllocationEvent.time();
                                    AllocatedBlocks allocatedBlocks = batchAllocationEvent.allocatedBlocks();
                                    this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(time, allocatedBlocks);
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    break block3;
                                }
                                if (!(receivedBlockTrackerLogEvent instanceof BatchCleanupEvent)) break block5;
                                BatchCleanupEvent batchCleanupEvent = (BatchCleanupEvent)receivedBlockTrackerLogEvent;
                                Seq<Time> batchTimes = batchCleanupEvent.times();
                                this.$outer.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer().org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(batchTimes);
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                            }
                            return;
                        }
                        throw new MatchError((Object)receivedBlockTrackerLogEvent);
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                });
            }

            public /* synthetic */ ReceivedBlockTracker org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$anonfun$$$outer() {
                return this.$outer;
            }
            {
                if ($outer == null) {
                    throw new NullPointerException();
                }
                this.$outer = $outer;
            }
        });
    }

    private void writeToLog(ReceivedBlockTrackerLogEvent record) {
        if (this.isLogManagerEnabled()) {
            this.logDebug((Function0<String>)new Serializable(this, record){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockTrackerLogEvent record$1;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Writing to log ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.record$1}));
                }
                {
                    this.record$1 = record$1;
                }
            });
            this.logManagerOption().foreach((Function1)new Serializable(this, record){
                public static final long serialVersionUID = 0L;
                private final ReceivedBlockTrackerLogEvent record$1;

                public final WriteAheadLogFileSegment apply(WriteAheadLogManager logManager) {
                    return logManager.writeToLog(ByteBuffer.wrap(Utils$.MODULE$.serialize((Object)this.record$1)));
                }
                {
                    this.record$1 = record$1;
                }
            });
        }
    }

    public Queue<ReceivedBlockInfo> org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(int streamId) {
        return (Queue)this.streamIdToUnallocatedBlockQueues().getOrElseUpdate((Object)BoxesRunTime.boxToInteger((int)streamId), (Function0)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Queue<ReceivedBlockInfo> apply() {
                return new Queue();
            }
        });
    }

    private Option<WriteAheadLogManager> createLogManager() {
        None$ none$;
        if (this.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) {
            if (this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.isEmpty()) {
                throw new SparkException("Cannot enable receiver write-ahead log without checkpoint directory set. Please use streamingContext.checkpoint() to set the checkpoint directory. See documentation for more details.");
            }
            String logDir = ReceivedBlockTracker$.MODULE$.checkpointDirToLogDir((String)this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption.get());
            int rollingIntervalSecs = this.conf.getInt("spark.streaming.receivedBlockTracker.writeAheadLog.rotationIntervalSecs", 60);
            String x$8 = logDir;
            Configuration x$9 = this.hadoopConf;
            int x$10 = rollingIntervalSecs;
            Clock x$11 = this.clock;
            String x$12 = "ReceivedBlockHandlerMaster";
            int x$13 = WriteAheadLogManager$.MODULE$.$lessinit$greater$default$4();
            WriteAheadLogManager logManager = new WriteAheadLogManager(x$8, x$9, x$10, x$13, x$12, x$11);
            none$ = new Some((Object)logManager);
        } else {
            none$ = None$.MODULE$;
        }
        return none$;
    }

    public boolean isLogManagerEnabled() {
        return this.logManagerOption().nonEmpty();
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAddedBlock$1(ReceivedBlockInfo receivedBlockInfo) {
        this.logTrace((Function0<String>)new Serializable(this, receivedBlockInfo){
            public static final long serialVersionUID = 0L;
            private final ReceivedBlockInfo receivedBlockInfo$2;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting added block ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.receivedBlockInfo$2}));
            }
            {
                this.receivedBlockInfo$2 = receivedBlockInfo$2;
            }
        });
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$getReceivedBlockQueue(receivedBlockInfo.streamId()).$plus$eq((Object)receivedBlockInfo);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$insertAllocatedBatch$1(Time batchTime, AllocatedBlocks allocatedBlocks) {
        this.logTrace((Function0<String>)new Serializable(this, batchTime, allocatedBlocks){
            public static final long serialVersionUID = 0L;
            private final Time batchTime$2;
            private final AllocatedBlocks allocatedBlocks$1;

            public final String apply() {
                return new StringBuilder().append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Inserting allocated batch for time ", " to "})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTime$2}))).append((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.allocatedBlocks$1.streamIdToAllocatedBlocks()}))).toString();
            }
            {
                this.batchTime$2 = batchTime$2;
                this.allocatedBlocks$1 = allocatedBlocks$1;
            }
        });
        this.streamIdToUnallocatedBlockQueues().values().foreach((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final void apply(Queue<ReceivedBlockInfo> x$7) {
                x$7.clear();
            }
        });
        this.lastAllocatedBatchTime_$eq(batchTime);
        this.timeToAllocatedBlocks().put((Object)batchTime, (Object)allocatedBlocks);
    }

    public final void org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$cleanupBatches$1(Seq batchTimes) {
        this.logTrace((Function0<String>)new Serializable(this, batchTimes){
            public static final long serialVersionUID = 0L;
            private final Seq batchTimes$1;

            public final String apply() {
                return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Recovery: Cleaning up batches ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.batchTimes$1}));
            }
            {
                this.batchTimes$1 = batchTimes$1;
            }
        });
        this.timeToAllocatedBlocks().$minus$minus$eq((TraversableOnce)batchTimes);
    }

    public ReceivedBlockTracker(SparkConf conf, Configuration hadoopConf, Seq<Object> streamIds, Clock clock, Option<String> checkpointDirOption) {
        this.conf = conf;
        this.hadoopConf = hadoopConf;
        this.streamIds = streamIds;
        this.clock = clock;
        this.org$apache$spark$streaming$scheduler$ReceivedBlockTracker$$checkpointDirOption = checkpointDirOption;
        Logging.class.$init$((Logging)this);
        this.streamIdToUnallocatedBlockQueues = new HashMap();
        this.timeToAllocatedBlocks = new HashMap();
        this.logManagerOption = this.createLogManager();
        this.lastAllocatedBatchTime = null;
        this.recoverFromWriteAheadLogs();
    }
}

