package com.opendxl.databus.consumer;

import com.opendxl.databus.common.TopicPartition;
import com.opendxl.databus.common.internal.util.GlobalConstants;
import com.opendxl.databus.consumer.DatabusPushConsumerStatus;
import com.opendxl.databus.credential.Credential;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.serialization.Deserializer;
import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/opendxl/databus/consumer/DatabusPushConsumer.class */
public final class DatabusPushConsumer<P> extends DatabusConsumer<P> implements Closeable {
    private static final long POLL_TIMEOUT_MS = 1000;
    private final DatabusPushConsumerListener consumerListener;
    private static final Logger LOG = LoggerFactory.getLogger(DatabusPushConsumer.class);
    private final ExecutorService executor;
    private Future<DatabusPushConsumerListenerResponse> listenerFuture;
    private AtomicBoolean stopRequested;
    private ExecutorService pushAsyncExecutor;
    private DatabusPushConsumerFuture databusPushConsumerFuture;
    private CountDownLatch countDownLatch;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.opendxl.databus.consumer.DatabusPushConsumer$1, reason: invalid class name */
    /* loaded from: input_file:com/opendxl/databus/consumer/DatabusPushConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse = new int[DatabusPushConsumerListenerResponse.values().length];

        static {
            try {
                $SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse[DatabusPushConsumerListenerResponse.STOP_AND_COMMIT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse[DatabusPushConsumerListenerResponse.STOP_NO_COMMIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse[DatabusPushConsumerListenerResponse.RETRY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse[DatabusPushConsumerListenerResponse.CONTINUE_AND_COMMIT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public DatabusPushConsumer(Map<String, Object> map, Deserializer<P> deserializer, DatabusPushConsumerListener databusPushConsumerListener) {
        super(map, deserializer);
        this.executor = Executors.newFixedThreadPool(1);
        this.stopRequested = new AtomicBoolean(false);
        this.pushAsyncExecutor = null;
        this.databusPushConsumerFuture = null;
        this.countDownLatch = new CountDownLatch(1);
        this.consumerListener = databusPushConsumerListener;
    }

    public DatabusPushConsumer(Map<String, Object> map, Deserializer<P> deserializer, DatabusPushConsumerListener databusPushConsumerListener, Credential credential) {
        super(map, deserializer, credential);
        this.executor = Executors.newFixedThreadPool(1);
        this.stopRequested = new AtomicBoolean(false);
        this.pushAsyncExecutor = null;
        this.databusPushConsumerFuture = null;
        this.countDownLatch = new CountDownLatch(1);
        this.consumerListener = databusPushConsumerListener;
    }

    public DatabusPushConsumer(Properties properties, Deserializer<P> deserializer, DatabusPushConsumerListener databusPushConsumerListener) {
        super(properties, (Deserializer) deserializer);
        this.executor = Executors.newFixedThreadPool(1);
        this.stopRequested = new AtomicBoolean(false);
        this.pushAsyncExecutor = null;
        this.databusPushConsumerFuture = null;
        this.countDownLatch = new CountDownLatch(1);
        this.consumerListener = databusPushConsumerListener;
    }

    public DatabusPushConsumer(Properties properties, Deserializer<P> deserializer, DatabusPushConsumerListener databusPushConsumerListener, Credential credential) {
        super(properties, (Deserializer) deserializer, credential);
        this.executor = Executors.newFixedThreadPool(1);
        this.stopRequested = new AtomicBoolean(false);
        this.pushAsyncExecutor = null;
        this.databusPushConsumerFuture = null;
        this.countDownLatch = new CountDownLatch(1);
        this.consumerListener = databusPushConsumerListener;
    }

    @Override // com.opendxl.databus.consumer.Consumer
    public ConsumerRecords poll(Duration duration) {
        if (this.stopRequested.get()) {
            throw new DatabusClientRuntimeException("poll method cannot be performed because DatabusPushConsumer is closed.", DatabusPushConsumer.class);
        }
        if (this.databusPushConsumerFuture != null) {
            throw new DatabusClientRuntimeException("poll cannot be performed because a pushAsync is already working.", DatabusPushConsumer.class);
        }
        return super.poll(duration);
    }

    @Override // com.opendxl.databus.consumer.Consumer
    public ConsumerRecords poll(long j) {
        if (this.stopRequested.get()) {
            throw new DatabusClientRuntimeException("poll cannot be performed because DatabusPushConsumer is closed.", DatabusPushConsumer.class);
        }
        if (this.databusPushConsumerFuture != null) {
            throw new DatabusClientRuntimeException("poll cannot be performed because a pushAsync is already working.", DatabusPushConsumer.class);
        }
        return super.poll(j);
    }

    public DatabusPushConsumerFuture pushAsync(Duration duration) {
        if (this.stopRequested.get()) {
            throw new DatabusClientRuntimeException("pushAsync cannot be performed because DatabusPushConsumer is closed.", DatabusPushConsumer.class);
        }
        if (this.databusPushConsumerFuture != null) {
            return this.databusPushConsumerFuture;
        }
        this.databusPushConsumerFuture = new DatabusPushConsumerFuture(new DatabusPushConsumerStatus.Builder().build(), this.countDownLatch);
        this.pushAsyncExecutor = Executors.newFixedThreadPool(1);
        this.pushAsyncExecutor.submit(() -> {
            push(this.databusPushConsumerFuture, duration);
        });
        return this.databusPushConsumerFuture;
    }

    public DatabusPushConsumerFuture pushAsync() {
        return pushAsync(Duration.ofMillis(POLL_TIMEOUT_MS));
    }

    private void push(DatabusPushConsumerFuture databusPushConsumerFuture, Duration duration) {
        LOG.info("Consumer " + super.getClientId() + " start");
        while (!this.stopRequested.get()) {
            try {
                try {
                    Set<TopicPartition> assignment = super.assignment();
                    Map<TopicPartition, Long> currentConsumerPosition = getCurrentConsumerPosition(assignment);
                    ConsumerRecords poll = super.poll(duration);
                    LOG.info("Consumer " + super.getClientId() + " number of records read: " + poll.count());
                    if (poll.count() > 0) {
                        super.pause(assignment);
                        LOG.info("Consumer " + super.getClientId() + " is paused");
                        this.listenerFuture = runListenerAsync(databusPushConsumerFuture, poll);
                        boolean z = false;
                        while (!z && !this.stopRequested.get()) {
                            try {
                                DatabusPushConsumerListenerResponse databusPushConsumerListenerResponse = this.listenerFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
                                z = true;
                                switch (AnonymousClass1.$SwitchMap$com$opendxl$databus$consumer$DatabusPushConsumerListenerResponse[databusPushConsumerListenerResponse.ordinal()]) {
                                    case 1:
                                        this.stopRequested.set(true);
                                        super.commitSync();
                                        LOG.info("Consumer " + getClientId() + " listener returns " + databusPushConsumerListenerResponse.toString());
                                        break;
                                    case GlobalConstants.NUMBER_OF_TOPIC_COMPONENTS /* 2 */:
                                        this.stopRequested.set(true);
                                        LOG.info("Consumer " + getClientId() + " listener returns " + databusPushConsumerListenerResponse.toString());
                                        break;
                                    case 3:
                                        seek(currentConsumerPosition);
                                        LOG.info("Consumer " + getClientId() + " listener returns " + databusPushConsumerListenerResponse.toString());
                                        break;
                                    case 4:
                                    default:
                                        super.commitSync();
                                        LOG.info("Consumer " + getClientId() + " listener returns " + databusPushConsumerListenerResponse.toString());
                                        break;
                                }
                                resume(assignment());
                                databusPushConsumerFuture.setDatabusPushConsumerListenerStatus(new DatabusPushConsumerStatus.Builder().withListenerResult(databusPushConsumerListenerResponse).build());
                                LOG.info("Consumer " + super.getClientId() + " is resumed");
                            } catch (InterruptedException | ExecutionException e) {
                                LOG.error("Consumer " + super.getClientId() + " listener throws an Exception while it was working: " + e.getMessage(), e);
                                databusPushConsumerFuture.setDatabusPushConsumerListenerStatus(new DatabusPushConsumerStatus.Builder().withException(e).build());
                                this.stopRequested.set(true);
                            } catch (CancellationException e2) {
                                LOG.warn("Consumer " + super.getClientId() + " was cancelled: " + e2.getMessage(), e2);
                                databusPushConsumerFuture.setDatabusPushConsumerListenerStatus(new DatabusPushConsumerStatus.Builder().build());
                                this.stopRequested.set(true);
                            } catch (TimeoutException e3) {
                                super.poll(Duration.ofMillis(0L));
                                LOG.info("Consumer " + super.getClientId() + " sends heartbeat to coordinator. The listener continue processing messages...");
                            } catch (Exception e4) {
                                LOG.warn("Consumer " + super.getClientId() + " exception: " + e4.getMessage(), e4);
                                databusPushConsumerFuture.setDatabusPushConsumerListenerStatus(new DatabusPushConsumerStatus.Builder().withException(e4).build());
                                this.stopRequested.set(true);
                            }
                        }
                    }
                } catch (DatabusClientRuntimeException e5) {
                    if ((e5.getCause() instanceof WakeupException) && this.stopRequested.get()) {
                        LOG.error("Consumer " + super.getClientId() + "Error: " + e5.getMessage(), e5);
                    }
                    this.countDownLatch.countDown();
                    LOG.info("Consumer " + super.getClientId() + " end");
                    return;
                } catch (Exception e6) {
                    LOG.error("Consumer " + super.getClientId() + "Error: " + e6.getMessage(), e6);
                    this.countDownLatch.countDown();
                    LOG.info("Consumer " + super.getClientId() + " end");
                    return;
                }
            } catch (Throwable th) {
                this.countDownLatch.countDown();
                LOG.info("Consumer " + super.getClientId() + " end");
                throw th;
            }
        }
        this.countDownLatch.countDown();
        LOG.info("Consumer " + super.getClientId() + " end");
    }

    private void seek(Map<TopicPartition, Long> map) {
        for (TopicPartition topicPartition : super.assignment()) {
            if (map.containsKey(topicPartition)) {
                super.seek(topicPartition, map.get(topicPartition).longValue());
            }
        }
    }

    private Future<DatabusPushConsumerListenerResponse> runListenerAsync(DatabusPushConsumerFuture databusPushConsumerFuture, ConsumerRecords consumerRecords) {
        Future<DatabusPushConsumerListenerResponse> submit = this.executor.submit(() -> {
            return this.consumerListener.onConsume(consumerRecords);
        });
        databusPushConsumerFuture.setDatabusPushConsumerListenerStatus(new DatabusPushConsumerStatus.Builder().withStatus(DatabusPushConsumerStatus.Status.PROCESSING).build());
        LOG.info("Consumer " + getClientId() + " Listener was called");
        return submit;
    }

    private Map<TopicPartition, Long> getCurrentConsumerPosition(Set<TopicPartition> set) {
        HashMap hashMap = new HashMap(set.size());
        for (TopicPartition topicPartition : set) {
            hashMap.put(topicPartition, Long.valueOf(super.position(topicPartition)));
        }
        return hashMap;
    }

    @Override // com.opendxl.databus.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.stopRequested.set(true);
            wakeup();
            this.databusPushConsumerFuture = null;
            if (this.listenerFuture != null) {
                this.listenerFuture.cancel(true);
            }
            this.executor.shutdown();
            this.executor.awaitTermination(5L, TimeUnit.MILLISECONDS);
            this.executor.shutdownNow();
            if (this.pushAsyncExecutor != null) {
                this.pushAsyncExecutor.shutdownNow();
                this.pushAsyncExecutor = null;
            }
            super.close();
            LOG.info("Consumer " + super.getClientId() + " is closed");
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            if (this.pushAsyncExecutor != null) {
                this.pushAsyncExecutor.shutdownNow();
                this.pushAsyncExecutor = null;
            }
            super.close();
            LOG.info("Consumer " + super.getClientId() + " is closed");
        } catch (Throwable th) {
            this.executor.shutdownNow();
            if (this.pushAsyncExecutor != null) {
                this.pushAsyncExecutor.shutdownNow();
                this.pushAsyncExecutor = null;
            }
            super.close();
            LOG.info("Consumer " + super.getClientId() + " is closed");
            throw th;
        }
    }
}
