package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Ascii;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Objects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ForwardingFuture;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.SettableFuture;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader.class */
public class WindmillStateReader {
    private static final Logger LOG = LoggerFactory.getLogger(WindmillStateReader.class);
    private final String computation;
    private final ByteString key;
    private final long workToken;
    private final MetricTrackingWindmillServerStub metrics;
    private long bytesRead = 0;

    @VisibleForTesting
    ConcurrentLinkedQueue<StateTag> pendingLookups = new ConcurrentLinkedQueue<>();
    private ConcurrentHashMap<StateTag, Coder<?>> coders = new ConcurrentHashMap<>();
    private ConcurrentHashMap<StateTag, SettableFuture<?>> futures = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateReader$2, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$WindmillStateReader$StateTag$Kind = new int[StateTag.Kind.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$WindmillStateReader$StateTag$Kind[StateTag.Kind.LIST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$WindmillStateReader$StateTag$Kind[StateTag.Kind.WATERMARK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$WindmillStateReader$StateTag$Kind[StateTag.Kind.VALUE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader$StateTag.class */
    public static class StateTag {
        private final Kind kind;
        private final ByteString tag;
        private final String stateFamily;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/WindmillStateReader$StateTag$Kind.class */
        public enum Kind {
            VALUE,
            LIST,
            WATERMARK
        }

        private StateTag(Kind kind, ByteString byteString, String str) {
            this.kind = kind;
            this.tag = byteString;
            this.stateFamily = (String) Preconditions.checkNotNull(str);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof StateTag)) {
                return false;
            }
            StateTag stateTag = (StateTag) obj;
            return Objects.equal(this.kind, stateTag.kind) && Objects.equal(this.tag, stateTag.tag) && Objects.equal(this.stateFamily, stateTag.stateFamily);
        }

        public int hashCode() {
            return Objects.hashCode(this.kind, this.tag, this.stateFamily);
        }

        public String toString() {
            String valueOf = String.valueOf(this.kind);
            String valueOf2 = String.valueOf(this.tag.toStringUtf8());
            String str = this.stateFamily;
            return new StringBuilder(7 + String.valueOf(valueOf).length() + String.valueOf(valueOf2).length() + String.valueOf(str).length()).append("Tag(").append(valueOf).append(",").append(valueOf2).append(",").append(str).append(")").toString();
        }
    }

    public WindmillStateReader(MetricTrackingWindmillServerStub metricTrackingWindmillServerStub, String str, ByteString byteString, long j) {
        this.metrics = metricTrackingWindmillServerStub;
        this.computation = str;
        this.key = byteString;
        this.workToken = j;
    }

    private <T> Future<T> stateFuture(StateTag stateTag, Coder<?> coder) {
        SettableFuture<?> settableFuture = this.futures.get(stateTag);
        if (settableFuture == null) {
            settableFuture = SettableFuture.create();
            SettableFuture<?> putIfAbsent = this.futures.putIfAbsent(stateTag, settableFuture);
            if (putIfAbsent == null) {
                this.pendingLookups.add(stateTag);
                if (coder != null) {
                    this.coders.putIfAbsent(stateTag, coder);
                }
            } else {
                settableFuture = putIfAbsent;
            }
        }
        return wrappedFuture(settableFuture);
    }

    public Future<Instant> watermarkFuture(ByteString byteString, String str) {
        return stateFuture(new StateTag(StateTag.Kind.WATERMARK, byteString, str), null);
    }

    public <T> Future<T> valueFuture(ByteString byteString, String str, Coder<T> coder) {
        return stateFuture(new StateTag(StateTag.Kind.VALUE, byteString, str), coder);
    }

    public <T> Future<Iterable<T>> listFuture(ByteString byteString, String str, Coder<T> coder) {
        return stateFuture(new StateTag(StateTag.Kind.LIST, byteString, str), coder);
    }

    private <T> Future<T> wrappedFuture(final Future<T> future) {
        return future.isDone() ? future : new ForwardingFuture<T>() { // from class: com.google.cloud.dataflow.sdk.runners.worker.WindmillStateReader.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ForwardingFuture, com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ForwardingObject
            public Future<T> delegate() {
                return future;
            }

            @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ForwardingFuture, java.util.concurrent.Future
            public T get() throws InterruptedException, ExecutionException {
                if (!future.isDone()) {
                    WindmillStateReader.this.startBatchAndBlock();
                }
                return (T) super.get();
            }

            @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ForwardingFuture, java.util.concurrent.Future
            public T get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                if (!future.isDone()) {
                    WindmillStateReader.this.startBatchAndBlock();
                }
                return (T) super.get(j, timeUnit);
            }
        };
    }

    public void startBatchAndBlock() {
        StateTag poll;
        HashSet hashSet = new HashSet();
        while (!this.pendingLookups.isEmpty() && (poll = this.pendingLookups.poll()) != null) {
            if (!hashSet.add(poll)) {
                throw new IllegalStateException("Duplicate tags being fetched.");
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        Windmill.GetDataRequest createRequest = createRequest(hashSet);
        Windmill.GetDataResponse stateData = this.metrics.getStateData(createRequest);
        if (stateData == null) {
            String valueOf = String.valueOf(createRequest);
            throw new RuntimeException(new StringBuilder(48 + String.valueOf(valueOf).length()).append("Windmill unexpectedly returned null for request ").append(valueOf).toString());
        }
        consumeResponse(createRequest, stateData, hashSet);
    }

    public long getBytesRead() {
        return this.bytesRead;
    }

    private Windmill.GetDataRequest createRequest(Iterable<StateTag> iterable) {
        Windmill.GetDataRequest.Builder newBuilder = Windmill.GetDataRequest.newBuilder();
        Windmill.KeyedGetDataRequest.Builder workToken = newBuilder.addRequestsBuilder().setComputationId(this.computation).addRequestsBuilder().setKey(this.key).setWorkToken(this.workToken);
        for (StateTag stateTag : iterable) {
            switch (AnonymousClass2.$SwitchMap$com$google$cloud$dataflow$sdk$runners$worker$WindmillStateReader$StateTag$Kind[stateTag.kind.ordinal()]) {
                case 1:
                    workToken.addListsToFetchBuilder().setTag(stateTag.tag).setStateFamily(stateTag.stateFamily).setEndTimestamp(OffsetRangeTracker.OFFSET_INFINITY);
                    break;
                case 2:
                    workToken.addWatermarkHoldsToFetchBuilder().setTag(stateTag.tag).setStateFamily(stateTag.stateFamily);
                    break;
                case Ascii.ETX /* 3 */:
                    workToken.addValuesToFetchBuilder().setTag(stateTag.tag).setStateFamily(stateTag.stateFamily);
                    break;
                default:
                    String valueOf = String.valueOf(stateTag.kind);
                    throw new RuntimeException(new StringBuilder(31 + String.valueOf(valueOf).length()).append("Unknown kind of tag requested: ").append(valueOf).toString());
            }
        }
        return newBuilder.build();
    }

    private void consumeResponse(Windmill.GetDataRequest getDataRequest, Windmill.GetDataResponse getDataResponse, Set<StateTag> set) {
        if (getDataResponse.getDataCount() == 0) {
            String valueOf = String.valueOf(getDataRequest);
            throw new RuntimeException(new StringBuilder(39 + String.valueOf(valueOf).length()).append("No computation in response to request: ").append(valueOf).toString());
        }
        if (getDataResponse.getDataCount() > 1) {
            String valueOf2 = String.valueOf(getDataResponse.getDataList());
            throw new RuntimeException(new StringBuilder(55 + String.valueOf(valueOf2).length()).append("Expected exactly one computation in response, but got: ").append(valueOf2).toString());
        }
        Windmill.ComputationGetDataResponse data = getDataResponse.getData(0);
        if (!this.computation.equals(data.getComputationId())) {
            String str = this.computation;
            String valueOf3 = String.valueOf(data.getComputationId());
            throw new RuntimeException(new StringBuilder(39 + String.valueOf(str).length() + String.valueOf(valueOf3).length()).append("Expected data for computation ").append(str).append(" but was ").append(valueOf3).toString());
        }
        if (data.getDataCount() == 0) {
            String valueOf4 = String.valueOf(getDataRequest);
            throw new RuntimeException(new StringBuilder(31 + String.valueOf(valueOf4).length()).append("No key in response to request: ").append(valueOf4).toString());
        }
        if (data.getDataCount() > 1) {
            String valueOf5 = String.valueOf(data.getDataList());
            throw new RuntimeException(new StringBuilder(47 + String.valueOf(valueOf5).length()).append("Expected exactly one key in response, but was: ").append(valueOf5).toString());
        }
        Windmill.KeyedGetDataResponse data2 = data.getData(0);
        this.bytesRead += data2.getSerializedSize();
        if (data2.getFailed()) {
            StreamingDataflowWorker.KeyTokenInvalidException keyTokenInvalidException = new StreamingDataflowWorker.KeyTokenInvalidException(this.key.toStringUtf8());
            Iterator<StateTag> it = set.iterator();
            while (it.hasNext()) {
                this.futures.get(it.next()).setException(keyTokenInvalidException);
            }
            return;
        }
        if (!this.key.equals(data2.getKey())) {
            String valueOf6 = String.valueOf(this.key);
            String valueOf7 = String.valueOf(data2.getKey());
            throw new RuntimeException(new StringBuilder(31 + String.valueOf(valueOf6).length() + String.valueOf(valueOf7).length()).append("Expected data for key ").append(valueOf6).append(" but was ").append(valueOf7).toString());
        }
        for (Windmill.TagList tagList : data2.getListsList()) {
            StateTag stateTag = new StateTag(StateTag.Kind.LIST, tagList.getTag(), tagList.getStateFamily());
            if (!set.remove(stateTag)) {
                String valueOf8 = String.valueOf(stateTag);
                String valueOf9 = String.valueOf(set);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(valueOf8).length() + String.valueOf(valueOf9).length()).append("Received response for unrequested tag ").append(valueOf8).append(". Pending tags: ").append(valueOf9).toString());
            }
            consumeTagList(tagList, stateTag);
        }
        for (Windmill.WatermarkHold watermarkHold : data2.getWatermarkHoldsList()) {
            StateTag stateTag2 = new StateTag(StateTag.Kind.WATERMARK, watermarkHold.getTag(), watermarkHold.getStateFamily());
            if (!set.remove(stateTag2)) {
                String valueOf10 = String.valueOf(stateTag2);
                String valueOf11 = String.valueOf(set);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(valueOf10).length() + String.valueOf(valueOf11).length()).append("Received response for unrequested tag ").append(valueOf10).append(". Pending tags: ").append(valueOf11).toString());
            }
            consumeWatermark(watermarkHold, stateTag2);
        }
        for (Windmill.TagValue tagValue : data2.getValuesList()) {
            StateTag stateTag3 = new StateTag(StateTag.Kind.VALUE, tagValue.getTag(), tagValue.getStateFamily());
            if (!set.remove(stateTag3)) {
                String valueOf12 = String.valueOf(stateTag3);
                String valueOf13 = String.valueOf(set);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(valueOf12).length() + String.valueOf(valueOf13).length()).append("Received response for unrequested tag ").append(valueOf12).append(". Pending tags: ").append(valueOf13).toString());
            }
            consumeTagValue(tagValue, stateTag3);
        }
        if (set.isEmpty()) {
            return;
        }
        String valueOf14 = String.valueOf(set);
        throw new IllegalStateException(new StringBuilder(59 + String.valueOf(valueOf14).length()).append("Didn't receive responses for all pending fetches. Missing: ").append(valueOf14).toString());
    }

    private <T> void consumeTagList(Windmill.TagList tagList, StateTag stateTag) {
        SettableFuture<?> settableFuture = this.futures.get(stateTag);
        if (settableFuture == null) {
            String valueOf = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(valueOf).length()).append("Missing future for ").append(valueOf).toString());
        }
        if (settableFuture.isDone()) {
            LOG.error("Future for {} is already done", stateTag);
        }
        if (tagList.getValuesCount() == 0) {
            settableFuture.set(Collections.emptyList());
            return;
        }
        Coder<?> remove = this.coders.remove(stateTag);
        if (remove == null) {
            String valueOf2 = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(26 + String.valueOf(valueOf2).length()).append("Missing element coder for ").append(valueOf2).toString());
        }
        ArrayList arrayList = new ArrayList(tagList.getValuesCount());
        for (Windmill.Value value : tagList.getValuesList()) {
            if (value.hasData() && !value.getData().isEmpty()) {
                try {
                    arrayList.add(remove.decode(value.getData().substring(1).newInput(), Coder.Context.OUTER));
                } catch (IOException e) {
                    String valueOf3 = String.valueOf(remove);
                    throw new IllegalStateException(new StringBuilder(32 + String.valueOf(valueOf3).length()).append("Unable to decode tag list using ").append(valueOf3).toString(), e);
                }
            }
        }
        settableFuture.set(Collections.unmodifiableList(arrayList));
    }

    private void consumeWatermark(Windmill.WatermarkHold watermarkHold, StateTag stateTag) {
        SettableFuture<?> settableFuture = this.futures.get(stateTag);
        if (settableFuture == null) {
            String valueOf = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(valueOf).length()).append("Missing future for ").append(valueOf).toString());
        }
        if (settableFuture.isDone()) {
            LOG.error("Future for {} is already done", stateTag);
        }
        ReadableInstant readableInstant = null;
        Iterator it = watermarkHold.getTimestampsList().iterator();
        while (it.hasNext()) {
            ReadableInstant instant = new Instant(TimeUnit.MICROSECONDS.toMillis(((Long) it.next()).longValue()));
            if (readableInstant == null || instant.isBefore(readableInstant)) {
                readableInstant = instant;
            }
        }
        settableFuture.set(readableInstant);
    }

    private <T> void consumeTagValue(Windmill.TagValue tagValue, StateTag stateTag) {
        SettableFuture<?> settableFuture = this.futures.get(stateTag);
        if (settableFuture == null) {
            String valueOf = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(valueOf).length()).append("Missing future for ").append(valueOf).toString());
        }
        if (settableFuture.isDone()) {
            LOG.error("Future for {} is already done", stateTag);
        }
        Coder<?> remove = this.coders.remove(stateTag);
        if (remove == null) {
            String valueOf2 = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(18 + String.valueOf(valueOf2).length()).append("Missing coder for ").append(valueOf2).toString());
        }
        if (!tagValue.hasValue() || !tagValue.getValue().hasData() || tagValue.getValue().getData().isEmpty()) {
            settableFuture.set(null);
            return;
        }
        try {
            settableFuture.set(remove.decode(tagValue.getValue().getData().newInput(), Coder.Context.OUTER));
        } catch (IOException e) {
            String valueOf3 = String.valueOf(remove);
            throw new IllegalStateException(new StringBuilder(29 + String.valueOf(valueOf3).length()).append("Unable to decode value using ").append(valueOf3).toString(), e);
        }
    }
}
