package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.Collections;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;

@Category({ReplicationTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.class */
public class TestReplicationSourceManager {
    private static Configuration CONF;
    private static FileSystem FS;
    private static RegionInfo RI;
    private static NavigableMap<byte[], Integer> SCOPES;

    @Rule
    public final TestName name = new TestName();
    private Path oldLogDir;
    private Path logDir;
    private Path remoteLogDir;
    private Server server;
    private Replication replication;
    private ReplicationSourceManager manager;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationSourceManager.class);
    private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
    private static final byte[] F1 = Bytes.toBytes(SpaceQuotaHelperForTests.F1);
    private static final byte[] F2 = Bytes.toBytes("f2");
    private static final TableName TABLE_NAME = TableName.valueOf("test");

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager$ReplicationEndpointForTest.class */
    public static final class ReplicationEndpointForTest extends DummyReplicationEndpoint {
        private String clusterKey;

        @Override // org.apache.hadoop.hbase.replication.DummyReplicationEndpoint
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            if (this.clusterKey.endsWith("error")) {
                throw new RuntimeException("Inject error");
            }
            return true;
        }

        public void init(ReplicationEndpoint.Context context) throws IOException {
            super.init(context);
            this.clusterKey = context.getReplicationPeer().getPeerConfig().getClusterKey();
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        UTIL.startMiniCluster(1);
        FS = UTIL.getTestFileSystem();
        CONF = new Configuration(UTIL.getConfiguration());
        CONF.setLong("replication.sleep.before.failover", 0L);
        RI = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
        SCOPES = new TreeMap(Bytes.BYTES_COMPARATOR);
        SCOPES.put(F1, 1);
        SCOPES.put(F2, 0);
    }

    @AfterClass
    public static void tearDownAfterClass() throws IOException {
        UTIL.shutdownMiniCluster();
    }

    @Before
    public void setUp() throws Exception {
        Path dataTestDirOnTestFS = UTIL.getDataTestDirOnTestFS(this.name.getMethodName());
        CommonFSUtils.setRootDir(CONF, dataTestDirOnTestFS);
        this.server = (Server) Mockito.mock(Server.class);
        Mockito.when(this.server.getConfiguration()).thenReturn(CONF);
        Mockito.when(this.server.getZooKeeper()).thenReturn(UTIL.getZooKeeperWatcher());
        Mockito.when(this.server.getConnection()).thenReturn(UTIL.getConnection());
        ServerName valueOf = ServerName.valueOf("hostname.example.org", 1234, 1L);
        Mockito.when(this.server.getServerName()).thenReturn(valueOf);
        this.oldLogDir = new Path(dataTestDirOnTestFS, "oldWALs");
        FS.mkdirs(this.oldLogDir);
        this.logDir = new Path(dataTestDirOnTestFS, "WALs");
        FS.mkdirs(this.logDir);
        this.remoteLogDir = new Path(dataTestDirOnTestFS, "remoteWALs");
        FS.mkdirs(this.remoteLogDir);
        TableName valueOf2 = TableName.valueOf("replication_" + this.name.getMethodName());
        UTIL.getAdmin().createTable(ReplicationStorageFactory.createReplicationQueueTableDescriptor(valueOf2));
        CONF.set("hbase.replication.queue.table.name", valueOf2.getNameAsString());
        this.replication = new Replication();
        this.replication.initialize(this.server, FS, new Path(this.logDir, valueOf.toString()), this.oldLogDir, new WALFactory(CONF, this.server.getServerName(), (Abortable) null));
        this.manager = this.replication.getReplicationManager();
    }

    @After
    public void tearDown() {
        this.replication.stopReplicationService();
    }

    private void addPeerAndWait(String str, String str2, boolean z) throws ReplicationException, IOException {
        ReplicationPeerConfigBuilder replicationEndpointImpl = ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/" + str2).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName());
        if (z) {
            replicationEndpointImpl.setTableCFsMap(ImmutableMap.of(TABLE_NAME, Collections.emptyList())).setRemoteWALDir(FS.makeQualified(this.remoteLogDir).toString());
        }
        this.manager.getReplicationPeers().getPeerStorage().addPeer(str, replicationEndpointImpl.build(), true, z ? SyncReplicationState.DOWNGRADE_ACTIVE : SyncReplicationState.NONE);
        this.manager.addPeer(str);
        UTIL.waitFor(20000L, () -> {
            ReplicationSourceInterface source = this.manager.getSource(str);
            return source != null && source.isSourceActive();
        });
    }

    private void removePeerAndWait(String str) throws Exception {
        ReplicationPeers replicationPeers = this.manager.getReplicationPeers();
        replicationPeers.getPeerStorage().removePeer(str);
        this.manager.removePeer(str);
        UTIL.waitFor(20000L, () -> {
            if (replicationPeers.getPeer(str) == null && this.manager.getSource(str) == null) {
                return this.manager.getOldSources().stream().noneMatch(replicationSourceInterface -> {
                    return replicationSourceInterface.getPeerId().equals(str);
                });
            }
            return false;
        });
    }

    private void createWALFile(Path path) throws Exception {
        ProtobufLogWriter protobufLogWriter = new ProtobufLogWriter();
        try {
            protobufLogWriter.init(FS, path, CONF, false, FS.getDefaultBlockSize(path), (StreamSlowMonitor) null);
            WALKeyImpl wALKeyImpl = new WALKeyImpl(RI.getEncodedNameAsBytes(), TABLE_NAME, EnvironmentEdgeManager.currentTime(), SCOPES);
            WALEdit wALEdit = new WALEdit();
            wALEdit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F1).setFamily(F1).setQualifier(F1).setType(Cell.Type.Put).setValue(F1).build());
            wALEdit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(F2).setFamily(F2).setQualifier(F2).setType(Cell.Type.Put).setValue(F2).build());
            protobufLogWriter.append(new WAL.Entry(wALKeyImpl, wALEdit));
            protobufLogWriter.sync(false);
            protobufLogWriter.close();
        } catch (Throwable th) {
            protobufLogWriter.close();
            throw th;
        }
    }

    @Test
    public void testClaimQueue() throws Exception {
        addPeerAndWait("1", "error", false);
        ServerName valueOf = ServerName.valueOf("hostname0.example.org", 12345, 123L);
        createWALFile(new Path(this.oldLogDir, valueOf.toString() + ".1"));
        ReplicationQueueId replicationQueueId = new ReplicationQueueId(valueOf, "1");
        this.manager.getQueueStorage().setOffset(replicationQueueId, "", new ReplicationGroupOffset("1", 0L), Collections.emptyMap());
        this.manager.claimQueue(replicationQueueId);
        MatcherAssert.assertThat(this.manager.getOldSources(), Matchers.hasSize(1));
    }

    @Test
    public void testSameWALPrefix() throws IOException {
        this.manager.postLogRoll(new Path("localhost,8080,12345-45678-Peer.34567"));
        this.manager.postLogRoll(new Path("localhost,8080,12345.56789"));
        MatcherAssert.assertThat((Set) this.manager.getLastestPath().stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet()), Matchers.both(Matchers.hasSize(2)).and(Matchers.hasItems(new String[]{"localhost,8080,12345-45678-Peer.34567", "localhost,8080,12345.56789"})));
    }

    private MetricsReplicationSourceSource getGlobalSource() {
        return ((MetricsReplicationSourceFactory) CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)).getGlobalSource();
    }

    @Test
    public void testRemovePeerMetricsCleanup() throws Exception {
        int sizeOfLogQueue = getGlobalSource().getSizeOfLogQueue();
        addPeerAndWait("DummyPeer", "hbase", false);
        Assert.assertEquals(sizeOfLogQueue, r0.getSizeOfLogQueue());
        ReplicationSourceInterface source = this.manager.getSource("DummyPeer");
        Assert.assertNotNull(source);
        int sizeOfLogQueue2 = source.getSourceMetrics().getSizeOfLogQueue();
        source.enqueueLog(new Path(new Path(this.logDir, this.server.getServerName().toString()), this.server.getServerName() + ".1"));
        Assert.assertEquals(1 + sizeOfLogQueue2, source.getSourceMetrics().getSizeOfLogQueue());
        Assert.assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
        removePeerAndWait("DummyPeer");
        Assert.assertEquals(sizeOfLogQueue, r0.getSizeOfLogQueue());
        addPeerAndWait("DummyPeer", "hbase", false);
        Assert.assertNotNull(this.manager.getSource("DummyPeer"));
        Assert.assertEquals(r0.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
    }

    @Test
    public void testDisablePeerMetricsCleanup() throws Exception {
        try {
            int sizeOfLogQueue = getGlobalSource().getSizeOfLogQueue();
            addPeerAndWait("DummyPeer", "hbase", false);
            Assert.assertEquals(sizeOfLogQueue, r0.getSizeOfLogQueue());
            ReplicationSourceInterface source = this.manager.getSource("DummyPeer");
            Assert.assertNotNull(source);
            int sizeOfLogQueue2 = source.getSourceMetrics().getSizeOfLogQueue();
            source.enqueueLog(new Path(new Path(this.logDir, this.server.getServerName().toString()), this.server.getServerName() + ".1"));
            Assert.assertEquals(1 + sizeOfLogQueue2, source.getSourceMetrics().getSizeOfLogQueue());
            Assert.assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
            this.manager.refreshSources("DummyPeer");
            Assert.assertEquals(sizeOfLogQueue, r0.getSizeOfLogQueue());
            Assert.assertNotNull(this.manager.getSource("DummyPeer"));
            Assert.assertEquals(sizeOfLogQueue2, r0.getSourceMetrics().getSizeOfLogQueue());
            Assert.assertEquals(r0.getSourceMetrics().getSizeOfLogQueue() + sizeOfLogQueue, r0.getSizeOfLogQueue());
            removePeerAndWait("DummyPeer");
        } catch (Throwable th) {
            removePeerAndWait("DummyPeer");
            throw th;
        }
    }

    @Test
    public void testRemoveRemoteWALs() throws Exception {
        addPeerAndWait("2", "hbase", true);
        this.manager.postLogRoll(new Path(this.logDir, "remoteWAL-12345-2.12345.syncrep"));
        Path path = new Path(this.remoteLogDir, "2");
        FS.mkdirs(path);
        String str = "remoteWAL-12345-2.23456.syncrep";
        Path makeQualified = new Path(path, str).makeQualified(FS.getUri(), FS.getWorkingDirectory());
        FS.create(makeQualified).close();
        this.manager.postLogRoll(new Path(this.logDir, str));
        this.manager.cleanOldLogs(str, true, this.manager.getSource("2"));
        Assert.assertFalse(FS.exists(makeQualified));
    }
}
