package no.ssb.rawdata.provider.postgres;

import de.huxhorn.sulky.ulid.ULID;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import no.ssb.rawdata.api.RawdataClosedException;
import no.ssb.rawdata.api.RawdataConsumer;
import no.ssb.rawdata.api.RawdataMessage;
import no.ssb.rawdata.provider.postgres.tx.TransactionFactory;

/* loaded from: input_file:no/ssb/rawdata/provider/postgres/PostgresRawdataConsumer.class */
class PostgresRawdataConsumer implements RawdataConsumer {
    static final CountDownLatch OPEN_LATCH = new CountDownLatch(0);
    final int dbPrefetchPollIntervalWhenEmptyMilliseconds;
    final TransactionFactory transactionFactory;
    final String topic;
    final AtomicReference<PostgresCursor> position = new AtomicReference<>();
    final AtomicBoolean closed = new AtomicBoolean(false);
    final Lock pollLock = new ReentrantLock();
    final Condition condition = this.pollLock.newCondition();
    final Deque<PostgresRawdataMessage> messageBuffer = new ConcurrentLinkedDeque();
    final AtomicReference<CompletableFuture<Integer>> pendingPrefetch = new AtomicReference<>(CompletableFuture.completedFuture(0));
    final AtomicReference<Long> pendingPrefetchExpiry = new AtomicReference<>(Long.valueOf(System.currentTimeMillis()));
    final int prefetchSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostgresRawdataConsumer(TransactionFactory transactionFactory, String str, PostgresCursor postgresCursor, int i, int i2) {
        this.transactionFactory = transactionFactory;
        this.prefetchSize = i;
        this.dbPrefetchPollIntervalWhenEmptyMilliseconds = i2;
        this.topic = str;
        this.position.set(postgresCursor == null ? new PostgresCursor(RawdataConsumer.beginningOfTime(), true) : postgresCursor);
    }

    public String topic() {
        return this.topic;
    }

