package streamz.camel.fs2.dsl;

import cats.effect.Async;
import cats.effect.ContextShift;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import fs2.Stream;
import fs2.Stream$;
import fs2.internal.FreeC;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.spi.Synchronization;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.reflect.ClassTag;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Failure;
import scala.util.Left;
import scala.util.Success;
import scala.util.Try$;
import streamz.camel.StreamContext;
import streamz.camel.StreamMessage;
import streamz.camel.StreamMessage$;
import streamz.camel.fs2.dsl.Cpackage;

/* compiled from: package.scala */
/* loaded from: input_file:streamz/camel/fs2/dsl/package$.class */
public final class package$ {
    public static final package$ MODULE$ = new package$();

    public <F, A> Cpackage.SendDsl<F, A> SendDsl(FreeC<F, StreamMessage<A>, BoxedUnit> freeC, ContextShift<F> contextShift, Async<F> async) {
        return new Cpackage.SendDsl<>(freeC, contextShift, async);
    }

    public <F, A> Cpackage.SendBodyDsl<F, A> SendBodyDsl(FreeC<F, A, BoxedUnit> freeC, ContextShift<F> contextShift, Async<F> async) {
        return new Cpackage.SendBodyDsl<>(freeC, contextShift, async);
    }

    public <A> Cpackage.SendDslPure<A> SendDslPure(FreeC<Nothing$, StreamMessage<A>, BoxedUnit> freeC) {
        return new Cpackage.SendDslPure<>(freeC);
    }

    public <A> Cpackage.SendBodyDslPure<A> SendBodyDslPure(FreeC<Nothing$, A, BoxedUnit> freeC) {
        return new Cpackage.SendBodyDslPure<>(freeC);
    }

    public <F, A> FreeC<F, StreamMessage<A>, BoxedUnit> receive(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext, ClassTag<A> classTag) {
        return Stream$.MODULE$.filter$extension(consume(str, streamContext, classTag, contextShift, async), streamMessage -> {
            return BoxesRunTime.boxToBoolean($anonfun$receive$1(streamMessage));
        });
    }

    public <F, A> FreeC<F, A, BoxedUnit> receiveBody(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext, ClassTag<A> classTag) {
        return Stream$.MODULE$.map$extension(receive(str, contextShift, async, streamContext, classTag), streamMessage -> {
            return streamMessage.body();
        });
    }

    public <F, A> Function1<Stream<F, StreamMessage<A>>, Stream<F, StreamMessage<A>>> send(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext) {
        return produce(str, ExchangePattern.InOnly, (streamMessage, exchange) -> {
            return streamMessage;
        }, streamContext, contextShift, async);
    }

