package no.ssb.rawdata.api.persistence;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import no.ssb.rawdata.api.state.CompletedPosition;
import no.ssb.rawdata.api.state.StatePersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:no/ssb/rawdata/api/persistence/Subscription.class */
public class Subscription implements Disposable {
    private static final Logger LOG = LoggerFactory.getLogger(Subscription.class);
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final AtomicBoolean endOfStream = new AtomicBoolean(false);
    private final PersistenceQueue<CompletedPosition> persistenceQueue;
    private final StatePersistence statePersistence;
    private final String namespace;
    private final String fromPosition;

    public Subscription(PersistenceQueue<CompletedPosition> persistenceQueue, StatePersistence statePersistence, String str, String str2) {
        this.persistenceQueue = persistenceQueue;
        this.statePersistence = statePersistence;
        this.namespace = str;
        this.fromPosition = str2;
    }

    public Disposable subscribe(Consumer<CompletedPosition> consumer) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference atomicReference = new AtomicReference(this.fromPosition);
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            while (!this.endOfStream.get()) {
                if (atomicReference.get() == null) {
                    String str = (String) this.statePersistence.getFirstPosition(this.namespace).blockingGet();
                    if (str == null) {
                        nap(250L);
                    } else {
                        atomicReference.set(str);
                    }
                }
                this.statePersistence.readPositions(this.namespace, (String) atomicReference.get(), (String) this.statePersistence.getLastPosition(this.namespace).blockingGet()).subscribe(completedPosition -> {
                    if (!atomicBoolean.get()) {
                        consumer.accept(completedPosition);
                        atomicReference.set(completedPosition.position);
                        atomicBoolean.set(true);
                    } else {
                        if (((String) atomicReference.get()).equals(completedPosition.position)) {
                            return;
                        }
                        consumer.accept(completedPosition);
                        atomicReference.set(completedPosition.position);
                    }
                }, th -> {
                    th.printStackTrace();
                }, () -> {
                });
                nap(250L);
            }
            return null;
        }, this.executor);
        ExecutorService executorService = this.executor;
        Objects.requireNonNull(supplyAsync);
        executorService.submit(supplyAsync::join);
        return this;
    }

    private void nap(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    void shutdownAndAwaitTermination() {
        LOG.info("Shutdown Subscription..");
        this.endOfStream.set(true);
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
                if (!this.executor.awaitTermination(60L, TimeUnit.SECONDS)) {
                    LOG.error("Pool did not terminate");
                }
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @Override // no.ssb.rawdata.api.persistence.Disposable
    public void cancel() {
        this.endOfStream.set(true);
    }

    @Override // no.ssb.rawdata.api.persistence.Disposable
    public void dispose() {
        close();
    }

    @Override // no.ssb.rawdata.api.persistence.Disposable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.executor.isShutdown()) {
            return;
        }
        shutdownAndAwaitTermination();
    }
}
