package no.ssb.rawdata.gcs;

import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import de.huxhorn.sulky.ulid.ULID;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import no.ssb.rawdata.api.RawdataClosedException;
import no.ssb.rawdata.api.RawdataMessage;
import no.ssb.rawdata.api.RawdataNotBufferedException;
import no.ssb.rawdata.api.RawdataProducer;
import no.ssb.rawdata.gcs.GCSRawdataMessage;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:no/ssb/rawdata/gcs/GCSRawdataProducer.class */
public class GCSRawdataProducer implements RawdataProducer {
    static final Logger LOG = LoggerFactory.getLogger(GCSRawdataProducer.class);
    static final Schema schema = (Schema) ((SchemaBuilder.MapDefault) ((SchemaBuilder.FixedDefault) SchemaBuilder.record("RawdataMessage").fields().name("id").type().fixed("ulid").size(16)).noDefault().name("orderingGroup").type().nullable().stringType().noDefault().name("sequenceNumber").type().longType().longDefault(0).name("position").type().stringType().noDefault().name("data").type().map().values().bytesType()).noDefault().endRecord();
    final GCSRawdataUtils gcsRawdataUtils;
    final String bucket;
    final Path tmpFolder;
    final long avroMaxSeconds;
    final long avroMaxBytes;
    final int avroSyncInterval;
    final String topic;
    final AtomicBoolean closed = new AtomicBoolean(false);
    final ULID ulid = new ULID();
    final AtomicReference<ULID.Value> prevUlid = new AtomicReference<>(this.ulid.nextValue());
    final Map<String, GCSRawdataMessage.Builder> buffer = new ConcurrentHashMap();
    final AtomicReference<DataFileWriter<GenericRecord>> dataFileWriterRef = new AtomicReference<>();
    final AtomicReference<Path> pathRef = new AtomicReference<>();
    final AtomicLong timestampOfFirstMessageInWindow = new AtomicLong(-1);
    final GCSAvroFileMetadata activeAvrofileMetadata = new GCSAvroFileMetadata();
    final AtomicLong avroBytesWrittenInBlock = new AtomicLong(0);
    final ReentrantLock lock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    public GCSRawdataProducer(Storage storage, String str, Path path, long j, long j2, int i, String str2) {
        this.gcsRawdataUtils = new GCSRawdataUtils(storage);
        this.bucket = str;
        this.tmpFolder = path;
        this.avroMaxSeconds = j;
        this.avroMaxBytes = j2;
        this.avroSyncInterval = i;
        this.topic = str2;
        try {
            Path resolve = path.resolve(str2);
            Files.createDirectories(resolve, new FileAttribute[0]);
            Path createTempFile = Files.createTempFile(resolve, "", ".avro", new FileAttribute[0]);
            this.pathRef.set(createTempFile);
            createOrOverwriteLocalAvroFile(createTempFile);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void createOrOverwriteLocalAvroFile(Path path) {
        try {
            if (!this.lock.tryLock(5L, TimeUnit.MINUTES)) {
                throw new IllegalStateException("Unable to acquire lock within 5 minutes");
            }
            try {
                this.activeAvrofileMetadata.clear();
                DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(schema));
                dataFileWriter.setSyncInterval(1073741824);
                dataFileWriter.setFlushOnEveryBlock(true);
                this.dataFileWriterRef.set(dataFileWriter);
                try {
                    dataFileWriter.create(schema, path.toFile());
                    this.activeAvrofileMetadata.setSyncOfLastBlock(dataFileWriter.sync());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    private void closeAvroFileAndUploadToGCS() {
        try {
            if (!this.lock.tryLock(5L, TimeUnit.MINUTES)) {
                throw new IllegalStateException("Unable to acquire lock within 5 minutes");
            }
            try {
                try {
                    DataFileWriter<GenericRecord> andSet = this.dataFileWriterRef.getAndSet(null);
                    if (andSet != null) {
                        andSet.flush();
                        andSet.close();
                    }
                    Path path = this.pathRef.get();
                    if (path != null && this.activeAvrofileMetadata.getCount() > 0) {
                        BlobId blobId = this.activeAvrofileMetadata.toBlobId(this.bucket, this.topic);
                        verifySeekableToLastBlockOffsetAsGivenByFilename(path, GCSRawdataUtils.getOffsetOfLastBlock(blobId));
                        LOG.info("Copying Avro file {} ({}) to BlobId: {}", new Object[]{path.getFileName(), GCSRawdataUtils.humanReadableByteCount(path.toFile().length(), false), blobId});
                        this.gcsRawdataUtils.copyLocalFileToGCSBlob(path.toFile(), blobId);
                        LOG.info("Copy COMPLETE! Avro file {}", path.getFileName());
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    static void verifySeekableToLastBlockOffsetAsGivenByFilename(Path path, long j) throws IOException {
        DataFileReader dataFileReader = new DataFileReader(new SeekableFileInput(path.toFile()), new GenericDatumReader(schema));
        try {
            dataFileReader.seek(j);
            dataFileReader.hasNext();
            dataFileReader.close();
        } catch (Throwable th) {
            try {
                dataFileReader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public String topic() {
        return this.topic;
    }

    public RawdataMessage.Builder builder() throws RawdataClosedException {
        return new GCSRawdataMessage.Builder();
    }

    public RawdataProducer buffer(RawdataMessage.Builder builder) throws RawdataClosedException {
        GCSRawdataMessage.Builder builder2 = (GCSRawdataMessage.Builder) builder;
        if (isClosed()) {
            throw new RawdataClosedException();
        }
        this.buffer.put(builder2.position, builder2);
        return this;
    }

    public void publish(String... strArr) throws RawdataClosedException, RawdataNotBufferedException {
        if (isClosed()) {
            throw new RawdataClosedException();
        }
        try {
            if (!this.lock.tryLock(5L, TimeUnit.MINUTES)) {
                throw new IllegalStateException("Unable to acquire lock within 5 minutes");
            }
            try {
                for (String str : strArr) {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.timestampOfFirstMessageInWindow.compareAndSet(-1L, currentTimeMillis);
                    if (this.timestampOfFirstMessageInWindow.get() + (1000 * this.avroMaxSeconds) < currentTimeMillis) {
                        closeAvroFileAndUploadToGCS();
                        createOrOverwriteLocalAvroFile(this.pathRef.get());
                        this.timestampOfFirstMessageInWindow.set(currentTimeMillis);
                    }
                    GCSRawdataMessage.Builder remove = this.buffer.remove(str);
                    if (remove == null) {
                        throw new RawdataNotBufferedException(String.format("position %s has not been buffered", str));
                    }
                    if (remove.ulid == null) {
                        remove.m5ulid(RawdataProducer.nextMonotonicUlid(this.ulid, this.prevUlid.get()));
                    }
                    GCSRawdataMessage m2build = remove.m2build();
                    this.prevUlid.set(m2build.ulid());
                    this.activeAvrofileMetadata.setIdOfFirstRecord(m2build.ulid());
                    this.activeAvrofileMetadata.setPositionOfFirstRecord(str);
                    try {
                        GenericData.Record record = new GenericData.Record(schema);
                        record.put("id", new GenericData.Fixed(schema.getField("id").schema(), m2build.ulid().toBytes()));
                        record.put("orderingGroup", m2build.orderingGroup());
                        record.put("sequenceNumber", Long.valueOf(m2build.sequenceNumber()));
                        record.put("position", m2build.position());
                        record.put("data", m2build.data().entrySet().stream().collect(Collectors.toMap(entry -> {
                            return (String) entry.getKey();
                        }, entry2 -> {
                            return ByteBuffer.wrap((byte[]) entry2.getValue());
                        })));
                        if (this.avroBytesWrittenInBlock.get() >= this.avroSyncInterval) {
                            this.activeAvrofileMetadata.setSyncOfLastBlock(this.dataFileWriterRef.get().sync());
                            this.avroBytesWrittenInBlock.set(0L);
                        }
                        this.dataFileWriterRef.get().append(record);
                        this.activeAvrofileMetadata.incrementCounter(1L);
                        this.avroBytesWrittenInBlock.addAndGet(estimateAvroSizeOfRawdataMessage(m2build));
                        if (this.pathRef.get().toFile().length() > this.avroMaxBytes) {
                            closeAvroFileAndUploadToGCS();
                            createOrOverwriteLocalAvroFile(this.pathRef.get());
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    static long estimateAvroSizeOfRawdataMessage(GCSRawdataMessage gCSRawdataMessage) {
        return 18 + ((Integer) Optional.ofNullable(gCSRawdataMessage.orderingGroup()).map((v0) -> {
            return v0.length();
        }).orElse(0)).intValue() + 6 + 2 + gCSRawdataMessage.position().length() + ((Long) gCSRawdataMessage.data().entrySet().stream().map(entry -> {
            return Long.valueOf(2 + ((String) entry.getKey()).length() + 4 + ((byte[]) entry.getValue()).length);
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        })).longValue();
    }

    public CompletableFuture<Void> publishAsync(String... strArr) {
        if (isClosed()) {
            throw new RawdataClosedException();
        }
        return CompletableFuture.runAsync(() -> {
            publish(strArr);
        });
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            closeAvroFileAndUploadToGCS();
            this.buffer.clear();
        }
    }
}
