/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.services.dataflow.model.SideInputInfo;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Maps;
import com.google.cloud.dataflow.sdk.runners.worker.SideInputUtils;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.DirectSideInputReader;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.PTuple;
import com.google.cloud.dataflow.sdk.util.SideInputReader;
import com.google.cloud.dataflow.sdk.util.Sized;
import com.google.cloud.dataflow.sdk.util.SizedSideInputReader;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.util.Map;
import java.util.Observable;
import java.util.Observer;

public class DataflowSideInputReader
extends SizedSideInputReader.Defaults
implements SizedSideInputReader {
    private final Map<TupleTag<Object>, ByteSizeObserver> observers = Maps.newHashMap();
    private final Map<TupleTag<Object>, Long> overheads = Maps.newHashMap();
    private final SideInputReader subReader;

    private DataflowSideInputReader(Iterable<? extends SideInputInfo> sideInputInfos, PipelineOptions options, ExecutionContext executionContext) throws Exception {
        PTuple sideInputValues = PTuple.empty();
        for (SideInputInfo sideInputInfo : sideInputInfos) {
            TupleTag tag = new TupleTag(sideInputInfo.getTag());
            ByteSizeObserver observer = new ByteSizeObserver();
            Object sideInputValue = SideInputUtils.readSideInput(options, sideInputInfo, observer, executionContext);
            this.overheads.put(tag, observer.getBytes());
            observer.reset();
            this.observers.put(tag, observer);
            sideInputValues = sideInputValues.and(tag, sideInputValue);
        }
        this.subReader = DirectSideInputReader.of(sideInputValues);
    }

    public static DataflowSideInputReader of(Iterable<? extends SideInputInfo> sideInputInfos, PipelineOptions options, ExecutionContext context) throws Exception {
        return new DataflowSideInputReader(sideInputInfos, options, context);
    }

    @Override
    public <T> boolean contains(PCollectionView<T> view) {
        return this.subReader.contains(view);
    }

    @Override
    public boolean isEmpty() {
        return this.subReader.isEmpty();
    }

    @Override
    public <T> Sized<T> getSized(PCollectionView<T> view, BoundedWindow window) {
        T value = this.subReader.get(view, window);
        TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
        ByteSizeObserver observer = this.observers.get(tag);
        long overhead = this.overheads.get(tag);
        long bytesRead = observer.getBytes();
        observer.reset();
        return Sized.of(value, overhead + bytesRead);
    }

    private static class ByteSizeObserver
    implements Observer {
        private long byteCount = 0L;

        private ByteSizeObserver() {
        }

        @Override
        public void update(Observable reader, Object obj) {
            Preconditions.checkArgument(obj instanceof Long, "unexpected parameter object");
            this.byteCount += ((Long)obj).longValue();
        }

        public void reset() {
            this.byteCount = 0L;
        }

        public long getBytes() {
            return this.byteCount;
        }
    }
}

