package org.apache.zookeeper.server.watch;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.DumbWatcher;
import org.apache.zookeeper.server.ServerMetrics;
import org.apache.zookeeper.test.SessionTrackerCheckTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/server/watch/WatchManagerTest.class */
public class WatchManagerTest extends ZKTestCase {
    protected static final Logger LOG = LoggerFactory.getLogger(WatchManagerTest.class);
    private static final String PATH_PREFIX = "/path";
    private ConcurrentHashMap<Integer, DumbWatcher> watchers;
    private Random r;

    /* loaded from: input_file:org/apache/zookeeper/server/watch/WatchManagerTest$AddWatcherWorker.class */
    public class AddWatcherWorker extends Thread {
        private final IWatchManager manager;
        private final int paths;
        private final int watchers;
        private final AtomicInteger watchesAdded;
        private volatile boolean stopped = false;

        public AddWatcherWorker(IWatchManager iWatchManager, int i, int i2, AtomicInteger atomicInteger) {
            this.manager = iWatchManager;
            this.paths = i;
            this.watchers = i2;
            this.watchesAdded = atomicInteger;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                if (this.manager.addWatch(WatchManagerTest.PATH_PREFIX + WatchManagerTest.this.r.nextInt(this.paths), WatchManagerTest.this.createOrGetWatcher(WatchManagerTest.this.r.nextInt(this.watchers)))) {
                    this.watchesAdded.addAndGet(1);
                }
            }
        }