    public <F, A> Function1<Stream<F, A>, Stream<F, A>> sendBody(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext) {
        return obj -> {
            return new Stream($anonfun$sendBody$1(str, contextShift, async, streamContext, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F, A, B> Function1<Stream<F, StreamMessage<A>>, Stream<F, StreamMessage<B>>> sendRequest(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext, ClassTag<B> classTag) {
        return produce(str, ExchangePattern.InOut, (streamMessage, exchange) -> {
            return StreamMessage$.MODULE$.from(exchange.getOut(), classTag);
        }, streamContext, contextShift, async);
    }

    public <F, A, B> Function1<Stream<F, A>, Stream<F, B>> sendRequestBody(String str, ContextShift<F> contextShift, Async<F> async, StreamContext streamContext, ClassTag<B> classTag) {
        return obj -> {
            return new Stream($anonfun$sendRequestBody$1(str, contextShift, async, streamContext, classTag, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    private <F, A> FreeC<F, StreamMessage<A>, BoxedUnit> consume(String str, StreamContext streamContext, ClassTag<A> classTag, ContextShift<F> contextShift, Async<F> async) {
        long duration = streamContext.config().getDuration("streamz.camel.consumer.receive.timeout", TimeUnit.MILLISECONDS);
        return Stream$.MODULE$.repeatEval(FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(contextShift.shift(), async), () -> {
            return async.async(function1 -> {
                $anonfun$consume$2(streamContext, str, duration, classTag, function1);
                return BoxedUnit.UNIT;
            });
        }, async));
    }

    private <F, A, B> Function1<Stream<F, StreamMessage<A>>, Stream<F, StreamMessage<B>>> produce(String str, ExchangePattern exchangePattern, Function2<StreamMessage<A>, Exchange, StreamMessage<B>> function2, StreamContext streamContext, ContextShift<F> contextShift, Async<F> async) {
        return obj -> {
            return new Stream($anonfun$produce$1(contextShift, async, streamContext, str, exchangePattern, function2, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public static final /* synthetic */ boolean $anonfun$receive$1(StreamMessage streamMessage) {
        return streamMessage != null;
    }

    public static final /* synthetic */ FreeC $anonfun$sendBody$1(String str, ContextShift contextShift, Async async, StreamContext streamContext, FreeC freeC) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.map$extension(freeC, obj -> {
            return new StreamMessage(obj, StreamMessage$.MODULE$.apply$default$2());
        }), MODULE$.send(str, contextShift, async, streamContext)), streamMessage -> {
            return streamMessage.body();
        });
    }

    public static final /* synthetic */ FreeC $anonfun$sendRequestBody$1(String str, ContextShift contextShift, Async async, StreamContext streamContext, ClassTag classTag, FreeC freeC) {
        return Stream$.MODULE$.map$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.map$extension(freeC, obj -> {
            return new StreamMessage(obj, StreamMessage$.MODULE$.apply$default$2());
        }), MODULE$.sendRequest(str, contextShift, async, streamContext, classTag)), streamMessage -> {
            return streamMessage.body();
        });
    }

    public static final /* synthetic */ void $anonfun$consume$2(StreamContext streamContext, String str, long j, ClassTag classTag, Function1 function1) {
        boolean z = false;
        Success success = null;
        Failure apply = Try$.MODULE$.apply(() -> {
            return streamContext.consumerTemplate().receive(str, j);
        });
        if (apply instanceof Success) {
            z = true;
            success = (Success) apply;
            if (((Exchange) success.value()) == null) {
                return;
            }
        }
        if (z) {
            Exchange exchange = (Exchange) success.value();
            if (exchange.getException() != null) {
                function1.apply(new Left(exchange.getException()));
                streamContext.consumerTemplate().doneUoW(exchange);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (!z) {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            return;
        }
        Exchange exchange2 = (Exchange) success.value();
        Success apply2 = Try$.MODULE$.apply(() -> {
            return StreamMessage$.MODULE$.from(exchange2.getIn(), classTag);
        });
        if (apply2 instanceof Success) {
        } else {
            if (!(apply2 instanceof Failure)) {
                throw new MatchError(apply2);
            }
            Throwable exception = ((Failure) apply2).exception();
            function1.apply(new Left(exception));
            exchange2.setException(exception);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        streamContext.consumerTemplate().doneUoW(exchange2);
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ void $anonfun$produce$4(StreamContext streamContext, String str, final StreamMessage streamMessage, ExchangePattern exchangePattern, final Function2 function2, final Function1 function1) {
        streamContext.producerTemplate().asyncCallback(str, streamContext.createExchange(streamMessage, exchangePattern), new Synchronization(function1, function2, streamMessage) { // from class: streamz.camel.fs2.dsl.package$$anon$1
            private final Function1 callback$1;
            private final Function2 result$1;
            private final StreamMessage message$1;

            public void onFailure(Exchange exchange) {
                this.callback$1.apply(new Left(exchange.getException()));
            }

            public void onComplete(Exchange exchange) {
                Success apply = Try$.MODULE$.apply(() -> {
                    return (StreamMessage) this.result$1.apply(this.message$1, exchange);
                });
                if (apply instanceof Success) {
                } else {
                    if (!(apply instanceof Failure)) {
                        throw new MatchError(apply);
                    }
                }
            }

            {
                this.callback$1 = function1;
                this.result$1 = function2;
                this.message$1 = streamMessage;
            }
        });
    }

    public static final /* synthetic */ FreeC $anonfun$produce$2(ContextShift contextShift, Async async, StreamContext streamContext, String str, ExchangePattern exchangePattern, Function2 function2, StreamMessage streamMessage) {
        return Stream$.MODULE$.eval(FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(contextShift.shift(), async), () -> {
            return async.async(function1 -> {
                $anonfun$produce$4(streamContext, str, streamMessage, exchangePattern, function2, function1);
                return BoxedUnit.UNIT;
            });
        }, async));
    }

    public static final /* synthetic */ FreeC $anonfun$produce$1(ContextShift contextShift, Async async, StreamContext streamContext, String str, ExchangePattern exchangePattern, Function2 function2, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(freeC, streamMessage -> {
            return new Stream($anonfun$produce$2(contextShift, async, streamContext, str, exchangePattern, function2, streamMessage));
        });
    }

    private package$() {
    }
}
