/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util.common.worker;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Operation;
import com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.StateSampler;
import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReadOperation
extends Operation {
    private static final Logger LOG = LoggerFactory.getLogger(ReadOperation.class);
    private static final long DEFAULT_PROGRESS_UPDATE_PERIOD_MS = TimeUnit.SECONDS.toMillis(1L);
    public final Reader<?> reader;
    final Counter<Long> byteCount;
    volatile Reader.ReaderIterator<?> readerIterator = null;
    private AtomicReference<Reader.Progress> progress = new AtomicReference();
    private long progressUpdatePeriodMs = DEFAULT_PROGRESS_UPDATE_PERIOD_MS;
    private AtomicBoolean isProgressUpdateRequested = new AtomicBoolean(true);

    public ReadOperation(String operationName, Reader<?> reader, OutputReceiver[] receivers, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
        super(operationName, receivers, counterPrefix, addCounterMutator, stateSampler, reader.getStateSamplerStateKind());
        this.reader = reader;
        this.byteCount = addCounterMutator.addCounter(Counter.longs(this.bytesCounterName(counterPrefix, operationName), Counter.AggregationKind.SUM));
        reader.addObserver(new ReaderObserver());
        reader.setStateSamplerAndOperationName(stateSampler, operationName);
    }

    ReadOperation(Reader<?> reader, OutputReceiver outputReceiver, String counterPrefix, CounterSet.AddCounterMutator addCounterMutator, StateSampler stateSampler) {
        this("ReadOperation", reader, new OutputReceiver[]{outputReceiver}, counterPrefix, addCounterMutator, stateSampler);
    }

    public void setProgressUpdatePeriodMs(long millis) {
        this.progressUpdatePeriodMs = millis;
    }

    protected String bytesCounterName(String counterPrefix, String operationName) {
        return String.valueOf(operationName).concat("-ByteCount");
    }

    public Reader<?> getReader() {
        return this.reader;
    }

    @Override
    public void start() throws Exception {
        try (StateSampler.ScopedState start = this.stateSampler.scopedState(this.startState);){
            assert (start != null);
            super.start();
            this.runReadLoop();
        }
    }

    @Override
    public boolean supportsRestart() {
        return this.reader.supportsRestart();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void runReadLoop() throws Exception {
        OutputReceiver receiver = this.receivers[0];
        if (receiver == null) {
            return;
        }
        try (StateSampler.ScopedState process = this.stateSampler.scopedState(this.processState);){
            assert (process != null);
            Object object = this.initializationStateLock;
            synchronized (object) {
                this.readerIterator = this.reader.iterator();
            }
            Thread updateRequester = null;
            if (this.progressUpdatePeriodMs > 0L) {
                updateRequester = new Thread(){

                    @Override
                    public void run() {
                        while (true) {
                            ReadOperation.this.isProgressUpdateRequested.set(true);
                            try {
                                Thread.sleep(ReadOperation.this.progressUpdatePeriodMs);
                            }
                            catch (InterruptedException e) {
                                return;
                            }
                        }
                    }
                };
                updateRequester.start();
            }
            try {
                this.setProgressFromIterator();
                while (this.readerIterator.hasNext()) {
                    Object value = this.readerIterator.next();
                    if (this.isProgressUpdateRequested.getAndSet(false) || this.progressUpdatePeriodMs == 0L) {
                        this.setProgressFromIterator();
                    }
                    receiver.process(value);
                }
                this.setProgressFromIterator();
            }
            finally {
                this.readerIterator.close();
                if (this.progressUpdatePeriodMs != 0L) {
                    updateRequester.interrupt();
                    updateRequester.join();
                }
            }
        }
    }

    private void setProgressFromIterator() {
        try {
            this.progress.set(this.readerIterator.getProgress());
        }
        catch (UnsupportedOperationException e) {
        }
        catch (Exception e) {
            LOG.warn("Progress estimation failed", (Throwable)e);
        }
    }

    public Reader.Progress getProgress() {
        return this.progress.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Reader.DynamicSplitResult requestDynamicSplit(Reader.DynamicSplitRequest splitRequest) {
        Object object = this.initializationStateLock;
        synchronized (object) {
            if (this.isFinished()) {
                LOG.warn("Iterator is in the Finished state, returning null stop position.");
                return null;
            }
            if (this.readerIterator == null) {
                LOG.warn("Iterator has not been initialized, refusing to split at {}", (Object)splitRequest);
                return null;
            }
            Reader.DynamicSplitResult result = this.readerIterator.requestDynamicSplit(splitRequest);
            if (result != null) {
                this.setProgressFromIterator();
            }
            return result;
        }
    }

    private class ReaderObserver
    implements Observer {
        private ReaderObserver() {
        }

        @Override
        public void update(Observable obs, Object obj) {
            Preconditions.checkArgument(obs == ReadOperation.this.reader, "unexpected observable");
            Preconditions.checkArgument(obj instanceof Long, "unexpected parameter object");
            ReadOperation.this.byteCount.addValue((long)((Long)obj));
        }
    }
}

