package no.ssb.rawdata.gcs;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import de.huxhorn.sulky.ulid.ULID;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import no.ssb.rawdata.api.RawdataClosedException;
import no.ssb.rawdata.api.RawdataConsumer;
import no.ssb.rawdata.api.RawdataMessage;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;

/* loaded from: input_file:no/ssb/rawdata/gcs/GCSRawdataConsumer.class */
class GCSRawdataConsumer implements RawdataConsumer {
    final String bucket;
    final String topic;
    final GCSTopicAvroFileCache gcsTopicAvroFileCache;
    final AtomicReference<Long> activeBlobFromKeyRef = new AtomicReference<>(-1L);
    final AtomicReference<DataFileReader<GenericRecord>> activeBlobDataFileReaderRef = new AtomicReference<>(null);
    final AtomicBoolean closed = new AtomicBoolean(false);
    final Deque<GCSRawdataMessage> preloadedMessages = new ConcurrentLinkedDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    public GCSRawdataConsumer(Storage storage, String str, String str2, GCSCursor gCSCursor, int i) {
        this.bucket = str;
        this.topic = str2;
        this.gcsTopicAvroFileCache = new GCSTopicAvroFileCache(storage, str, str2, i);
        if (gCSCursor == null) {
            seek(0L);
            return;
        }
        seek(gCSCursor.ulid.timestamp());
        while (true) {
            try {
                GCSRawdataMessage gCSRawdataMessage = (GCSRawdataMessage) receive(2, TimeUnit.SECONDS);
                if (gCSRawdataMessage == null) {
                    break;
                }
                if (gCSRawdataMessage.ulid().equals(gCSCursor.ulid)) {
                    if (gCSCursor.inclusive) {
                        this.preloadedMessages.addFirst(gCSRawdataMessage);
                    }
                } else if (gCSRawdataMessage.timestamp() > gCSCursor.ulid.timestamp()) {
                    this.preloadedMessages.addFirst(gCSRawdataMessage);
                    break;
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public String topic() {
        return this.topic;
    }

    public RawdataMessage receive(int i, TimeUnit timeUnit) throws InterruptedException, RawdataClosedException {
        long currentTimeMillis = System.currentTimeMillis();
        GCSRawdataMessage poll = this.preloadedMessages.poll();
        if (poll != null) {
            return poll;
        }
        DataFileReader<GenericRecord> dataFileReader = this.activeBlobDataFileReaderRef.get();
        if (dataFileReader == null) {
            Map.Entry<Long, Blob> findNextGCSBlob = findNextGCSBlob(i, timeUnit, currentTimeMillis);
            if (findNextGCSBlob == null) {
                return null;
            }
            this.activeBlobFromKeyRef.set(findNextGCSBlob.getKey());
            dataFileReader = setDataFileReader(findNextGCSBlob.getValue());
        }
        if (dataFileReader.hasNext()) {
            return toRawdataMessage((GenericRecord) dataFileReader.next());
        }
        Map.Entry<Long, Blob> findNextGCSBlob2 = findNextGCSBlob(i, timeUnit, currentTimeMillis);
        if (findNextGCSBlob2 == null) {
            return null;
        }
        this.activeBlobFromKeyRef.set(findNextGCSBlob2.getKey());
        setDataFileReader(findNextGCSBlob2.getValue());
        return receive(i, timeUnit);
    }

    private Map.Entry<Long, Blob> findNextGCSBlob(int i, TimeUnit timeUnit, long j) throws InterruptedException {
        Long l = this.activeBlobFromKeyRef.get();
        Map.Entry<Long, Blob> higherEntry = this.gcsTopicAvroFileCache.blobsByTimestamp().higherEntry(l);
        while (true) {
            Map.Entry<Long, Blob> entry = higherEntry;
            if (entry != null) {
                return entry;
            }
            if (System.currentTimeMillis() - j > timeUnit.toMillis(i)) {
                return null;
            }
            Thread.sleep(1000L);
            higherEntry = this.gcsTopicAvroFileCache.blobsByTimestamp().higherEntry(l);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static GCSRawdataMessage toRawdataMessage(GenericRecord genericRecord) {
        return new GCSRawdataMessage(ULID.fromBytes(((GenericData.Fixed) genericRecord.get("id")).bytes()), (String) Optional.ofNullable(genericRecord.get("orderingGroup")).map((v0) -> {
            return v0.toString();
        }).orElse(null), ((Long) genericRecord.get("sequenceNumber")).longValue(), genericRecord.get("position").toString(), (Map) ((Map) genericRecord.get("data")).entrySet().stream().collect(Collectors.toMap(entry -> {
            return ((Utf8) entry.getKey()).toString();
        }, entry2 -> {
            return ((ByteBuffer) entry2.getValue()).array();
        })));
    }

    public CompletableFuture<? extends RawdataMessage> receiveAsync() {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return receive(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void seek(long j) {
        this.preloadedMessages.clear();
        DataFileReader<GenericRecord> andSet = this.activeBlobDataFileReaderRef.getAndSet(null);
        if (andSet != null) {
            try {
                andSet.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        NavigableMap<Long, Blob> blobsByTimestamp = this.gcsTopicAvroFileCache.blobsByTimestamp();
        Map.Entry<Long, Blob> floorEntry = blobsByTimestamp.floorEntry(Long.valueOf(j));
        if (floorEntry == null) {
            floorEntry = blobsByTimestamp.ceilingEntry(Long.valueOf(j));
        }
        if (floorEntry == null) {
            this.activeBlobFromKeyRef.set(-1L);
            return;
        }
        this.activeBlobFromKeyRef.set(floorEntry.getKey());
        DataFileReader<GenericRecord> dataFileReader = setDataFileReader(floorEntry.getValue());
        GenericRecord genericRecord = null;
        while (dataFileReader.hasNext()) {
            try {
                genericRecord = (GenericRecord) dataFileReader.next(genericRecord);
                GCSRawdataMessage rawdataMessage = toRawdataMessage(genericRecord);
                if (rawdataMessage.timestamp() >= j) {
                    this.preloadedMessages.add(rawdataMessage);
                    return;
                }
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    private DataFileReader<GenericRecord> setDataFileReader(Blob blob) {
        try {
            DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(new GCSSeekableInput(blob.reader(new Blob.BlobSourceOption[0]), blob.getSize().longValue()), new GenericDatumReader(GCSRawdataProducer.schema));
            this.activeBlobDataFileReaderRef.set(dataFileReader);
            return dataFileReader;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            DataFileReader<GenericRecord> andSet = this.activeBlobDataFileReaderRef.getAndSet(null);
            if (andSet != null) {
                andSet.close();
            }
            this.activeBlobFromKeyRef.set(null);
            this.gcsTopicAvroFileCache.clear();
            this.preloadedMessages.clear();
        }
    }
}
