package org.apache.storm.kafka.spout;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.class */
public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
    private final int maxPollRecords = 10;
    private final int maxRetries = 3;

    public KafkaSpoutSingleTopicTest() {
        super(2000L);
        this.maxPollRecords = 10;
        this.maxRetries = 3;
    }

    @Override // org.apache.storm.kafka.spout.KafkaSpoutAbstractTest
    KafkaSpoutConfig<String, String> createSpoutConfig() {
        return SingleTopicKafkaSpoutConfiguration.setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + this.kafkaUnitExtension.getKafkaUnit().getKafkaPort(), Pattern.compile("test"))).setOffsetCommitPeriodMs(this.commitOffsetPeriodMs).setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L), 3, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0L))).setProp("max.poll.records", 10).build();
    }

    @Test
    public void testSeekToCommittedOffsetIfConsumerPositionIsBehindWhenCommitting() throws Exception {
        prepareSpout(20);
        for (int i = 0; i < 20; i++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(20))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        List allValues = forClass.getAllValues();
        for (int i2 = 1; i2 < allValues.size(); i2++) {
            this.spout.ack(allValues.get(i2));
        }
        KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) allValues.get(0);
        this.spout.fail(kafkaSpoutMessageId);
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        this.spout.nextTuple();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        MatcherAssert.assertThat("Expected replay of failed tuple", forClass2.getValue(), CoreMatchers.is(kafkaSpoutMessageId));
        Mockito.clearInvocations(new SpoutOutputCollector[]{this.collectorMock});
        Time.advanceTime(500 + this.commitOffsetPeriodMs);
        this.spout.ack(forClass2.getValue());
        this.spout.nextTuple();
        ((KafkaConsumer) Mockito.verify(getKafkaConsumer())).commitSync((Map) this.commitCapture.capture());
        Map map = (Map) this.commitCapture.getValue();
        TopicPartition topicPartition = new TopicPartition("test", 0);
        MatcherAssert.assertThat("Should have committed to the right topic", map, Matchers.hasKey(topicPartition));
        MatcherAssert.assertThat("Should have committed all the acked messages", Long.valueOf(((OffsetAndMetadata) map.get(topicPartition)).offset()), CoreMatchers.is(20L));
        for (int i3 = 0; i3 < 3; i3++) {
            this.spout.nextTuple();
        }
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.anyObject());
    }

    @Test
    public void testClearingWaitingToEmitIfConsumerPositionIsNotBehindWhenCommitting() throws Exception {
        prepareSpout(11);
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(10))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        List allValues = forClass.getAllValues();
        for (int i2 = 1; i2 < allValues.size(); i2++) {
            this.spout.ack(allValues.get(i2));
        }
        KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) allValues.get(0);
        this.spout.fail(kafkaSpoutMessageId);
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        this.spout.nextTuple();
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        MatcherAssert.assertThat("Expected replay of failed tuple", forClass2.getValue(), CoreMatchers.is(kafkaSpoutMessageId));
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        Time.advanceTime(500 + this.commitOffsetPeriodMs);
        this.spout.ack(forClass2.getValue());
        this.spout.nextTuple();
        ((KafkaConsumer) Mockito.verify(getKafkaConsumer())).commitSync((Map) this.commitCapture.capture());
        Map map = (Map) this.commitCapture.getValue();
        TopicPartition topicPartition = new TopicPartition("test", 0);
        MatcherAssert.assertThat("Should have committed to the right topic", map, Matchers.hasKey(topicPartition));
        MatcherAssert.assertThat("Should have committed all the acked messages", Long.valueOf(((OffsetAndMetadata) map.get(topicPartition)).offset()), CoreMatchers.is(10L));
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass3.capture());
        MatcherAssert.assertThat("Expected emit of the final tuple in the partition", Long.valueOf(((KafkaSpoutMessageId) forClass3.getValue()).offset()), CoreMatchers.is(Long.valueOf(11 - 1)));
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        for (int i3 = 0; i3 < 3; i3++) {
            this.spout.nextTuple();
        }
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.anyObject());
    }

    @Test
    public void testShouldContinueWithSlowDoubleAcks() throws Exception {
        prepareSpout(20);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        this.spout.ack(forClass.getValue());
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
        }
        this.spout.ack(forClass.getValue());
        for (int i2 = 0; i2 < 20; i2++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(20))).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass2.capture());
        Iterator it = forClass2.getAllValues().iterator();
        while (it.hasNext()) {
            this.spout.ack(it.next());
        }
        Time.advanceTime(this.commitOffsetPeriodMs + 500);
        this.spout.nextTuple();
        verifyAllMessagesCommitted(20L);
    }

    @Test
    public void testShouldEmitAllMessages() throws Exception {
        prepareSpout(10);
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), (List) ArgumentMatchers.eq(new Values(new Object[]{"test", Integer.toString(i), Integer.toString(i)})), forClass.capture());
            this.spout.ack(forClass.getValue());
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        }
        Time.advanceTime(this.commitOffsetPeriodMs + 500);
        this.spout.nextTuple();
        verifyAllMessagesCommitted(10L);
    }

    @Test
    public void testShouldReplayInOrderFailedMessages() throws Exception {
        prepareSpout(10);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        this.spout.ack(forClass.getValue());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        this.spout.fail(forClass2.getValue());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(9))).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass3.capture());
        Iterator it = forClass3.getAllValues().iterator();
        while (it.hasNext()) {
            this.spout.ack(it.next());
        }
        Time.advanceTime(this.commitOffsetPeriodMs + 500);
        this.spout.nextTuple();
        verifyAllMessagesCommitted(10L);
    }

    @Test
    public void testShouldReplayFirstTupleFailedOutOfOrder() throws Exception {
        prepareSpout(10);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Object.class);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        this.spout.ack(forClass2.getValue());
        this.spout.fail(forClass.getValue());
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Object.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(9))).emit((String) ArgumentMatchers.eq(SingleTopicKafkaSpoutConfiguration.STREAM), ArgumentMatchers.anyList(), forClass3.capture());
        Iterator it = forClass3.getAllValues().iterator();
        while (it.hasNext()) {
            this.spout.ack(it.next());
        }
        Time.advanceTime(this.commitOffsetPeriodMs + 500);
        this.spout.nextTuple();
        verifyAllMessagesCommitted(10L);
    }

    @Test
    public void testShouldReplayAllFailedTuplesWhenFailedOutOfOrder() throws Exception {
        prepareSpout(10);
        for (int i = 0; i < 10; i++) {
            this.spout.nextTuple();
        }
        ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(10))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass.capture());
        Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        List allValues = forClass.getAllValues();
        this.spout.fail(allValues.get(5));
        this.spout.fail(allValues.get(3));
        this.spout.nextTuple();
        this.spout.fail(allValues.get(2));
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
        for (int i2 = 0; i2 < 10; i2++) {
            this.spout.nextTuple();
        }
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.times(3))).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), forClass2.capture());
        HashSet hashSet = new HashSet();
        hashSet.add(allValues.get(5));
        hashSet.add(allValues.get(3));
        hashSet.add(allValues.get(2));
        MatcherAssert.assertThat("Expected reemits to be the 3 failed tuples", new HashSet(forClass2.getAllValues()), CoreMatchers.is(hashSet));
    }

    @Test
    public void testShouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
        prepareSpout(1);
        for (int i = 0; i <= 3; i++) {
            ArgumentCaptor forClass = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
            this.spout.nextTuple();
            ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyListOf(Object.class), forClass.capture());
            KafkaSpoutMessageId kafkaSpoutMessageId = (KafkaSpoutMessageId) forClass.getValue();
            this.spout.fail(kafkaSpoutMessageId);
            MatcherAssert.assertThat("Expected message id number of failures to match the number of times the message has failed", Integer.valueOf(kafkaSpoutMessageId.numFails()), CoreMatchers.is(Integer.valueOf(i + 1)));
            Mockito.reset(new SpoutOutputCollector[]{this.collectorMock});
        }
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyListOf(Object.class), ArgumentMatchers.anyObject());
    }

    @Test
    public void testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
        SingleTopicKafkaUnitSetupHelper.initializeSpout(this.spout, this.conf, this.topologyContext, this.collectorMock);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock, Mockito.never())).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.any(KafkaSpoutMessageId.class));
        SingleTopicKafkaUnitSetupHelper.populateTopicData(this.kafkaUnitExtension.getKafkaUnit(), "test", 1);
        Time.advanceTime(2500L);
        this.spout.nextTuple();
        ((SpoutOutputCollector) Mockito.verify(this.collectorMock)).emit(ArgumentMatchers.anyString(), ArgumentMatchers.anyList(), ArgumentMatchers.any(KafkaSpoutMessageId.class));
    }

    @Test
    public void testOffsetMetrics() throws Exception {
        prepareSpout(10);
        Map map = (Map) this.spout.getKafkaOffsetMetric().getValueAndReset();
        Assertions.assertEquals(((Long) map.get("test/totalEarliestTimeOffset")).longValue(), 0L);
        Assertions.assertEquals(((Long) map.get("test/totalLatestTimeOffset")).longValue(), 10L);
        Assertions.assertEquals(((Long) map.get("test/totalRecordsInPartitions")).longValue(), 10L);
        Assertions.assertEquals(((Long) map.get("test/totalLatestEmittedOffset")).longValue(), 0L);
        Assertions.assertEquals(((Long) map.get("test/totalLatestCompletedOffset")).longValue(), 0L);
        Assertions.assertEquals(((Long) map.get("test/totalSpoutLag")).longValue(), 10L);
        for (int i = 0; i < 10; i++) {
            nextTuple_verifyEmitted_ack_resetCollector(i);
        }
        commitAndVerifyAllMessagesCommitted(10L);
        Map map2 = (Map) this.spout.getKafkaOffsetMetric().getValueAndReset();
        Assertions.assertEquals(((Long) map2.get("test/totalEarliestTimeOffset")).longValue(), 0L);
        Assertions.assertEquals(((Long) map2.get("test/totalLatestTimeOffset")).longValue(), 10L);
        Assertions.assertEquals(((Long) map2.get("test/totalLatestEmittedOffset")).longValue(), 9L);
        Assertions.assertEquals(((Long) map2.get("test/totalLatestCompletedOffset")).longValue(), 10L);
        Assertions.assertEquals(((Long) map2.get("test/totalSpoutLag")).longValue(), 0L);
    }
}
