/*
 * Decompiled with CFR 0.152.
 */
package ratpack.sse.internal;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.sse.Event;
import ratpack.sse.internal.ServerSentEventDecoder;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;

class ServerSentEventDecodingPublisher
extends BufferingPublisher<Event<?>> {
    ServerSentEventDecodingPublisher(Publisher<? extends ByteBuf> publisher, final ByteBufAllocator allocator) {
        super(Action.noop(), (? super BufferedWriteStream<T> write) -> new Subscription((BufferedWriteStream)write, (Publisher)publisher){
            Subscription upstream;
            ServerSentEventDecoder decoder;
            volatile boolean emitting;
            final /* synthetic */ BufferedWriteStream val$write;
            final /* synthetic */ Publisher val$publisher;
            {
                this.val$write = bufferedWriteStream;
                this.val$publisher = publisher;
                this.decoder = new ServerSentEventDecoder(allocator, this.val$write::item);
            }

            public void request(final long n) {
                if (this.emitting) {
                    return;
                }
                if (this.upstream == null) {
                    this.val$publisher.subscribe((Subscriber)new Subscriber<ByteBuf>(){

                        public void onSubscribe(Subscription s) {
                            upstream = s;
                            upstream.request(n);
                        }

                        public void onNext(ByteBuf event) {
                            emitting = true;
                            try {
                                decoder.decode(event);
                            }
                            catch (Throwable e) {
                                upstream.cancel();
                                this.onError(e);
                                return;
                            }
                            finally {
                                emitting = false;
                            }
                            if (val$write.getRequested() > 0L) {
                                upstream.request(1L);
                            }
                        }

                        public void onError(Throwable t) {
                            decoder.close();
                            val$write.error(t);
                        }

                        public void onComplete() {
                            decoder.close();
                            val$write.complete();
                        }
                    });
                } else {
                    this.upstream.request(n);
                }
            }

            public void cancel() {
                this.decoder.close();
                if (this.upstream != null) {
                    this.upstream.cancel();
                }
            }
        });
    }
}

