package dk.cloudcreate.essentials.components.foundation.postgresql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dk.cloudcreate.essentials.components.foundation.postgresql.TableChangeNotification;
import dk.cloudcreate.essentials.reactive.LocalEventBus;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/postgresql/MultiTableChangeListener.class */
public class MultiTableChangeListener<T extends TableChangeNotification> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MultiTableChangeListener.class);
    private final Jdbi jdbi;
    private final Duration pollingInterval;
    private final ObjectMapper objectMapper;
    private final LocalEventBus localEventBus;
    private final ConcurrentMap<String, Class<? extends T>> listenForNotificationsRelatedToTables = new ConcurrentHashMap();
    private final AtomicReference<Handle> handleReference = new AtomicReference<>();
    private ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(ThreadFactoryBuilder.builder().nameFormat("MultiTableChangeListener").daemon(true).build());
    private ScheduledFuture<?> scheduledFuture;

    public MultiTableChangeListener(Jdbi jdbi, Duration duration, ObjectMapper objectMapper, LocalEventBus localEventBus) {
        this.jdbi = (Jdbi) FailFast.requireNonNull(jdbi, "No jdbi provided");
        this.pollingInterval = (Duration) FailFast.requireNonNull(duration, "No pollingInterval provided");
        this.objectMapper = (ObjectMapper) FailFast.requireNonNull(objectMapper, "No objectMapper provided");
        this.localEventBus = (LocalEventBus) FailFast.requireNonNull(localEventBus, "No localEventBus instance provided");
        this.scheduledFuture = this.executorService.scheduleAtFixedRate(this::pollForNotifications, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        log.info("Closing");
        try {
            this.scheduledFuture.cancel(true);
            Iterator<String> it = this.listenForNotificationsRelatedToTables.keySet().iterator();
            while (it.hasNext()) {
                unlisten(it.next());
            }
            this.scheduledFuture = null;
        } catch (Exception e) {
        }
        log.info("Closed");
    }

    public MultiTableChangeListener listenToNotificationsFor(String str, Class<? extends T> cls) {
        FailFast.requireNonBlank(str, "No tableName was provided");
        FailFast.requireNonNull(cls, "No tableNotificationType was provided");
        if (this.listenForNotificationsRelatedToTables.put(str, cls) == null) {
            listen(str);
        }
        return this;
    }

    public MultiTableChangeListener unlistenToNotificationsFor(String str) {
        FailFast.requireNonBlank(str, "No tableName was provided");
        if (this.listenForNotificationsRelatedToTables.remove(str) != null) {
            unlisten(str);
        }
        return this;
    }

    private void listen(String str) {
        FailFast.requireNonBlank(str, "No tableName provided");
        log.info("Setting up Table change LISTENER for '{}'", str);
        getHandle(null).execute("LISTEN " + ListenNotify.resolveTableChangeChannelName(str), new Object[0]);
    }

    private void unlisten(String str) {
        FailFast.requireNonBlank(str, "No tableName provided");
        log.info("Removing table change LISTENER for '{}'", str);
        getHandle(null).execute("UNLISTEN " + ListenNotify.resolveTableChangeChannelName(str), new Object[0]);
    }

    private void pollForNotifications() {
        log.trace("Polling for notifications related to {} tables: {}", Integer.valueOf(this.listenForNotificationsRelatedToTables.size()), this.listenForNotificationsRelatedToTables.keySet());
        if (this.listenForNotificationsRelatedToTables.isEmpty()) {
            return;
        }
        Handle handle = getHandle(handle2 -> {
            Iterator<String> it = this.listenForNotificationsRelatedToTables.keySet().iterator();
            while (it.hasNext()) {
                listen(it.next());
            }
        });
        try {
            PGNotification[] notifications = ((PGConnection) handle.getConnection().unwrap(PGConnection.class)).getNotifications();
            if (notifications.length > 0) {
                log.debug("Received {} Notification(s)", Integer.valueOf(notifications.length));
                Stream filter = Arrays.stream(notifications).map(pGNotification -> {
                    Class<? extends T> cls = this.listenForNotificationsRelatedToTables.get(pGNotification.getName());
                    if (cls == null) {
                        log.error(MessageFormatter.msg("Couldn't find a concrete {} type for notifications related to table '{}'", new Object[]{TableChangeNotification.class.getSimpleName(), pGNotification.getName()}));
                        return null;
                    }
                    try {
                        return (TableChangeNotification) this.objectMapper.readValue(pGNotification.getParameter(), cls);
                    } catch (JsonProcessingException e) {
                        log.error(MessageFormatter.msg("Failed to deserialize notification payload '{}' to concrete {} related to table '{}'", new Object[]{pGNotification.getParameter(), cls.getName(), pGNotification.getName()}));
                        return null;
                    }
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                });
                LocalEventBus localEventBus = this.localEventBus;
                Objects.requireNonNull(localEventBus);
                filter.forEach((v1) -> {
                    r1.publish(v1);
                });
            } else {
                log.trace("Didn't receive any Notifications");
            }
        } catch (ConnectionException | SQLException e) {
            log.error(MessageFormatter.msg("Failed to listen for notifications", new Object[0]), e);
            try {
                handle.close();
            } catch (Exception e2) {
                log.error(MessageFormatter.msg("Failed to close the listener Handle", new Object[0]), e);
            }
            this.handleReference.set(null);
        }
    }

    private Handle getHandle(Consumer<Handle> consumer) {
        try {
            Handle handle = this.handleReference.get();
            if (handle == null) {
                AtomicReference<Handle> atomicReference = this.handleReference;
                Handle open = this.jdbi.open();
                handle = open;
                atomicReference.set(open);
                handle.getConnection().setAutoCommit(true);
                handle.getConnection().setReadOnly(true);
                handle.getConnection().setTransactionIsolation(2);
                if (consumer != null) {
                    consumer.accept(handle);
                }
            }
            return handle;
        } catch (SQLException e) {
            throw new RuntimeException("Failed to acquire Handle", e);
        }
    }
}
