package ru.quipy.streams;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLong;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.random.Random;
import kotlin.time.Duration;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.CoroutineName;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.Job;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.quipy.core.AggregateRegistry;
import ru.quipy.core.EventSourcingProperties;
import ru.quipy.database.EventStore;
import ru.quipy.domain.ActiveEventStreamReader;
import ru.quipy.domain.Aggregate;
import ru.quipy.domain.EventRecord;
import ru.quipy.domain.EventStreamReadIndex;

/* compiled from: EventStoreReader.kt */
@Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��\u008c\u0001\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000e\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\b\n\u0002\u0010 \n\u0002\b\b\n\u0002\u0018\u0002\n\u0002\b\u0002\u0018�� <2\u00020\u0001:\u0001<BC\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\f\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007\u0012\u0006\u0010\t\u001a\u00020\n\u0012\u0006\u0010\u000b\u001a\u00020\f\u0012\u0006\u0010\r\u001a\u00020\u000e\u0012\u0006\u0010\u000f\u001a\u00020\u0010¢\u0006\u0002\u0010\u0011J\u0010\u0010&\u001a\u00020'2\u0006\u0010(\u001a\u00020)H\u0016J\b\u0010*\u001a\u00020'H\u0002J\u0010\u0010+\u001a\u00020'2\u0006\u0010,\u001a\u00020\u001aH\u0002J\u0011\u0010-\u001a\u00020'H\u0082@ø\u0001��¢\u0006\u0002\u0010.J\b\u0010/\u001a\u00020\u0018H\u0002J\b\u00100\u001a\u00020\u001eH\u0002J\u001f\u00101\u001a\b\u0012\u0004\u0012\u00020)022\u0006\u00103\u001a\u00020\u0013H\u0096@ø\u0001��¢\u0006\u0002\u00104J\u0010\u00105\u001a\u00020'2\u0006\u00106\u001a\u00020\u001cH\u0016J\b\u00107\u001a\u00020'H\u0016J\b\u00108\u001a\u00020'H\u0016J\b\u00109\u001a\u00020'H\u0002J\u000e\u0010:\u001a\u00020\u001e*\u0004\u0018\u00010;H\u0002R\u000e\u0010\u0012\u001a\u00020\u0013X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0016\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0019\u001a\u00020\u001aX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001b\u001a\u00020\u001cX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001d\u001a\u00020\u001eX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u001f\u001a\u00020 X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010!\u001a\u00020\u001eX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\"\u001a\u00020\u001aX\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010#\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010$\u001a\u00020%X\u0082\u0004¢\u0006\u0002\n��\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006="}, d2 = {"Lru/quipy/streams/EventStoreReader;", "Lru/quipy/streams/EventReader;", "eventStore", "Lru/quipy/database/EventStore;", "streamName", "", "aggregateInfo", "Lru/quipy/core/AggregateRegistry$BasicAggregateInfo;", "Lru/quipy/domain/Aggregate;", "streamManager", "Lru/quipy/streams/EventStreamReaderManager;", "config", "Lru/quipy/core/EventSourcingProperties;", "eventStreamNotifier", "Lru/quipy/streams/EventStreamNotifier;", "dispatcher", "Lkotlinx/coroutines/CoroutineDispatcher;", "(Lru/quipy/database/EventStore;Ljava/lang/String;Lru/quipy/core/AggregateRegistry$BasicAggregateInfo;Lru/quipy/streams/EventStreamReaderManager;Lru/quipy/core/EventSourcingProperties;Lru/quipy/streams/EventStreamNotifier;Lkotlinx/coroutines/CoroutineDispatcher;)V", "commitIndexEachNMessages", "", "eventStoreReadIndex", "Lru/quipy/domain/EventStreamReadIndex;", "eventStoreTableName", "healthCheckJob", "Lkotlinx/coroutines/Job;", "healthcheckPeriodInMillis", "", "indexResetInfo", "Lru/quipy/streams/ReadIndexResetInfo;", "isHealthcheckActive", "", "logger", "Lorg/slf4j/Logger;", "meIsActiveReader", "processedRecords", "readerId", "version", "Ljava/util/concurrent/atomic/AtomicLong;", "acknowledgeRecord", "", "eventRecord", "Lru/quipy/domain/EventRecord;", "checkAndResetIndexIfRequired", "commitReadIndex", "index", "ensureTableExists", "(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;", "launchEventStoreReaderHealthCheckJob", "performHealthCheck", "read", "", "batchSize", "(ILkotlin/coroutines/Continuation;)Ljava/lang/Object;", "resetReadIndex", "resetInfo", "resume", "stop", "syncReaderIndex", "isMe", "Lru/quipy/domain/ActiveEventStreamReader;", "Companion", "tiny-event-sourcing-lib"})
/* loaded from: input_file:ru/quipy/streams/EventStoreReader.class */
public final class EventStoreReader implements EventReader {