        public void shutdown() {
            this.stopped = true;
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/watch/WatchManagerTest$CreateDeadWatchersWorker.class */
    public class CreateDeadWatchersWorker extends Thread {
        private final IWatchManager manager;
        private final int watchers;
        private final Set<Watcher> removedWatchers;
        private volatile boolean stopped = false;

        public CreateDeadWatchersWorker(IWatchManager iWatchManager, int i, Set<Watcher> set) {
            this.manager = iWatchManager;
            this.watchers = i;
            this.removedWatchers = set;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                Watcher createOrGetWatcher = WatchManagerTest.this.createOrGetWatcher(WatchManagerTest.this.r.nextInt(this.watchers));
                createOrGetWatcher.setStale();
                this.manager.removeWatcher(createOrGetWatcher);
                synchronized (this.removedWatchers) {
                    this.removedWatchers.add(createOrGetWatcher);
                }
                try {
                    Thread.sleep(WatchManagerTest.this.r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            this.stopped = true;
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/watch/WatchManagerTest$RemoveWatcherWorker.class */
    public class RemoveWatcherWorker extends Thread {
        private final IWatchManager manager;
        private final int paths;
        private final int watchers;
        private final AtomicInteger watchesRemoved;
        private volatile boolean stopped = false;

        public RemoveWatcherWorker(IWatchManager iWatchManager, int i, int i2, AtomicInteger atomicInteger) {
            this.manager = iWatchManager;
            this.paths = i;
            this.watchers = i2;
            this.watchesRemoved = atomicInteger;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                if (this.manager.removeWatcher(WatchManagerTest.PATH_PREFIX + WatchManagerTest.this.r.nextInt(this.paths), WatchManagerTest.this.createOrGetWatcher(WatchManagerTest.this.r.nextInt(this.watchers)))) {
                    this.watchesRemoved.addAndGet(1);
                }
                try {
                    Thread.sleep(WatchManagerTest.this.r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            this.stopped = true;
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/server/watch/WatchManagerTest$WatcherTriggerWorker.class */
    public class WatcherTriggerWorker extends Thread {
        private final IWatchManager manager;
        private final int paths;
        private final AtomicInteger triggeredCount;
        private volatile boolean stopped = false;

        public WatcherTriggerWorker(IWatchManager iWatchManager, int i, AtomicInteger atomicInteger) {
            this.manager = iWatchManager;
            this.paths = i;
            this.triggeredCount = atomicInteger;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                WatcherOrBitSet triggerWatch = this.manager.triggerWatch(WatchManagerTest.PATH_PREFIX + WatchManagerTest.this.r.nextInt(this.paths), Watcher.Event.EventType.NodeDeleted);
                if (triggerWatch != null) {
                    this.triggeredCount.addAndGet(triggerWatch.size());
                }
                try {
                    Thread.sleep(WatchManagerTest.this.r.nextInt(10));
                } catch (InterruptedException e) {
                }
            }
        }

        public void shutdown() {
            this.stopped = true;
        }
    }

    public static Stream<Arguments> data() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{WatchManager.class.getName()}), Arguments.of(new Object[]{WatchManagerOptimized.class.getName()})});
    }

    @BeforeEach
    public void setUp() {
        ServerMetrics.getMetrics().resetAll();
        this.watchers = new ConcurrentHashMap<>();
        this.r = new Random(System.nanoTime());
    }

    public IWatchManager getWatchManager(String str) throws IOException {
        System.setProperty("zookeeper.watchManagerName", str);
        return WatchManagerFactory.createWatchManager();
    }

    public DumbWatcher createOrGetWatcher(int i) {
        if (!this.watchers.containsKey(Integer.valueOf(i))) {
            this.watchers.putIfAbsent(Integer.valueOf(i), new DumbWatcher(i));
        }
        return this.watchers.get(Integer.valueOf(i));
    }

    @MethodSource({"data"})
    @Timeout(90)
    @ParameterizedTest
    public void testAddAndTriggerWatcher(String str) throws IOException {
        IWatchManager watchManager = getWatchManager(str);
        AtomicInteger atomicInteger = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            WatcherTriggerWorker watcherTriggerWorker = new WatcherTriggerWorker(watchManager, 1, atomicInteger);
            arrayList.add(watcherTriggerWorker);
            watcherTriggerWorker.start();
        }
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            AddWatcherWorker addWatcherWorker = new AddWatcherWorker(watchManager, 1, SessionTrackerCheckTest.CONNECTION_TIMEOUT, atomicInteger2);
            arrayList2.add(addWatcherWorker);
            addWatcherWorker.start();
        }
        while (atomicInteger2.get() < 100000) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            ((AddWatcherWorker) it.next()).shutdown();
        }
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e2) {
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((WatcherTriggerWorker) it2.next()).shutdown();
        }
        Assertions.assertTrue(atomicInteger2.get() > 0);
        Assertions.assertEquals(atomicInteger2.get(), atomicInteger.get());
    }

