package org.apache.zookeeper.server.quorum;

import java.lang.Thread;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.server.WorkerService;
import org.apache.zookeeper.server.ZooKeeperServerListener;
import org.apache.zookeeper.test.SessionTrackerCheckTest;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.class */
public class CommitProcessorMetricsTest extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(CommitProcessorMetricsTest.class);
    CommitProcessor commitProcessor;
    DummyFinalProcessor finalProcessor;
    CountDownLatch requestScheduled = null;
    CountDownLatch requestProcessed = null;
    CountDownLatch commitSeen = null;
    CountDownLatch poolEmpytied = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest$DummyFinalProcessor.class */
    public class DummyFinalProcessor implements RequestProcessor {
        int processTime;

        public DummyFinalProcessor(int i) {
            this.processTime = i;
        }

        public void processRequest(Request request) {
            if (this.processTime > 0) {
                try {
                    if (CommitProcessorMetricsTest.this.commitSeen != null) {
                        CommitProcessorMetricsTest.this.commitSeen.await(5L, TimeUnit.SECONDS);
                    }
                    Thread.sleep(this.processTime);
                } catch (Exception e) {
                }
            }
        }

        public void shutdown() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest$TestCommitProcessor.class */
    public class TestCommitProcessor extends CommitProcessor {
        int numWorkerThreads;

        public TestCommitProcessor(RequestProcessor requestProcessor, int i) {
            super(requestProcessor, "1", true, (ZooKeeperServerListener) null);
            this.numWorkerThreads = i;
        }

        public void start() {
            ((CommitProcessor) this).workerPool = new TestWorkerService(this.numWorkerThreads);
            super.start();
            Thread.State state = super.getState();
            while (state != Thread.State.WAITING) {
                try {
                    Thread.sleep(50L);
                } catch (Exception e) {
                }
                state = super.getState();
            }
            CommitProcessorMetricsTest.LOG.info("numWorkerThreads in Test is {}", Integer.valueOf(this.numWorkerThreads));
        }

        protected void endOfIteration() {
            if (CommitProcessorMetricsTest.this.requestProcessed != null) {
                CommitProcessorMetricsTest.this.requestProcessed.countDown();
            }
        }

        protected void waitForEmptyPool() throws InterruptedException {
            if (CommitProcessorMetricsTest.this.commitSeen != null) {
                CommitProcessorMetricsTest.this.commitSeen.countDown();
            }
            super.waitForEmptyPool();
            if (CommitProcessorMetricsTest.this.poolEmpytied != null) {
                CommitProcessorMetricsTest.this.poolEmpytied.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest$TestWorkerService.class */
    private class TestWorkerService extends WorkerService {
        public TestWorkerService(int i) {
            super("CommitProcWork", i, true);
        }

        public void schedule(WorkerService.WorkRequest workRequest, long j) {
            super.schedule(workRequest, j);
            if (CommitProcessorMetricsTest.this.requestScheduled != null) {
                CommitProcessorMetricsTest.this.requestScheduled.countDown();
            }
        }
    }

    @BeforeEach
    public void setup() {
        LOG.info("setup");
        ServerMetrics.getMetrics().resetAll();
        System.clearProperty("zookeeper.commitProcessor.maxReadBatchSize");
        System.clearProperty("zookeeper.commitProcessor.maxCommitBatchSize");
    }

    public void setupProcessors(int i, int i2) {
        this.finalProcessor = new DummyFinalProcessor(i2);
        this.commitProcessor = new TestCommitProcessor(this.finalProcessor, i);
        this.commitProcessor.start();
    }

    @AfterEach
    public void tearDown() throws Exception {
        LOG.info("tearDown starting");
        this.commitProcessor.shutdown();
        this.commitProcessor.join();
    }

    private void checkMetrics(String str, long j, long j2, double d, long j3, long j4) {
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(Long.valueOf(j), currentServerMetrics.get("min_" + str), "expected min is " + j);
        Assertions.assertEquals(Long.valueOf(j2), currentServerMetrics.get("max_" + str), "expected max is: " + j2);
        Assertions.assertEquals(d, ((Double) currentServerMetrics.get("avg_" + str)).doubleValue(), 0.001d, "expected avg is: " + d);
        Assertions.assertEquals(Long.valueOf(j3), currentServerMetrics.get("cnt_" + str), "expected cnt is: " + j3);
        Assertions.assertEquals(Long.valueOf(j4), currentServerMetrics.get("sum_" + str), "expected sum is: " + j4);
    }

    private void checkTimeMetric(long j, long j2, long j3) {
        MatcherAssert.assertThat(Long.valueOf(j), Matchers.greaterThanOrEqualTo(Long.valueOf(j2)));
        MatcherAssert.assertThat(Long.valueOf(j), Matchers.lessThanOrEqualTo(Long.valueOf(j3)));
    }

    private Request createReadRequest(long j, int i) {
        return new Request((ServerCnxn) null, j, i, 4, ByteBuffer.wrap(new byte[10]), (List) null);
    }

    private Request createWriteRequest(long j, int i) {
        return new Request((ServerCnxn) null, j, i, 5, ByteBuffer.wrap(new byte[10]), (List) null);
    }

    private void processRequestWithWait(Request request) throws Exception {
        this.requestProcessed = new CountDownLatch(1);
        this.commitProcessor.processRequest(request);
        this.requestProcessed.await(5L, TimeUnit.SECONDS);
    }

    private void commitWithWait(Request request) throws Exception {
        this.requestProcessed = new CountDownLatch(1);
        this.commitProcessor.commit(request);
        this.requestProcessed.await(5L, TimeUnit.SECONDS);
    }

    @Test
    public void testRequestsInSessionQueue() throws Exception {
        setupProcessors(0, 0);
        Request createWriteRequest = createWriteRequest(1L, 1);
        processRequestWithWait(createWriteRequest);
        checkMetrics("requests_in_session_queue", 1L, 1L, 1.0d, 1L, 1L);
        processRequestWithWait(createReadRequest(1L, 2));
        processRequestWithWait(createReadRequest(1L, 3));
        checkMetrics("requests_in_session_queue", 1L, 3L, 2.0d, 3L, 6L);
        commitWithWait(createWriteRequest);
        checkMetrics("requests_in_session_queue", 1L, 3L, 2.25d, 4L, 9L);
    }

    @Test
    public void testWriteFinalProcTime() throws Exception {
        setupProcessors(0, SessionTrackerCheckTest.TICK_TIME);
        Request createWriteRequest = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest);
        Assertions.assertEquals(0L, MetricsUtils.currentServerMetrics().get("cnt_write_final_proc_time_ms"));
        commitWithWait(createWriteRequest);
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_write_final_proc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_write_final_proc_time_ms")).longValue(), 1000L, 2000L);
    }

    @Test
    public void testReadFinalProcTime() throws Exception {
        setupProcessors(0, SessionTrackerCheckTest.TICK_TIME);
        processRequestWithWait(createReadRequest(1L, 1));
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_read_final_proc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_read_final_proc_time_ms")).longValue(), 1000L, 2000L);
    }

    @Test
    public void testCommitProcessTime() throws Exception {
        setupProcessors(0, 0);
        processRequestWithWait(createReadRequest(1L, 1));
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_commit_process_time"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_commit_process_time")).longValue(), 0L, 1000L);
    }

    @Test
    public void testServerWriteCommittedTime() throws Exception {
        setupProcessors(0, 0);
        commitWithWait(createWriteRequest(1L, 1));
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_server_write_committed_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_server_write_committed_time_ms")).longValue(), 0L, 1000L);
    }

    @Test
    public void testLocalWriteCommittedTime() throws Exception {
        setupProcessors(0, 0);
        Request createWriteRequest = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest);
        commitWithWait(createWriteRequest);
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_local_write_committed_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_local_write_committed_time_ms")).longValue(), 0L, 1000L);
        Request createWriteRequest2 = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest2);
        Thread.sleep(1000L);
        commitWithWait(createWriteRequest2);
        Map<String, Object> currentServerMetrics2 = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(2L, currentServerMetrics2.get("cnt_local_write_committed_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics2.get("max_local_write_committed_time_ms")).longValue(), 0L, 1000L);
    }

    @Test
    public void testWriteCommitProcTime() throws Exception {
        setupProcessors(0, 0);
        Request createWriteRequest = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest);
        commitWithWait(createWriteRequest);
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_write_commitproc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_write_commitproc_time_ms")).longValue(), 0L, 1000L);
        Request createWriteRequest2 = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest2);
        Thread.sleep(1000L);
        commitWithWait(createWriteRequest2);
        Map<String, Object> currentServerMetrics2 = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(2L, currentServerMetrics2.get("cnt_write_commitproc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics2.get("max_write_commitproc_time_ms")).longValue(), 1000L, 2000L);
    }

    @Test
    public void testReadCommitProcTime() throws Exception {
        setupProcessors(0, 0);
        processRequestWithWait(createReadRequest(1L, 1));
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(1L, currentServerMetrics.get("cnt_read_commitproc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics.get("max_read_commitproc_time_ms")).longValue(), 0L, 1000L);
        Request createWriteRequest = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest);
        processRequestWithWait(createReadRequest(1L, 3));
        Thread.sleep(1000L);
        commitWithWait(createWriteRequest);
        Map<String, Object> currentServerMetrics2 = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(2L, currentServerMetrics2.get("cnt_read_commitproc_time_ms"));
        checkTimeMetric(((Long) currentServerMetrics2.get("max_read_commitproc_time_ms")).longValue(), 1000L, 2000L);
    }

    @Test
    public void testTimeWaitingEmptyPoolInCommitProcessorRead() throws Exception {
        setupProcessors(1, SessionTrackerCheckTest.TICK_TIME);
        this.requestScheduled = new CountDownLatch(3);
        this.commitProcessor.processRequest(createReadRequest(0L, 2));
        this.commitProcessor.processRequest(createReadRequest(1L, 3));
        this.commitProcessor.processRequest(createReadRequest(2L, 4));
        this.requestScheduled.await(5L, TimeUnit.SECONDS);
        this.poolEmpytied = new CountDownLatch(1);
        this.commitProcessor.commit(createWriteRequest(1L, 1));
        this.poolEmpytied.await(5L, TimeUnit.SECONDS);
        checkTimeMetric(((Long) MetricsUtils.currentServerMetrics().get("max_time_waiting_empty_pool_in_commit_processor_read_ms")).longValue(), 2500L, 3500L);
    }

    @Test
    public void testConcurrentRequestProcessingInCommitProcessor() throws Exception {
        setupProcessors(3, SessionTrackerCheckTest.TICK_TIME);
        this.commitSeen = new CountDownLatch(1);
        this.requestScheduled = new CountDownLatch(3);
        this.commitProcessor.processRequest(createReadRequest(1L, 2));
        this.commitProcessor.processRequest(createReadRequest(1L, 3));
        this.commitProcessor.processRequest(createReadRequest(1L, 4));
        this.requestScheduled.await(5L, TimeUnit.SECONDS);
        this.poolEmpytied = new CountDownLatch(1);
        this.commitProcessor.commit(createWriteRequest(1L, 1));
        this.poolEmpytied.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals(3L, MetricsUtils.currentServerMetrics().get("max_concurrent_request_processing_in_commit_processor"));
    }

    @Test
    public void testReadsAfterWriteInSessionQueue() throws Exception {
        setupProcessors(0, 0);
        processRequestWithWait(createReadRequest(1L, 1));
        Request createWriteRequest = createWriteRequest(1L, 1);
        processRequestWithWait(createWriteRequest);
        processRequestWithWait(createReadRequest(1L, 2));
        processRequestWithWait(createReadRequest(1L, 3));
        processRequestWithWait(createReadRequest(1L, 4));
        commitWithWait(createWriteRequest);
        checkMetrics("reads_after_write_in_session_queue", 3L, 3L, 3.0d, 1L, 3L);
    }

    @Test
    public void testReadsQueuedInCommitProcessor() throws Exception {
        setupProcessors(0, 0);
        processRequestWithWait(createReadRequest(1L, 1));
        processRequestWithWait(createReadRequest(1L, 2));
        checkMetrics("read_commit_proc_req_queued", 1L, 1L, 1.0d, 2L, 2L);
    }

    @Test
    public void testWritesQueuedInCommitProcessor() throws Exception {
        setupProcessors(0, 0);
        Request createWriteRequest = createWriteRequest(1L, 1);
        processRequestWithWait(createWriteRequest);
        Request createWriteRequest2 = createWriteRequest(1L, 2);
        processRequestWithWait(createWriteRequest2);
        checkMetrics("write_commit_proc_req_queued", 1L, 2L, 1.5d, 2L, 3L);
        commitWithWait(createWriteRequest);
        checkMetrics("write_commit_proc_req_queued", 1L, 2L, 1.6667d, 3L, 5L);
        commitWithWait(createWriteRequest2);
        checkMetrics("write_commit_proc_req_queued", 1L, 2L, 1.5d, 4L, 6L);
        processRequestWithWait(createReadRequest(1L, 1));
        checkMetrics("write_commit_proc_req_queued", 0L, 2L, 1.2d, 5L, 6L);
    }

    @Test
    public void testCommitsQueuedInCommitProcessor() throws Exception {
        setupProcessors(0, 0);
        commitWithWait(createWriteRequest(1L, 1));
        commitWithWait(createWriteRequest(1L, 2));
        checkMetrics("commit_commit_proc_req_queued", 1L, 1L, 1.0d, 2L, 2L);
    }

    @Test
    public void testCommitsQueued() throws Exception {
        setupProcessors(0, 0);
        commitWithWait(createWriteRequest(1L, 1));
        commitWithWait(createWriteRequest(1L, 2));
        Assertions.assertEquals(2L, ((Long) MetricsUtils.currentServerMetrics().get("request_commit_queued")).longValue());
    }

    @Test
    public void testPendingSessionQueueSize() throws Exception {
        setupProcessors(0, 0);
        Request createWriteRequest = createWriteRequest(1L, 1);
        processRequestWithWait(createWriteRequest);
        Request createWriteRequest2 = createWriteRequest(2L, 2);
        processRequestWithWait(createWriteRequest2);
        Request createWriteRequest3 = createWriteRequest(2L, 3);
        processRequestWithWait(createWriteRequest3);
        commitWithWait(createWriteRequest);
        checkMetrics("pending_session_queue_size", 2L, 2L, 2.0d, 1L, 2L);
        commitWithWait(createWriteRequest2);
        checkMetrics("pending_session_queue_size", 1L, 2L, 1.5d, 2L, 3L);
        commitWithWait(createWriteRequest3);
        checkMetrics("pending_session_queue_size", 1L, 2L, 1.333d, 3L, 4L);
    }
}
