package dk.cloudcreate.essentials.components.foundation.fencedlock;

import dk.cloudcreate.essentials.components.foundation.fencedlock.DBFencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockEvents;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.reactive.EventBus;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.network.Network;
import java.time.Clock;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/fencedlock/DBFencedLockManager.class */
public class DBFencedLockManager<UOW extends UnitOfWork, LOCK extends DBFencedLock> implements FencedLockManager {
    private final FencedLockStorage<UOW, LOCK> lockStorage;
    private final ConcurrentMap<LockName, LOCK> locksAcquiredByThisLockManager;
    private final ConcurrentMap<LockName, ScheduledFuture<?>> asyncLockAcquirings;
    private final Duration lockTimeOut;
    private final Duration lockConfirmationInterval;
    private final String lockManagerInstanceId;
    protected final UnitOfWorkFactory<? extends UOW> unitOfWorkFactory;
    private final Optional<EventBus> eventBus;
    private volatile boolean started;
    private volatile boolean stopping;
    private volatile boolean paused;
    private ScheduledExecutorService lockConfirmationExecutor;
    private ScheduledExecutorService asyncLockAcquiringExecutor;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected int syncAcquireLockPauseIntervalMs = 100;

    protected DBFencedLockManager(FencedLockStorage<UOW, LOCK> fencedLockStorage, UnitOfWorkFactory<? extends UOW> unitOfWorkFactory, Optional<String> optional, Duration duration, Duration duration2, Optional<EventBus> optional2) {
        FailFast.requireNonNull(optional, "No lockManagerInstanceId option provided");
        this.lockStorage = (FencedLockStorage) FailFast.requireNonNull(fencedLockStorage, "No lockStorage provided");
        this.unitOfWorkFactory = (UnitOfWorkFactory) FailFast.requireNonNull(unitOfWorkFactory, "No unitOfWorkFactory provided");
        this.lockManagerInstanceId = optional.orElseGet(Network::hostName);
        this.lockTimeOut = (Duration) FailFast.requireNonNull(duration, "No lockTimeOut value provided");
        this.lockConfirmationInterval = (Duration) FailFast.requireNonNull(duration2, "No lockConfirmationInterval value provided");
        if (duration2.compareTo(duration) >= 1) {
            throw new IllegalArgumentException(MessageFormatter.msg("lockConfirmationInterval {} duration MUST not be larger than the lockTimeOut {} duration, because locks will then always timeout", new Object[]{duration2, duration}));
        }
        this.eventBus = (Optional) FailFast.requireNonNull(optional2, "No eventBus option provided");
        this.locksAcquiredByThisLockManager = new ConcurrentHashMap();
        this.asyncLockAcquirings = new ConcurrentHashMap();
        this.log.info("[{}] Initializing '{}' using storage '{}' and lockConfirmationInterval: {} ms, lockTimeOut: {} ms", new Object[]{this.lockManagerInstanceId, getClass().getName(), fencedLockStorage.getClass().getName(), Long.valueOf(duration2.toMillis()), Long.valueOf(duration.toMillis())});
        unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
            fencedLockStorage.initializeLockStorage(this, unitOfWork);
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void start() {
        if (this.started) {
            this.log.debug("[{}] Lock Manager was already started", this.lockManagerInstanceId);
            return;
        }
        this.log.info("[{}] Starting lock manager", this.lockManagerInstanceId);
        this.stopping = false;
        this.lockConfirmationExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().nameFormat(this.lockManagerInstanceId + "-FencedLock-Confirmation-%d").daemon(true).build());
        this.asyncLockAcquiringExecutor = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder.builder().nameFormat(this.lockManagerInstanceId + "-Lock-Acquiring-%d").daemon(true).build());
        this.lockConfirmationExecutor.scheduleAtFixedRate(this::confirmAllLocallyAcquiredLocks, this.lockConfirmationInterval.toMillis(), this.lockConfirmationInterval.toMillis(), TimeUnit.MILLISECONDS);
        this.started = true;
        this.log.info("[{}] Started lock manager", this.lockManagerInstanceId);
        notify(new FencedLockEvents.FencedLockManagerStopped(this));
    }

    protected void notify(FencedLockEvents fencedLockEvents) {
        this.eventBus.ifPresent(eventBus -> {
            eventBus.publish(fencedLockEvents);
        });
    }

    public void pause() {
        this.log.info("[{}] Pausing async lock acquiring and lock confirmation", this.lockManagerInstanceId);
        this.paused = true;
    }

    public void resume() {
        this.log.info("[{}] Resuming async lock acquiring and lock confirmation", this.lockManagerInstanceId);
        this.paused = false;
    }

    private void confirmAllLocallyAcquiredLocks() {
        if (this.stopping) {
            this.log.debug("[{}] Shutting down, skipping confirmAllLocallyAcquiredLocks", this.lockManagerInstanceId);
            return;
        }
        if (this.locksAcquiredByThisLockManager.size() == 0) {
            this.log.debug("[{}] No locks to confirm for this Lock Manager instance", this.lockManagerInstanceId);
            return;
        }
        if (this.paused) {
            this.log.info("[{}] Lock Manager is paused, skipping confirmAllLocallyAcquiredLocks", this.lockManagerInstanceId);
            return;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("[{}] Confirming {} locks acquired by this Lock Manager Instance: {}", new Object[]{this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()), this.locksAcquiredByThisLockManager.keySet()});
        } else {
            this.log.debug("[{}] Confirming {} locks acquired by this Lock Manager Instance", this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()));
        }
        OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
        this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
            this.locksAcquiredByThisLockManager.forEach((lockName, dBFencedLock) -> {
                this.log.trace("[{}] Attempting to confirm lock '{}': {}", new Object[]{this.lockManagerInstanceId, dBFencedLock.getName(), dBFencedLock});
                boolean z = false;
                try {
                    z = this.lockStorage.confirmLockInDB(this, unitOfWork, dBFencedLock, now);
                } catch (Exception e) {
                    this.log.error(MessageFormatter.msg("[{}] Attempting to confirm lock '{}': {}", new Object[]{this.lockManagerInstanceId, dBFencedLock.getName(), dBFencedLock}), e);
                }
                if (!z) {
                    this.log.info("[{}] Failed to confirm lock '{}', someone has taken over the lock: {}", new Object[]{this.lockManagerInstanceId, dBFencedLock.getName(), dBFencedLock});
                    dBFencedLock.release();
                } else {
                    dBFencedLock.markAsConfirmed(now);
                    this.log.debug("[{}] Confirmed lock '{}': {}", new Object[]{this.lockManagerInstanceId, dBFencedLock.getName(), dBFencedLock});
                    notify(new FencedLockEvents.LockAcquired(dBFencedLock, this));
                }
            });
        });
        if (this.log.isTraceEnabled()) {
            this.log.trace("[{}] Completed confirmation of locks acquired by this Lock Manager Instance. Number of Locks acquired locally after confirmation {}: {}", new Object[]{this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()), this.locksAcquiredByThisLockManager.keySet()});
        } else {
            this.log.debug("[{}] Completed confirmation of locks acquired by this Lock Manager Instance. Number of Locks acquired locally after confirmation {}", this.lockManagerInstanceId, Integer.valueOf(this.locksAcquiredByThisLockManager.size()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void releaseLock(LOCK lock) {
        if (!lock.isLockedByThisLockManagerInstance()) {
            throw new IllegalArgumentException(MessageFormatter.msg("[{}] Cannot release Lock '{}' since it isn't locked by the current Lock Manager Node. Details: {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock}));
        }
        Boolean bool = (Boolean) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            return Boolean.valueOf(this.lockStorage.releaseLockInDB(this, unitOfWork, lock));
        });
        lock.markAsReleased();
        this.locksAcquiredByThisLockManager.remove(lock.getName());
        notify(new FencedLockEvents.LockReleased(lock, this));
        if (bool.booleanValue()) {
            this.log.debug("[{}] Released Lock '{}': {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock});
        } else {
            this.unitOfWorkFactory.usingUnitOfWork(unitOfWork2 -> {
                this.lockStorage.lookupLockInDB(this, unitOfWork2, lock.getName()).ifPresent(dBFencedLock -> {
                    this.log.debug("[{}] Couldn't release Lock '{}' as it was already acquired by another JVM Node: {}", new Object[]{this.lockManagerInstanceId, lock.getName(), dBFencedLock.getLockedByLockManagerInstanceId()});
                });
            });
        }
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public Optional<FencedLock> lookupLock(LockName lockName) {
        FailFast.requireNonNull(lockName, "No lockName provided");
        if (!this.started) {
            throw new IllegalStateException(MessageFormatter.msg("The {} isn't started", new Object[]{getClass().getSimpleName()}));
        }
        Optional<FencedLock> optional = (Optional) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            Optional<LOCK> lookupLockInDB = this.lockStorage.lookupLockInDB(this, unitOfWork, lockName);
            Class<FencedLock> cls = FencedLock.class;
            Objects.requireNonNull(FencedLock.class);
            return lookupLockInDB.map((v1) -> {
                return r1.cast(v1);
            });
        });
        this.log.trace("[{}] Lookup FencedLock with name '{}' result: {}", new Object[]{this.lockManagerInstanceId, lockName, optional});
        return optional;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void stop() {
        if (!this.started) {
            this.log.debug("[{}] Lock Manager was already stopped", this.lockManagerInstanceId);
            return;
        }
        this.log.debug("[{}] Stopping lock manager", this.lockManagerInstanceId);
        this.stopping = true;
        if (this.asyncLockAcquiringExecutor != null) {
            this.asyncLockAcquiringExecutor.shutdownNow();
            this.asyncLockAcquiringExecutor = null;
        }
        if (this.lockConfirmationExecutor != null) {
            this.lockConfirmationExecutor.shutdownNow();
            this.lockConfirmationExecutor = null;
        }
        this.locksAcquiredByThisLockManager.values().forEach((v0) -> {
            v0.release();
        });
        this.started = false;
        this.stopping = false;
        this.log.debug("[{}] Stopped lock manager", this.lockManagerInstanceId);
        notify(new FencedLockEvents.FencedLockManagerStopped(this));
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public boolean isStarted() {
        return this.started;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public Optional<FencedLock> tryAcquireLock(LockName lockName) {
        return Optional.ofNullable((DBFencedLock) _tryAcquireLock(lockName).block());
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public Optional<FencedLock> tryAcquireLock(LockName lockName, Duration duration) {
        FailFast.requireNonNull(duration, "No timeout value provided");
        return Optional.ofNullable((FencedLock) _tryAcquireLock(lockName).repeatWhenEmpty(flux -> {
            return flux.doOnNext(l -> {
                try {
                    Thread.sleep(this.syncAcquireLockPauseIntervalMs);
                } catch (InterruptedException e) {
                }
            });
        }).block(duration));
    }

    private Mono<LOCK> _tryAcquireLock(LockName lockName) {
        FailFast.requireNonNull(lockName, "No lockName provided");
        if (!this.started) {
            throw new IllegalStateException(MessageFormatter.msg("The {} isn't started", new Object[]{getClass().getSimpleName()}));
        }
        this.log.debug("[{}] Attempting to acquire lock '{}'", this.lockManagerInstanceId, lockName);
        LOCK lock = this.locksAcquiredByThisLockManager.get(lockName);
        if (lock == null || !lock.isLocked() || isLockTimedOut(lock)) {
            return (Mono) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
                return resolveLock(unitOfWork, this.lockStorage.lookupLockInDB(this, unitOfWork, lockName).orElseGet(() -> {
                    return this.lockStorage.createUninitializedLock(this, lockName);
                }));
            });
        }
        this.log.debug("[{}] Returned cached locally acquired lock '{}", this.lockManagerInstanceId, lockName);
        return Mono.just(lock);
    }

    private Mono<LOCK> resolveLock(UOW uow, LOCK lock) {
        FailFast.requireNonNull(uow, "No uow provided");
        FailFast.requireNonNull(lock, "No existingLock provided");
        if (!lock.isLocked()) {
            if (Objects.equals(lock.getCurrentToken(), this.lockStorage.getUninitializedTokenValue())) {
                return insertLock(uow, lock);
            }
            OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
            LOCK createInitializedLock = this.lockStorage.createInitializedLock(this, lock.getName(), lock.getCurrentToken().longValue() + 1, this.lockManagerInstanceId, now, now);
            this.log.debug("[{}] Found un-acquired lock '{}'. Have Acquired lock. Existing lock: {} - New lock: {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock, createInitializedLock});
            return updateLock(uow, lock, createInitializedLock);
        }
        if (lock.isLockedByThisLockManagerInstance()) {
            this.log.debug("[{}] lock '{}' was already acquired by this JVM node: {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock});
            this.locksAcquiredByThisLockManager.put(lock.getName(), lock);
            return Mono.just(lock);
        }
        if (!isLockTimedOut(lock)) {
            return Mono.empty();
        }
        OffsetDateTime now2 = OffsetDateTime.now(Clock.systemUTC());
        LOCK createInitializedLock2 = this.lockStorage.createInitializedLock(this, lock.getName(), lock.getCurrentToken().longValue() + 1, this.lockManagerInstanceId, now2, now2);
        this.log.debug("[{}] Found a TIMED-OUT lock '{}', that was acquired by Lock Manager '{}'. Will attempt to acquire the lock. Timed-out lock: {} - New lock: {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock.getLockedByLockManagerInstanceId(), lock, createInitializedLock2});
        return updateLock(uow, lock, createInitializedLock2);
    }

    private Mono<LOCK> insertLock(UOW uow, LOCK lock) {
        FailFast.requireNonNull(uow, "No uow provided");
        FailFast.requireNonNull(lock, "No initialLock provided");
        OffsetDateTime now = OffsetDateTime.now(Clock.systemUTC());
        if (!this.lockStorage.insertLockIntoDB(this, uow, lock, now)) {
            this.log.debug("[{}] Failed to acquire lock '{}' for the first time (insert)", this.lockManagerInstanceId, lock.getName());
            return Mono.empty();
        }
        lock.markAsLocked(now, this.lockManagerInstanceId, this.lockStorage.getInitialTokenValue());
        this.log.debug("[{}] Acquired lock '{}' for the first time (insert): {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock});
        this.locksAcquiredByThisLockManager.put(lock.getName(), lock);
        notify(new FencedLockEvents.LockAcquired(lock, this));
        return Mono.just(lock);
    }

    private Mono<LOCK> updateLock(UOW uow, LOCK lock, LOCK lock2) {
        FailFast.requireNonNull(uow, "No uow provided");
        FailFast.requireNonNull(lock, "No timedOutLock provided");
        FailFast.requireNonNull(lock2, "No newLockReadyToBeAcquiredLocally provided");
        if (!this.lockStorage.updateLockInDB(this, uow, lock, lock2)) {
            this.log.debug("[{}] Didn't acquire timed out lock '{}', someone else acquired it in the mean time(update): {}", new Object[]{this.lockManagerInstanceId, lock.getName(), this.lockStorage.lookupLockInDB(this, uow, lock.getName())});
            return Mono.empty();
        }
        this.log.debug("[{}] Acquired lock '{}' (update): {}", new Object[]{this.lockManagerInstanceId, lock.getName(), lock2});
        this.locksAcquiredByThisLockManager.put(lock.getName(), lock2);
        lock2.markAsLocked(lock2.getLockAcquiredTimestamp(), lock2.getLockedByLockManagerInstanceId(), lock2.getCurrentToken().longValue());
        notify(new FencedLockEvents.LockAcquired(lock2, this));
        return Mono.just(lock2);
    }

    private boolean isLockTimedOut(LOCK lock) {
        FailFast.requireNonNull(lock, "No lock provided");
        return lock.getDurationSinceLastConfirmation().compareTo(this.lockTimeOut) >= 1;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public FencedLock acquireLock(LockName lockName) {
        return (FencedLock) _tryAcquireLock(lockName).repeatWhenEmpty(flux -> {
            return flux.doOnNext(l -> {
                try {
                    Thread.sleep(this.syncAcquireLockPauseIntervalMs);
                } catch (InterruptedException e) {
                }
            });
        }).block();
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public boolean isLockAcquired(LockName lockName) {
        Optional optional = (Optional) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            return this.lockStorage.lookupLockInDB(this, unitOfWork, lockName);
        });
        if (optional.isEmpty()) {
            return false;
        }
        return ((DBFencedLock) optional.get()).isLocked();
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public boolean isLockedByThisLockManagerInstance(LockName lockName) {
        Optional optional = (Optional) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            return this.lockStorage.lookupLockInDB(this, unitOfWork, lockName);
        });
        if (optional.isEmpty()) {
            return false;
        }
        return ((DBFencedLock) optional.get()).isLockedByThisLockManagerInstance();
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public boolean isLockAcquiredByAnotherLockManagerInstance(LockName lockName) {
        Optional optional = (Optional) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            return this.lockStorage.lookupLockInDB(this, unitOfWork, lockName);
        });
        return (optional.isEmpty() || !((DBFencedLock) optional.get()).isLocked() || ((DBFencedLock) optional.get()).isLockedByThisLockManagerInstance()) ? false : true;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public void acquireLockAsync(LockName lockName, LockCallback lockCallback) {
        FailFast.requireNonNull(lockName, "You must supply a lockName");
        FailFast.requireNonNull(lockCallback, "You must supply a lockCallback");
        if (!this.started) {
            throw new IllegalStateException(MessageFormatter.msg("The {} isn't started", new Object[]{getClass().getSimpleName()}));
        }
        this.asyncLockAcquirings.computeIfAbsent(lockName, lockName2 -> {
            this.log.debug("[{}] Starting async Lock acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
            return this.asyncLockAcquiringExecutor.scheduleAtFixedRate(() -> {
                LOCK lock = this.locksAcquiredByThisLockManager.get(lockName);
                if (lock != null) {
                    if (lock.isLockedByThisLockManagerInstance()) {
                        return;
                    }
                    this.log.debug("[{}] Noticed that lock '{}' isn't locked by this Lock Manager instance anymore. Releasing the lock", this.lockManagerInstanceId, lockName);
                    this.locksAcquiredByThisLockManager.remove(lockName);
                    lockCallback.lockReleased(lock);
                    return;
                }
                if (this.paused) {
                    this.log.info("[{}] Lock Manager is paused, skipping async acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
                    return;
                }
                Optional<FencedLock> tryAcquireLock = tryAcquireLock(lockName);
                if (!tryAcquireLock.isPresent()) {
                    if (this.log.isTraceEnabled()) {
                        this.log.trace("[{}] Couldn't async Acquire lock '{}' as it is acquired by another Lock Manager instance: {}", new Object[]{this.lockManagerInstanceId, lockName, lookupLock(lockName)});
                    }
                } else {
                    this.log.debug("[{}] Async Acquired lock '{}'", this.lockManagerInstanceId, lockName);
                    DBFencedLock dBFencedLock = (DBFencedLock) tryAcquireLock.get();
                    dBFencedLock.registerCallback(lockCallback);
                    this.locksAcquiredByThisLockManager.put(lockName, dBFencedLock);
                    lockCallback.lockAcquired(tryAcquireLock.get());
                }
            }, 0L, this.lockConfirmationInterval.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public void cancelAsyncLockAcquiring(LockName lockName) {
        FailFast.requireNonNull(lockName, "You must supply a lockName");
        ScheduledFuture<?> remove = this.asyncLockAcquirings.remove(lockName);
        if (remove != null) {
            this.log.debug("[{}] Canceling async Lock acquiring for lock '{}'", this.lockManagerInstanceId, lockName);
            remove.cancel(true);
            LOCK remove2 = this.locksAcquiredByThisLockManager.remove(lockName);
            if (remove2.isLockedByThisLockManagerInstance()) {
                this.log.debug("[{}] Releasing Lock due to cancelling the lock acquiring '{}'", this.lockManagerInstanceId, lockName);
                remove2.release();
            }
        }
    }

    @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager
    public String getLockManagerInstanceId() {
        return this.lockManagerInstanceId;
    }

    public String toString() {
        return getClass().getSimpleName() + "{lockManagerInstanceId='" + this.lockManagerInstanceId + "'}";
    }

    public Long getUninitializedTokenValue() {
        return this.lockStorage.getUninitializedTokenValue();
    }

    public long getInitialTokenValue() {
        return this.lockStorage.getInitialTokenValue();
    }

    public void deleteAllLocksInDB() {
        this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
            this.lockStorage.deleteAllLocksInDB(this, unitOfWork);
        });
    }
}
