package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Cache;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Weigher;
import com.google.cloud.dataflow.sdk.runners.worker.MetricTrackingWindmillServerStub;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StateFetcher.class */
public class StateFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(StateFetcher.class);
    private Cache<SideInputId, SideInputCacheEntry> sideInputCache;
    private MetricTrackingWindmillServerStub server;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StateFetcher$SideInputCacheEntry.class */
    public static class SideInputCacheEntry {
        public final Object value;
        public final int encodedSize;

        public SideInputCacheEntry(Object obj, int i) {
            this.value = obj;
            this.encodedSize = i;
        }

        public static SideInputCacheEntry notReady() {
            return new SideInputCacheEntry(null, 0);
        }

        public boolean isReady() {
            return this.value != null;
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StateFetcher$SideInputId.class */
    static class SideInputId {
        private final TupleTag<?> tag;
        private final BoundedWindow window;

        public SideInputId(TupleTag<?> tupleTag, BoundedWindow boundedWindow) {
            this.tag = tupleTag;
            this.window = boundedWindow;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof SideInputId)) {
                return false;
            }
            SideInputId sideInputId = (SideInputId) obj;
            return this.tag.equals(sideInputId.tag) && this.window.equals(sideInputId.window);
        }

        public int hashCode() {
            return Objects.hash(this.tag, this.window);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/util/StateFetcher$SideInputState.class */
    public enum SideInputState {
        CACHED_IN_WORKITEM,
        KNOWN_READY,
        UNKNOWN
    }

    public StateFetcher(MetricTrackingWindmillServerStub metricTrackingWindmillServerStub) {
        this(metricTrackingWindmillServerStub, CacheBuilder.newBuilder().maximumWeight(100000000L).expireAfterWrite(1L, TimeUnit.MINUTES).weigher(new Weigher<SideInputId, SideInputCacheEntry>() { // from class: com.google.cloud.dataflow.sdk.util.StateFetcher.1
            @Override // com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Weigher
            public int weigh(SideInputId sideInputId, SideInputCacheEntry sideInputCacheEntry) {
                return sideInputCacheEntry.encodedSize;
            }
        }).build());
    }

    public StateFetcher(MetricTrackingWindmillServerStub metricTrackingWindmillServerStub, Cache<SideInputId, SideInputCacheEntry> cache) {
        this.server = metricTrackingWindmillServerStub;
        this.sideInputCache = cache;
    }

    public <T, SideWindowT extends BoundedWindow> T fetchSideInput(final PCollectionView<T> pCollectionView, final SideWindowT sidewindowt, final String str, SideInputState sideInputState) {
        SideInputCacheEntry ifPresent;
        SideInputId sideInputId = new SideInputId(pCollectionView.getTagInternal(), sidewindowt);
        Callable<SideInputCacheEntry> callable = new Callable<SideInputCacheEntry>() { // from class: com.google.cloud.dataflow.sdk.util.StateFetcher.2
            /* JADX WARN: Can't rename method to resolve collision */
            /* JADX WARN: Multi-variable type inference failed */
            /* JADX WARN: Type inference failed for: r0v31, types: [java.lang.Iterable] */
            @Override // java.util.concurrent.Callable
            public SideInputCacheEntry call() throws Exception {
                WindowingStrategy<?, ?> windowingStrategyInternal = pCollectionView.getWindowingStrategyInternal();
                Coder<?> windowCoder = windowingStrategyInternal.getWindowFn().windowCoder();
                ByteString.Output newOutput = ByteString.newOutput();
                windowCoder.encode(sidewindowt, newOutput, Coder.Context.OUTER);
                Windmill.GlobalDataRequest build = Windmill.GlobalDataRequest.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(pCollectionView.getTagInternal().getId()).setVersion(newOutput.toByteString()).build()).setStateFamily(str).setExistenceWatermarkDeadline(TimeUnit.MILLISECONDS.toMicros(windowingStrategyInternal.getTrigger().getSpec().getWatermarkThatGuaranteesFiring(sidewindowt).getMillis())).build();
                Windmill.GlobalData globalData = StateFetcher.this.server.getSideInputData(Windmill.GetDataRequest.newBuilder().addGlobalDataFetchRequests(build).addGlobalDataToFetch(build.getDataId()).build()).getGlobalData(0);
                if (globalData.getIsReady()) {
                    return new SideInputCacheEntry(pCollectionView.fromIterableInternal(globalData.getData().size() > 0 ? (Iterable) pCollectionView.getCoderInternal().decode(globalData.getData().newInput(), Coder.Context.OUTER) : Collections.emptyList()), globalData.getData().size());
                }
                return SideInputCacheEntry.notReady();
            }
        };
        try {
            if (sideInputState == SideInputState.KNOWN_READY && (ifPresent = this.sideInputCache.getIfPresent(sideInputId)) != null) {
                if (ifPresent.isReady()) {
                    return (T) ifPresent.value;
                }
                synchronized (ifPresent) {
                    SideInputCacheEntry ifPresent2 = this.sideInputCache.getIfPresent(sideInputId);
                    if (ifPresent2 != null && !ifPresent2.isReady()) {
                        this.sideInputCache.invalidate(sideInputId);
                    }
                }
                return (T) this.sideInputCache.get(sideInputId, callable).value;
            }
            return (T) this.sideInputCache.get(sideInputId, callable).value;
        } catch (Exception e) {
            LOG.error("Fetch failed: ", e);
            throw new RuntimeException("Exception while fetching side input: ", e);
        }
    }
}
