package org.apache.curator.framework.recipes.queue;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.ErrorListenerPathAndBytesable;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.queue.DistributedQueue;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/curator/framework/recipes/queue/TestDistributedQueue.class */
public class TestDistributedQueue extends BaseClassForTests {
    private static final String QUEUE_PATH = "/a/queue";
    private static final QueueSerializer<TestQueueItem> serializer = new QueueItemSerializer();

    @Test
    public void testRetryAfterFailure_Curator56() throws Exception {
        DistributedQueue distributedQueue = null;
        final CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            final ArrayList arrayList = new ArrayList();
            distributedQueue = QueueBuilder.builder(newClient, new QueueConsumer<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.1
                public void consumeMessage(TestQueueItem testQueueItem) throws Exception {
                    arrayList.add(((List) newClient.getChildren().forPath(TestDistributedQueue.QUEUE_PATH)).get(0));
                    if (countDownLatch.getCount() > 1) {
                        countDownLatch.countDown();
                        throw new Exception("Something went wrong");
                    }
                    countDownLatch.countDown();
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            }, serializer, QUEUE_PATH).lockPath("/lock").buildQueue();
            distributedQueue.start();
            distributedQueue.put(new TestQueueItem("test"));
            countDownLatch.await(10L, TimeUnit.SECONDS);
            Assertions.assertEquals(countDownLatch.getCount(), 0L, "Queue item was not consumed. Retry counter is " + countDownLatch.getCount());
            Assertions.assertEquals(arrayList.size(), 2);
            Assertions.assertEquals(((String) arrayList.get(0)).length(), ((String) arrayList.get(1)).length(), "name1: " + ((String) arrayList.get(0)) + " - name2: " + ((String) arrayList.get(1)));
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testCustomExecutor() throws Exception {
        Timing timing = new Timing();
        DistributedQueue<String> distributedQueue = null;
        final CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        newClient.start();
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1000);
            QueueConsumer<String> queueConsumer = new QueueConsumer<String>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.2
                public void consumeMessage(String str) throws Exception {
                    countDownLatch.countDown();
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            };
            QueueSerializer<String> queueSerializer = new QueueSerializer<String>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.3
                public byte[] serialize(String str) {
                    return str.getBytes();
                }

                /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                public String m20deserialize(byte[] bArr) {
                    return new String(bArr);
                }
            };
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            final HashSet newHashSet = Sets.newHashSet();
            final HashSet newHashSet2 = Sets.newHashSet();
            distributedQueue = new DistributedQueue<String>(newClient, queueConsumer, queueSerializer, QUEUE_PATH, QueueBuilder.defaultThreadFactory, newCachedThreadPool, Integer.MAX_VALUE, false, "/lock", Integer.MAX_VALUE, true, timing.milliseconds()) { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.4
                protected boolean processWithLockSafety(String str, DistributedQueue.ProcessType processType) throws Exception {
                    if (newHashSet.contains(str)) {
                        newHashSet2.add(str);
                    } else {
                        newHashSet.add(str);
                    }
                    if (newClient.getState() == CuratorFrameworkState.STARTED) {
                        return super.processWithLockSafety(str, processType);
                    }
                    return false;
                }
            };
            distributedQueue.start();
            for (int i = 0; i < 1000; i++) {
                distributedQueue.put(Integer.toString(i));
            }
            Assertions.assertTrue(timing.awaitLatch(countDownLatch));
            Assertions.assertTrue(newHashSet2.size() == 0, newHashSet2.toString());
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testPutListener() throws Exception {
        DistributedQueue distributedQueue = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer((ConnectionStateListener) Mockito.mock(ConnectionStateListener.class));
            distributedQueue = QueueBuilder.builder(newClient, blockingQueueConsumer, serializer, QUEUE_PATH).buildQueue();
            distributedQueue.start();
            QueueTestProducer queueTestProducer = new QueueTestProducer(distributedQueue, 10, 0);
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            distributedQueue.getPutListenerContainer().addListener(new QueuePutListener<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.5
                public void putCompleted(TestQueueItem testQueueItem) {
                    atomicInteger.incrementAndGet();
                }

                public void putMultiCompleted(MultiItem<TestQueueItem> multiItem) {
                }
            });
            Executors.newCachedThreadPool().submit(queueTestProducer);
            int i = 0;
            while (blockingQueueConsumer.size() < 10) {
                i++;
                Assertions.assertTrue(i < 10);
                Thread.sleep(1000L);
            }
            int i2 = 0;
            Iterator it = blockingQueueConsumer.getItems().iterator();
            while (it.hasNext()) {
                int i3 = i2;
                i2++;
                Assertions.assertEquals(((TestQueueItem) it.next()).str, Integer.toString(i3));
            }
            Assertions.assertEquals(atomicInteger.get(), 10);
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testErrorMode() throws Exception {
        Timing timing = new Timing();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        newClient.start();
        try {
            final AtomicReference atomicReference = new AtomicReference(new CountDownLatch(1));
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            DistributedQueue buildQueue = QueueBuilder.builder(newClient, new QueueConsumer<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.6
                public void consumeMessage(TestQueueItem testQueueItem) throws Exception {
                    if (atomicInteger.incrementAndGet() < 2) {
                        throw new Exception();
                    }
                    ((CountDownLatch) atomicReference.get()).countDown();
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            }, serializer, QUEUE_PATH).lockPath("/locks").buildQueue();
            try {
                buildQueue.start();
                buildQueue.put(new TestQueueItem("1"));
                Assertions.assertTrue(timing.awaitLatch((CountDownLatch) atomicReference.get()));
                Assertions.assertEquals(atomicInteger.get(), 2);
                buildQueue.setErrorMode(ErrorMode.DELETE);
                atomicInteger.set(0);
                atomicReference.set(new CountDownLatch(1));
                buildQueue.put(new TestQueueItem("1"));
                Assertions.assertFalse(((CountDownLatch) atomicReference.get()).await(5L, TimeUnit.SECONDS));
                Assertions.assertEquals(atomicInteger.get(), 1);
                buildQueue.close();
            } catch (Throwable th) {
                buildQueue.close();
                throw th;
            }
        } finally {
            newClient.close();
        }
    }

