package zio.pravega;

import io.pravega.client.EventStreamClientFactory;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.Transaction;
import io.pravega.client.stream.TransactionalEventStreamWriter;
import java.util.UUID;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Tuple2;
import scala.Tuple2$;
import scala.util.NotGiven$;
import zio.CanFail$;
import zio.Cause;
import zio.Chunk;
import zio.Chunk$;
import zio.Exit;
import zio.Exit$Failure$;
import zio.Exit$Success$;
import zio.IsSubtypeOfError$;
import zio.Scope;
import zio.ZIO;
import zio.ZIO$;
import zio.ZIO$ZIOAutoCloseableOps$;
import zio.pravega.stream.EventWriter$;
import zio.stream.ZChannel;
import zio.stream.ZPipeline;
import zio.stream.ZPipeline$;
import zio.stream.ZPipeline$UnwrapScopedPartiallyApplied$;
import zio.stream.ZSink;
import zio.stream.ZSink$;
import zio.stream.ZSink$UnwrapScopedPartiallyApplied$;
import zio.stream.ZStream;
import zio.stream.ZStream$;
import zio.stream.ZStream$UnwrapScopedPartiallyApplied$;

/* compiled from: PravegaStream.scala */
/* loaded from: input_file:zio/pravega/PravegaStreamImpl.class */
public class PravegaStreamImpl implements PravegaStream {
    private final EventStreamClientFactory eventStreamClientFactory;

    public PravegaStreamImpl(EventStreamClientFactory eventStreamClientFactory) {
        this.eventStreamClientFactory = eventStreamClientFactory;
    }