    @MethodSource({"data"})
    @Timeout(90)
    @ParameterizedTest
    public void testRemoveWatcherOnPath(String str) throws IOException {
        IWatchManager watchManager = getWatchManager(str);
        AtomicInteger atomicInteger = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            RemoveWatcherWorker removeWatcherWorker = new RemoveWatcherWorker(watchManager, 10, SessionTrackerCheckTest.CONNECTION_TIMEOUT, atomicInteger);
            arrayList.add(removeWatcherWorker);
            removeWatcherWorker.start();
        }
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            AddWatcherWorker addWatcherWorker = new AddWatcherWorker(watchManager, 10, SessionTrackerCheckTest.CONNECTION_TIMEOUT, atomicInteger2);
            arrayList2.add(addWatcherWorker);
            addWatcherWorker.start();
        }
        while (atomicInteger2.get() < 100000) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((RemoveWatcherWorker) it.next()).shutdown();
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((AddWatcherWorker) it2.next()).shutdown();
        }
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e2) {
        }
        Assertions.assertTrue(atomicInteger2.get() > 0);
        Assertions.assertTrue(atomicInteger.get() > 0);
        Assertions.assertTrue(watchManager.size() > 0);
        Assertions.assertEquals(atomicInteger2.get(), atomicInteger.get() + watchManager.size());
    }

    @MethodSource({"data"})
    @Timeout(90)
    @ParameterizedTest
    public void testDeadWatchers(String str) throws IOException {
        System.setProperty("zookeeper.watcherCleanThreshold", "10");
        System.setProperty("zookeeper.watcherCleanIntervalInSeconds", "1");
        IWatchManager watchManager = getWatchManager(str);
        HashSet hashSet = new HashSet();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 5; i++) {
            CreateDeadWatchersWorker createDeadWatchersWorker = new CreateDeadWatchersWorker(watchManager, 100000, hashSet);
            arrayList.add(createDeadWatchersWorker);
            createDeadWatchersWorker.start();
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 0; i2 < 5; i2++) {
            AddWatcherWorker addWatcherWorker = new AddWatcherWorker(watchManager, 1, 100000, atomicInteger);
            arrayList2.add(addWatcherWorker);
            addWatcherWorker.start();
        }
        while (atomicInteger.get() < 50000) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CreateDeadWatchersWorker) it.next()).shutdown();
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((AddWatcherWorker) it2.next()).shutdown();
        }
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e2) {
        }
        WatchesReport watches = watchManager.getWatches();
        Iterator it3 = hashSet.iterator();
        while (it3.hasNext()) {
            Assertions.assertFalse(watches.hasPaths(((Watcher) it3.next()).getSessionId()));
        }
    }

    private void checkMetrics(String str, long j, long j2, double d, long j3, long j4) {
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(Long.valueOf(j), currentServerMetrics.get("min_" + str));
        Assertions.assertEquals(Long.valueOf(j2), currentServerMetrics.get("max_" + str));
        Assertions.assertEquals(d, ((Double) currentServerMetrics.get("avg_" + str)).doubleValue(), 1.0E-6d);
        Assertions.assertEquals(Long.valueOf(j3), currentServerMetrics.get("cnt_" + str));
        Assertions.assertEquals(Long.valueOf(j4), currentServerMetrics.get("sum_" + str));
    }

    @MethodSource({"data"})
    @ParameterizedTest
    public void testWatcherMetrics(String str) throws IOException {
        IWatchManager watchManager = getWatchManager(str);
        ServerMetrics.getMetrics().resetAll();
        DumbWatcher dumbWatcher = new DumbWatcher(1L);
        DumbWatcher dumbWatcher2 = new DumbWatcher(2L);
        watchManager.addWatch("/path1", dumbWatcher);
        watchManager.addWatch("/path1", dumbWatcher2);
        watchManager.addWatch("/path2", dumbWatcher);
        watchManager.triggerWatch("/path3", Watcher.Event.EventType.NodeCreated);
        checkMetrics("node_created_watch_count", 0L, 0L, 0.0d, 0L, 0L);
        watchManager.triggerWatch("/path1", Watcher.Event.EventType.NodeCreated);
        checkMetrics("node_created_watch_count", 2L, 2L, 2.0d, 1L, 2L);
        watchManager.triggerWatch("/path2", Watcher.Event.EventType.NodeCreated);
        checkMetrics("node_created_watch_count", 1L, 2L, 1.5d, 2L, 3L);
        watchManager.triggerWatch("/path1", Watcher.Event.EventType.NodeDataChanged);
        checkMetrics("node_changed_watch_count", 0L, 0L, 0.0d, 0L, 0L);
        watchManager.addWatch("/path1", dumbWatcher);
        watchManager.addWatch("/path1", dumbWatcher2);
        watchManager.addWatch("/path2", dumbWatcher);
        watchManager.triggerWatch("/path1", Watcher.Event.EventType.NodeDataChanged);
        checkMetrics("node_changed_watch_count", 2L, 2L, 2.0d, 1L, 2L);
        watchManager.triggerWatch("/path2", Watcher.Event.EventType.NodeDeleted);
        checkMetrics("node_deleted_watch_count", 1L, 1L, 1.0d, 1L, 1L);
        checkMetrics("node_created_watch_count", 1L, 2L, 1.5d, 2L, 3L);
    }
}
