package com.microsoft.azure.eventhubs.impl;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/azure-eventhubs-1.2.1.jar:com/microsoft/azure/eventhubs/impl/ReceivePump.class */
public class ReceivePump implements Runnable {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger((Class<?>) ReceivePump.class);
    private final IPartitionReceiver receiver;
    private final PartitionReceiveHandler onReceiveHandler;
    private final boolean invokeOnTimeout;
    private final Executor executor;
    private volatile boolean isPumpHealthy = true;
    private final CompletableFuture<Void> stopPump = new CompletableFuture<>();
    private final ProcessAndReschedule processAndReschedule = new ProcessAndReschedule();
    private AtomicBoolean stopPumpRaised = new AtomicBoolean(false);

    /* loaded from: input_file:BOOT-INF/lib/azure-eventhubs-1.2.1.jar:com/microsoft/azure/eventhubs/impl/ReceivePump$IPartitionReceiver.class */
    public interface IPartitionReceiver {
        String getPartitionId();

        CompletableFuture<Iterable<EventData>> receive(int i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/azure-eventhubs-1.2.1.jar:com/microsoft/azure/eventhubs/impl/ReceivePump$ProcessAndReschedule.class */
    public final class ProcessAndReschedule implements BiFunction<Iterable<EventData>, Throwable, Void> {
        private ProcessAndReschedule() {
        }

        @Override // java.util.function.BiFunction
        public Void apply(Iterable<EventData> iterable, Throwable th) {
            ReceivePump.this.handleClientExceptions(th);
            try {
                if (ReceivePump.this.shouldContinue() && (iterable != null || (iterable == null && ReceivePump.this.invokeOnTimeout))) {
                    ReceivePump.this.onReceiveHandler.onReceive(iterable);
                }
            } catch (Throwable th2) {
                ReceivePump.this.handleUserCodeExceptions(th2);
            }
            ReceivePump.this.schedulePump();
            return null;
        }
    }

    public ReceivePump(IPartitionReceiver iPartitionReceiver, PartitionReceiveHandler partitionReceiveHandler, boolean z, Executor executor) {
        this.receiver = iPartitionReceiver;
        this.onReceiveHandler = partitionReceiveHandler;
        this.invokeOnTimeout = z;
        this.executor = executor;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            receiveAndProcess();
        } catch (Exception e) {
            if (TRACE_LOGGER.isErrorEnabled()) {
                TRACE_LOGGER.error(String.format("Receive pump for partition (%s) encountered unrecoverable error and exited with exception %s.", this.receiver.getPartitionId(), e.toString()));
            }
            throw e;
        }
    }

    public void receiveAndProcess() {
        if (shouldContinue()) {
            this.receiver.receive(this.onReceiveHandler.getMaxEventCount()).handleAsync((BiFunction<? super Iterable<EventData>, Throwable, ? extends U>) this.processAndReschedule, this.executor);
            return;
        }
        if (TRACE_LOGGER.isInfoEnabled()) {
            Logger logger = TRACE_LOGGER;
            Object[] objArr = new Object[2];
            objArr[0] = this.receiver.getPartitionId();
            objArr[1] = this.stopPumpRaised.get() ? "per the request." : "pump ran into errors.";
            logger.info(String.format("Stopping receive pump for partition (%s) as %s", objArr));
        }
        this.stopPump.complete(null);
    }

    public CompletableFuture<Void> stop() {
        this.stopPumpRaised.set(true);
        return this.stopPump;
    }

    public boolean isRunning() {
        return !this.stopPump.isDone();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldContinue() {
        return this.isPumpHealthy && !this.stopPumpRaised.get();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleClientExceptions(Throwable th) {
        if (th != null) {
            this.isPumpHealthy = false;
            if (TRACE_LOGGER.isWarnEnabled()) {
                TRACE_LOGGER.warn(String.format("Receive pump for partition (%s) exiting after receive exception %s", this.receiver.getPartitionId(), th.toString()));
            }
            this.onReceiveHandler.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleUserCodeExceptions(Throwable th) {
        this.isPumpHealthy = false;
        if (TRACE_LOGGER.isErrorEnabled()) {
            TRACE_LOGGER.error(String.format("Receive pump for partition (%s) exiting after user-code exception %s", this.receiver.getPartitionId(), th.toString()));
        }
        this.onReceiveHandler.onError(th);
        if (th instanceof InterruptedException) {
            if (TRACE_LOGGER.isInfoEnabled()) {
                TRACE_LOGGER.info(String.format("Interrupting receive pump for partition (%s)", this.receiver.getPartitionId()));
            }
            Thread.currentThread().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedulePump() {
        try {
            this.executor.execute(this);
        } catch (RejectedExecutionException e) {
            this.isPumpHealthy = false;
            if (TRACE_LOGGER.isWarnEnabled()) {
                TRACE_LOGGER.warn(String.format("Receive pump for partition (%s) exiting with error: %s", this.receiver.getPartitionId(), e.toString()));
            }
            this.onReceiveHandler.onError(e);
        }
    }
}
