package org.apache.kafka.connect.runtime.distributed;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.TestFuture;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;

@PrepareForTest({DistributedHerder.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.class */
public class DistributedHerderTest {
    private static final Map<String, String> HERDER_CONFIG = new HashMap();
    private static final String MEMBER_URL = "memberUrl";
    private static final String CONN1 = "sourceA";
    private static final String CONN2 = "sourceB";
    private static final ConnectorTaskId TASK0;
    private static final ConnectorTaskId TASK1;
    private static final ConnectorTaskId TASK2;
    private static final Integer MAX_TASKS;
    private static final Map<String, String> CONN1_CONFIG;
    private static final Map<String, String> CONN1_CONFIG_UPDATED;
    private static final Map<String, String> CONN2_CONFIG;
    private static final Map<String, String> TASK_CONFIG;
    private static final List<Map<String, String>> TASK_CONFIGS;
    private static final HashMap<ConnectorTaskId, Map<String, String>> TASK_CONFIGS_MAP;
    private static final ClusterConfigState SNAPSHOT;
    private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG;

    @Mock
    private KafkaConfigStorage configStorage;

    @Mock
    private WorkerGroupMember member;
    private MockTime time;
    private DistributedHerder herder;

    @Mock
    private Worker worker;

    @Mock
    private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
    private Callback<String> connectorConfigCallback;
    private Callback<List<ConnectorTaskId>> taskConfigCallback;
    private WorkerRebalanceListener rebalanceListener;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceConnector.class */
    private abstract class BogusSourceConnector extends SourceConnector {
        private BogusSourceConnector() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerderTest$BogusSourceTask.class */
    private abstract class BogusSourceTask extends SourceTask {
        private BogusSourceTask() {
        }
    }

    @Before
    public void setUp() throws Exception {
        this.worker = (Worker) PowerMock.createMock(Worker.class);
        EasyMock.expect(Boolean.valueOf(this.worker.isSinkConnector(CONN1))).andStubReturn(Boolean.FALSE);
        this.time = new MockTime();
        this.herder = (DistributedHerder) PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"}, new Object[]{new DistributedConfig(HERDER_CONFIG), this.worker, this.configStorage, this.member, MEMBER_URL, this.time});
        this.connectorConfigCallback = (Callback) Whitebox.invokeMethod(this.herder, "connectorConfigCallback", new Object[0]);
        this.taskConfigCallback = (Callback) Whitebox.invokeMethod(this.herder, "taskConfigCallback", new Object[0]);
        this.rebalanceListener = (WorkerRebalanceListener) Whitebox.invokeMethod(this.herder, "rebalanceListener", new Object[0]);
    }

    @Test
    public void testJoinAssignment() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.addTask((ConnectorTaskId) EasyMock.eq(TASK1), (TaskConfig) EasyMock.anyObject());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testHaltCleansUpWorker() {
        EasyMock.expect(this.worker.connectorNames()).andReturn(Collections.singleton(CONN1));
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.taskIds()).andReturn(Collections.singleton(TASK1));
        this.worker.stopTask(TASK1);
        PowerMock.expectLastCall();
        this.member.stop();
        PowerMock.expectLastCall();
        this.configStorage.stop();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.halt();
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(true, new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.emptyList())));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN2, CONN2_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testCreateConnectorAlreadyExists() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) EasyMock.anyObject(), EasyMock.isNull());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG, false, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testDestroyConnector() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.wakeup();
        PowerMock.expectLastCall();
        this.configStorage.putConnectorConfig(CONN1, (Map) null);
        PowerMock.expectLastCall();
        this.putConnectorCallback.onCompletion((Throwable) null, new Herder.Created(false, (Object) null));
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.putConnectorConfig(CONN1, (Map) null, true, this.putConnectorCallback);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configStorage.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Arrays.asList(CONN1), Collections.emptyList());
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.connectorConfigCallback.onCompletion((Throwable) null, CONN1);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorConfigUpdate() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        EasyMock.expect(this.worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configStorage.snapshot()).andReturn(SNAPSHOT);
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall();
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.connectorConfigCallback.onCompletion((Throwable) null, CONN1);
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testTaskConfigAdded() {
        EasyMock.expect(this.member.memberId()).andStubReturn("member");
        expectRebalance(-1L, Collections.emptyList(), Collections.emptyList());
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.wakeup();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        EasyMock.expect(this.configStorage.snapshot()).andReturn(SNAPSHOT);
        this.member.requestRejoin();
        PowerMock.expectLastCall();
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 0, 1L, Collections.emptyList(), Arrays.asList(TASK0));
        this.worker.addTask((ConnectorTaskId) EasyMock.eq(TASK0), (TaskConfig) EasyMock.anyObject());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.taskConfigCallback.onCompletion((Throwable) null, Arrays.asList(TASK0, TASK1, TASK2));
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testJoinLeaderCatchUpFails() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(Collections.emptyList(), Collections.emptyList(), (short) 1, 1L, Collections.emptyList(), Collections.emptyList());
        TestFuture testFuture = new TestFuture();
        testFuture.resolveOnGet((Throwable) new TimeoutException());
        EasyMock.expect(this.configStorage.readToEnd()).andReturn(testFuture);
        PowerMock.expectPrivate(this.herder, "backoff", new Object[]{300000});
        this.member.requestRejoin();
        expectRebalance(1L, Arrays.asList(CONN1), Arrays.asList(TASK1));
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.worker.addTask((ConnectorTaskId) EasyMock.eq(TASK1), (TaskConfig) EasyMock.anyObject());
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.herder.tick();
        this.herder.tick();
        PowerMock.verifyAll();
    }

    @Test
    public void testAccessors() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Collections.emptyList(), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectors(futureCallback);
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.connectorInfo(CONN1, futureCallback2);
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        FutureCallback futureCallback4 = new FutureCallback();
        this.herder.taskConfigs(CONN1, futureCallback4);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(Collections.singleton(CONN1), futureCallback.get());
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new ConnectorInfo(CONN1, CONN1_CONFIG, Arrays.asList(TASK0, TASK1, TASK2)), futureCallback2.get());
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback3.get());
        Assert.assertTrue(futureCallback4.isDone());
        Assert.assertEquals(Arrays.asList(new TaskInfo(TASK0, TASK_CONFIG), new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), futureCallback4.get());
        PowerMock.verifyAll();
    }

    @Test
    public void testPutConnectorConfig() throws Exception {
        EasyMock.expect(this.member.memberId()).andStubReturn("leader");
        expectRebalance(1L, Arrays.asList(CONN1), Collections.emptyList());
        expectPostRebalanceCatchup(SNAPSHOT);
        this.worker.addConnector((ConnectorConfig) EasyMock.anyObject(), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.wakeup();
        PowerMock.expectLastCall().anyTimes();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.1
            public Object answer() throws Throwable {
                DistributedHerderTest.this.connectorConfigCallback.onCompletion((Throwable) null, DistributedHerderTest.CONN1);
                return null;
            }
        });
        EasyMock.expect(this.configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
        this.worker.stopConnector(CONN1);
        PowerMock.expectLastCall();
        Capture newCapture = EasyMock.newCapture();
        this.worker.addConnector((ConnectorConfig) EasyMock.capture(newCapture), (ConnectorContext) EasyMock.anyObject());
        PowerMock.expectLastCall();
        EasyMock.expect(this.worker.connectorTaskConfigs(CONN1, MAX_TASKS.intValue(), (List) null)).andReturn(TASK_CONFIGS);
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        this.member.ensureActive();
        PowerMock.expectLastCall();
        this.member.poll(EasyMock.anyInt());
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        FutureCallback futureCallback = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback);
        this.herder.tick();
        Assert.assertTrue(futureCallback.isDone());
        Assert.assertEquals(CONN1_CONFIG, futureCallback.get());
        FutureCallback futureCallback2 = new FutureCallback();
        this.herder.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED, true, futureCallback2);
        this.herder.tick();
        Assert.assertTrue(futureCallback2.isDone());
        Assert.assertEquals(new Herder.Created(false, new ConnectorInfo(CONN1, CONN1_CONFIG_UPDATED, Arrays.asList(TASK0, TASK1, TASK2))), futureCallback2.get());
        FutureCallback futureCallback3 = new FutureCallback();
        this.herder.connectorConfig(CONN1, futureCallback3);
        this.herder.tick();
        Assert.assertTrue(futureCallback3.isDone());
        Assert.assertEquals(CONN1_CONFIG_UPDATED, futureCallback3.get());
        Assert.assertEquals(Arrays.asList("foo", "bar", "baz"), ((ConnectorConfig) newCapture.getValue()).getList("topics"));
        PowerMock.verifyAll();
    }

    @Test
    public void testInconsistentConfigs() throws Exception {
    }

    private void expectRebalance(long j, List<String> list, List<ConnectorTaskId> list2) {
        expectRebalance(null, null, (short) 0, j, list, list2);
    }

    private void expectRebalance(final Collection<String> collection, final List<ConnectorTaskId> list, final short s, final long j, final List<String> list2, final List<ConnectorTaskId> list3) {
        this.member.ensureActive();
        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerderTest.2
            public Object answer() throws Throwable {
                if (collection != null) {
                    DistributedHerderTest.this.rebalanceListener.onRevoked("leader", collection, list);
                }
                DistributedHerderTest.this.rebalanceListener.onAssigned(new ConnectProtocol.Assignment(s, "leader", "leaderUrl", j, list2, list3));
                return null;
            }
        });
        this.member.wakeup();
        PowerMock.expectLastCall();
    }

    private void expectPostRebalanceCatchup(ClusterConfigState clusterConfigState) {
        TestFuture testFuture = new TestFuture();
        testFuture.resolveOnGet((TestFuture) null);
        EasyMock.expect(this.configStorage.readToEnd()).andReturn(testFuture);
        EasyMock.expect(this.configStorage.snapshot()).andReturn(clusterConfigState);
    }

    static {
        HERDER_CONFIG.put("config.storage.topic", "config-topic");
        HERDER_CONFIG.put("bootstrap.servers", "localhost:9092");
        HERDER_CONFIG.put("group.id", "test-connect-group");
        HERDER_CONFIG.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        HERDER_CONFIG.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        TASK0 = new ConnectorTaskId(CONN1, 0);
        TASK1 = new ConnectorTaskId(CONN1, 1);
        TASK2 = new ConnectorTaskId(CONN1, 2);
        MAX_TASKS = 3;
        CONN1_CONFIG = new HashMap();
        CONN1_CONFIG.put("name", CONN1);
        CONN1_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN1_CONFIG.put("topics", "foo,bar");
        CONN1_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        CONN1_CONFIG_UPDATED = new HashMap(CONN1_CONFIG);
        CONN1_CONFIG_UPDATED.put("topics", "foo,bar,baz");
        CONN2_CONFIG = new HashMap();
        CONN2_CONFIG.put("name", CONN2);
        CONN2_CONFIG.put("tasks.max", MAX_TASKS.toString());
        CONN2_CONFIG.put("topics", "foo,bar");
        CONN2_CONFIG.put("connector.class", BogusSourceConnector.class.getName());
        TASK_CONFIG = new HashMap();
        TASK_CONFIG.put("task.class", BogusSourceTask.class.getName());
        TASK_CONFIGS = new ArrayList();
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS.add(TASK_CONFIG);
        TASK_CONFIGS_MAP = new HashMap<>();
        TASK_CONFIGS_MAP.put(TASK0, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK1, TASK_CONFIG);
        TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
        SNAPSHOT = new ClusterConfigState(1L, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.emptySet());
        SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1L, Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.emptySet());
    }
}
