package org.apache.zookeeper;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest.class */
public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase {
    private static final int SERVER_COUNT = 3;
    private static final int SESSION_TIMEOUT = 40000;
    public static final int CONNECTION_TIMEOUT = 30000;
    private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator();
    private volatile CustomZooKeeper zk = null;
    private volatile FragileClientCnxnSocketNIO socket = null;
    private volatile CustomClientCnxn cnxn = null;

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$ClientWatcher.class */
    class ClientWatcher implements Watcher {
        private ZooKeeper zk;
        private boolean sessionExpired = false;

        ClientWatcher() {
        }

        void watchFor(ZooKeeper zooKeeper) {
            this.zk = zooKeeper;
        }

        public void process(WatchedEvent watchedEvent) {
            ClientCnxnSocketFragilityTest.LOG.info("Watcher got {}", watchedEvent);
            if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
                this.sessionExpired = true;
            }
        }

        boolean isSessionExpired() {
            return this.sessionExpired;
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$CustomClientCnxn.class */
    class CustomClientCnxn extends ClientCnxn {
        private volatile boolean closing;
        private volatile boolean hitUnsafeRegion;

        public CustomClientCnxn(String str, HostProvider hostProvider, int i, ZKClientConfig zKClientConfig, Watcher watcher, ClientCnxnSocket clientCnxnSocket, boolean z) throws IOException {
            super(str, hostProvider, i, zKClientConfig, watcher, clientCnxnSocket, z);
            this.closing = false;
            this.hitUnsafeRegion = false;
        }

        void attemptClose() {
            this.closing = true;
        }

        void waitUntilHitUnsafeRegion() {
            while (!this.hitUnsafeRegion) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
        }

        protected void onConnecting(InetSocketAddress inetSocketAddress) {
            if (this.closing) {
                ClientCnxnSocketFragilityTest.LOG.info("Attempt to connnecting {} {} {}", new Object[]{inetSocketAddress, Boolean.valueOf(this.closing), this.state});
                this.hitUnsafeRegion = true;
                ClientCnxnSocketFragilityTest.this.unsafeCoordinator.sync(this.closing);
            }
        }

        public void disconnect() {
            Assertions.assertTrue(this.closing);
            ClientCnxnSocketFragilityTest.LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", new Object[]{Long.toHexString(getSessionId()), Boolean.valueOf(this.closing), this.state});
            this.sendThread.close();
            ClientCnxnSocketFragilityTest.this.unsafeCoordinator.sync(this.closing);
            try {
                this.sendThread.join();
            } catch (InterruptedException e) {
                ClientCnxnSocketFragilityTest.LOG.warn("Got interrupted while waiting for the sender thread to close", e);
            }
            this.eventThread.queueEventOfDeath();
            if (this.zooKeeperSaslClient != null) {
                this.zooKeeperSaslClient.shutdown();
            }
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$CustomZooKeeper.class */
    class CustomZooKeeper extends ZooKeeper {
        public CustomZooKeeper(String str, int i, Watcher watcher) throws IOException {
            super(str, i, watcher);
        }

        public boolean isAlive() {
            return this.cnxn.getState().isAlive();
        }

        ClientCnxn createConnection(String str, HostProvider hostProvider, int i, ZKClientConfig zKClientConfig, Watcher watcher, ClientCnxnSocket clientCnxnSocket, boolean z) throws IOException {
            Assertions.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO);
            ClientCnxnSocketFragilityTest.this.socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
            ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(str, hostProvider, i, zKClientConfig, watcher, clientCnxnSocket, z);
            return ClientCnxnSocketFragilityTest.this.cnxn;
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$FragileClientCnxnSocketNIO.class */
    public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO {
        private volatile boolean mute;

        public FragileClientCnxnSocketNIO(ZKClientConfig zKClientConfig) throws IOException {
            super(zKClientConfig);
            this.mute = false;
        }

        synchronized void mute() {
            if (this.mute) {
                return;
            }
            ClientCnxnSocketFragilityTest.LOG.info("Fire socket mute");
            this.mute = true;
        }

        synchronized void unmute() {
            if (this.mute) {
                ClientCnxnSocketFragilityTest.LOG.info("Fire socket unmute");
                this.mute = false;
            }
        }

        void doTransport(int i, Queue<ClientCnxn.Packet> queue, ClientCnxn clientCnxn) throws IOException, InterruptedException {
            if (this.mute) {
                throw new IOException("Socket is mute");
            }
            super.doTransport(i, queue, clientCnxn);
        }

        void connect(InetSocketAddress inetSocketAddress) throws IOException {
            if (this.mute) {
                throw new IOException("Socket is mute");
            }
            super.connect(inetSocketAddress);
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$GetDataRetryForeverBackgroundTask.class */
    class GetDataRetryForeverBackgroundTask extends Thread {
        private volatile boolean alive = false;
        private final CustomZooKeeper zk;
        private final String path;

        GetDataRetryForeverBackgroundTask(CustomZooKeeper customZooKeeper, String str) {
            this.zk = customZooKeeper;
            this.path = str;
            setDaemon(true);
        }

        void startTask() {
            this.alive = true;
            start();
        }

        void syncCloseTask() throws InterruptedException {
            this.alive = false;
            join();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.alive) {
                try {
                    this.zk.getData(this.path, false, new Stat());
                    TimeUnit.MILLISECONDS.sleep(500L);
                } catch (Exception e) {
                    ClientCnxnSocketFragilityTest.LOG.info("zookeeper getData failed on path {}", this.path);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/zookeeper/ClientCnxnSocketFragilityTest$UnsafeCoordinator.class */
    class UnsafeCoordinator {
        private CountDownLatch syncLatch = new CountDownLatch(2);

        UnsafeCoordinator() {
        }

        void sync(boolean z) {
            ClientCnxnSocketFragilityTest.LOG.info("Attempt to sync with {}", Boolean.valueOf(z));
            if (z) {
                this.syncLatch.countDown();
                try {
                    this.syncLatch.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    private String getCxnString(int[] iArr) {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i < iArr.length; i++) {
            stringBuffer.append("127.0.0.1:");
            stringBuffer.append(iArr[i]);
            if (i != iArr.length - 1) {
                stringBuffer.append(',');
            }
        }
        return stringBuffer.toString();
    }

    private void closeZookeeper(ZooKeeper zooKeeper) {
        Executors.newSingleThreadExecutor().submit(() -> {
            try {
                LOG.info("closeZookeeper is fired");
                zooKeeper.close();
            } catch (InterruptedException e) {
            }
        });
    }

    @Test
    public void testClientCnxnSocketFragility() throws Exception {
        System.setProperty("zookeeper.clientCnxnSocket", FragileClientCnxnSocketNIO.class.getName());
        System.setProperty("zookeeper.request.timeout", "1000");
        int[] iArr = new int[3];
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 3; i++) {
            iArr[i] = PortAssignment.unique();
            sb.append(("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;127.0.0.1:" + iArr[i]) + "\n");
        }
        String sb2 = sb.toString();
        QuorumPeerTestBase.MainThread[] mainThreadArr = new QuorumPeerTestBase.MainThread[3];
        for (int i2 = 0; i2 < 3; i2++) {
            mainThreadArr[i2] = new QuorumPeerTestBase.MainThread(i2, iArr[i2], sb2, false);
            mainThreadArr[i2].start();
        }
        for (int i3 = 0; i3 < 3; i3++) {
            Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + iArr[i3], 30000L), "waiting for server " + i3 + " being up");
        }
        ClientWatcher clientWatcher = new ClientWatcher();
        this.zk = new CustomZooKeeper(getCxnString(iArr), SESSION_TIMEOUT, clientWatcher);
        clientWatcher.watchFor(this.zk);
        this.zk.create("/testClientCnxnSocketFragility", "balabala".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assertions.assertEquals(new String(this.zk.getData("/testClientCnxnSocketFragility", false, new Stat())), "balabala");
        Assertions.assertTrue(!clientWatcher.isSessionExpired());
        this.socket.mute();
        boolean z = false;
        try {
            this.zk.getData("/testClientCnxnSocketFragility", false, new Stat());
        } catch (KeeperException e) {
            z = true;
            Assertions.assertFalse(e instanceof KeeperException.SessionExpiredException);
        }
        this.socket.unmute();
        Assertions.assertTrue(z);
        Assertions.assertTrue(!clientWatcher.isSessionExpired());
        GetDataRetryForeverBackgroundTask getDataRetryForeverBackgroundTask = new GetDataRetryForeverBackgroundTask(this.zk, "/testClientCnxnSocketFragility");
        getDataRetryForeverBackgroundTask.startTask();
        this.socket.mute();
        this.cnxn.attemptClose();
        this.cnxn.waitUntilHitUnsafeRegion();
        closeZookeeper(this.zk);
        TimeUnit.MILLISECONDS.sleep(3000L);
        Assertions.assertTrue(!this.zk.isAlive());
        Assertions.assertTrue(!clientWatcher.isSessionExpired());
        getDataRetryForeverBackgroundTask.syncCloseTask();
        for (int i4 = 0; i4 < 3; i4++) {
            mainThreadArr[i4].shutdown();
        }
    }
}
