package com.gruelbox.transactionoutbox;

import com.gruelbox.transactionoutbox.TransactionOutbox;
import com.gruelbox.transactionoutbox.spi.ProxyFactory;
import com.gruelbox.transactionoutbox.spi.Utils;
import java.lang.reflect.InvocationTargetException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.event.Level;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/gruelbox/transactionoutbox/TransactionOutboxImpl.class */
public final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
    private static final Logger log = LoggerFactory.getLogger(TransactionOutboxImpl.class);
    private final TransactionManager transactionManager;
    private final Persistor persistor;
    private final Instantiator instantiator;
    private final Submitter submitter;
    private final Duration attemptFrequency;
    private final Level logLevelTemporaryFailure;
    private final int blockAfterAttempts;
    private final int flushBatchSize;
    private final Supplier<Clock> clockProvider;
    private final TransactionOutboxListener listener;
    private final boolean serializeMdc;
    private final Validator validator;
    private final Duration retentionThreshold;
    private final AtomicBoolean initialized = new AtomicBoolean();
    private final ProxyFactory proxyFactory = new ProxyFactory();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

    /* loaded from: input_file:com/gruelbox/transactionoutbox/TransactionOutboxImpl$ParameterizedScheduleBuilderImpl.class */
    private class ParameterizedScheduleBuilderImpl implements TransactionOutbox.ParameterizedScheduleBuilder {
        private String uniqueRequestId;
        private String ordered;
        private Duration delayForAtLeast;

        private ParameterizedScheduleBuilderImpl() {
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.ParameterizedScheduleBuilder
        public <T> T schedule(Class<T> cls) {
            if (this.uniqueRequestId == null || this.uniqueRequestId.length() <= 250) {
                return (T) TransactionOutboxImpl.this.schedule(cls, this.uniqueRequestId, this.ordered, this.delayForAtLeast);
            }
            throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters");
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.ParameterizedScheduleBuilder
        public ParameterizedScheduleBuilderImpl uniqueRequestId(String str) {
            this.uniqueRequestId = str;
            return this;
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.ParameterizedScheduleBuilder
        public ParameterizedScheduleBuilderImpl ordered(String str) {
            this.ordered = str;
            return this;
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.ParameterizedScheduleBuilder
        public ParameterizedScheduleBuilderImpl delayForAtLeast(Duration duration) {
            this.delayForAtLeast = duration;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/gruelbox/transactionoutbox/TransactionOutboxImpl$TransactionOutboxBuilderImpl.class */
    public static class TransactionOutboxBuilderImpl extends TransactionOutbox.TransactionOutboxBuilder {
        TransactionOutboxBuilderImpl() {
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.TransactionOutboxBuilder
        public TransactionOutboxImpl build() {
            Validator validator = new Validator(this.clockProvider);
            TransactionOutboxImpl transactionOutboxImpl = new TransactionOutboxImpl(this.transactionManager, this.persistor, (Instantiator) Utils.firstNonNull(this.instantiator, Instantiator::usingReflection), (Submitter) Utils.firstNonNull(this.submitter, Submitter::withDefaultExecutor), (Duration) Utils.firstNonNull(this.attemptFrequency, () -> {
                return Duration.of(2L, ChronoUnit.MINUTES);
            }), (Level) Utils.firstNonNull(this.logLevelTemporaryFailure, () -> {
                return Level.WARN;
            }), this.blockAfterAttempts < 1 ? 5 : this.blockAfterAttempts, this.flushBatchSize < 1 ? 4096 : this.flushBatchSize, this.clockProvider == null ? Clock::systemDefaultZone : this.clockProvider, (TransactionOutboxListener) Utils.firstNonNull(this.listener, () -> {
                return TransactionOutboxListener.EMPTY;
            }), this.serializeMdc == null || this.serializeMdc.booleanValue(), validator, this.retentionThreshold == null ? Duration.ofDays(7L) : this.retentionThreshold);
            validator.validate(transactionOutboxImpl);
            if (this.initializeImmediately == null || this.initializeImmediately.booleanValue()) {
                transactionOutboxImpl.initialize();
            }
            return transactionOutboxImpl;
        }

        @Override // com.gruelbox.transactionoutbox.TransactionOutbox.TransactionOutboxBuilder
        public String toString() {
            return "TransactionOutboxImpl.TransactionOutboxBuilderImpl()";
        }
    }

    @Override // com.gruelbox.transactionoutbox.Validatable
    public void validate(Validator validator) {
        validator.notNull("transactionManager", this.transactionManager);
        validator.valid("persistor", this.persistor);
        validator.valid("instantiator", this.instantiator);
        validator.valid("submitter", this.submitter);
        validator.notNull("attemptFrequency", this.attemptFrequency);
        validator.notNull("logLevelTemporaryFailure", this.logLevelTemporaryFailure);
        validator.min("blockAfterAttempts", this.blockAfterAttempts, 1);
        validator.min("flushBatchSize", this.flushBatchSize, 1);
        validator.notNull("clockProvider", this.clockProvider);
        validator.notNull("listener", this.listener);
        validator.notNull("retentionThreshold", this.retentionThreshold);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static TransactionOutbox.TransactionOutboxBuilder builder() {
        return new TransactionOutboxBuilderImpl();
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public void initialize() {
        if (this.initialized.compareAndSet(false, true)) {
            try {
                this.persistor.migrate(this.transactionManager);
            } catch (Exception e) {
                this.initialized.set(false);
                throw e;
            }
        }
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public <T> T schedule(Class<T> cls) {
        return (T) schedule(cls, null, null, null);
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public TransactionOutbox.ParameterizedScheduleBuilder with() {
        return new ParameterizedScheduleBuilderImpl();
    }

    private boolean flushStale(Instant instant) {
        log.debug("Flushing stale tasks");
        List list = (List) this.transactionManager.inTransactionReturns(transaction -> {
            ArrayList arrayList = new ArrayList(this.flushBatchSize);
            ((List) Utils.uncheckedly(() -> {
                return this.persistor.selectBatch(transaction, this.flushBatchSize, instant);
            })).forEach(transactionOutboxEntry -> {
                log.debug("Reprocessing {}", transactionOutboxEntry.description());
                try {
                    pushBack(transaction, transactionOutboxEntry);
                    arrayList.add(transactionOutboxEntry);
                } catch (OptimisticLockException e) {
                    log.debug("Beaten to optimistic lock on {}", transactionOutboxEntry.description());
                }
            });
            return arrayList;
        });
        log.debug("Got batch of {}", Integer.valueOf(list.size()));
        list.forEach(this::submitNow);
        log.debug("Submitted batch");
        return !list.isEmpty();
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public boolean flush(Executor executor) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Not initialized");
        }
        Instant instant = this.clockProvider.get().instant();
        ArrayList arrayList = new ArrayList();
        arrayList.add(CompletableFuture.supplyAsync(() -> {
            return Boolean.valueOf(flushStale(instant));
        }, executor));
        arrayList.add(CompletableFuture.runAsync(() -> {
            expireIdempotencyProtection(instant);
        }, executor).thenApply(r2 -> {
            return false;
        }));
        List list = (List) this.transactionManager.inTransactionReturns(transaction -> {
            return (List) Utils.uncheckedly(() -> {
                return this.persistor.selectActiveTopics(transaction);
            });
        });
        if (!list.isEmpty()) {
            Stream map = list.stream().map(str -> {
                return CompletableFuture.supplyAsync(() -> {
                    return Boolean.valueOf(processNextInTopic(str));
                }, executor);
            });
            Objects.requireNonNull(arrayList);
            map.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return ((Boolean) arrayList.stream().reduce((completableFuture, completableFuture2) -> {
            return completableFuture.thenCombine((CompletionStage) completableFuture2, (bool, bool2) -> {
                return Boolean.valueOf(bool.booleanValue() || bool2.booleanValue());
            });
        }).map((v0) -> {
            return v0.join();
        }).orElse(false)).booleanValue();
    }

    private void expireIdempotencyProtection(Instant instant) {
        int intValue;
        long j = 0;
        do {
            intValue = ((Integer) this.transactionManager.inTransactionReturns(transaction -> {
                return (Integer) Utils.uncheckedly(() -> {
                    return Integer.valueOf(this.persistor.deleteProcessedAndExpired(transaction, this.flushBatchSize, instant));
                });
            })).intValue();
            j += intValue;
        } while (intValue > 0);
        if (j > 0) {
            log.info("Expired idempotency protection on {} requests completed more than {} ago", Long.valueOf(j), String.format("%dd:%02dh:%02dm:%02ds", Long.valueOf(this.retentionThreshold.toDaysPart()), Integer.valueOf(this.retentionThreshold.toHoursPart()), Integer.valueOf(this.retentionThreshold.toMinutesPart()), Integer.valueOf(this.retentionThreshold.toSecondsPart())));
        } else {
            log.debug("No records found to delete as of {}", instant);
        }
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public boolean unblock(String str) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Not initialized");
        }
        if (!(this.transactionManager instanceof ThreadLocalContextTransactionManager)) {
            throw new UnsupportedOperationException("This method requires a ThreadLocalContextTransactionManager");
        }
        log.info("Unblocking entry {} for retry.", str);
        try {
            return ((Boolean) ((ThreadLocalContextTransactionManager) this.transactionManager).requireTransactionReturns(transaction -> {
                return Boolean.valueOf(this.persistor.unblock(transaction, str));
            })).booleanValue();
        } catch (Exception e) {
            throw ((RuntimeException) Utils.uncheckAndThrow(e));
        }
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public boolean unblock(String str, Object obj) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("Not initialized");
        }
        if (!(this.transactionManager instanceof ParameterContextTransactionManager)) {
            throw new UnsupportedOperationException("This method requires a ParameterContextTransactionManager");
        }
        log.info("Unblocking entry {} for retry", str);
        try {
            if (obj instanceof Transaction) {
                return this.persistor.unblock((Transaction) obj, str);
            }
            return this.persistor.unblock(((ParameterContextTransactionManager) this.transactionManager).transactionFromContext(obj), str);
        } catch (Exception e) {
            throw ((RuntimeException) Utils.uncheckAndThrow(e));
        }
    }

    private <T> T schedule(Class<T> cls, String str, String str2, Duration duration) {
        if (this.initialized.get()) {
            return (T) this.proxyFactory.createProxy(cls, (method, objArr) -> {
                return Utils.uncheckedly(() -> {
                    TransactionalInvocation extractTransaction = this.transactionManager.extractTransaction(method, objArr);
                    TransactionOutboxEntry newEntry = newEntry(extractTransaction.getClazz(), extractTransaction.getMethodName(), extractTransaction.getParameters(), extractTransaction.getArgs(), str, str2);
                    if (duration != null) {
                        newEntry.setNextAttemptTime(newEntry.getNextAttemptTime().plus((TemporalAmount) duration));
                    }
                    this.validator.validate(newEntry);
                    this.persistor.save(extractTransaction.getTransaction(), newEntry);
                    extractTransaction.getTransaction().addPostCommitHook(() -> {
                        this.listener.scheduled(newEntry);
                        if (newEntry.getTopic() != null) {
                            log.debug("Queued {} in topic {}", newEntry.description(), str2);
                            return;
                        }
                        if (duration == null) {
                            submitNow(newEntry);
                            log.debug("Scheduled {} for post-commit execution", newEntry.description());
                        } else if (duration.compareTo(this.attemptFrequency) >= 0) {
                            log.info("Queued {} for execution after at least {}", newEntry.description(), duration);
                        } else {
                            this.scheduler.schedule(() -> {
                                submitNow(newEntry);
                            }, duration.toMillis(), TimeUnit.MILLISECONDS);
                            log.info("Scheduled {} for post-commit execution after at least {}", newEntry.description(), duration);
                        }
                    });
                    return null;
                });
            });
        }
        throw new IllegalStateException("Not initialized");
    }

    private void submitNow(TransactionOutboxEntry transactionOutboxEntry) {
        this.submitter.submit(transactionOutboxEntry, this::processNow);
    }

    @Override // com.gruelbox.transactionoutbox.TransactionOutbox
    public void processNow(TransactionOutboxEntry transactionOutboxEntry) {
        initialize();
        Boolean bool = null;
        try {
            bool = (Boolean) this.transactionManager.inTransactionReturnsThrows(transaction -> {
                if (!this.persistor.lock(transaction, transactionOutboxEntry)) {
                    return false;
                }
                processWithExistingLock(transaction, transactionOutboxEntry);
                return true;
            });
        } catch (InvocationTargetException e) {
            updateAttemptCount(transactionOutboxEntry, e.getCause());
        } catch (Exception e2) {
            updateAttemptCount(transactionOutboxEntry, e2);
        }
        if (bool != null) {
            if (!bool.booleanValue()) {
                log.debug("Skipped task {} - may be locked or already processed", transactionOutboxEntry.getId());
            } else {
                log.info("Processed {}", transactionOutboxEntry.description());
                this.listener.success(transactionOutboxEntry);
            }
        }
    }

    private boolean processNextInTopic(String str) {
        AtomicReference atomicReference = new AtomicReference();
        boolean z = false;
        try {
            z = ((Boolean) this.transactionManager.inTransactionReturnsThrows(transaction -> {
                Optional<TransactionOutboxEntry> nextInTopic = this.persistor.nextInTopic(transaction, str);
                if (!nextInTopic.isPresent()) {
                    return false;
                }
                if (!nextInTopic.get().getNextAttemptTime().isBefore(this.clockProvider.get().instant())) {
                    log.info("Topic {}: ignoring until {}", str, ZonedDateTime.ofInstant(nextInTopic.get().getNextAttemptTime(), ZoneId.of("UTC")));
                    return false;
                }
                log.info("Topic {}: processing seq {}", str, nextInTopic.get().getSequence());
                atomicReference.set(nextInTopic.get());
                processWithExistingLock(transaction, nextInTopic.get());
                return true;
            })).booleanValue();
        } catch (InvocationTargetException e) {
            if (atomicReference.get() != null) {
                updateAttemptCount((TransactionOutboxEntry) atomicReference.get(), e.getCause());
            }
        } catch (Exception e2) {
            if (atomicReference.get() != null) {
                updateAttemptCount((TransactionOutboxEntry) atomicReference.get(), e2);
            }
        }
        if (z) {
            log.info("Processed {}", ((TransactionOutboxEntry) atomicReference.get()).description());
            this.listener.success((TransactionOutboxEntry) atomicReference.get());
        }
        return z;
    }

    private void processWithExistingLock(Transaction transaction, TransactionOutboxEntry transactionOutboxEntry) throws Exception {
        initialize();
        transactionOutboxEntry.getInvocation().withinMDC(() -> {
            log.info("Processing {}", transactionOutboxEntry.description());
            invoke(transactionOutboxEntry, transaction);
            if (transactionOutboxEntry.getUniqueRequestId() == null) {
                this.persistor.delete(transaction, transactionOutboxEntry);
            } else {
                log.debug("Deferring deletion of {} by {}", transactionOutboxEntry.description(), this.retentionThreshold);
                transactionOutboxEntry.setProcessed(true);
                transactionOutboxEntry.setLastAttemptTime(Instant.now(this.clockProvider.get()));
                transactionOutboxEntry.setNextAttemptTime(after(this.retentionThreshold));
                this.persistor.update(transaction, transactionOutboxEntry);
            }
            return true;
        });
    }

    private void invoke(TransactionOutboxEntry transactionOutboxEntry, Transaction transaction) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        Object instantiator = this.instantiator.getInstance(transactionOutboxEntry.getInvocation().getClassName());
        log.debug("Created instance {}", instantiator);
        this.transactionManager.injectTransaction(transactionOutboxEntry.getInvocation(), transaction).invoke(instantiator, this.listener);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [com.gruelbox.transactionoutbox.TransactionOutboxEntry$TransactionOutboxEntryBuilder] */
    private TransactionOutboxEntry newEntry(Class<?> cls, String str, Class<?>[] clsArr, Object[] objArr, String str2, String str3) {
        return TransactionOutboxEntry.builder().id(UUID.randomUUID().toString()).invocation(new Invocation(this.instantiator.getName(cls), str, clsArr, objArr, (!this.serializeMdc || MDC.getMDCAdapter() == null) ? null : MDC.getCopyOfContextMap())).lastAttemptTime(null).nextAttemptTime(this.clockProvider.get().instant()).uniqueRequestId(str2).topic(str3).build();
    }

    private void pushBack(Transaction transaction, TransactionOutboxEntry transactionOutboxEntry) throws OptimisticLockException {
        try {
            transactionOutboxEntry.setLastAttemptTime(this.clockProvider.get().instant());
            transactionOutboxEntry.setNextAttemptTime(after(this.attemptFrequency));
            this.validator.validate(transactionOutboxEntry);
            this.persistor.update(transaction, transactionOutboxEntry);
        } catch (OptimisticLockException e) {
            throw e;
        } catch (Exception e2) {
            Utils.uncheckAndThrow(e2);
        }
    }

    private Instant after(Duration duration) {
        return this.clockProvider.get().instant().plus((TemporalAmount) duration).truncatedTo(ChronoUnit.MILLIS);
    }

    private void updateAttemptCount(TransactionOutboxEntry transactionOutboxEntry, Throwable th) {
        try {
            transactionOutboxEntry.setAttempts(transactionOutboxEntry.getAttempts() + 1);
            boolean z = transactionOutboxEntry.getTopic() == null && transactionOutboxEntry.getAttempts() >= this.blockAfterAttempts;
            transactionOutboxEntry.setBlocked(z);
            this.transactionManager.inTransactionThrows(transaction -> {
                pushBack(transaction, transactionOutboxEntry);
            });
            this.listener.failure(transactionOutboxEntry, th);
            if (z) {
                log.error("Blocking failing entry {} after {} attempts: {}", new Object[]{transactionOutboxEntry.getId(), Integer.valueOf(transactionOutboxEntry.getAttempts()), transactionOutboxEntry.description(), th});
                this.listener.blocked(transactionOutboxEntry, th);
            } else {
                Utils.logAtLevel(log, this.logLevelTemporaryFailure, "Temporarily failed to process entry {} : {}", transactionOutboxEntry.getId(), transactionOutboxEntry.description(), th);
            }
        } catch (Exception e) {
            log.error("Failed to update attempt count for {}. It may be retried more times than expected.", transactionOutboxEntry.description(), e);
        }
    }

    private TransactionOutboxImpl(TransactionManager transactionManager, Persistor persistor, Instantiator instantiator, Submitter submitter, Duration duration, Level level, int i, int i2, Supplier<Clock> supplier, TransactionOutboxListener transactionOutboxListener, boolean z, Validator validator, Duration duration2) {
        this.transactionManager = transactionManager;
        this.persistor = persistor;
        this.instantiator = instantiator;
        this.submitter = submitter;
        this.attemptFrequency = duration;
        this.logLevelTemporaryFailure = level;
        this.blockAfterAttempts = i;
        this.flushBatchSize = i2;
        this.clockProvider = supplier;
        this.listener = transactionOutboxListener;
        this.serializeMdc = z;
        this.validator = validator;
        this.retentionThreshold = duration2;
    }
}
