/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import java.io.IOException;
import java.util.Collection;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;

class WindowingWindmillReader<T>
extends Reader<WindowedValue<KeyedWorkItem<T>>> {
    private final KvCoder<?, T> kvCoder;
    private final Coder<? extends BoundedWindow> windowCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    WindowingWindmillReader(Coder<WindowedValue<KeyedWorkItem<T>>> coder, StreamingModeExecutionContext context) {
        KvCoder kvCoder;
        WindowedValue.FullWindowedValueCoder inputCoder = (WindowedValue.FullWindowedValueCoder)coder;
        this.windowsCoder = inputCoder.getWindowsCoder();
        this.windowCoder = inputCoder.getWindowCoder();
        Coder elementCoder = ((KeyedWorkItem.KeyedWorkItemCoder)inputCoder.getValueCoder()).getElementCoder();
        if (!(elementCoder instanceof KvCoder)) {
            throw new IllegalArgumentException("WindowingWindmillReader only works with KvCoders.");
        }
        this.kvCoder = kvCoder = (KvCoder)elementCoder;
        this.context = context;
    }

    public static <T> WindowingWindmillReader<T> create(Coder<WindowedValue<KeyedWorkItem<T>>> coder, StreamingModeExecutionContext context) {
        return new WindowingWindmillReader<T>(coder, context);
    }

    @Override
    public Reader.ReaderIterator<WindowedValue<KeyedWorkItem<T>>> iterator() throws IOException {
        final Object key = this.kvCoder.getKeyCoder().decode(this.context.getSerializedKey().newInput(), Coder.Context.OUTER);
        final Windmill.WorkItem workItem = this.context.getWork();
        return new Reader.AbstractReaderIterator<WindowedValue<KeyedWorkItem<T>>>(){
            boolean consumed = false;

            @Override
            public boolean hasNext() throws IOException {
                return !this.consumed;
            }

            @Override
            public WindowedValue<KeyedWorkItem<T>> next() throws IOException, NoSuchElementException {
                this.consumed = true;
                return WindowedValue.valueInEmptyWindows(KeyedWorkItem.workItem(key, workItem, WindowingWindmillReader.this.windowCoder, WindowingWindmillReader.this.windowsCoder, WindowingWindmillReader.this.kvCoder.getValueCoder()));
            }
        };
    }

    @Override
    public boolean supportsRestart() {
        return true;
    }

    static class Factory
    implements ReaderFactory {
        Factory() {
        }

        @Override
        public Reader<?> create(CloudObject spec, @Nullable Coder<?> coder, @Nullable PipelineOptions options, @Nullable ExecutionContext context, @Nullable CounterSet.AddCounterMutator addCounterMutator, @Nullable String operationName) throws Exception {
            Coder typedCoder = coder;
            return WindowingWindmillReader.create(typedCoder, (StreamingModeExecutionContext)context);
        }
    }
}

