/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Supplier;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillStateReader;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.CombiningValueStateInternal;
import com.google.cloud.dataflow.sdk.util.state.MergingStateInternals;
import com.google.cloud.dataflow.sdk.util.state.State;
import com.google.cloud.dataflow.sdk.util.state.StateContents;
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.cloud.dataflow.sdk.util.state.StateTable;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.ValueState;
import com.google.cloud.dataflow.sdk.util.state.WatermarkStateInternal;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class WindmillStateInternals
extends MergingStateInternals {
    private final StateTable inMemoryState = new StateTable(){

        @Override
        protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace) {
            return new StateTag.StateBinder(){

                @Override
                public <T> BagState<T> bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
                    return new WindmillBag(WindmillStateInternals.this.encodeKey(namespace, address), WindmillStateInternals.this.stateFamily, elemCoder, WindmillStateInternals.this.reader, WindmillStateInternals.this.scopedReadStateSupplier);
                }

                @Override
                public <T> WatermarkStateInternal bindWatermark(StateTag<WatermarkStateInternal> address) {
                    return new WindmillWatermarkState(WindmillStateInternals.this.encodeKey(namespace, address), WindmillStateInternals.this.stateFamily, WindmillStateInternals.this.reader, WindmillStateInternals.this.scopedReadStateSupplier);
                }

                @Override
                public <InputT, AccumT, OutputT> CombiningValueStateInternal<InputT, AccumT, OutputT> bindCombiningValue(StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> address, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
                    return new WindmillCombiningValue(WindmillStateInternals.this.encodeKey(namespace, address), WindmillStateInternals.this.stateFamily, accumCoder, combineFn, WindmillStateInternals.this.reader, WindmillStateInternals.this.scopedReadStateSupplier);
                }

                @Override
                public <T> ValueState<T> bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
                    return new WindmillValue(WindmillStateInternals.this.encodeKey(namespace, address), WindmillStateInternals.this.stateFamily, coder, WindmillStateInternals.this.reader, WindmillStateInternals.this.scopedReadStateSupplier);
                }
            };
        }
    };
    private final String prefix;
    private final String stateFamily;
    private final WindmillStateReader reader;
    private final boolean useStateFamilies;
    private final Supplier<StateSampler.ScopedState> scopedReadStateSupplier;

    public WindmillStateInternals(String prefix, boolean useStateFamilies, WindmillStateReader reader, Supplier<StateSampler.ScopedState> scopedReadStateSupplier) {
        this.prefix = prefix;
        this.stateFamily = useStateFamilies ? prefix : "";
        this.reader = reader;
        this.useStateFamilies = useStateFamilies;
        this.scopedReadStateSupplier = scopedReadStateSupplier;
    }

    public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) {
        for (State location : this.inMemoryState.values()) {
            if (!(location instanceof WindmillState)) {
                throw new IllegalStateException(String.format("%s wasn't created by %s -- unable to persist it", location.getClass().getSimpleName(), this.getClass().getSimpleName()));
            }
            try {
                ((WindmillState)((Object)location)).persist(commitBuilder);
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to persist state", e);
            }
        }
        this.reader.startBatchAndBlock();
        this.inMemoryState.clear();
    }

    private ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
        if (this.useStateFamilies) {
            return ByteString.copyFromUtf8((String)String.format("%s+%s", namespace.stringKey(), address.getId()));
        }
        return ByteString.copyFromUtf8((String)String.format("%s%s+%s", this.prefix, namespace.stringKey(), address.getId()));
    }

    @Override
    public <T extends State> T state(StateNamespace namespace, StateTag<T> address) {
        return this.inMemoryState.get(namespace, address);
    }

    private static class WindmillCombiningValue<InputT, AccumT, OutputT>
    implements CombiningValueStateInternal<InputT, AccumT, OutputT>,
    WindmillState {
        private final WindmillBag<AccumT> bag;
        private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
        private AccumT localAdditionsAccum;
        private boolean hasLocalAdditions = false;

        private WindmillCombiningValue(ByteString stateKey, String stateFamily, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindmillStateReader reader, Supplier<StateSampler.ScopedState> readStateSupplier) {
            this.bag = new WindmillBag(stateKey, stateFamily, accumCoder, reader, readStateSupplier);
            this.combineFn = combineFn;
            this.localAdditionsAccum = combineFn.createAccumulator();
        }

        @Override
        public StateContents<OutputT> get() {
            final StateContents<AccumT> accum = this.getAccum();
            return new StateContents<OutputT>(){

                @Override
                public OutputT read() {
                    return WindmillCombiningValue.this.combineFn.extractOutput(accum.read());
                }
            };
        }

        @Override
        public void add(InputT input) {
            this.hasLocalAdditions = true;
            this.localAdditionsAccum = this.combineFn.addInput(this.localAdditionsAccum, input);
        }

        @Override
        public void clear() {
            this.bag.clear();
            this.localAdditionsAccum = this.combineFn.createAccumulator();
            this.hasLocalAdditions = false;
        }

        @Override
        public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) throws IOException {
            if (this.hasLocalAdditions) {
                this.bag.add(this.localAdditionsAccum);
                this.localAdditionsAccum = this.combineFn.createAccumulator();
                this.hasLocalAdditions = false;
            }
            this.bag.persist(commitBuilder);
        }

        @Override
        public StateContents<AccumT> getAccum() {
            final StateContents<Iterable<AccumT>> future = this.bag.get();
            return new StateContents<AccumT>(){

                @Override
                public AccumT read() {
                    Iterable<Object> accums = Iterables.concat((Iterable)future.read(), Collections.singleton(WindmillCombiningValue.this.localAdditionsAccum));
                    Object merged = WindmillCombiningValue.this.combineFn.mergeAccumulators(accums);
                    WindmillCombiningValue.this.bag.clear();
                    WindmillCombiningValue.this.localAdditionsAccum = merged;
                    WindmillCombiningValue.this.hasLocalAdditions = true;
                    return merged;
                }
            };
        }

        @Override
        public StateContents<Boolean> isEmpty() {
            final StateContents<Boolean> isEmptyFuture = this.bag.isEmpty();
            return new StateContents<Boolean>(){

                @Override
                public Boolean read() {
                    return !WindmillCombiningValue.this.hasLocalAdditions && (Boolean)isEmptyFuture.read() != false;
                }
            };
        }

        @Override
        public void addAccum(AccumT accum) {
            this.hasLocalAdditions = true;
            this.localAdditionsAccum = this.combineFn.mergeAccumulators(Arrays.asList(this.localAdditionsAccum, accum));
        }
    }

    private static class WindmillWatermarkState
    implements WatermarkStateInternal,
    WindmillState {
        private final ByteString stateKey;
        private final String stateFamily;
        private final WindmillStateReader reader;
        private final Supplier<StateSampler.ScopedState> readStateSupplier;
        private boolean cleared = false;
        private Instant localAdditions = null;

        private WindmillWatermarkState(ByteString stateKey, String stateFamily, WindmillStateReader reader, Supplier<StateSampler.ScopedState> readStateSupplier) {
            this.stateKey = stateKey;
            this.stateFamily = stateFamily;
            this.reader = reader;
            this.readStateSupplier = readStateSupplier;
        }

        @Override
        public void clear() {
            this.cleared = true;
            this.localAdditions = null;
        }

        @Override
        public StateContents<Instant> get() {
            final ListenableFuture<Object> persistedData = this.cleared ? Futures.immediateFuture(null) : this.reader.watermarkFuture(this.stateKey, this.stateFamily);
            return new StateContents<Instant>(){

                @Override
                public Instant read() {
                    Instant value = WindmillWatermarkState.this.localAdditions;
                    if (!WindmillWatermarkState.this.cleared) {
                        try (StateSampler.ScopedState scope = (StateSampler.ScopedState)WindmillWatermarkState.this.readStateSupplier.get();){
                            Instant persisted = (Instant)persistedData.get();
                            if (value == null || persisted != null && persisted.isBefore((ReadableInstant)value)) {
                                value = persisted;
                            }
                        }
                        catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException("Unable to read state", e);
                        }
                    }
                    return value;
                }
            };
        }

        @Override
        public StateContents<Boolean> isEmpty() {
            final ListenableFuture<Object> persistedData = this.cleared ? Futures.immediateFuture(null) : this.reader.watermarkFuture(this.stateKey, this.stateFamily);
            return new StateContents<Boolean>(){

                /*
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public Boolean read() {
                    try (StateSampler.ScopedState scope = (StateSampler.ScopedState)WindmillWatermarkState.this.readStateSupplier.get();){
                        Boolean bl = WindmillWatermarkState.this.localAdditions == null && (WindmillWatermarkState.this.cleared || persistedData.get() == null);
                        return bl;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read state", e);
                    }
                }
            };
        }

        @Override
        public void add(Instant watermarkHold) {
            if (this.localAdditions == null || watermarkHold.isBefore((ReadableInstant)this.localAdditions)) {
                this.localAdditions = watermarkHold;
            }
        }

        @Override
        public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) {
            if (this.cleared) {
                this.reader.watermarkFuture(this.stateKey, this.stateFamily);
                commitBuilder.addWatermarkHoldsBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).setReset(true);
            }
            if (this.localAdditions != null) {
                commitBuilder.addWatermarkHoldsBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).addTimestamps(TimeUnit.MILLISECONDS.toMicros(this.localAdditions.getMillis()));
            }
        }
    }

    private static class WindmillBag<T>
    implements BagState<T>,
    WindmillState {
        private final ByteString stateKey;
        private final String stateFamily;
        private final Coder<T> elemCoder;
        private final WindmillStateReader reader;
        private final Supplier<StateSampler.ScopedState> readStateSupplier;
        private boolean cleared = false;
        private final List<T> localAdditions = new ArrayList<T>();

        private WindmillBag(ByteString stateKey, String stateFamily, Coder<T> elemCoder, WindmillStateReader reader, Supplier<StateSampler.ScopedState> readStateSupplier) {
            this.stateKey = stateKey;
            this.stateFamily = stateFamily;
            this.elemCoder = elemCoder;
            this.reader = reader;
            this.readStateSupplier = readStateSupplier;
        }

        @Override
        public void clear() {
            this.cleared = true;
            this.localAdditions.clear();
        }

        @Override
        public StateContents<Iterable<T>> get() {
            final ListenableFuture persistedData = this.cleared ? Futures.immediateFuture(Collections.emptyList()) : this.reader.listFuture(this.stateKey, this.stateFamily, this.elemCoder);
            return new StateContents<Iterable<T>>(){

                /*
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public Iterable<T> read() {
                    try (StateSampler.ScopedState scope = (StateSampler.ScopedState)WindmillBag.this.readStateSupplier.get();){
                        List input = WindmillBag.this.cleared ? Collections.emptyList() : (Iterable)persistedData.get();
                        Iterable iterable = Iterables.concat(input, WindmillBag.this.localAdditions);
                        return iterable;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read state", e);
                    }
                }
            };
        }

        @Override
        public StateContents<Boolean> isEmpty() {
            final ListenableFuture persistedData = this.cleared ? Futures.immediateFuture(Collections.emptyList()) : this.reader.listFuture(this.stateKey, this.stateFamily, this.elemCoder);
            return new StateContents<Boolean>(){

                /*
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public Boolean read() {
                    try (StateSampler.ScopedState scope = (StateSampler.ScopedState)WindmillBag.this.readStateSupplier.get();){
                        List input = WindmillBag.this.cleared ? Collections.emptyList() : (Iterable)persistedData.get();
                        Boolean bl = Iterables.isEmpty(input) && Iterables.isEmpty(WindmillBag.this.localAdditions);
                        return bl;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read state", e);
                    }
                }
            };
        }

        @Override
        public void add(T input) {
            this.localAdditions.add(input);
        }

        @Override
        public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) throws IOException {
            if (this.cleared) {
                this.reader.listFuture(this.stateKey, this.stateFamily, this.elemCoder);
                commitBuilder.addListUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).setEndTimestamp(Long.MAX_VALUE);
            }
            if (!this.localAdditions.isEmpty()) {
                byte[] zero = new byte[]{0};
                Windmill.TagList.Builder listUpdatesBuilder = commitBuilder.addListUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily);
                for (T value : this.localAdditions) {
                    ByteString.Output stream = ByteString.newOutput();
                    stream.write(zero);
                    this.elemCoder.encode(value, (OutputStream)stream, Coder.Context.OUTER);
                    listUpdatesBuilder.addValuesBuilder().setData(stream.toByteString()).setTimestamp(Long.MAX_VALUE);
                }
            }
        }
    }

    private static class WindmillValue<T>
    implements ValueState<T>,
    WindmillState {
        private final ByteString stateKey;
        private final String stateFamily;
        private final Coder<T> coder;
        private final WindmillStateReader reader;
        private final Supplier<StateSampler.ScopedState> readStateSupplier;
        private boolean modified = false;
        private T modifiedValue;

        private WindmillValue(ByteString stateKey, String stateFamily, Coder<T> coder, WindmillStateReader reader, Supplier<StateSampler.ScopedState> readStateSupplier) {
            this.stateKey = stateKey;
            this.stateFamily = stateFamily;
            this.coder = coder;
            this.reader = reader;
            this.readStateSupplier = readStateSupplier;
        }

        @Override
        public void clear() {
            this.modified = true;
            this.modifiedValue = null;
        }

        @Override
        public StateContents<T> get() {
            final Future<T> future = this.modified ? null : this.reader.valueFuture(this.stateKey, this.stateFamily, this.coder);
            return new StateContents<T>(){

                /*
                 * Enabled aggressive block sorting
                 * Enabled unnecessary exception pruning
                 * Enabled aggressive exception aggregation
                 */
                @Override
                public T read() {
                    try (StateSampler.ScopedState scope = (StateSampler.ScopedState)WindmillValue.this.readStateSupplier.get();){
                        Object object = WindmillValue.this.modified ? WindmillValue.this.modifiedValue : future.get();
                        return object;
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException("Unable to read value from state", e);
                    }
                }
            };
        }

        @Override
        public void set(T value) {
            this.modified = true;
            this.modifiedValue = value;
        }

        @Override
        public void persist(Windmill.WorkItemCommitRequest.Builder commitBuilder) throws IOException {
            if (!this.modified) {
                return;
            }
            this.reader.valueFuture(this.stateKey, this.stateFamily, this.coder);
            ByteString.Output stream = ByteString.newOutput();
            if (this.modifiedValue != null) {
                this.coder.encode(this.modifiedValue, (OutputStream)stream, Coder.Context.OUTER);
            }
            commitBuilder.addValueUpdatesBuilder().setTag(this.stateKey).setStateFamily(this.stateFamily).getValueBuilder().setData(stream.toByteString()).setTimestamp(Long.MAX_VALUE);
        }
    }

    private static interface WindmillState {
        public void persist(Windmill.WorkItemCommitRequest.Builder var1) throws IOException;
    }
}

