package dk.cloudcreate.essentials.components.foundation.postgresql;

import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.jdbi.v3.core.ConnectionException;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.Jdbi;
import org.postgresql.PGConnection;
import org.postgresql.PGNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/postgresql/ListenNotify.class */
public final class ListenNotify {
    private static final Logger log = LoggerFactory.getLogger(ListenNotify.class);
    public static final String TABLE_NAME = "table_name";
    public static final String SQL_OPERATION = "sql_operation";

    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/postgresql/ListenNotify$SqlOperation.class */
    public enum SqlOperation {
        INSERT,
        UPDATE,
        DELETE,
        TRUNCATE
    }

    public static String resolveTableChangeChannelName(String str) {
        return (String) FailFast.requireNonNull(str, "No tableName provided");
    }

    public static void addChangeNotificationTriggerToTable(Handle handle, String str, List<SqlOperation> list, String... strArr) {
        FailFast.requireNonNull(handle, "No handle provided");
        FailFast.requireNonBlank(str, "No tableName provided");
        FailFast.requireNonEmpty(list, "No triggerOnSqlOperations entries provided");
        String str2 = "table_name, sql_operation";
        String str3 = "TG_TABLE_NAME, TG_OP";
        if (strArr != null && strArr.length > 0) {
            str2 = str2 + ", " + ((String) Arrays.stream(strArr).reduce((str4, str5) -> {
                return str4 + ", " + str5;
            }).get());
            str3 = str3 + ", " + ((String) Arrays.stream(strArr).map(str6 -> {
                return "NEW." + str6;
            }).reduce((str7, str8) -> {
                return str7 + ", " + str8;
            }).get()) + "\n";
        }
        String bind = MessageFormatter.bind("CREATE OR REPLACE FUNCTION notify_{:tableName}_change()\n       RETURNS trigger AS $$\n       BEGIN\n         PERFORM (\n            WITH payload({:additionalColumnsPayLoadStatement}) AS (\n              SELECT {:additionalColumnsSelectStatement}            )\n            SELECT pg_notify('{:channelName}', row_to_json(payload)::text) FROM payload);\n         RETURN NULL;\n       END;\n       $$ LANGUAGE PLPGSQL;", new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg("tableName", str), MessageFormatter.NamedArgumentBinding.arg("channelName", resolveTableChangeChannelName(str)), MessageFormatter.NamedArgumentBinding.arg("additionalColumnsPayLoadStatement", str2), MessageFormatter.NamedArgumentBinding.arg("additionalColumnsSelectStatement", str3)});
        log.debug("Notify Function for changes on '{}' SQL:\n{}", str, bind);
        handle.createUpdate(bind).execute();
        log.info("Added Notification FUNCTION 'notify_{}_change' for table '{}'", str, str);
        String bind2 = MessageFormatter.bind("CREATE OR REPLACE TRIGGER notify_on_{:tableName}_changes\n      {:when} {:on} ON {:tableName}\n      FOR EACH ROW\n         EXECUTE FUNCTION notify_{:tableName}_change();", new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg("tableName", str), MessageFormatter.NamedArgumentBinding.arg("when", "AFTER"), MessageFormatter.NamedArgumentBinding.arg("on", list.stream().map((v0) -> {
            return v0.name();
        }).reduce((str9, str10) -> {
            return str9 + " OR " + str10;
        }).get())});
        log.debug("Trigger for '{}' changes SQL:\n{}", str, bind2);
        handle.createUpdate(bind2).execute();
        log.info("Added Notification TRIGGER 'notify_on_{}_changes' for table '{}'", str, str);
    }

    public static void removeChangeNotificationTriggerFromTable(Handle handle, String str) {
        FailFast.requireNonNull(handle, "No handle provided");
        FailFast.requireNonBlank(str, "No tableName provided");
        handle.createUpdate(MessageFormatter.bind("DROP FUNCTION IF EXISTS notify_{:tableName}_change() CASCADE", new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg("tableName", str)})).execute();
        log.info("Removed Notification FUNCTION 'notify_{}_change' for table '{}'", str, str);
        handle.createUpdate(MessageFormatter.bind("DROP TRIGGER IF EXISTS notify_on_{:tableName}_changes ON {:tableName} CASCADE", new MessageFormatter.NamedArgumentBinding[]{MessageFormatter.NamedArgumentBinding.arg("tableName", str)})).execute();
        log.info("Removed Notification TRIGGER 'notify_on_{}_changes' for table '{}'", str, str);
    }

    public static Flux<String> listen(Jdbi jdbi, String str, Duration duration) {
        FailFast.requireNonNull(jdbi, "No jdbi provided");
        FailFast.requireNonBlank(str, "No tableName provided");
        FailFast.requireNonNull(duration, "No pollingInterval provided");
        AtomicReference atomicReference = new AtomicReference();
        return Flux.defer(() -> {
            Handle handle = (Handle) atomicReference.get();
            if (handle == null) {
                try {
                    Handle open = jdbi.open();
                    handle = open;
                    atomicReference.set(open);
                    handle.getConnection().setAutoCommit(true);
                    handle.getConnection().setReadOnly(true);
                    handle.getConnection().setTransactionIsolation(2);
                    log.info("Setting up LISTENER for table '{}'", str);
                    handle.execute("LISTEN " + resolveTableChangeChannelName(str), new Object[0]);
                } catch (ConnectionException | SQLException e) {
                    log.error(MessageFormatter.msg("Failed to listen for notification for table '{}'", new Object[]{str}), e);
                    try {
                        handle.close();
                        atomicReference.set(null);
                        return Flux.empty();
                    } catch (Exception e2) {
                        log.error(MessageFormatter.msg("Failed to close the listener Handle for table '{}'", new Object[]{str}), e);
                        return Flux.error(e2);
                    }
                }
            }
            PGNotification[] notifications = ((PGConnection) handle.getConnection().unwrap(PGConnection.class)).getNotifications();
            if (notifications.length > 0) {
                log.debug("Received {} Notification(s) for table '{}'", Integer.valueOf(notifications.length), str);
                return Flux.fromStream(Arrays.stream(notifications).map((v0) -> {
                    return v0.getParameter();
                }));
            }
            log.trace("Didn't receive any Notifications for table '{}'", str);
            return Flux.empty();
        }).doOnCancel(() -> {
            Handle handle = (Handle) atomicReference.get();
            if (handle != null) {
                log.info("Removing LISTENER for table '{}'", str);
                try {
                    handle.execute("UNLISTEN " + str, new Object[0]);
                } catch (Exception e) {
                }
                try {
                    handle.close();
                } catch (Exception e2) {
                }
            }
        }).repeatWhen(flux -> {
            return Flux.interval(duration).onBackpressureDrop().publishOn(Schedulers.newSingle("Postgresql-Listener-" + str, true));
        });
    }
}
