package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.FluentIterable;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.AbstractWindowSet;
import com.google.cloud.dataflow.sdk.util.TimerManager;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.CodedTupleTag;
import com.google.cloud.dataflow.sdk.values.CodedTupleTagMap;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/TriggerTester.class */
public class TriggerTester<InputT, OutputT, W extends BoundedWindow> {
    private static final Logger LOGGER = Logger.getLogger(TriggerTester.class.getName());
    private final TriggerExecutor<String, InputT, OutputT, W> triggerExecutor;
    private final WindowFn<Object, W> windowFn;
    private final AbstractWindowSet<String, InputT, OutputT, W> windowSet;
    private final Coder<OutputT> outputCoder;
    private static final String KEY = "TEST_KEY";
    private ExecutableTrigger<W> executableTrigger;
    private Instant watermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
    private final BatchTimerManager timerManager = new LoggingBatchTimerManager(this.processingTime);
    private boolean logInteractions = false;
    private final TriggerTester<InputT, OutputT, W>.StubContexts stubContexts = new StubContexts();

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/TriggerTester$LoggingBatchTimerManager.class */
    private class LoggingBatchTimerManager extends BatchTimerManager {
        public LoggingBatchTimerManager(Instant instant) {
            super(instant);
        }

        @Override // com.google.cloud.dataflow.sdk.util.BatchTimerManager, com.google.cloud.dataflow.sdk.util.TimerManager
        public void setTimer(String str, Instant instant, TimerManager.TimeDomain timeDomain) {
            TriggerTester.this.logInteraction("Setting timer '%s' for time %d in domain %s", str, Long.valueOf(instant.getMillis()), timeDomain);
            super.setTimer(str, instant, timeDomain);
        }

