/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Objects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ForwardingFuture;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.SettableFuture;
import com.google.cloud.dataflow.sdk.runners.worker.MetricTrackingWindmillServerStub;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WindmillStateReader {
    private static final Logger LOG = LoggerFactory.getLogger(WindmillStateReader.class);
    private final String computation;
    private final ByteString key;
    private final long workToken;
    private final MetricTrackingWindmillServerStub metrics;
    private long bytesRead = 0L;
    @VisibleForTesting
    ConcurrentLinkedQueue<StateTag> pendingLookups = new ConcurrentLinkedQueue();
    private ConcurrentHashMap<StateTag, Coder<?>> coders = new ConcurrentHashMap();
    private ConcurrentHashMap<StateTag, SettableFuture<?>> futures = new ConcurrentHashMap();

    public WindmillStateReader(MetricTrackingWindmillServerStub metrics, String computation, ByteString key, long workToken) {
        this.metrics = metrics;
        this.computation = computation;
        this.key = key;
        this.workToken = workToken;
    }

    private <T> Future<T> stateFuture(StateTag tag, Coder<?> coder) {
        SettableFuture<Object> wildcardFuture = this.futures.get(tag);
        if (wildcardFuture == null) {
            wildcardFuture = SettableFuture.create();
            SettableFuture<?> old = this.futures.putIfAbsent(tag, wildcardFuture);
            if (old == null) {
                this.pendingLookups.add(tag);
                if (coder != null) {
                    this.coders.putIfAbsent(tag, coder);
                }
            } else {
                wildcardFuture = old;
            }
        }
        SettableFuture<?> typedFuture = wildcardFuture;
        return this.wrappedFuture(typedFuture);
    }

    public Future<Instant> watermarkFuture(ByteString encodedTag, String stateFamily) {
        return this.stateFuture(new StateTag(StateTag.Kind.WATERMARK, encodedTag, stateFamily), null);
    }

    public <T> Future<T> valueFuture(ByteString encodedTag, String stateFamily, Coder<T> coder) {
        return this.stateFuture(new StateTag(StateTag.Kind.VALUE, encodedTag, stateFamily), coder);
    }

    public <T> Future<Iterable<T>> listFuture(ByteString encodedTag, String stateFamily, Coder<T> elemCoder) {
        return this.stateFuture(new StateTag(StateTag.Kind.LIST, encodedTag, stateFamily), elemCoder);
    }

    private <T> Future<T> wrappedFuture(final Future<T> future) {
        if (future.isDone()) {
            return future;
        }
        return new ForwardingFuture<T>(){

            @Override
            protected Future<T> delegate() {
                return future;
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                if (!future.isDone()) {
                    WindmillStateReader.this.startBatchAndBlock();
                }
                return super.get();
            }

            @Override
            public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                if (!future.isDone()) {
                    WindmillStateReader.this.startBatchAndBlock();
                }
                return super.get(timeout, unit);
            }
        };
    }

    public void startBatchAndBlock() {
        StateTag tag;
        HashSet<StateTag> toFetch = new HashSet<StateTag>();
        while (!this.pendingLookups.isEmpty() && (tag = this.pendingLookups.poll()) != null) {
            if (toFetch.add(tag)) continue;
            throw new IllegalStateException("Duplicate tags being fetched.");
        }
        if (toFetch.isEmpty()) {
            return;
        }
        Windmill.GetDataRequest request = this.createRequest(toFetch);
        Windmill.GetDataResponse response = this.metrics.getStateData(request);
        if (response == null) {
            String string = String.valueOf(request);
            throw new RuntimeException(new StringBuilder(48 + String.valueOf(string).length()).append("Windmill unexpectedly returned null for request ").append(string).toString());
        }
        this.consumeResponse(request, response, toFetch);
    }

    public long getBytesRead() {
        return this.bytesRead;
    }

    private Windmill.GetDataRequest createRequest(Iterable<StateTag> toFetch) {
        Windmill.GetDataRequest.Builder request = Windmill.GetDataRequest.newBuilder();
        Windmill.KeyedGetDataRequest.Builder keyedDataBuilder = request.addRequestsBuilder().setComputationId(this.computation).addRequestsBuilder().setKey(this.key).setWorkToken(this.workToken);
        block5: for (StateTag tag : toFetch) {
            switch (tag.kind) {
                case LIST: {
                    keyedDataBuilder.addListsToFetchBuilder().setTag(tag.tag).setStateFamily(tag.stateFamily).setEndTimestamp(Long.MAX_VALUE);
                    continue block5;
                }
                case WATERMARK: {
                    keyedDataBuilder.addWatermarkHoldsToFetchBuilder().setTag(tag.tag).setStateFamily(tag.stateFamily);
                    continue block5;
                }
                case VALUE: {
                    keyedDataBuilder.addValuesToFetchBuilder().setTag(tag.tag).setStateFamily(tag.stateFamily);
                    continue block5;
                }
            }
            String string = String.valueOf((Object)tag.kind);
            throw new RuntimeException(new StringBuilder(31 + String.valueOf(string).length()).append("Unknown kind of tag requested: ").append(string).toString());
        }
        return request.build();
    }

    private void consumeResponse(Windmill.GetDataRequest request, Windmill.GetDataResponse getDataResponse, Set<StateTag> toFetch) {
        StateTag stateTag;
        if (getDataResponse.getDataCount() == 0) {
            String string = String.valueOf(request);
            throw new RuntimeException(new StringBuilder(39 + String.valueOf(string).length()).append("No computation in response to request: ").append(string).toString());
        }
        if (getDataResponse.getDataCount() > 1) {
            String string = String.valueOf(getDataResponse.getDataList());
            throw new RuntimeException(new StringBuilder(55 + String.valueOf(string).length()).append("Expected exactly one computation in response, but got: ").append(string).toString());
        }
        Windmill.ComputationGetDataResponse computationResponse = getDataResponse.getData(0);
        if (!this.computation.equals(computationResponse.getComputationId())) {
            String string = this.computation;
            String string2 = String.valueOf(computationResponse.getComputationId());
            throw new RuntimeException(new StringBuilder(39 + String.valueOf(string).length() + String.valueOf(string2).length()).append("Expected data for computation ").append(string).append(" but was ").append(string2).toString());
        }
        if (computationResponse.getDataCount() == 0) {
            String string = String.valueOf(request);
            throw new RuntimeException(new StringBuilder(31 + String.valueOf(string).length()).append("No key in response to request: ").append(string).toString());
        }
        if (computationResponse.getDataCount() > 1) {
            String string = String.valueOf(computationResponse.getDataList());
            throw new RuntimeException(new StringBuilder(47 + String.valueOf(string).length()).append("Expected exactly one key in response, but was: ").append(string).toString());
        }
        Windmill.KeyedGetDataResponse response = computationResponse.getData(0);
        this.bytesRead += (long)response.getSerializedSize();
        if (response.getFailed()) {
            StreamingDataflowWorker.KeyTokenInvalidException keyTokenInvalidException = new StreamingDataflowWorker.KeyTokenInvalidException(this.key.toStringUtf8());
            for (StateTag stateTag2 : toFetch) {
                this.futures.get(stateTag2).setException(keyTokenInvalidException);
            }
            return;
        }
        if (!this.key.equals((Object)response.getKey())) {
            String string = String.valueOf(this.key);
            String string3 = String.valueOf(response.getKey());
            throw new RuntimeException(new StringBuilder(31 + String.valueOf(string).length() + String.valueOf(string3).length()).append("Expected data for key ").append(string).append(" but was ").append(string3).toString());
        }
        for (Windmill.TagList list : response.getListsList()) {
            stateTag = new StateTag(StateTag.Kind.LIST, list.getTag(), list.getStateFamily());
            if (!toFetch.remove(stateTag)) {
                String string = String.valueOf(stateTag);
                String string4 = String.valueOf(toFetch);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(string).length() + String.valueOf(string4).length()).append("Received response for unrequested tag ").append(string).append(". Pending tags: ").append(string4).toString());
            }
            this.consumeTagList(list, stateTag);
        }
        for (Windmill.WatermarkHold hold : response.getWatermarkHoldsList()) {
            stateTag = new StateTag(StateTag.Kind.WATERMARK, hold.getTag(), hold.getStateFamily());
            if (!toFetch.remove(stateTag)) {
                String string = String.valueOf(stateTag);
                String string5 = String.valueOf(toFetch);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(string).length() + String.valueOf(string5).length()).append("Received response for unrequested tag ").append(string).append(". Pending tags: ").append(string5).toString());
            }
            this.consumeWatermark(hold, stateTag);
        }
        for (Windmill.TagValue value : response.getValuesList()) {
            stateTag = new StateTag(StateTag.Kind.VALUE, value.getTag(), value.getStateFamily());
            if (!toFetch.remove(stateTag)) {
                String string = String.valueOf(stateTag);
                String string6 = String.valueOf(toFetch);
                throw new IllegalStateException(new StringBuilder(54 + String.valueOf(string).length() + String.valueOf(string6).length()).append("Received response for unrequested tag ").append(string).append(". Pending tags: ").append(string6).toString());
            }
            this.consumeTagValue(value, stateTag);
        }
        if (!toFetch.isEmpty()) {
            String string = String.valueOf(toFetch);
            throw new IllegalStateException(new StringBuilder(59 + String.valueOf(string).length()).append("Didn't receive responses for all pending fetches. Missing: ").append(string).toString());
        }
    }

    private <T> void consumeTagList(Windmill.TagList list, StateTag stateTag) {
        SettableFuture<?> future = this.futures.get(stateTag);
        if (future == null) {
            String string = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(string).length()).append("Missing future for ").append(string).toString());
        }
        if (future.isDone()) {
            LOG.error("Future for {} is already done", (Object)stateTag);
        }
        if (list.getValuesCount() == 0) {
            future.set(Collections.emptyList());
            return;
        }
        Coder<?> elemCoder = this.coders.remove(stateTag);
        if (elemCoder == null) {
            String string = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(26 + String.valueOf(string).length()).append("Missing element coder for ").append(string).toString());
        }
        ArrayList valueList = new ArrayList(list.getValuesCount());
        for (Windmill.Value value : list.getValuesList()) {
            if (!value.hasData() || value.getData().isEmpty()) continue;
            InputStream inputStream = value.getData().substring(1).newInput();
            try {
                valueList.add(elemCoder.decode(inputStream, Coder.Context.OUTER));
            }
            catch (IOException e) {
                String string = String.valueOf(elemCoder);
                throw new IllegalStateException(new StringBuilder(32 + String.valueOf(string).length()).append("Unable to decode tag list using ").append(string).toString(), e);
            }
        }
        future.set(Collections.unmodifiableList(valueList));
    }

    private void consumeWatermark(Windmill.WatermarkHold watermarkHold, StateTag stateTag) {
        SettableFuture<?> future = this.futures.get(stateTag);
        if (future == null) {
            String string = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(string).length()).append("Missing future for ").append(string).toString());
        }
        if (future.isDone()) {
            LOG.error("Future for {} is already done", (Object)stateTag);
        }
        Instant hold = null;
        Iterator iterator = watermarkHold.getTimestampsList().iterator();
        while (iterator.hasNext()) {
            long timestamp = (Long)iterator.next();
            Instant instant = new Instant(TimeUnit.MICROSECONDS.toMillis(timestamp));
            if (hold != null && !instant.isBefore((ReadableInstant)hold)) continue;
            hold = instant;
        }
        future.set(hold);
    }

    private <T> void consumeTagValue(Windmill.TagValue tagValue, StateTag stateTag) {
        Coder<?> coder;
        SettableFuture<?> future = this.futures.get(stateTag);
        if (future == null) {
            String string = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(19 + String.valueOf(string).length()).append("Missing future for ").append(string).toString());
        }
        if (future.isDone()) {
            LOG.error("Future for {} is already done", (Object)stateTag);
        }
        if ((coder = this.coders.remove(stateTag)) == null) {
            String string = String.valueOf(stateTag);
            throw new IllegalStateException(new StringBuilder(18 + String.valueOf(string).length()).append("Missing coder for ").append(string).toString());
        }
        if (tagValue.hasValue() && tagValue.getValue().hasData() && !tagValue.getValue().getData().isEmpty()) {
            InputStream inputStream = tagValue.getValue().getData().newInput();
            try {
                Object value = coder.decode(inputStream, Coder.Context.OUTER);
                future.set(value);
            }
            catch (IOException e) {
                String string = String.valueOf(coder);
                throw new IllegalStateException(new StringBuilder(29 + String.valueOf(string).length()).append("Unable to decode value using ").append(string).toString(), e);
            }
        } else {
            future.set(null);
        }
    }

    private static class StateTag {
        private final Kind kind;
        private final ByteString tag;
        private final String stateFamily;

        private StateTag(Kind kind, ByteString tag, String stateFamily) {
            this.kind = kind;
            this.tag = tag;
            this.stateFamily = Preconditions.checkNotNull(stateFamily);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof StateTag)) {
                return false;
            }
            StateTag that = (StateTag)obj;
            return Objects.equal((Object)this.kind, (Object)that.kind) && Objects.equal(this.tag, that.tag) && Objects.equal(this.stateFamily, that.stateFamily);
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{this.kind, this.tag, this.stateFamily});
        }

        public String toString() {
            String string = String.valueOf((Object)this.kind);
            String string2 = String.valueOf(this.tag.toStringUtf8());
            String string3 = this.stateFamily;
            return new StringBuilder(7 + String.valueOf(string).length() + String.valueOf(string2).length() + String.valueOf(string3).length()).append("Tag(").append(string).append(",").append(string2).append(",").append(string3).append(")").toString();
        }

        private static enum Kind {
            VALUE,
            LIST,
            WATERMARK;

        }
    }
}