    @Test
    public void testNoDuplicateProcessing() throws Exception {
        Timing timing = new Timing();
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new ExponentialBackoffRetry(100, 3));
        newClient.start();
        try {
            DistributedQueue buildQueue = QueueBuilder.builder(newClient, (QueueConsumer) null, serializer, QUEUE_PATH).buildQueue();
            try {
                buildQueue.start();
                for (int i = 0; i < 1000; i++) {
                    buildQueue.put(new TestQueueItem(Integer.toString(i)));
                }
                buildQueue.flushPuts(timing.multiple(2.0d).seconds(), TimeUnit.SECONDS);
                buildQueue.close();
                final HashSet newHashSet = Sets.newHashSet();
                final HashSet newHashSet2 = Sets.newHashSet();
                final CountDownLatch countDownLatch = new CountDownLatch(1000);
                ArrayList newArrayList = Lists.newArrayList();
                ArrayList newArrayList2 = Lists.newArrayList();
                try {
                    QueueConsumer<TestQueueItem> queueConsumer = new QueueConsumer<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.7
                        public void consumeMessage(TestQueueItem testQueueItem) {
                            synchronized (newHashSet) {
                                if (newHashSet.contains(testQueueItem.str)) {
                                    newHashSet2.add(testQueueItem.str);
                                }
                                newHashSet.add(testQueueItem.str);
                            }
                            countDownLatch.countDown();
                        }

                        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                        }
                    };
                    for (int i2 = 0; i2 < 4; i2++) {
                        CuratorFramework newClient2 = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
                        newArrayList2.add(newClient2);
                        newClient2.start();
                        newArrayList.add(QueueBuilder.builder(newClient2, queueConsumer, serializer, QUEUE_PATH).lockPath("/a/locks").buildQueue());
                    }
                    Iterator it = newArrayList.iterator();
                    while (it.hasNext()) {
                        ((DistributedQueue) it.next()).start();
                    }
                    timing.awaitLatch(countDownLatch);
                    Assertions.assertTrue(newHashSet2.size() == 0, newHashSet2.toString());
                    Iterator it2 = newArrayList.iterator();
                    while (it2.hasNext()) {
                        CloseableUtils.closeQuietly((DistributedQueue) it2.next());
                    }
                    Iterator it3 = newArrayList2.iterator();
                    while (it3.hasNext()) {
                        CloseableUtils.closeQuietly((CuratorFramework) it3.next());
                    }
                } catch (Throwable th) {
                    Iterator it4 = newArrayList.iterator();
                    while (it4.hasNext()) {
                        CloseableUtils.closeQuietly((DistributedQueue) it4.next());
                    }
                    Iterator it5 = newArrayList2.iterator();
                    while (it5.hasNext()) {
                        CloseableUtils.closeQuietly((CuratorFramework) it5.next());
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                buildQueue.close();
                throw th2;
            }
        } finally {
            newClient.close();
        }
    }

    @Test
    public void testSafetyWithCrash() throws Exception {
        DistributedQueue distributedQueue = null;
        DistributedQueue distributedQueue2 = null;
        DistributedQueue distributedQueue3 = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        CuratorFramework newClient2 = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        CuratorFramework newClient3 = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        try {
            newClient.start();
            newClient2.start();
            newClient3.start();
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            distributedQueue = QueueBuilder.builder(newClient, (QueueConsumer) null, serializer, QUEUE_PATH).buildQueue();
            distributedQueue.start();
            newCachedThreadPool.submit(new QueueTestProducer(distributedQueue, 100, 0));
            final TreeSet newTreeSet = Sets.newTreeSet();
            final TreeSet newTreeSet2 = Sets.newTreeSet();
            final TreeSet newTreeSet3 = Sets.newTreeSet();
            final AtomicReference atomicReference = new AtomicReference(null);
            distributedQueue2 = QueueBuilder.builder(newClient2, new QueueConsumer<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.8
                public void consumeMessage(TestQueueItem testQueueItem) throws Exception {
                    synchronized (newTreeSet) {
                        if (newTreeSet.size() > 10) {
                            atomicReference.set(testQueueItem);
                            throw new Exception("dummy");
                        }
                    }
                    TestDistributedQueue.this.addToTakenItems(testQueueItem, newTreeSet, 100);
                    synchronized (newTreeSet2) {
                        newTreeSet2.add(testQueueItem);
                    }
                    Thread.sleep((long) (Math.random() * 5.0d));
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            }, serializer, QUEUE_PATH).lockPath("/a/locks").buildQueue();
            distributedQueue2.start();
            distributedQueue3 = QueueBuilder.builder(newClient3, new QueueConsumer<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.9
                public void consumeMessage(TestQueueItem testQueueItem) throws Exception {
                    TestDistributedQueue.this.addToTakenItems(testQueueItem, newTreeSet, 100);
                    synchronized (newTreeSet3) {
                        newTreeSet3.add(testQueueItem);
                    }
                    Thread.sleep((long) (Math.random() * 5.0d));
                }

                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                }
            }, serializer, QUEUE_PATH).lockPath("/a/locks").buildQueue();
            distributedQueue3.start();
            synchronized (newTreeSet) {
                while (newTreeSet.size() < 100) {
                    newTreeSet.wait(1000L);
                }
            }
            int i = 0;
            Iterator it = newTreeSet.iterator();
            while (it.hasNext()) {
                int i2 = i;
                i++;
                Assertions.assertEquals(((TestQueueItem) it.next()).str, Integer.toString(i2));
            }
            Assertions.assertNotNull(atomicReference.get());
            Assertions.assertTrue(newTreeSet3.contains(atomicReference.get()));
            Assertions.assertTrue(Sets.intersection(newTreeSet2, newTreeSet3).size() == 0);
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(distributedQueue2);
            CloseableUtils.closeQuietly(distributedQueue3);
            CloseableUtils.closeQuietly(newClient);
            CloseableUtils.closeQuietly(newClient2);
            CloseableUtils.closeQuietly(newClient3);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(distributedQueue2);
            CloseableUtils.closeQuietly(distributedQueue3);
            CloseableUtils.closeQuietly(newClient);
            CloseableUtils.closeQuietly(newClient2);
            CloseableUtils.closeQuietly(newClient3);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addToTakenItems(TestQueueItem testQueueItem, Set<TestQueueItem> set, int i) {
        synchronized (set) {
            set.add(testQueueItem);
            if (set.size() > i) {
                set.notifyAll();
            }
        }
    }

    @Test
    public void testSafetyBasic() throws Exception {
        DistributedQueue distributedQueue = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            final BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer((ConnectionStateListener) Mockito.mock(ConnectionStateListener.class));
            distributedQueue = QueueBuilder.builder(newClient, blockingQueueConsumer, serializer, QUEUE_PATH).lockPath("/a/locks").buildQueue();
            distributedQueue.start();
            QueueTestProducer queueTestProducer = new QueueTestProducer(distributedQueue, 10, 0);
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            newCachedThreadPool.submit(queueTestProducer);
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            newCachedThreadPool.submit(new Callable<Object>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.10
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    for (int i = 0; i < 10; i++) {
                        Assertions.assertEquals(((TestQueueItem) blockingQueueConsumer.take()).str, Integer.toString(i));
                    }
                    countDownLatch.countDown();
                    return null;
                }
            });
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testPutMulti() throws Exception {
        DistributedQueue distributedQueue = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer((ConnectionStateListener) Mockito.mock(ConnectionStateListener.class));
            distributedQueue = QueueBuilder.builder(newClient, blockingQueueConsumer, serializer, QUEUE_PATH).buildQueue();
            distributedQueue.start();
            distributedQueue.putMulti(new MultiItem<TestQueueItem>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.11
                private int index = 0;

                /* renamed from: nextItem, reason: merged with bridge method [inline-methods] */
                public TestQueueItem m19nextItem() throws Exception {
                    if (this.index >= 100) {
                        return null;
                    }
                    int i = this.index;
                    this.index = i + 1;
                    return new TestQueueItem(Integer.toString(i));
                }
            });
            for (int i = 0; i < 100; i++) {
                TestQueueItem testQueueItem = (TestQueueItem) blockingQueueConsumer.take(1, TimeUnit.SECONDS);
                Assertions.assertNotNull(testQueueItem);
                Assertions.assertEquals(testQueueItem, new TestQueueItem(Integer.toString(i)));
            }
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testMultiPutterSingleGetter() throws Exception {
        DistributedQueue distributedQueue = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer((ConnectionStateListener) Mockito.mock(ConnectionStateListener.class));
            distributedQueue = QueueBuilder.builder(newClient, blockingQueueConsumer, serializer, QUEUE_PATH).buildQueue();
            distributedQueue.start();
            QueueTestProducer queueTestProducer = new QueueTestProducer(distributedQueue, 50, 0);
            QueueTestProducer queueTestProducer2 = new QueueTestProducer(distributedQueue, 50, 50);
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            newCachedThreadPool.submit(queueTestProducer);
            newCachedThreadPool.submit(queueTestProducer2);
            int i = 0;
            while (blockingQueueConsumer.size() < 100) {
                i++;
                Assertions.assertTrue(i < 10);
                Thread.sleep(1000L);
            }
            List items = blockingQueueConsumer.getItems();
            Assertions.assertEquals(Sets.newHashSet(items).size(), items.size());
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testFlush() throws Exception {
        final Timing timing = new Timing();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        DistributedQueue<TestQueueItem> distributedQueue = null;
        final CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
        newClient.start();
        try {
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            distributedQueue = new DistributedQueue<TestQueueItem>(newClient, null, serializer, "/test", new ThreadFactoryBuilder().build(), MoreExecutors.directExecutor(), 10, true, null, Integer.MAX_VALUE, true, 0) { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.12
                void internalCreateNode(final String str, final byte[] bArr, final BackgroundCallback backgroundCallback) throws Exception {
                    if (atomicBoolean.compareAndSet(true, false)) {
                        Executors.newSingleThreadExecutor().submit(new Callable<Object>() { // from class: org.apache.curator.framework.recipes.queue.TestDistributedQueue.12.1
                            @Override // java.util.concurrent.Callable
                            public Object call() throws Exception {
                                countDownLatch.await();
                                timing.sleepABit();
                                ((ErrorListenerPathAndBytesable) ((ACLBackgroundPathAndBytesable) newClient.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)).inBackground(backgroundCallback)).forPath(str, bArr);
                                return null;
                            }
                        });
                    } else {
                        super.internalCreateNode(str, bArr, backgroundCallback);
                    }
                }
            };
            distributedQueue.start();
            distributedQueue.put(new TestQueueItem("1"));
            Assertions.assertFalse(distributedQueue.flushPuts(timing.forWaiting().seconds(), TimeUnit.SECONDS));
            countDownLatch.countDown();
            Assertions.assertTrue(distributedQueue.flushPuts(timing.forWaiting().seconds(), TimeUnit.SECONDS));
            if (countDownLatch.getCount() > 0) {
                countDownLatch.countDown();
            }
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            if (countDownLatch.getCount() > 0) {
                countDownLatch.countDown();
            }
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }

    @Test
    public void testSimple() throws Exception {
        DistributedQueue distributedQueue = null;
        CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.server.getConnectString(), new RetryOneTime(1));
        newClient.start();
        try {
            BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer((ConnectionStateListener) Mockito.mock(ConnectionStateListener.class));
            distributedQueue = QueueBuilder.builder(newClient, blockingQueueConsumer, serializer, QUEUE_PATH).buildQueue();
            distributedQueue.start();
            Executors.newCachedThreadPool().submit(new QueueTestProducer(distributedQueue, 10, 0));
            int i = 0;
            while (blockingQueueConsumer.size() < 10) {
                i++;
                Assertions.assertTrue(i < 10);
                Thread.sleep(1000L);
            }
            int i2 = 0;
            Iterator it = blockingQueueConsumer.getItems().iterator();
            while (it.hasNext()) {
                int i3 = i2;
                i2++;
                Assertions.assertEquals(((TestQueueItem) it.next()).str, Integer.toString(i3));
            }
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
        } catch (Throwable th) {
            CloseableUtils.closeQuietly(distributedQueue);
            CloseableUtils.closeQuietly(newClient);
            throw th;
        }
    }
}