        @Override // com.google.cloud.dataflow.sdk.util.BatchTimerManager, com.google.cloud.dataflow.sdk.util.TimerManager
        public void deleteTimer(String str, TimerManager.TimeDomain timeDomain) {
            TriggerTester.this.logInteraction("Delete timer '%s' in domain %s", str, timeDomain);
            super.deleteTimer(str, timeDomain);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.google.cloud.dataflow.sdk.util.BatchTimerManager
        public void fire(TriggerExecutor<?, ?, ?, ?> triggerExecutor, String str, TimerManager.TimeDomain timeDomain) throws Exception {
            TriggerTester.this.logInteraction("Firing timer '%s' in domain %s", str, timeDomain);
            super.fire(triggerExecutor, str, timeDomain);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/TriggerTester$StubAssignContext.class */
    private static class StubAssignContext<W extends BoundedWindow> extends WindowFn<Object, W>.AssignContext {
        private Object element;
        private Instant timestamp;
        private Collection<? extends BoundedWindow> windows;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public StubAssignContext(WindowFn<Object, W> windowFn, Object obj, Instant instant, Collection<? extends BoundedWindow> collection) {
            super();
            windowFn.getClass();
            this.element = obj;
            this.timestamp = instant;
            this.windows = collection;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
        public Object element() {
            return this.element;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
        public Instant timestamp() {
            return this.timestamp;
        }

        @Override // com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn.AssignContext
        public Collection<? extends BoundedWindow> windows() {
            return this.windows;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/TriggerTester$StubContexts.class */
    public class StubContexts implements WindowingInternals<InputT, KV<String, OutputT>>, WindowingInternals.KeyedState {
        private Map<CodedTupleTag<?>, List<?>> tagListValues;
        private Map<CodedTupleTag<?>, Object> tagValues;
        private List<WindowedValue<KV<String, OutputT>>> outputs;
        private Map<CodedTupleTag<?>, Instant> tagTimestamps;
        private PriorityQueue<Instant> minTagTimestamp;

        private StubContexts() {
            this.tagListValues = new HashMap();
            this.tagValues = new HashMap();
            this.outputs = new ArrayList();
            this.tagTimestamps = new HashMap();
            this.minTagTimestamp = new PriorityQueue<>();
        }

        public void outputWindowedValue(KV<String, OutputT> kv, Instant instant, Collection<? extends BoundedWindow> collection) {
            WindowedValue<KV<String, OutputT>> of = WindowedValue.of((KV) SerializableUtils.ensureSerializableByCoder(KvCoder.of(StringUtf8Coder.of(), TriggerTester.this.outputCoder), kv, "outputForWindow"), instant, collection);
            TriggerTester.this.logInteraction("Outputting: %s", of);
            this.outputs.add(of);
        }

        public Set<String> getKeyedStateInUse() {
            return FluentIterable.from(this.tagListValues.keySet()).append(this.tagValues.keySet()).transform(new Function<CodedTupleTag<?>, String>() { // from class: com.google.cloud.dataflow.sdk.util.TriggerTester.StubContexts.1
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                @Nullable
                public String apply(CodedTupleTag<?> codedTupleTag) {
                    return codedTupleTag.getId();
                }
            }).toSet();
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> void writeToTagList(CodedTupleTag<T> codedTupleTag, T t) throws IOException {
            List<?> list = this.tagListValues.get(codedTupleTag);
            if (list == null) {
                list = new ArrayList();
                this.tagListValues.put(codedTupleTag, list);
            }
            list.add(t);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> void deleteTagList(CodedTupleTag<T> codedTupleTag) {
            this.tagListValues.remove(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> Iterable<T> readTagList(CodedTupleTag<T> codedTupleTag) {
            List<?> list = this.tagListValues.get(codedTupleTag);
            return list == null ? Collections.emptyList() : list;
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> Map<CodedTupleTag<T>, Iterable<T>> readTagList(List<CodedTupleTag<T>> list) throws IOException {
            return FluentIterable.from(list).toMap(new Function<CodedTupleTag<T>, Iterable<T>>() { // from class: com.google.cloud.dataflow.sdk.util.TriggerTester.StubContexts.2
                @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
                @Nullable
                public Iterable<T> apply(@Nullable CodedTupleTag<T> codedTupleTag) {
                    return StubContexts.this.readTagList(codedTupleTag);
                }
            });
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public TimerManager getTimerManager() {
            throw new UnsupportedOperationException("getTimerManager() should not be called on StubContexts.");
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public Collection<? extends BoundedWindow> windows() {
            throw new UnsupportedOperationException("Testing triggers should not use windows from WindowingInternals.");
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public <T> void store(CodedTupleTag<T> codedTupleTag, T t) throws IOException {
            store(codedTupleTag, t, BoundedWindow.TIMESTAMP_MAX_VALUE);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> void store(CodedTupleTag<T> codedTupleTag, T t, Instant instant) throws IOException {
            this.tagValues.put(codedTupleTag, t);
            Instant put = this.tagTimestamps.put(codedTupleTag, instant);
            if (put != null) {
                this.minTagTimestamp.remove(put);
            }
            this.minTagTimestamp.add(instant);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public <T> void remove(CodedTupleTag<T> codedTupleTag) {
            this.tagValues.remove(codedTupleTag);
            this.minTagTimestamp.remove(this.tagTimestamps.remove(codedTupleTag));
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public <T> T lookup(CodedTupleTag<T> codedTupleTag) throws IOException {
            return (T) this.tagValues.get(codedTupleTag);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals.KeyedState
        public CodedTupleTagMap lookup(Iterable<? extends CodedTupleTag<?>> iterable) throws IOException {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            for (CodedTupleTag<?> codedTupleTag : iterable) {
                linkedHashMap.put(codedTupleTag, this.tagValues.get(codedTupleTag));
            }
            return CodedTupleTagMap.of(linkedHashMap);
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public <T> void writePCollectionViewData(TupleTag<?> tupleTag, Iterable<WindowedValue<T>> iterable, Coder<T> coder) throws IOException {
            throw new UnsupportedOperationException("Testing triggers should not use writePCollectionViewData from WindowingInternals.");
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public WindowingInternals.KeyedState keyedState() {
            return this;
        }

        @Override // com.google.cloud.dataflow.sdk.util.WindowingInternals
        public /* bridge */ /* synthetic */ void outputWindowedValue(Object obj, Instant instant, Collection collection) {
            outputWindowedValue((KV) obj, instant, (Collection<? extends BoundedWindow>) collection);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logInteraction(String str, Object... objArr) {
        String str2;
        if (this.logInteractions) {
            Logger logger = LOGGER;
            String valueOf = String.valueOf(String.format(str, objArr));
            if (valueOf.length() != 0) {
                str2 = "Trigger Interaction: ".concat(valueOf);
            } else {
                str2 = r2;
                String str3 = new String("Trigger Interaction: ");
            }
            logger.warning(str2);
        }
    }

    public static <W extends BoundedWindow> TriggerTester<Integer, Iterable<Integer>, W> nonCombining(WindowFn<?, W> windowFn, Trigger<W> trigger, WindowingStrategy.AccumulationMode accumulationMode, Duration duration) throws Exception {
        return new TriggerTester<>(windowFn, trigger, AbstractWindowSet.factoryFor(WindowingStrategy.of(windowFn).withTrigger(trigger).withMode(accumulationMode), VarIntCoder.of()), accumulationMode, IterableCoder.of(VarIntCoder.of()), duration);
    }

    public static <W extends BoundedWindow, AccumT, OutputT> TriggerTester<Integer, OutputT, W> combining(WindowFn<?, W> windowFn, Trigger<W> trigger, WindowingStrategy.AccumulationMode accumulationMode, Combine.KeyedCombineFn<String, Integer, AccumT, OutputT> keyedCombineFn, Coder<OutputT> coder, Duration duration) throws Exception {
        return new TriggerTester<>(windowFn, trigger, CombiningWindowSet.factory(keyedCombineFn, StringUtf8Coder.of(), VarIntCoder.of()), accumulationMode, coder, duration);
    }

    private TriggerTester(WindowFn<Object, W> windowFn, Trigger<W> trigger, AbstractWindowSet.Factory<String, InputT, OutputT, W> factory, WindowingStrategy.AccumulationMode accumulationMode, Coder<OutputT> coder, Duration duration) throws Exception {
        this.windowFn = windowFn;
        this.windowSet = factory.create(KEY, windowFn.windowCoder(), this.stubContexts, this.stubContexts);
        this.outputCoder = coder;
        this.executableTrigger = ExecutableTrigger.create(trigger);
        this.triggerExecutor = new TriggerExecutor<>(windowFn, this.timerManager, this.executableTrigger, this.stubContexts, this.stubContexts, this.windowSet, accumulationMode, duration);
    }

    public ExecutableTrigger<W> getTrigger() {
        return this.executableTrigger;
    }

    public void logInteractions(boolean z) {
        this.logInteractions = z;
    }

    public boolean isMarkedFinished(W w) throws IOException {
        return this.triggerExecutor.lookupFinishedSet(w).get(0);
    }

    public Iterable<String> getKeyedStateInUse() {
        return this.stubContexts.getKeyedStateInUse();
    }

    public String finishedSet(W w) throws CoderException {
        return this.triggerExecutor.finishedSetTag(w).getId();
    }

    public String bufferTag(W w) throws IOException {
        return WindowUtils.bufferTag(w, this.windowFn.windowCoder(), VoidCoder.of()).getId();
    }

    public String earliestElement(W w) throws CoderException {
        return this.triggerExecutor.earliestElementTag(w).getId();
    }

    public Instant getWatermarkHold() {
        return (Instant) ((StubContexts) this.stubContexts).minTagTimestamp.peek();
    }

    public boolean isWindowActive(W w) throws IOException {
        return this.stubContexts.getKeyedStateInUse().contains(WindowUtils.bufferTag(w, this.windowFn.windowCoder(), VarIntCoder.of()).getId());
    }

    public Iterable<WindowedValue<OutputT>> extractOutput() {
        ImmutableList list = FluentIterable.from(((StubContexts) this.stubContexts).outputs).transform(new Function<WindowedValue<KV<String, OutputT>>, WindowedValue<OutputT>>() { // from class: com.google.cloud.dataflow.sdk.util.TriggerTester.1
            @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function
            @Nullable
            public WindowedValue<OutputT> apply(@Nullable WindowedValue<KV<String, OutputT>> windowedValue) {
                return WindowedValue.of(windowedValue.getValue().getValue(), windowedValue.getTimestamp(), windowedValue.getWindows());
            }
        }).toList();
        ((StubContexts) this.stubContexts).outputs.clear();
        return list;
    }

    public void advanceWatermark(Instant instant) throws Exception {
        Preconditions.checkState(!instant.isBefore(this.watermark), "Cannot move watermark time backwards from %s to %s", Long.valueOf(this.watermark.getMillis()), Long.valueOf(instant.getMillis()));
        logInteraction("Advancing watermark to %d", Long.valueOf(instant.getMillis()));
        this.watermark = instant;
        this.timerManager.advanceWatermark(this.triggerExecutor, instant);
    }

    public void advanceProcessingTime(Instant instant) throws Exception {
        Preconditions.checkState(!instant.isBefore(this.processingTime), "Cannot move processing time backwards from %s to %s", Long.valueOf(this.processingTime.getMillis()), Long.valueOf(instant.getMillis()));
        logInteraction("Advancing processing time to %d", Long.valueOf(instant.getMillis()));
        this.processingTime = instant;
        this.timerManager.advanceProcessingTime(this.triggerExecutor, instant);
    }

    public void injectElement(InputT inputt, Instant instant) throws Exception {
        Collection<W> assignWindows = this.windowFn.assignWindows(new StubAssignContext(this.windowFn, inputt, instant, Arrays.asList(GlobalWindow.INSTANCE)));
        logInteraction("Element %s at time %d put in windows %s", inputt, Long.valueOf(instant.getMillis()), assignWindows);
        this.triggerExecutor.onElement(WindowedValue.of(inputt, instant, (Collection<? extends BoundedWindow>) assignWindows));
    }

    public void doMerge() throws Exception {
        this.triggerExecutor.merge();
    }

    public void setTimer(W w, Instant instant, TimerManager.TimeDomain timeDomain, ExecutableTrigger<W> executableTrigger) throws CoderException {
        this.triggerExecutor.setTimer(new Trigger.TriggerId<>(w, executableTrigger.getTriggerIndex()), instant, timeDomain);
    }
}