    @NotNull
    private final EventStore eventStore;

    @NotNull
    private final String streamName;

    @NotNull
    private final EventStreamReaderManager streamManager;

    @NotNull
    private final EventStreamNotifier eventStreamNotifier;

    @NotNull
    private final CoroutineDispatcher dispatcher;

    @NotNull
    private final String eventStoreTableName;
    private final int commitIndexEachNMessages;
    private final long healthcheckPeriodInMillis;

    @NotNull
    private final Logger logger;

    @NotNull
    private final String readerId;

    @NotNull
    private final AtomicLong version;
    private volatile boolean meIsActiveReader;
    private volatile boolean isHealthcheckActive;

    @NotNull
    private Job healthCheckJob;

    @NotNull
    private EventStreamReadIndex eventStoreReadIndex;

    @NotNull
    private ReadIndexResetInfo indexResetInfo;
    private long processedRecords;

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private static final ReadIndexResetInfo NO_RESET_REQUIRED = new ReadIndexResetInfo(-1);

    /* compiled from: EventStoreReader.kt */
    @Metadata(mv = {1, 6, 0}, k = 1, xi = 48, d1 = {"��\u0012\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0005"}, d2 = {"Lru/quipy/streams/EventStoreReader$Companion;", "", "()V", "NO_RESET_REQUIRED", "Lru/quipy/streams/ReadIndexResetInfo;", "tiny-event-sourcing-lib"})
    /* loaded from: input_file:ru/quipy/streams/EventStoreReader$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public EventStoreReader(@NotNull EventStore eventStore, @NotNull String str, @NotNull AggregateRegistry.BasicAggregateInfo<Aggregate> basicAggregateInfo, @NotNull EventStreamReaderManager eventStreamReaderManager, @NotNull EventSourcingProperties eventSourcingProperties, @NotNull EventStreamNotifier eventStreamNotifier, @NotNull CoroutineDispatcher coroutineDispatcher) {
        Intrinsics.checkNotNullParameter(eventStore, "eventStore");
        Intrinsics.checkNotNullParameter(str, "streamName");
        Intrinsics.checkNotNullParameter(basicAggregateInfo, "aggregateInfo");
        Intrinsics.checkNotNullParameter(eventStreamReaderManager, "streamManager");
        Intrinsics.checkNotNullParameter(eventSourcingProperties, "config");
        Intrinsics.checkNotNullParameter(eventStreamNotifier, "eventStreamNotifier");
        Intrinsics.checkNotNullParameter(coroutineDispatcher, "dispatcher");
        this.eventStore = eventStore;
        this.streamName = str;
        this.streamManager = eventStreamReaderManager;
        this.eventStreamNotifier = eventStreamNotifier;
        this.dispatcher = coroutineDispatcher;
        this.eventStoreTableName = basicAggregateInfo.getAggregateEventsTableName();
        this.commitIndexEachNMessages = eventSourcingProperties.getRecordReadIndexCommitPeriod();
        this.healthcheckPeriodInMillis = Duration.getInWholeMilliseconds-impl(eventSourcingProperties.m5getEventReaderHealthCheckPeriodUwyO8pc()) + Random.Default.nextLong(Duration.getInWholeMilliseconds-impl(eventSourcingProperties.m5getEventReaderHealthCheckPeriodUwyO8pc()) / 5);
        Logger logger = LoggerFactory.getLogger(EventStoreReader.class);
        Intrinsics.checkNotNullExpressionValue(logger, "getLogger(EventStoreReader::class.java)");
        this.logger = logger;
        String uuid = UUID.randomUUID().toString();
        Intrinsics.checkNotNullExpressionValue(uuid, "randomUUID().toString()");
        this.readerId = uuid;
        this.version = new AtomicLong(1L);
        this.isHealthcheckActive = true;
        this.healthCheckJob = launchEventStoreReaderHealthCheckJob();
        this.eventStoreReadIndex = new EventStreamReadIndex(this.streamName, 0L, 0L);
        this.indexResetInfo = NO_RESET_REQUIRED;
    }

    @Override // ru.quipy.streams.EventReader
    @Nullable
    public Object read(int i, @NotNull Continuation<? super List<EventRecord>> continuation) {
        if (!this.meIsActiveReader) {
            this.logger.debug("Skip reading by reader id " + this.readerId + ", stream " + this.streamName + " has another active reader");
            return CollectionsKt.emptyList();
        }
        checkAndResetIndexIfRequired();
        List<EventRecord> findBatchOfEventRecordAfter = this.eventStore.findBatchOfEventRecordAfter(this.eventStoreTableName, this.eventStoreReadIndex.getReadIndex(), i);
        this.eventStreamNotifier.onBatchRead(this.streamName, findBatchOfEventRecordAfter.size());
        return findBatchOfEventRecordAfter;
    }

    @Override // ru.quipy.streams.EventReader
    public void acknowledgeRecord(@NotNull EventRecord eventRecord) {
        Intrinsics.checkNotNullParameter(eventRecord, "eventRecord");
        if (!this.meIsActiveReader || eventRecord.getCreatedAt() < this.eventStoreReadIndex.getReadIndex()) {
            return;
        }
        long createdAt = eventRecord.getCreatedAt();
        this.eventStoreReadIndex = new EventStreamReadIndex(this.streamName, createdAt, this.eventStoreReadIndex.getVersion());
        long j = this.processedRecords;
        this.processedRecords = j + 1;
        if (j % this.commitIndexEachNMessages == 0) {
            commitReadIndex(createdAt);
        }
    }

    @Override // ru.quipy.streams.EventReader
    public void resetReadIndex(@NotNull ReadIndexResetInfo readIndexResetInfo) {
        Intrinsics.checkNotNullParameter(readIndexResetInfo, "resetInfo");
        if (this.eventStoreReadIndex.getVersion() < 1) {
            throw new IllegalArgumentException("Can't reset to non existing version: " + this.eventStoreReadIndex.getVersion());
        }
        this.indexResetInfo = readIndexResetInfo;
    }

    @Override // ru.quipy.streams.EventReader
    public void stop() {
        if (this.meIsActiveReader) {
            this.meIsActiveReader = false;
        }
        if (this.isHealthcheckActive) {
            this.isHealthcheckActive = false;
            Job.DefaultImpls.cancel$default(this.healthCheckJob, (CancellationException) null, 1, (Object) null);
        }
    }

    @Override // ru.quipy.streams.EventReader
    public void resume() {
        this.isHealthcheckActive = true;
        this.healthCheckJob = launchEventStoreReaderHealthCheckJob();
    }

    private final void commitReadIndex(long j) {
        EventStreamReadIndex eventStreamReadIndex = new EventStreamReadIndex(this.streamName, j, this.eventStoreReadIndex.getVersion() + 1);
        this.logger.debug("Committing index for " + this.streamName + '-' + this.readerId + ", index: " + j + ", updated version: " + eventStreamReadIndex.getVersion());
        this.eventStore.commitStreamReadIndex(eventStreamReadIndex);
        this.eventStreamNotifier.onReadIndexCommitted(this.streamName, eventStreamReadIndex.getReadIndex());
        syncReaderIndex();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Job launchEventStoreReaderHealthCheckJob() {
        Job launch$default = BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope(new CoroutineName("reading-" + this.streamName + "-event-store-coroutine").plus(this.dispatcher)), (CoroutineContext) null, (CoroutineStart) null, new EventStoreReader$launchEventStoreReaderHealthCheckJob$1(this, null), 3, (Object) null);
        launch$default.invokeOnCompletion(new Function1<Throwable, Unit>() { // from class: ru.quipy.streams.EventStoreReader$launchEventStoreReaderHealthCheckJob$2$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final void invoke(@Nullable Throwable th) {
                boolean z;
                Logger logger;
                String str;
                Logger logger2;
                String str2;
                Job launchEventStoreReaderHealthCheckJob;
                z = EventStoreReader.this.isHealthcheckActive;
                if (!z) {
                    logger = EventStoreReader.this.logger;
                    StringBuilder append = new StringBuilder().append("Stopped event store reader coroutine of stream ");
                    str = EventStoreReader.this.streamName;
                    logger.warn(append.append(str).toString());
                    return;
                }
                logger2 = EventStoreReader.this.logger;
                StringBuilder append2 = new StringBuilder().append("Unexpected error in event store reader ");
                str2 = EventStoreReader.this.streamName;
                logger2.error(append2.append(str2).append(". Relaunching...").toString(), th);
                EventStoreReader eventStoreReader = EventStoreReader.this;
                launchEventStoreReaderHealthCheckJob = EventStoreReader.this.launchEventStoreReaderHealthCheckJob();
                eventStoreReader.healthCheckJob = launchEventStoreReaderHealthCheckJob;
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((Throwable) obj);
                return Unit.INSTANCE;
            }
        });
        return launch$default;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean performHealthCheck() {
        return this.streamManager.tryUpdateReaderState(this.streamName, this.readerId, this.eventStoreReadIndex.getReadIndex());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final boolean isMe(ActiveEventStreamReader activeEventStreamReader) {
        return activeEventStreamReader != null && Intrinsics.areEqual(activeEventStreamReader.getReaderId(), this.readerId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Removed duplicated region for block: B:12:0x0068  */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0082  */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00ca  */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0054  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public final java.lang.Object ensureTableExists(kotlin.coroutines.Continuation<? super kotlin.Unit> r7) {
        /*
            r6 = this;
            r0 = r7
            boolean r0 = r0 instanceof ru.quipy.streams.EventStoreReader$ensureTableExists$1
            if (r0 == 0) goto L26
            r0 = r7
            ru.quipy.streams.EventStoreReader$ensureTableExists$1 r0 = (ru.quipy.streams.EventStoreReader$ensureTableExists$1) r0
            r9 = r0
            r0 = r9
            int r0 = r0.label
            r1 = -2147483648(0xffffffff80000000, float:-0.0)
            r0 = r0 & r1
            if (r0 == 0) goto L26
            r0 = r9
            r1 = r0
            int r1 = r1.label
            r2 = -2147483648(0xffffffff80000000, float:-0.0)
            int r1 = r1 - r2
            r0.label = r1
            goto L30
        L26:
            ru.quipy.streams.EventStoreReader$ensureTableExists$1 r0 = new ru.quipy.streams.EventStoreReader$ensureTableExists$1
            r1 = r0
            r2 = r6
            r3 = r7
            r1.<init>(r2, r3)
            r9 = r0
        L30:
            r0 = r9
            java.lang.Object r0 = r0.result
            r8 = r0
            java.lang.Object r0 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
            r10 = r0
            r0 = r9
            int r0 = r0.label
            switch(r0) {
                case 0: goto L54;
                case 1: goto L82;
                default: goto Lca;
            }
        L54:
            r0 = r8
            kotlin.ResultKt.throwOnFailure(r0)
        L58:
            r0 = r6
            ru.quipy.database.EventStore r0 = r0.eventStore
            r1 = r6
            java.lang.String r1 = r1.eventStoreTableName
            boolean r0 = r0.tableExists(r1)
            if (r0 != 0) goto Lc6
            r0 = 2000(0x7d0, double:9.88E-321)
            r1 = r9
            r2 = r9
            r3 = r6
            r2.L$0 = r3
            r2 = r9
            r3 = 1
            r2.label = r3
            java.lang.Object r0 = kotlinx.coroutines.DelayKt.delay(r0, r1)
            r1 = r0
            r2 = r10
            if (r1 != r2) goto L8f
            r1 = r10
            return r1
        L82:
            r0 = r9
            java.lang.Object r0 = r0.L$0
            ru.quipy.streams.EventStoreReader r0 = (ru.quipy.streams.EventStoreReader) r0
            r6 = r0
            r0 = r8
            kotlin.ResultKt.throwOnFailure(r0)
            r0 = r8
        L8f:
            r0 = r6
            org.slf4j.Logger r0 = r0.logger
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Event stream "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.lang.String r2 = r2.streamName
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " is waiting for "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.lang.String r2 = r2.eventStoreTableName
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = " to be created"
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.trace(r1)
            goto L58
        Lc6:
            kotlin.Unit r0 = kotlin.Unit.INSTANCE
            return r0
        Lca:
            java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
            r1 = r0
            java.lang.String r2 = "call to 'resume' before 'invoke' with coroutine"
            r1.<init>(r2)
            throw r0
        */
        throw new UnsupportedOperationException("Method not decompiled: ru.quipy.streams.EventStoreReader.ensureTableExists(kotlin.coroutines.Continuation):java.lang.Object");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void syncReaderIndex() {
        EventStreamReadIndex findStreamReadIndex = this.eventStore.findStreamReadIndex(this.streamName);
        if (findStreamReadIndex != null) {
            this.eventStoreReadIndex = findStreamReadIndex;
            this.logger.debug("Reader index synced for " + this.streamName + ". Index: " + findStreamReadIndex.getReadIndex() + ", version: " + findStreamReadIndex.getVersion());
            this.eventStreamNotifier.onReadIndexSynced(this.streamName, findStreamReadIndex.getReadIndex());
        }
    }

    private final void checkAndResetIndexIfRequired() {
        if (Intrinsics.areEqual(this.indexResetInfo, NO_RESET_REQUIRED)) {
            return;
        }
        this.eventStoreReadIndex = new EventStreamReadIndex(this.streamName, this.indexResetInfo.getResetIndex(), this.eventStoreReadIndex.getVersion());
        this.logger.warn("Index for stream " + this.streamName + " forcibly reset to " + this.indexResetInfo.getResetIndex());
        this.indexResetInfo = NO_RESET_REQUIRED;
        this.eventStreamNotifier.onStreamReset(this.streamName, this.indexResetInfo.getResetIndex());
    }
}
