/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.worker;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Predicate;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Predicates;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.FluentIterable;
import com.google.cloud.dataflow.sdk.runners.worker.WindmillSink;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.ElementByteSizeObserver;
import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

public class KeyedWorkItem<ElemT> {
    private static final Predicate<Windmill.Timer> IS_WATERMARK = new Predicate<Windmill.Timer>(){

        @Override
        public boolean apply(Windmill.Timer input) {
            return input.getType() == Windmill.Timer.Type.WATERMARK;
        }
    };
    private final Windmill.WorkItem workItem;
    private final Object key;
    private final transient Coder<? extends BoundedWindow> windowCoder;
    private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private final transient Coder<ElemT> valueCoder;

    public static <ElemT> KeyedWorkItem<ElemT> workItem(Object key, Windmill.WorkItem workItem, Coder<? extends BoundedWindow> windowCoder, Coder<Collection<? extends BoundedWindow>> windowsCoder, Coder<ElemT> valueCoder) {
        return new KeyedWorkItem<ElemT>(key, workItem, windowCoder, windowsCoder, valueCoder);
    }

    public Object key() {
        return this.key;
    }

    public Iterable<TimerInternals.TimerData> timersIterable() {
        FluentIterable<Windmill.Timer> allTimers = FluentIterable.from(this.workItem.getTimers().getTimersList());
        FluentIterable<Windmill.Timer> eventTimers = allTimers.filter(IS_WATERMARK);
        FluentIterable<Windmill.Timer> nonEventTimers = allTimers.filter(Predicates.not(IS_WATERMARK));
        return eventTimers.append((Iterable<Windmill.Timer>)nonEventTimers).transform(new Function<Windmill.Timer, TimerInternals.TimerData>(){

            private TimeDomain getTimeDomain(Windmill.Timer.Type type) {
                switch (type) {
                    case REALTIME: {
                        return TimeDomain.PROCESSING_TIME;
                    }
                    case DEPENDENT_REALTIME: {
                        return TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
                    }
                    case WATERMARK: {
                        return TimeDomain.EVENT_TIME;
                    }
                }
                String string = String.valueOf(type);
                throw new IllegalArgumentException(new StringBuilder(23 + String.valueOf(string).length()).append("Unsupported timer type ").append(string).toString());
            }

            @Override
            public TimerInternals.TimerData apply(Windmill.Timer timer) {
                String tag = timer.getTag().toStringUtf8();
                String namespaceString = tag.substring(0, tag.indexOf(43));
                StateNamespace namespace = StateNamespaces.fromString(namespaceString, KeyedWorkItem.this.windowCoder);
                Instant timestamp = new Instant(TimeUnit.MICROSECONDS.toMillis(timer.getTimestamp()));
                return TimerInternals.TimerData.of(namespace, timestamp, this.getTimeDomain(timer.getType()));
            }
        });
    }

    public Iterable<WindowedValue<ElemT>> elementsIterable() {
        return FluentIterable.from(this.workItem.getMessageBundlesList()).transformAndConcat(new Function<Windmill.InputMessageBundle, Iterable<Windmill.Message>>(){

            @Override
            public Iterable<Windmill.Message> apply(Windmill.InputMessageBundle input) {
                return input.getMessagesList();
            }
        }).transform(new Function<Windmill.Message, WindowedValue<ElemT>>(){

            @Override
            public WindowedValue<ElemT> apply(Windmill.Message message) {
                try {
                    Instant timestamp = new Instant(TimeUnit.MICROSECONDS.toMillis(message.getTimestamp()));
                    Collection<BoundedWindow> windows = WindmillSink.decodeMetadataWindows(KeyedWorkItem.this.windowsCoder, message.getMetadata());
                    PaneInfo pane = WindmillSink.decodeMetadataPane(message.getMetadata());
                    InputStream inputStream = message.getData().newInput();
                    Object value = KeyedWorkItem.this.valueCoder.decode(inputStream, Coder.Context.OUTER);
                    return WindowedValue.of(value, timestamp, windows, pane);
                }
                catch (IOException e) {
                    throw Throwables.propagate(e);
                }
            }
        });
    }

    public boolean equals(Object other) {
        if (!(other instanceof KeyedWorkItem)) {
            return false;
        }
        KeyedWorkItem that = (KeyedWorkItem)other;
        return Objects.equals(this.key, that.key) && Objects.equals(this.workItem, that.workItem);
    }

    public int hashCode() {
        return Objects.hash(this.key, this.workItem);
    }

    KeyedWorkItem(Object key, Windmill.WorkItem workItem, Coder<? extends BoundedWindow> windowCoder, Coder<Collection<? extends BoundedWindow>> windowsCoder, Coder<ElemT> valueCoder) {
        this.key = key;
        this.workItem = workItem;
        this.windowCoder = windowCoder;
        this.windowsCoder = windowsCoder;
        this.valueCoder = valueCoder;
    }

    public static class KeyedWorkItemCoder<T>
    extends StandardCoder<KeyedWorkItem<T>> {
        final Coder<T> elemCoder;

        public static <T> KeyedWorkItemCoder<T> of(Coder<T> elemCoder) {
            return new KeyedWorkItemCoder<T>(elemCoder);
        }

        @JsonCreator
        public static KeyedWorkItemCoder<?> of(@JsonProperty(value="component_encodings") List<Object> components) {
            return KeyedWorkItemCoder.of((Coder)components.get(0));
        }

        @Override
        public void encode(KeyedWorkItem<T> value, OutputStream outStream, Coder.Context context) {
            throw new UnsupportedOperationException();
        }

        @Override
        public KeyedWorkItem<T> decode(InputStream inStream, Coder.Context context) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isRegisterByteSizeObserverCheap(KeyedWorkItem<T> value, Coder.Context context) {
            return true;
        }

        @Override
        public void registerByteSizeObserver(KeyedWorkItem<T> value, ElementByteSizeObserver observer, Coder.Context context) throws Exception {
            observer.update(((KeyedWorkItem)value).workItem.getSerializedSize());
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.elemCoder);
        }

        public Coder<T> getElementCoder() {
            return this.elemCoder;
        }

        protected KeyedWorkItemCoder(Coder<T> elemCoder) {
            this.elemCoder = elemCoder;
        }
    }
}

