/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.util.internal;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecResult;
import ratpack.exec.Execution;
import ratpack.exec.Operation;
import ratpack.exec.Promise;
import ratpack.exec.util.ParallelBatch;
import ratpack.func.Action;
import ratpack.func.BiAction;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.util.Types;

public class DefaultParallelBatch<T>
implements ParallelBatch<T> {
    private final Iterable<? extends Promise<T>> promises;
    private final Action<? super Execution> execInit;

    public DefaultParallelBatch(Iterable<? extends Promise<T>> promises, Action<? super Execution> execInit) {
        this.promises = promises;
        this.execInit = execInit;
    }

    @Override
    public ParallelBatch<T> execInit(Action<? super Execution> execInit) {
        return new DefaultParallelBatch<T>(this.promises, execInit);
    }

    @Override
    public Promise<List<? extends ExecResult<T>>> yieldAll() {
        ArrayList promises = Lists.newArrayList(this.promises);
        List results = (List)Types.cast(promises);
        AtomicInteger counter = new AtomicInteger(promises.size());
        return Promise.async(d -> {
            int i = 0;
            while (i < promises.size()) {
                int finalI = i++;
                Execution.fork().onStart((Action)this.execInit).onComplete(e -> {
                    if (counter.decrementAndGet() == 0) {
                        d.success(results);
                    }
                }).start(e -> ((Promise)promises.get(finalI)).result(t -> results.set(finalI, t)));
            }
        });
    }

    @Override
    public Promise<List<? extends T>> yield() {
        ArrayList promises = Lists.newArrayList(this.promises);
        List results = (List)Types.cast(promises);
        return Promise.async(d -> this.forEach(results::set).onError(d::error).then(() -> d.success(results)));
    }

    @Override
    public Operation forEach(BiAction<? super Integer, ? super T> consumer) {
        AtomicReference error = new AtomicReference();
        AtomicBoolean done = new AtomicBoolean();
        AtomicInteger wip = new AtomicInteger();
        return Promise.async(d -> {
            int i = 0;
            Iterator<Promise<T>> iterator = this.promises.iterator();
            while (iterator.hasNext()) {
                Promise promise = iterator.next();
                int finalI = i++;
                wip.incrementAndGet();
                if (!iterator.hasNext()) {
                    done.set(true);
                }
                Execution.fork().onStart((Action)this.execInit).onComplete(e -> {
                    if (wip.decrementAndGet() == 0 && done.get()) {
                        Throwable t = (Throwable)error.get();
                        if (t == null) {
                            d.success(null);
                        } else {
                            d.error(t);
                        }
                    }
                }).start(e -> {
                    if (error.get() == null) {
                        promise.result(t -> {
                            if (t.isError()) {
                                Throwable firstError;
                                Throwable thisError = t.getThrowable();
                                if (!error.compareAndSet(null, thisError) && (firstError = (Throwable)error.get()) != thisError) {
                                    firstError.addSuppressed(thisError);
                                }
                            } else {
                                consumer.execute((Integer)finalI, (Object)t.getValue());
                            }
                        });
                    }
                });
            }
            if (i == 0) {
                d.success(null);
            }
        }).operation();
    }

    @Override
    public TransformablePublisher<T> publisher() {
        final Iterator<? extends Promise<T>> iterator = this.promises.iterator();
        return new BufferingPublisher<Object>(Action.noop(), write -> new Subscription((BufferedWriteStream)write){
            volatile boolean cancelled;
            volatile boolean complete;
            final AtomicLong finished = new AtomicLong();
            volatile long started;
            final /* synthetic */ BufferedWriteStream val$write;
            {
                this.val$write = bufferedWriteStream;
            }

            public void request(long n) {
                while (n-- > 0L && !this.cancelled) {
                    if (iterator.hasNext()) {
                        ++this.started;
                        Promise promise = (Promise)iterator.next();
                        if (!iterator.hasNext()) {
                            this.complete = true;
                        }
                        Execution.fork().onStart(DefaultParallelBatch.this.execInit).onComplete(e -> {
                            long finished = this.finished.incrementAndGet();
                            if (finished == this.started && this.complete && !this.cancelled) {
                                this.val$write.complete();
                            }
                        }).start(e -> promise.onError(this.val$write::error).then(this.val$write::item));
                        continue;
                    }
                    return;
                }
            }

            public void cancel() {
                this.cancelled = true;
            }
        });
    }
}

