package no.ssb.rawdata.api.persistence;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import no.ssb.config.DynamicConfiguration;

/* loaded from: input_file:no/ssb/rawdata/api/persistence/PersistenceQueue.class */
public class PersistenceQueue<T> implements Closeable {
    private final DynamicConfiguration configuration;
    private final BlockingQueue<T> blockingQueue = new LinkedBlockingQueue();
    private final T noMoreUpStreamItemsAllowed = newNoMoreUpStreamItemsAllowed();
    private volatile boolean closed;

    public PersistenceQueue(DynamicConfiguration dynamicConfiguration) {
        this.configuration = dynamicConfiguration;
    }

    public <T> T newNoMoreUpStreamItemsAllowed() {
        try {
            return (T) Class.forName(getClass().getGenericSuperclass().getTypeName()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void enqueue(T t) {
        if (this.closed) {
            throw new IllegalStateException("The queue provider is closed!");
        }
        this.blockingQueue.add(t);
    }

    public Flowable<T> toFlowable() {
        return Flowable.create(flowableEmitter -> {
            while (!flowableEmitter.isCancelled()) {
                try {
                    if (this.closed) {
                        flowableEmitter.onComplete();
                        return;
                    }
                    if (flowableEmitter.requested() == 0) {
                        return;
                    }
                    T poll = this.blockingQueue.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        if (poll == this.noMoreUpStreamItemsAllowed) {
                            this.blockingQueue.add(this.noMoreUpStreamItemsAllowed);
                            flowableEmitter.onComplete();
                            return;
                        }
                        flowableEmitter.onNext(poll);
                    }
                } catch (Throwable th) {
                    flowableEmitter.onError(th);
                    return;
                }
            }
        }, BackpressureStrategy.BUFFER);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closed = true;
        this.blockingQueue.add(this.noMoreUpStreamItemsAllowed);
    }
}
