package lucuma.graphql.routes;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.Ref;
import cats.effect.kernel.Ref$ApplyBuilders$;
import cats.effect.kernel.Ref$Make$;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.syntax.package$all$;
import cats.implicits$;
import cats.syntax.ApplicativeIdOps$;
import clue.model.StreamingMessage;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import fs2.concurrent.SignallingRef$;
import io.circe.Json;
import lucuma.graphql.routes.Subscriptions;
import lucuma.graphql.routes.syntax.JsonOps$;
import lucuma.graphql.routes.syntax.json$;
import org.typelevel.log4cats.Logger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/* compiled from: Subscriptions.scala */
/* loaded from: input_file:lucuma/graphql/routes/Subscriptions$.class */
public final class Subscriptions$ {
    public static final Subscriptions$ MODULE$ = new Subscriptions$();

    public <F> Function1<Stream<F, Either<Throwable, Json>>, Stream<F, StreamingMessage.FromServer>> lucuma$graphql$routes$Subscriptions$$fromServerPipe(String str, GraphQLService<F> graphQLService) {
        return stream -> {
            return stream.flatMap(either -> {
                if (either instanceof Left) {
                    return Stream$.MODULE$.eval(graphQLService.format((Throwable) ((Left) either).value())).map(json -> {
                        return new StreamingMessage.FromServer.Error(str, json);
                    });
                }
                if (!(either instanceof Right)) {
                    throw new MatchError(either);
                }
                return Stream$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new StreamingMessage.FromServer[]{JsonOps$.MODULE$.toStreamingMessage$extension(json$.MODULE$.toJsonOps((Json) ((Right) either).value()), str)}));
            }, NotGiven$.MODULE$.default());
        };
    }

    public <F> F apply(GraphQLService<F> graphQLService, Function1<Option<StreamingMessage.FromServer>, F> function1, Logger<F> logger, GenConcurrent<F, Throwable> genConcurrent) {
        return (F) implicits$.MODULE$.toFunctorOps(Ref$ApplyBuilders$.MODULE$.of$extension(cats.effect.package$.MODULE$.Ref().apply(Ref$Make$.MODULE$.concurrentInstance(genConcurrent)), Predef$.MODULE$.Map().empty()), genConcurrent).map(ref -> {
            return new Subscriptions<F>(graphQLService, function1, genConcurrent, logger, ref) { // from class: lucuma.graphql.routes.Subscriptions$$anon$1
                private final GraphQLService service$2;
                private final Function1 send$1;
                private final GenConcurrent evidence$3$1;
                private final Logger evidence$2$1;
                private final Ref subscriptions$1;

                private Function1<Stream<F, Either<Throwable, Json>>, Stream<F, BoxedUnit>> replySink(String str) {
                    return stream -> {
                        return ((Stream) Subscriptions$.MODULE$.lucuma$graphql$routes$Subscriptions$$fromServerPipe(str, this.service$2).apply(stream)).evalMap(fromServer -> {
                            return this.send$1.apply(new Some(fromServer));
                        });
                    };
                }

                @Override // lucuma.graphql.routes.Subscriptions
                public F add(String str, Stream<F, Either<Throwable, Json>> stream) {
                    return (F) implicits$.MODULE$.toFlatMapOps(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), this.evidence$3$1), this.evidence$3$1).flatMap(signallingRef -> {
                        Stream interruptWhen = stream.through(this.replySink(str)).interruptWhen(signallingRef.discrete().evalTap(obj -> {
                            return $anonfun$add$2(this, BoxesRunTime.unboxToBoolean(obj));
                        }, this.evidence$3$1), this.evidence$3$1);
                        return implicits$.MODULE$.toFlatMapOps(package$.MODULE$.debug(() -> {
                            return new StringBuilder(22).append("starting event stream ").append(str).toString();
                        }, this.evidence$2$1), this.evidence$3$1).flatMap(boxedUnit -> {
                            return implicits$.MODULE$.toFlatMapOps(GenSpawnOps$.MODULE$.start$extension(package$all$.MODULE$.genSpawnOps(interruptWhen.compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(this.evidence$3$1))).drain(), this.evidence$3$1), this.evidence$3$1), this.evidence$3$1).flatMap(fiber -> {
                                return implicits$.MODULE$.toFlatMapOps(package$.MODULE$.debug(() -> {
                                    return new StringBuilder(21).append("started event stream ").append(str).toString();
                                }, this.evidence$2$1), this.evidence$3$1).flatMap(boxedUnit -> {
                                    return this.subscriptions$1.update(map -> {
                                        return map.updated(str, new Subscriptions.Subscription(str, fiber, signallingRef, this.evidence$3$1));
                                    });
                                });
                            });
                        });
                    });
                }

                @Override // lucuma.graphql.routes.Subscriptions
                public F remove(String str) {
                    return (F) implicits$.MODULE$.toFlatMapOps(this.subscriptions$1.getAndUpdate(map -> {
                        return map.removed(str);
                    }), this.evidence$3$1).flatMap(map2 -> {
                        return map2.get(str).fold(() -> {
                            return ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(BoxedUnit.UNIT), this.evidence$3$1);
                        }, subscription -> {
                            return implicits$.MODULE$.catsSyntaxApply(subscription.stop(), this.evidence$3$1).$times$greater(this.send$1.apply(new Some(new StreamingMessage.FromServer.Complete(subscription.id()))));
                        });
                    });
                }

                @Override // lucuma.graphql.routes.Subscriptions
                public F removeAll() {
                    return (F) implicits$.MODULE$.toFlatMapOps(this.subscriptions$1.getAndSet(Predef$.MODULE$.Map().empty()), this.evidence$3$1).flatMap(map -> {
                        return implicits$.MODULE$.toFoldableOps(map.values().toList(), implicits$.MODULE$.catsStdInstancesForList()).traverse_(subscription -> {
                            return implicits$.MODULE$.catsSyntaxApply(subscription.stop(), this.evidence$3$1).$times$greater(this.send$1.apply(new Some(new StreamingMessage.FromServer.Complete(subscription.id()))));
                        }, this.evidence$3$1);
                    });
                }

                public static final /* synthetic */ Object $anonfun$add$2(Subscriptions$$anon$1 subscriptions$$anon$1, boolean z) {
                    return package$.MODULE$.info(() -> {
                        return new StringBuilder(17).append("signalling ref = ").append(z).toString();
                    }, subscriptions$$anon$1.evidence$2$1);
                }

                {
                    this.service$2 = graphQLService;
                    this.send$1 = function1;
                    this.evidence$3$1 = genConcurrent;
                    this.evidence$2$1 = logger;
                    this.subscriptions$1 = ref;
                }
            };
        });
    }

    private Subscriptions$() {
    }
}
