package org.apache.zookeeper.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.InvalidAttributeValueException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.QueryExp;
import javax.management.ReflectionException;
import javax.management.RuntimeMBeanException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.admin.Commands;
import org.apache.zookeeper.server.quorum.DelayRequestProcessor;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.server.util.PortForwarder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/zookeeper/test/ObserverMasterTest.class */
public class ObserverMasterTest extends QuorumPeerTestBase implements Watcher {
    protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);
    private Boolean testObserverMaster;
    private CountDownLatch latch;
    ZooKeeper zk;
    private WatchedEvent lastEvent = null;
    private int CLIENT_PORT_QP1;
    private int CLIENT_PORT_QP2;
    private int CLIENT_PORT_OBS;
    private int OM_PORT;
    private QuorumPeerTestBase.MainThread q1;
    private QuorumPeerTestBase.MainThread q2;
    private QuorumPeerTestBase.MainThread q3;

    /* loaded from: input_file:org/apache/zookeeper/test/ObserverMasterTest$AsyncWriter.class */
    class AsyncWriter implements Runnable {
        private final ZooKeeper client;
        private final int numTransactions;
        private final boolean issueSync;
        private final CountDownLatch writerLatch;
        private final String root;
        private final CountDownLatch gate;

        AsyncWriter(ZooKeeper zooKeeper, int i, boolean z, CountDownLatch countDownLatch, String str, CountDownLatch countDownLatch2) {
            this.client = zooKeeper;
            this.numTransactions = i;
            this.issueSync = z;
            this.writerLatch = countDownLatch;
            this.root = str;
            this.gate = countDownLatch2;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.gate != null) {
                try {
                    this.gate.await();
                } catch (InterruptedException e) {
                    ObserverMasterTest.LOG.error("Gate interrupted");
                    return;
                }
            }
            for (int i = 0; i < this.numTransactions; i++) {
                final boolean z = i % 100 == 0;
                this.client.create(this.root + i, "inner thread".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: org.apache.zookeeper.test.ObserverMasterTest.AsyncWriter.1
                    public void processResult(int i2, String str, Object obj, String str2) {
                        AsyncWriter.this.writerLatch.countDown();
                        if (z) {
                            ObserverMasterTest.LOG.info("wrote {}", str);
                        }
                    }
                }, (Object) null);
                if (z) {
                    ObserverMasterTest.LOG.info("async wrote {}{}", this.root, Integer.valueOf(i));
                    if (this.issueSync) {
                        this.client.sync(this.root + "0", (AsyncCallback.VoidCallback) null, (Object) null);
                    }
                }
            }
        }
    }

    public ObserverMasterTest(Boolean bool) {
        this.testObserverMaster = bool;
    }

    @Parameterized.Parameters
    public static List<Object[]> data() {
        return Arrays.asList(new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE});
    }

    private PortForwarder setUp(int i) throws IOException {
        String str;
        ClientBase.setupTestEnv();
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        int unique3 = PortAssignment.unique();
        int unique4 = PortAssignment.unique();
        int unique5 = PortAssignment.unique();
        int unique6 = PortAssignment.unique();
        this.CLIENT_PORT_QP1 = PortAssignment.unique();
        this.CLIENT_PORT_QP2 = PortAssignment.unique();
        this.CLIENT_PORT_OBS = PortAssignment.unique();
        this.OM_PORT = PortAssignment.unique();
        String str2 = "server.1=127.0.0.1:" + unique + ":" + unique4 + ";" + this.CLIENT_PORT_QP1 + "\nserver.2=127.0.0.1:" + unique2 + ":" + unique5 + ";" + this.CLIENT_PORT_QP2 + "\nserver.3=127.0.0.1:" + unique3 + ":" + unique6 + ":observer;" + this.CLIENT_PORT_OBS;
        String format = this.testObserverMaster.booleanValue() ? String.format("observerMasterPort=%d%n", Integer.valueOf(this.OM_PORT)) : "";
        if (this.testObserverMaster.booleanValue()) {
            Object[] objArr = new Object[1];
            objArr[0] = Integer.valueOf(i <= 0 ? this.OM_PORT : i);
            str = String.format("observerMasterPort=%d%n", objArr);
        } else {
            str = "";
        }
        String str3 = str;
        PortForwarder portForwarder = null;
        if (this.testObserverMaster.booleanValue() && i >= 0) {
            portForwarder = new PortForwarder(i, this.OM_PORT);
        }
        this.q1 = new QuorumPeerTestBase.MainThread(1, this.CLIENT_PORT_QP1, str2, format);
        this.q2 = new QuorumPeerTestBase.MainThread(2, this.CLIENT_PORT_QP2, str2, format);
        this.q3 = new QuorumPeerTestBase.MainThread(3, this.CLIENT_PORT_OBS, str2, str3);
        this.q1.start();
        this.q2.start();
        Assert.assertTrue("waiting for server 1 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        return portForwarder;
    }

    private void shutdown() throws InterruptedException {
        LOG.info("Shutting down all servers");
        this.zk.close();
        this.q1.shutdown();
        this.q2.shutdown();
        this.q3.shutdown();
        Assert.assertTrue("Waiting for server 1 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server 2 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertTrue("Waiting for server 3 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
    }

    @Test
    public void testLaggingObserverMaster() throws Exception {
        int i;
        QuorumPeerTestBase.MainThread mainThread;
        QuorumPeerTestBase.MainThread mainThread2;
        int unique = PortAssignment.unique();
        PortForwarder up = setUp(unique);
        if (this.q1.getQuorumPeer().leader != null) {
            i = this.CLIENT_PORT_QP1;
            mainThread = this.q1;
            mainThread2 = this.q2;
        } else {
            if (this.q2.getQuorumPeer().leader == null) {
                throw new RuntimeException("No leader");
            }
            i = this.CLIENT_PORT_QP2;
            mainThread = this.q2;
            mainThread2 = this.q1;
        }
        this.zk = new ZooKeeper("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT, this);
        for (int i2 = 0; i2 < 10; i2++) {
            this.zk.create("/bulk" + i2, "initial data of some size".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        this.zk.close();
        this.q3.start();
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        Assert.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        this.zk.create("/init", "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final long lastLoggedZxid = mainThread.getQuorumPeer().getLastLoggedZxid();
        waitFor("Timeout waiting for observer sync", new ZKTestCase.WaitForCondition() { // from class: org.apache.zookeeper.test.ObserverMasterTest.1
            @Override // org.apache.zookeeper.ZKTestCase.WaitForCondition
            public boolean evaluate() {
                return lastLoggedZxid == ObserverMasterTest.this.q3.getQuorumPeer().getLastLoggedZxid();
            }
        }, 30);
        if (up != null) {
            up.shutdown();
        }
        for (int i3 = 0; i3 < 10; i3++) {
            this.zk.create("/basic" + i3, "second".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        DelayRequestProcessor injectDelayRequestProcessor = this.testObserverMaster.booleanValue() ? DelayRequestProcessor.injectDelayRequestProcessor(mainThread2.getQuorumPeer().getActiveServer()) : null;
        this.zk.create("/target1", "third".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        this.zk.create("/target2", "third".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = Long.toHexString(this.q3.getQuorumPeer().getLastLoggedZxid());
        objArr[1] = this.testObserverMaster.booleanValue() ? "" : " observer master zxid " + Long.toHexString(mainThread2.getQuorumPeer().getLastLoggedZxid());
        objArr[2] = Long.toHexString(mainThread.getQuorumPeer().getLastLoggedZxid());
        logger.info("observer zxid {}{} leader zxid {}", objArr);
        PortForwarder portForwarder = this.testObserverMaster.booleanValue() ? new PortForwarder(unique, this.OM_PORT) : null;
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        Assert.assertNotNull("Leader switched", mainThread.getQuorumPeer().leader);
        if (injectDelayRequestProcessor != null) {
            injectDelayRequestProcessor.unblockQueue();
        }
        this.latch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        this.zk.create("/finalop", "fourth".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals("first", new String(zooKeeper.getData("/init", (Watcher) null, (Stat) null)));
        Assert.assertEquals("third", new String(zooKeeper.getData("/target1", (Watcher) null, (Stat) null)));
        zooKeeper.close();
        shutdown();
        if (portForwarder != null) {
            try {
                portForwarder.shutdown();
            } catch (Exception e) {
            }
        }
    }

    @Test
    public void testObserver() throws Exception {
        this.latch = new CountDownLatch(2);
        setUp(-1);
        this.q3.start();
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        if (this.testObserverMaster.booleanValue()) {
            int port = this.q3.getQuorumPeer().observer.getSocket().getPort();
            LOG.info("port {} {}", Integer.valueOf(port), Integer.valueOf(this.OM_PORT));
            Assert.assertEquals("observer failed to connect to observer master", port, this.OM_PORT);
        }
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        this.zk.create("/obstest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)), "test");
        this.zk.sync("/", (AsyncCallback.VoidCallback) null, (Object) null);
        this.zk.setData("/obstest", "test2".getBytes(), -1);
        this.zk.getChildren("/", false);
        Assert.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        LOG.info("Shutting down server 2");
        this.q2.shutdown();
        Assert.assertTrue("Waiting for server 2 to shut down", ClientBase.waitForServerDown("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        LOG.info("Server 2 down");
        this.latch.await();
        Assert.assertNotSame("Client is still connected to non-quorate cluster", Watcher.Event.KeeperState.SyncConnected, this.lastEvent.getState());
        LOG.info("Latch returned");
        try {
            Assert.assertNotEquals("Shouldn't get a response when cluster not quorate!", "test", new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)));
        } catch (KeeperException.ConnectionLossException e) {
            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
        }
        this.latch = new CountDownLatch(1);
        LOG.info("Restarting server 2");
        this.q2.start();
        LOG.info("Waiting for server 2 to come up");
        Assert.assertTrue("waiting for server 2 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
        LOG.info("Server 2 started, waiting for latch");
        this.latch.await();
        Assert.assertTrue("Client didn't reconnect to quorate ensemble (state was" + this.lastEvent.getState() + ")", Watcher.Event.KeeperState.SyncConnected == this.lastEvent.getState() || Watcher.Event.KeeperState.Expired == this.lastEvent.getState());
        LOG.info("perform a revalidation test");
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        PortForwarder portForwarder = new PortForwarder(unique, this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP2 : this.CLIENT_PORT_QP1);
        this.latch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", Integer.valueOf(unique), Integer.valueOf(unique2)), ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        zooKeeper.create("/revalidtest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assert.assertNotNull("Read-after write failed", zooKeeper.exists("/revalidtest", (Watcher) null));
        this.latch = new CountDownLatch(2);
        PortForwarder portForwarder2 = new PortForwarder(unique2, this.CLIENT_PORT_OBS);
        try {
            portForwarder.shutdown();
        } catch (Exception e2) {
        }
        this.latch.await();
        Assert.assertEquals(new String(zooKeeper.getData("/revalidtest", (Watcher) null, (Stat) null)), "test");
        zooKeeper.close();
        portForwarder2.shutdown();
        shutdown();
    }

    @Test
    public void testRevalidation() throws Exception {
        setUp(-1);
        this.q3.start();
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        PortForwarder portForwarder = new PortForwarder(unique, this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP2 : this.CLIENT_PORT_QP1);
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", Integer.valueOf(unique), Integer.valueOf(unique2)), ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        this.zk.create("/revalidtest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assert.assertNotNull("Read-after write failed", this.zk.exists("/revalidtest", (Watcher) null));
        this.latch = new CountDownLatch(2);
        PortForwarder portForwarder2 = new PortForwarder(unique2, this.CLIENT_PORT_OBS);
        try {
            portForwarder.shutdown();
        } catch (Exception e) {
        }
        this.latch.await();
        Assert.assertEquals(new String(this.zk.getData("/revalidtest", (Watcher) null, (Stat) null)), "test");
        portForwarder2.shutdown();
        shutdown();
    }

    @Test
    public void testInOrderCommits() throws Exception {
        setUp(-1);
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, (Watcher) null);
        for (int i = 0; i < 10; i++) {
            this.zk.create("/bulk" + i, "Initial data of some size".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        this.zk.close();
        this.q3.start();
        Assert.assertTrue("waiting for observer to be up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        Assert.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        this.zk.create("/init", "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final long lastLoggedZxid = this.q1.getQuorumPeer().getLastLoggedZxid();
        waitFor("Timeout waiting for observer sync", new ZKTestCase.WaitForCondition() { // from class: org.apache.zookeeper.test.ObserverMasterTest.2
            @Override // org.apache.zookeeper.ZKTestCase.WaitForCondition
            public boolean evaluate() {
                return lastLoggedZxid == ObserverMasterTest.this.q3.getQuorumPeer().getLastLoggedZxid();
            }
        }, 30);
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        ZooKeeper zooKeeper2 = new ZooKeeper("127.0.0.1:" + (this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP1 : this.CLIENT_PORT_QP2), ClientBase.CONNECTION_TIMEOUT, this);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10001);
        Thread thread = new Thread(new AsyncWriter(zooKeeper, 10001, true, countDownLatch2, "/obs", countDownLatch));
        CountDownLatch countDownLatch3 = new CountDownLatch(10001);
        Thread thread2 = new Thread(new AsyncWriter(zooKeeper2, 10001, true, countDownLatch3, "/follower", countDownLatch));
        LOG.info("ASYNC WRITES");
        thread.start();
        thread2.start();
        countDownLatch.countDown();
        countDownLatch2.await();
        countDownLatch3.await();
        thread.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }
        thread2.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread2.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }
        zooKeeper.close();
        zooKeeper2.close();
        shutdown();
    }

    @Test
    public void testAdminCommands() throws IOException, MBeanException, InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, AttributeNotFoundException, InvalidAttributeValueException, KeeperException {
        Iterator it = MBeanRegistry.getInstance().getRegisteredBeans().iterator();
        while (it.hasNext()) {
            MBeanRegistry.getInstance().unregister((ZKMBeanInfo) it.next());
        }
        JMXEnv.setUp();
        setUp(-1);
        this.q3.start();
        Assert.assertTrue("waiting for observer to be up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        this.zk.create("/obstest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assert.assertEquals(new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)), "test");
        Assert.assertTrue("observer not emitting observer_master_id", Commands.runCommand("mntr", this.q3.getQuorumPeer().getActiveServer(), Collections.emptyMap()).toMap().containsKey("observer_master_id"));
        if (this.testObserverMaster.booleanValue()) {
            if (this.q1.getQuorumPeer().leader == null) {
                Assert.assertEquals(1, this.q1.getQuorumPeer().getSynced_observers_metric());
            } else {
                Assert.assertEquals(0, this.q1.getQuorumPeer().getSynced_observers_metric());
            }
        } else if (this.q1.getQuorumPeer().leader == null) {
            Assert.assertNull(this.q1.getQuorumPeer().getSynced_observers_metric());
        } else {
            Assert.assertEquals(1, this.q1.getQuorumPeer().getSynced_observers_metric());
        }
        if (this.testObserverMaster.booleanValue()) {
            if (this.q2.getQuorumPeer().leader == null) {
                Assert.assertEquals(1, this.q2.getQuorumPeer().getSynced_observers_metric());
            } else {
                Assert.assertEquals(0, this.q2.getQuorumPeer().getSynced_observers_metric());
            }
        } else if (this.q2.getQuorumPeer().leader == null) {
            Assert.assertNull(this.q2.getQuorumPeer().getSynced_observers_metric());
        } else {
            Assert.assertEquals(1, this.q2.getQuorumPeer().getSynced_observers_metric());
        }
        ObjectName objectName = null;
        Iterator it2 = JMXEnv.conn().queryNames(new ObjectName("org.apache.ZooKeeperService:*"), (QueryExp) null).iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            ObjectName objectName2 = (ObjectName) it2.next();
            if (objectName2.getCanonicalName().contains("Learner_Connections") && objectName2.getCanonicalName().contains("id:" + this.q3.getQuorumPeer().getId())) {
                objectName = objectName2;
                break;
            }
        }
        Assert.assertNotNull("could not find connection bean", objectName);
        this.latch = new CountDownLatch(1);
        JMXEnv.conn().invoke(objectName, "terminateConnection", new Object[0], (String[]) null);
        Assert.assertTrue("server failed to disconnect on terminate", this.latch.await(ClientBase.CONNECTION_TIMEOUT / 2, TimeUnit.MILLISECONDS));
        Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        Set queryNames = JMXEnv.conn().queryNames(new ObjectName(String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", Long.valueOf(this.q3.getQuorumPeer().getId()), Long.valueOf(this.q3.getQuorumPeer().getId()))), (QueryExp) null);
        Assert.assertEquals("expecting singular observer bean", 1L, queryNames.size());
        ObjectName objectName3 = (ObjectName) queryNames.iterator().next();
        if (this.testObserverMaster.booleanValue()) {
            long learnerMasterId = this.q3.getQuorumPeer().observer.getLearnerMasterId();
            this.latch = new CountDownLatch(1);
            JMXEnv.conn().setAttribute(objectName3, new Attribute("LearnerMaster", Long.toString(3 - learnerMasterId)));
            Assert.assertTrue("server failed to disconnect on terminate", this.latch.await(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
            Assert.assertTrue("waiting for server 3 being up", ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT));
        } else {
            try {
                JMXEnv.conn().setAttribute(objectName3, new Attribute("LearnerMaster", Long.toString(3 - (this.q1.getQuorumPeer().leader == null ? 2L : 1L))));
                Assert.fail("should have seen an exception on previous command");
            } catch (RuntimeMBeanException e) {
                Assert.assertEquals("mbean failed for the wrong reason", IllegalArgumentException.class, e.getCause().getClass());
            }
        }
        shutdown();
        JMXEnv.tearDown();
    }

    private String createServerString(String str, long j, int i) {
        return "server." + j + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":" + str + ";" + i;
    }

    private void waitServerUp(int i) {
        Assert.assertTrue("waiting for server being up", ClientBase.waitForServerUp("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT));
    }

    private ZooKeeperAdmin createAdmin(int i) throws IOException {
        System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU=");
        QuorumPeerConfig.setReconfigEnabled(true);
        ZooKeeperAdmin zooKeeperAdmin = new ZooKeeperAdmin("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE);
        zooKeeperAdmin.addAuthInfo("digest", "super:test".getBytes());
        return zooKeeperAdmin;
    }

    public void testDynamicReconfig() throws InterruptedException, IOException, KeeperException {
        if (this.testObserverMaster.booleanValue()) {
            ClientBase.setupTestEnv();
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            int unique3 = PortAssignment.unique();
            int unique4 = PortAssignment.unique();
            String str = createServerString("participant", 1L, unique) + "\n" + createServerString("participant", 2L, unique2);
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str, String.format("observerMasterPort=%d%n", Integer.valueOf(unique3)));
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str, String.format("observerMasterPort=%d%n", Integer.valueOf(unique4)));
            mainThread.start();
            mainThread2.start();
            waitServerUp(unique);
            waitServerUp(unique2);
            long j = mainThread.getQuorumPeer().leader == null ? unique3 : unique4;
            int unique5 = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread3 = new QuorumPeerTestBase.MainThread(10, unique5, str + "\n" + createServerString("observer", 10, unique5), String.format("observerMasterPort=%d%n", Long.valueOf(j)));
            LOG.info("starting observer");
            mainThread3.start();
            waitServerUp(unique5);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + unique5, ClientBase.CONNECTION_TIMEOUT, watchedEvent -> {
                try {
                    linkedBlockingQueue.put(watchedEvent.getState());
                } catch (InterruptedException e) {
                }
            });
            Assert.assertEquals(Watcher.Event.KeeperState.SyncConnected, (Watcher.Event.KeeperState) linkedBlockingQueue.poll(1000L, TimeUnit.MILLISECONDS));
            ArrayList arrayList = new ArrayList();
            arrayList.add("server.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;localhost:" + PortAssignment.unique());
            ZooKeeperAdmin createAdmin = createAdmin(unique);
            ReconfigTest.reconfig(createAdmin, arrayList, null, null, -1L);
            ReconfigTest.testServerHasConfig(zooKeeper, arrayList, null);
            Assert.assertNull((Watcher.Event.KeeperState) linkedBlockingQueue.poll(1000L, TimeUnit.MILLISECONDS));
            createAdmin.close();
            zooKeeper.close();
            mainThread3.shutdown();
            mainThread2.shutdown();
            mainThread.shutdown();
        }
    }

    @Override // org.apache.zookeeper.server.quorum.QuorumPeerTestBase
    public void process(WatchedEvent watchedEvent) {
        this.lastEvent = watchedEvent;
        if (this.latch != null) {
            this.latch.countDown();
        }
        LOG.info("Latch got event :: {}", watchedEvent);
    }
}