    private <A> ZIO<Scope, Throwable, EventStreamWriter<A>> createEventWriter(String str, WriterSettings<A> writerSettings) {
        return ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(unsafe -> {
            return this.eventStreamClientFactory.createEventWriter(str, writerSettings.serializer(), writerSettings.eventWriterConfig());
        }, "zio.pravega.PravegaStreamImpl.createEventWriter(PravegaStream.scala:38)")), "zio.pravega.PravegaStreamImpl.createEventWriter(PravegaStream.scala:39)");
    }

    private <A> ZIO<Scope, Throwable, EventStreamReader<A>> createEventStreamReader(String str, ReaderSettings<A> readerSettings) {
        return ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(unsafe -> {
            return this.eventStreamClientFactory.createReader((String) readerSettings.readerId().getOrElse(PravegaStreamImpl::createEventStreamReader$$anonfun$1$$anonfun$1), str, readerSettings.serializer(), readerSettings.readerConfig());
        }, "zio.pravega.PravegaStreamImpl.createEventStreamReader(PravegaStream.scala:48)")), "zio.pravega.PravegaStreamImpl.createEventStreamReader(PravegaStream.scala:49)");
    }

    @Override // zio.pravega.PravegaStream
    public <A> ZChannel sink(String str, WriterSettings<A> writerSettings) {
        return ZSink$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZSink$.MODULE$.unwrapScoped(), () -> {
            return r2.sink$$anonfun$1(r3, r4);
        }, "zio.pravega.PravegaStreamImpl.sink(PravegaStream.scala:55)");
    }

    @Override // zio.pravega.PravegaStream
    public <A> ZPipeline<Object, Throwable, A, A> writeFlow(String str, WriterSettings<A> writerSettings) {
        return ZPipeline$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZPipeline$.MODULE$.unwrapScoped(), () -> {
            return r2.writeFlow$$anonfun$1(r3, r4);
        }, "zio.pravega.PravegaStreamImpl.writeFlow(PravegaStream.scala:63)");
    }

    private <A> ZIO<Scope, Throwable, TransactionalEventStreamWriter<A>> createTxEventWriter(String str, WriterSettings<A> writerSettings) {
        return ZIO$ZIOAutoCloseableOps$.MODULE$.withFinalizerAuto$extension(ZIO$.MODULE$.ZIOAutoCloseableOps(ZIO$.MODULE$.attemptBlocking(unsafe -> {
            return this.eventStreamClientFactory.createTransactionalEventWriter(str, writerSettings.serializer(), writerSettings.eventWriterConfig());
        }, "zio.pravega.PravegaStreamImpl.createTxEventWriter(PravegaStream.scala:69)")), "zio.pravega.PravegaStreamImpl.createTxEventWriter(PravegaStream.scala:70)");
    }

    private <A> ZIO<Scope, Throwable, Transaction<A>> beginTransaction(TransactionalEventStreamWriter<A> transactionalEventStreamWriter) {
        return ZIO$.MODULE$.acquireReleaseExit(() -> {
            return beginTransaction$$anonfun$1(r1);
        }, (transaction, exit) -> {
            Tuple2 apply = Tuple2$.MODULE$.apply(transaction, exit);
            if (apply == null) {
                throw new MatchError(apply);
            }
            Transaction transaction = (Transaction) apply._1();
            Exit.Failure failure = (Exit) apply._2();
            if (failure instanceof Exit.Failure) {
                Cause _1 = Exit$Failure$.MODULE$.unapply(failure)._1();
                return ZIO$.MODULE$.logCause(() -> {
                    return beginTransaction$$anonfun$2$$anonfun$1(r1);
                }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:75)").$times$greater(() -> {
                    return beginTransaction$$anonfun$2$$anonfun$2(r1);
                }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:75)");
            }
            if (!(failure instanceof Exit.Success)) {
                throw new MatchError(failure);
            }
            Exit$Success$.MODULE$.unapply((Exit.Success) failure)._1();
            return ZIO$.MODULE$.attemptBlocking(unsafe -> {
                transaction.commit();
            }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:77)").orDie(IsSubtypeOfError$.MODULE$.impl($less$colon$less$.MODULE$.refl()), CanFail$.MODULE$.canFail(NotGiven$.MODULE$.value()), "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:77)");
        }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:79)");
    }

    @Override // zio.pravega.PravegaStream
    public <A> ZChannel sinkTx(String str, WriterSettings<A> writerSettings) {
        return ZSink$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZSink$.MODULE$.unwrapScoped(), () -> {
            return r2.sinkTx$$anonfun$1(r3, r4);
        }, "zio.pravega.PravegaStreamImpl.sinkTx(PravegaStream.scala:86)");
    }

    private <A> ZIO<Object, Throwable, Chunk<A>> readNextEvent(EventStreamReader<A> eventStreamReader, long j) {
        return ZIO$.MODULE$.attemptBlocking(unsafe -> {
            Object event;
            EventRead readNextEvent = eventStreamReader.readNextEvent(j);
            if (!readNextEvent.isCheckpoint() && (event = readNextEvent.getEvent()) != null) {
                return Chunk$.MODULE$.single(event);
            }
            return Chunk$.MODULE$.empty();
        }, "zio.pravega.PravegaStreamImpl.readNextEvent(PravegaStream.scala:94)");
    }

    @Override // zio.pravega.PravegaStream
    public <A> ZStream<Object, Throwable, A> stream(String str, ReaderSettings<A> readerSettings) {
        return ZStream$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZStream$.MODULE$.unwrapScoped(), () -> {
            return r2.stream$$anonfun$1(r3, r4);
        }, "zio.pravega.PravegaStreamImpl.stream(PravegaStream.scala:100)");
    }

    @Override // zio.pravega.PravegaStream
    public <A> ZStream<Object, Throwable, EventRead<A>> eventStream(String str, ReaderSettings<A> readerSettings) {
        return ZStream$UnwrapScopedPartiallyApplied$.MODULE$.apply$extension(ZStream$.MODULE$.unwrapScoped(), () -> {
            return r2.eventStream$$anonfun$1(r3, r4);
        }, "zio.pravega.PravegaStreamImpl.eventStream(PravegaStream.scala:115)");
    }

    private static final String createEventStreamReader$$anonfun$1$$anonfun$1() {
        return UUID.randomUUID().toString();
    }

    private static final /* synthetic */ ZChannel sink$$anonfun$1$$anonfun$2(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return ZSink$.MODULE$.foreach((Function1) tuple2._2(), "zio.pravega.PravegaStreamImpl.sink(PravegaStream.scala:54)");
    }

    private final ZIO sink$$anonfun$1(String str, WriterSettings writerSettings) {
        return createEventWriter(str, writerSettings).map(eventStreamWriter -> {
            return Tuple2$.MODULE$.apply(eventStreamWriter, EventWriter$.MODULE$.writeEventTask(eventStreamWriter, writerSettings));
        }, "zio.pravega.PravegaStreamImpl.sink(PravegaStream.scala:53)").map(tuple2 -> {
            return new ZSink(sink$$anonfun$1$$anonfun$2(tuple2));
        }, "zio.pravega.PravegaStreamImpl.sink(PravegaStream.scala:54)");
    }

    private final ZIO writeFlow$$anonfun$1(String str, WriterSettings writerSettings) {
        return createEventWriter(str, writerSettings).map(eventStreamWriter -> {
            return Tuple2$.MODULE$.apply(eventStreamWriter, EventWriter$.MODULE$.writeEventTask(eventStreamWriter, writerSettings));
        }, "zio.pravega.PravegaStreamImpl.writeFlow(PravegaStream.scala:61)").map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return ZPipeline$.MODULE$.tap((Function1) tuple2._2(), "zio.pravega.PravegaStreamImpl.writeFlow(PravegaStream.scala:62)");
        }, "zio.pravega.PravegaStreamImpl.writeFlow(PravegaStream.scala:62)");
    }

    private static final ZIO beginTransaction$$anonfun$1(TransactionalEventStreamWriter transactionalEventStreamWriter) {
        return ZIO$.MODULE$.attemptBlocking(unsafe -> {
            return transactionalEventStreamWriter.beginTxn();
        }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:72)");
    }

    private static final Cause beginTransaction$$anonfun$2$$anonfun$1(Cause cause) {
        return cause;
    }

    private static final ZIO beginTransaction$$anonfun$2$$anonfun$2(Transaction transaction) {
        return ZIO$.MODULE$.attemptBlocking(unsafe -> {
            transaction.abort();
        }, "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:75)").orDie(IsSubtypeOfError$.MODULE$.impl($less$colon$less$.MODULE$.refl()), CanFail$.MODULE$.canFail(NotGiven$.MODULE$.value()), "zio.pravega.PravegaStreamImpl.beginTransaction(PravegaStream.scala:75)");
    }

    private static final /* synthetic */ ZChannel sinkTx$$anonfun$1$$anonfun$1$$anonfun$2(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return ZSink$.MODULE$.foreach((Function1) tuple2._2(), "zio.pravega.PravegaStreamImpl.sinkTx(PravegaStream.scala:85)");
    }

    private final ZIO sinkTx$$anonfun$1(String str, WriterSettings writerSettings) {
        return createTxEventWriter(str, writerSettings).flatMap(transactionalEventStreamWriter -> {
            return beginTransaction(transactionalEventStreamWriter).map(transaction -> {
                return Tuple2$.MODULE$.apply(transaction, EventWriter$.MODULE$.writeEventTask(transaction, writerSettings));
            }, "zio.pravega.PravegaStreamImpl.sinkTx(PravegaStream.scala:84)").map(tuple2 -> {
                return new ZSink(sinkTx$$anonfun$1$$anonfun$1$$anonfun$2(tuple2));
            }, "zio.pravega.PravegaStreamImpl.sinkTx(PravegaStream.scala:85)");
        }, "zio.pravega.PravegaStreamImpl.sinkTx(PravegaStream.scala:85)");
    }

    private static final ZIO stream$$anonfun$1$$anonfun$2$$anonfun$1(ZIO zio2) {
        return zio2;
    }

    private final ZIO stream$$anonfun$1(String str, ReaderSettings readerSettings) {
        return createEventStreamReader(str, readerSettings).map(eventStreamReader -> {
            return Tuple2$.MODULE$.apply(eventStreamReader, readNextEvent(eventStreamReader, readerSettings.timeout()));
        }, "zio.pravega.PravegaStreamImpl.stream(PravegaStream.scala:98)").map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ZIO zio2 = (ZIO) tuple2._2();
            return ZStream$.MODULE$.repeatZIOChunk(() -> {
                return stream$$anonfun$1$$anonfun$2$$anonfun$1(r1);
            }, "zio.pravega.PravegaStreamImpl.stream(PravegaStream.scala:99)");
        }, "zio.pravega.PravegaStreamImpl.stream(PravegaStream.scala:99)");
    }

    private static final ZIO eventStream$$anonfun$1$$anonfun$1$$anonfun$1(ReaderSettings readerSettings, EventStreamReader eventStreamReader) {
        return ZIO$.MODULE$.attemptBlocking(unsafe -> {
            EventRead readNextEvent = eventStreamReader.readNextEvent(readerSettings.timeout());
            if (!readNextEvent.isCheckpoint() && readNextEvent.getEvent() == null) {
                return Chunk$.MODULE$.empty();
            }
            return Chunk$.MODULE$.single(readNextEvent);
        }, "zio.pravega.PravegaStreamImpl.eventStream(PravegaStream.scala:113)");
    }

    private final ZIO eventStream$$anonfun$1(String str, ReaderSettings readerSettings) {
        return createEventStreamReader(str, readerSettings).map(eventStreamReader -> {
            return ZStream$.MODULE$.repeatZIOChunk(() -> {
                return eventStream$$anonfun$1$$anonfun$1$$anonfun$1(r1, r2);
            }, "zio.pravega.PravegaStreamImpl.eventStream(PravegaStream.scala:113)");
        }, "zio.pravega.PravegaStreamImpl.eventStream(PravegaStream.scala:114)");
    }
}
