/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Cache;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.CacheBuilder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.cache.Weigher;
import com.google.cloud.dataflow.sdk.runners.worker.MetricTrackingWindmillServerStub;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.protobuf.ByteString;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StateFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(StateFetcher.class);
    private Cache<SideInputId, SideInputCacheEntry> sideInputCache;
    private MetricTrackingWindmillServerStub server;
    private long bytesRead = 0L;

    public StateFetcher(MetricTrackingWindmillServerStub server) {
        this(server, CacheBuilder.newBuilder().maximumWeight(100000000L).expireAfterWrite(1L, TimeUnit.MINUTES).weigher(new Weigher<SideInputId, SideInputCacheEntry>(){

            @Override
            public int weigh(SideInputId id, SideInputCacheEntry entry) {
                return entry.encodedSize;
            }
        }).build());
    }

    public StateFetcher(MetricTrackingWindmillServerStub server, Cache<SideInputId, SideInputCacheEntry> sideInputCache) {
        this.server = server;
        this.sideInputCache = sideInputCache;
    }

    public StateFetcher byteTrackingView() {
        return new StateFetcher(this.server, this.sideInputCache);
    }

    public long getBytesRead() {
        return this.bytesRead;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T, SideWindowT extends BoundedWindow> T fetchSideInput(final PCollectionView<T> view, final SideWindowT sideWindow, final String stateFamily, SideInputState state, final Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
        SideInputId id = new SideInputId(view.getTagInternal(), sideWindow);
        Callable<SideInputCacheEntry> fetchCallable = new Callable<SideInputCacheEntry>(){

            @Override
            public SideInputCacheEntry call() throws Exception {
                Windmill.GetDataResponse response;
                WindowingStrategy<?, ?> sideWindowStrategy = view.getWindowingStrategyInternal();
                Coder<?> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();
                ByteString.Output windowStream = ByteString.newOutput();
                windowCoder.encode(sideWindow, (OutputStream)windowStream, Coder.Context.OUTER);
                Windmill.GlobalDataRequest request = Windmill.GlobalDataRequest.newBuilder().setDataId(Windmill.GlobalDataId.newBuilder().setTag(view.getTagInternal().getId()).setVersion(windowStream.toByteString()).build()).setStateFamily(stateFamily).setExistenceWatermarkDeadline(TimeUnit.MILLISECONDS.toMicros(sideWindowStrategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring(sideWindow).getMillis())).build();
                try (StateSampler.ScopedState scope = (StateSampler.ScopedState)scopedReadStateSupplier.get();){
                    response = StateFetcher.this.server.getSideInputData(Windmill.GetDataRequest.newBuilder().addGlobalDataFetchRequests(request).addGlobalDataToFetch(request.getDataId()).build());
                }
                Windmill.GlobalData data = response.getGlobalData(0);
                StateFetcher.this.bytesRead = StateFetcher.this.bytesRead + (long)data.getSerializedSize();
                if (data.getIsReady()) {
                    Iterable<WindowedValue<?>> rawData = data.getData().size() > 0 ? view.getCoderInternal().decode(data.getData().newInput(), Coder.Context.OUTER) : Collections.emptyList();
                    return new SideInputCacheEntry(view.fromIterableInternal(rawData), data.getData().size());
                }
                return SideInputCacheEntry.notReady();
            }
        };
        try {
            if (state == SideInputState.KNOWN_READY) {
                SideInputCacheEntry entry = this.sideInputCache.getIfPresent(id);
                if (entry == null) {
                    return (T)this.sideInputCache.get((SideInputId)id, (Callable<SideInputCacheEntry>)fetchCallable).value;
                }
                if (!entry.isReady()) {
                    SideInputCacheEntry sideInputCacheEntry = entry;
                    synchronized (sideInputCacheEntry) {
                        SideInputCacheEntry newEntry = this.sideInputCache.getIfPresent(id);
                        if (newEntry != null && !newEntry.isReady()) {
                            this.sideInputCache.invalidate(id);
                        }
                    }
                    return (T)this.sideInputCache.get((SideInputId)id, (Callable<SideInputCacheEntry>)fetchCallable).value;
                }
                return (T)entry.value;
            }
            return (T)this.sideInputCache.get((SideInputId)id, (Callable<SideInputCacheEntry>)fetchCallable).value;
        }
        catch (Exception e) {
            LOG.error("Fetch failed: ", (Throwable)e);
            throw new RuntimeException("Exception while fetching side input: ", e);
        }
    }

    static class SideInputCacheEntry {
        public final Object value;
        public final int encodedSize;

        public SideInputCacheEntry(Object value, int encodedSize) {
            this.value = value;
            this.encodedSize = encodedSize;
        }

        public static SideInputCacheEntry notReady() {
            return new SideInputCacheEntry(null, 0);
        }

        public boolean isReady() {
            return this.value != null;
        }
    }

    static class SideInputId {
        private final TupleTag<?> tag;
        private final BoundedWindow window;

        public SideInputId(TupleTag<?> tag, BoundedWindow window) {
            this.tag = tag;
            this.window = window;
        }

        public boolean equals(Object other) {
            if (other instanceof SideInputId) {
                SideInputId otherId = (SideInputId)other;
                return this.tag.equals(otherId.tag) && this.window.equals(otherId.window);
            }
            return false;
        }

        public int hashCode() {
            return Objects.hash(this.tag, this.window);
        }
    }

    public static enum SideInputState {
        CACHED_IN_WORKITEM,
        KNOWN_READY,
        UNKNOWN;

    }
}