    PostgresRawdataMessage findNextMessage() {
        CountDownLatch countDownLatch = OPEN_LATCH;
        if (this.messageBuffer.size() < 1 + (this.prefetchSize / 2) && this.pendingPrefetch.get().isDone() && (this.pendingPrefetch.get().join().intValue() > 0 || this.pendingPrefetchExpiry.get().longValue() <= System.currentTimeMillis())) {
            AtomicReference<CompletableFuture<Integer>> atomicReference = this.pendingPrefetch;
            CountDownLatch countDownLatch2 = new CountDownLatch(1);
            countDownLatch = countDownLatch2;
            atomicReference.set(fetchNextBatchAsync(countDownLatch2));
            this.pendingPrefetchExpiry.set(Long.valueOf(System.currentTimeMillis() + this.dbPrefetchPollIntervalWhenEmptyMilliseconds));
        }
        while (this.messageBuffer.isEmpty() && !this.pendingPrefetch.get().isDone()) {
            try {
                countDownLatch.await(5L, TimeUnit.SECONDS);
                if (this.messageBuffer.isEmpty()) {
                    this.pendingPrefetch.get().join();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return this.messageBuffer.pollFirst();
    }

    private CompletableFuture<Integer> fetchNextBatchAsync(CountDownLatch countDownLatch) {
        return this.transactionFactory.runAsyncInIsolatedTransaction(transaction -> {
            try {
                try {
                    PostgresCursor postgresCursor = this.position.get();
                    Object[] objArr = new Object[3];
                    objArr[0] = this.topic;
                    objArr[1] = postgresCursor.inclusive ? ">=" : ">";
                    objArr[2] = this.topic;
                    PreparedStatement prepareStatement = transaction.connection().prepareStatement(String.format("SELECT c.name, c.data, p.ulid, p.position, p.ordering_group, p.sequence_number FROM (SELECT ulid, ordering_group, sequence_number, position FROM \"%s_positions\" WHERE ulid %s ? ORDER BY ulid LIMIT ?) p LEFT JOIN \"%s_content\" c ON c.ulid = p.ulid ORDER BY p.ulid, c.name", objArr));
                    prepareStatement.setObject(1, new UUID(postgresCursor.startKey.getMostSignificantBits(), postgresCursor.startKey.getLeastSignificantBits()));
                    prepareStatement.setInt(2, this.prefetchSize);
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    PostgresRawdataMessage postgresRawdataMessage = null;
                    ULID.Value value = null;
                    String str = null;
                    long j = 0;
                    String str2 = null;
                    LinkedHashMap linkedHashMap = new LinkedHashMap();
                    int i = 0;
                    while (executeQuery.next()) {
                        String string = executeQuery.getString(1);
                        byte[] bytes = executeQuery.getBytes(2);
                        UUID uuid = (UUID) executeQuery.getObject(3);
                        String string2 = executeQuery.getString(4);
                        String string3 = executeQuery.getString(5);
                        long j2 = executeQuery.getLong(6);
                        ULID.Value value2 = new ULID.Value(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
                        if (value == null) {
                            value = value2;
                            str2 = string2;
                            str = string3;
                            j = j2;
                        }
                        if (!value2.equals(value)) {
                            Deque<PostgresRawdataMessage> deque = this.messageBuffer;
                            PostgresRawdataMessage postgresRawdataMessage2 = new PostgresRawdataMessage(value, str, j, str2, linkedHashMap);
                            postgresRawdataMessage = postgresRawdataMessage2;
                            deque.add(postgresRawdataMessage2);
                            int i2 = i;
                            i++;
                            if (i2 == 0) {
                                countDownLatch.countDown();
                            }
                            linkedHashMap = new LinkedHashMap();
                        }
                        linkedHashMap.put(string, bytes);
                        value = value2;
                        str2 = string2;
                        str = string3;
                        j = j2;
                    }
                    if (value != null) {
                        i++;
                        Deque<PostgresRawdataMessage> deque2 = this.messageBuffer;
                        PostgresRawdataMessage postgresRawdataMessage3 = new PostgresRawdataMessage(value, str, j, str2, linkedHashMap);
                        postgresRawdataMessage = postgresRawdataMessage3;
                        deque2.add(postgresRawdataMessage3);
                    }
                    if (postgresRawdataMessage != null) {
                        this.position.set(new PostgresCursor(postgresRawdataMessage.ulid(), false));
                    }
                    Integer valueOf = Integer.valueOf(i);
                    countDownLatch.countDown();
                    return valueOf;
                } catch (SQLException e) {
                    throw new PersistenceException(e);
                }
            } catch (Throwable th) {
                countDownLatch.countDown();
                throw th;
            }
        }, true);
    }

    /* renamed from: receive, reason: merged with bridge method [inline-methods] */
    public PostgresRawdataMessage m3receive(int i, TimeUnit timeUnit) throws InterruptedException {
        if (isClosed()) {
            throw new RawdataClosedException();
        }
        long nanoTime = System.nanoTime() + timeUnit.toNanos(i);
        if (!this.pollLock.tryLock()) {
            throw new RuntimeException("Concurrent access between calls to receive and seek not allowed");
        }
        try {
            PostgresRawdataMessage findNextMessage = findNextMessage();
            while (findNextMessage == null) {
                long nanoTime2 = nanoTime - System.nanoTime();
                if (nanoTime2 <= 0) {
                    return null;
                }
                this.condition.await(Math.min(nanoTime2, 250000000), TimeUnit.NANOSECONDS);
                findNextMessage = findNextMessage();
            }
            PostgresRawdataMessage postgresRawdataMessage = findNextMessage;
            this.pollLock.unlock();
            return postgresRawdataMessage;
        } finally {
            this.pollLock.unlock();
        }
    }

    public CompletableFuture<? extends RawdataMessage> receiveAsync() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return m3receive(5, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void seek(long j) {
        if (!this.pollLock.tryLock()) {
            throw new RuntimeException("Concurrent access between calls to receive and seek not allowed");
        }
        try {
            this.position.set(new PostgresCursor(RawdataConsumer.beginningOf(j), true));
            this.pendingPrefetch.get().join();
            this.messageBuffer.clear();
            this.pendingPrefetchExpiry.set(Long.valueOf(System.currentTimeMillis()));
            this.pollLock.unlock();
        } catch (Throwable th) {
            this.pollLock.unlock();
            throw th;
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void close() {
        this.closed.set(true);
    }

    public String toString() {
        return "PostgresRawdataConsumer{topic='" + this.topic + "'position=" + this.position.get() + ", closed=" + this.closed + "}";
    }
}
