package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Category({RegionServerTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.class */
public class TestReplicateToReplica {

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicateToReplica.class);
    private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
    private static byte[] FAMILY = Bytes.toBytes("family");
    private static byte[] QUAL = Bytes.toBytes("qualifier");
    private static ExecutorService EXEC;

    @Rule
    public final TableNameTestRule name = new TableNameTestRule();
    private TableName tableName;
    private Path testDir;
    private TableDescriptor td;
    private RegionServerServices rss;
    private AsyncClusterConnection conn;
    private RegionReplicationBufferManager manager;
    private FlushRequester flushRequester;
    private HRegion primary;
    private HRegion secondary;
    private WALFactory walFactory;
    private boolean queueReqAndResps;
    private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
    private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/TestReplicateToReplica$HRegionForTest.class */
    public static final class HRegionForTest extends HRegion {
        public HRegionForTest(HRegionFileSystem hRegionFileSystem, WAL wal, Configuration configuration, TableDescriptor tableDescriptor, RegionServerServices regionServerServices) {
            super(hRegionFileSystem, wal, configuration, tableDescriptor, regionServerServices);
        }

        public HRegionForTest(Path path, WAL wal, FileSystem fileSystem, Configuration configuration, RegionInfo regionInfo, TableDescriptor tableDescriptor, RegionServerServices regionServerServices) {
            super(path, wal, fileSystem, configuration, regionInfo, tableDescriptor, regionServerServices);
        }

        protected HRegion.PrepareFlushResult internalPrepareFlushCache(WAL wal, long j, Collection<HStore> collection, MonitoredTask monitoredTask, boolean z, FlushLifeCycleTracker flushLifeCycleTracker) throws IOException {
            HRegion.PrepareFlushResult internalPrepareFlushCache = super.internalPrepareFlushCache(wal, j, collection, monitoredTask, z, flushLifeCycleTracker);
            Iterator it = TestReplicateToReplica.TO_ADD_AFTER_PREPARE_FLUSH.iterator();
            while (it.hasNext()) {
                put((Put) it.next());
            }
            TestReplicateToReplica.TO_ADD_AFTER_PREPARE_FLUSH.clear();
            return internalPrepareFlushCache;
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() {
        Configuration configuration = UTIL.getConfiguration();
        configuration.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
        configuration.setBoolean("hbase.region.replica.replication.enabled", true);
        configuration.setBoolean("hbase.region.replica.replication.catalog.enabled", true);
        configuration.setClass("hbase.hregion.impl", HRegionForTest.class, HRegion.class);
        EXEC = new ExecutorService("test");
        ExecutorService executorService = EXEC;
        ExecutorService executorService2 = EXEC;
        executorService2.getClass();
        executorService.startExecutorService(new ExecutorService.ExecutorConfig(executorService2).setCorePoolSize(1).setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
        ChunkCreator.initialize(2097152, false, 0L, 0.0f, 0.0f, (HeapMemoryManager) null, 0.1f);
    }

    @AfterClass
    public static void tearDownAfterClass() {
        EXEC.shutdown();
        UTIL.cleanupTestDir();
    }

    @Before
    public void setUp() throws IOException {
        TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList();
        this.tableName = this.name.getTableName();
        this.testDir = UTIL.getDataTestDir(this.tableName.getNameAsString());
        Configuration configuration = UTIL.getConfiguration();
        configuration.set("hbase.rootdir", this.testDir.toString());
        this.td = TableDescriptorBuilder.newBuilder(this.tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2).setRegionMemStoreReplication(true).build();
        this.reqAndResps = new ArrayDeque();
        this.queueReqAndResps = true;
        this.conn = (AsyncClusterConnection) Mockito.mock(AsyncClusterConnection.class);
        Mockito.when(this.conn.replicate((RegionInfo) ArgumentMatchers.any(), ArgumentMatchers.anyList(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenAnswer(invocationOnMock -> {
            if (!this.queueReqAndResps) {
                return CompletableFuture.completedFuture(null);
            }
            List list = (List) invocationOnMock.getArgument(1, List.class);
            CompletableFuture completableFuture = new CompletableFuture();
            this.reqAndResps.add(Pair.newPair(list, completableFuture));
            return completableFuture;
        });
        this.flushRequester = (FlushRequester) Mockito.mock(FlushRequester.class);
        this.rss = (RegionServerServices) Mockito.mock(RegionServerServices.class);
        Mockito.when(this.rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1L));
        Mockito.when(this.rss.getConfiguration()).thenReturn(configuration);
        Mockito.when(this.rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(configuration));
        Mockito.when(this.rss.getExecutorService()).thenReturn(EXEC);
        Mockito.when(this.rss.getAsyncClusterConnection()).thenReturn(this.conn);
        Mockito.when(this.rss.getFlushRequester()).thenReturn(this.flushRequester);
        this.manager = new RegionReplicationBufferManager(this.rss);
        Mockito.when(this.rss.getRegionReplicationBufferManager()).thenReturn(this.manager);
        RegionInfo build = RegionInfoBuilder.newBuilder(this.td.getTableName()).build();
        RegionInfo regionInfoForReplica = RegionReplicaUtil.getRegionInfoForReplica(build, 1);
        this.walFactory = new WALFactory(configuration, UUID.randomUUID().toString());
        WAL wal = this.walFactory.getWAL(build);
        this.primary = HRegion.createHRegion(build, this.testDir, configuration, this.td, wal);
        this.primary.close();
        this.primary = HRegion.openHRegion(this.testDir, build, this.td, wal, configuration, this.rss, (CancelableProgressable) null);
        this.secondary = HRegion.openHRegion(regionInfoForReplica, this.td, (WAL) null, configuration, this.rss, (CancelableProgressable) null);
        Mockito.when(this.rss.getRegions()).then(invocationOnMock2 -> {
            return Arrays.asList(this.primary, this.secondary);
        });
        replicateAll();
    }

    @After
    public void tearDown() throws IOException {
        this.queueReqAndResps = false;
        failAll();
        HBaseTestingUtil.closeRegionAndWAL(this.primary);
        HBaseTestingUtil.closeRegionAndWAL(this.secondary);
        if (this.walFactory != null) {
            this.walFactory.close();
        }
    }

    private HRegion.FlushResult flushPrimary() throws IOException {
        return this.primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
    }

    private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
        Pair buildReplicateWALEntryRequest = ReplicationProtobufUtil.buildReplicateWALEntryRequest((WAL.Entry[]) ((List) pair.getFirst()).toArray(new WAL.Entry[0]), this.secondary.getRegionInfo().getEncodedNameAsBytes(), (String) null, (Path) null, (Path) null);
        Iterator it = ((AdminProtos.ReplicateWALEntryRequest) buildReplicateWALEntryRequest.getFirst()).getEntryList().iterator();
        while (it.hasNext()) {
            this.secondary.replayWALEntry((AdminProtos.WALEntry) it.next(), (CellScanner) buildReplicateWALEntryRequest.getSecond());
        }
        ((CompletableFuture) pair.getSecond()).complete(null);
    }

    private void replicateOne() throws IOException {
        replicate(this.reqAndResps.remove());
    }

    private void replicateAll() throws IOException {
        while (true) {
            Pair<List<WAL.Entry>, CompletableFuture<Void>> poll = this.reqAndResps.poll();
            if (poll == null) {
                return;
            } else {
                replicate(poll);
            }
        }
    }

    private void failOne() {
        ((CompletableFuture) this.reqAndResps.remove().getSecond()).completeExceptionally(new IOException("Inject error"));
    }

    private void failAll() {
        while (true) {
            Pair<List<WAL.Entry>, CompletableFuture<Void>> poll = this.reqAndResps.poll();
            if (poll == null) {
                return;
            } else {
                ((CompletableFuture) poll.getSecond()).completeExceptionally(new IOException("Inject error"));
            }
        }
    }

    @Test
    public void testNormalReplicate() throws IOException {
        this.primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        replicateOne();
        Assert.assertEquals(1L, Bytes.toInt(this.secondary.get(new Get(r0)).getValue(FAMILY, QUAL)));
    }

    @Test
    public void testNormalFlush() throws IOException {
        byte[] bytes = Bytes.toBytes(0);
        this.primary.put(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
        flushPrimary();
        replicateAll();
        Assert.assertEquals(2L, Bytes.toInt(this.secondary.get(new Get(bytes)).getValue(FAMILY, QUAL)));
        Assert.assertEquals(this.primary.getMemStoreDataSize(), this.secondary.getMemStoreDataSize());
    }

    @Test
    public void testErrorBeforeFlushStart() throws IOException {
        byte[] bytes = Bytes.toBytes(0);
        this.primary.put(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        failOne();
        ((FlushRequester) Mockito.verify(this.flushRequester, Mockito.times(1))).requestFlush((HRegion) ArgumentMatchers.any(), ArgumentMatchers.anyList(), (FlushLifeCycleTracker) ArgumentMatchers.any());
        TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
        flushPrimary();
        replicateAll();
        Assert.assertEquals(2L, Bytes.toInt(this.secondary.get(new Get(bytes)).getValue(FAMILY, QUAL)));
        Assert.assertEquals(this.primary.getMemStoreDataSize(), this.secondary.getMemStoreDataSize());
    }

    @Test
    public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
        this.primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        replicateAll();
        TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
        flushPrimary();
        replicateOne();
        failOne();
        ((FlushRequester) Mockito.verify(this.flushRequester, Mockito.times(1))).requestFlush((HRegion) ArgumentMatchers.any(), ArgumentMatchers.anyList(), (FlushLifeCycleTracker) ArgumentMatchers.any());
        this.primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
        flushPrimary();
        replicateAll();
        for (int i = 0; i < 3; i++) {
            Assert.assertEquals(i + 1, Bytes.toInt(this.secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
        }
        Assert.assertEquals(0L, this.secondary.getMemStoreDataSize());
    }

    @Test
    public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
        byte[] bytes = Bytes.toBytes(0);
        this.primary.put(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        failOne();
        ((FlushRequester) Mockito.verify(this.flushRequester, Mockito.times(1))).requestFlush((HRegion) ArgumentMatchers.any(), ArgumentMatchers.anyList(), (FlushLifeCycleTracker) ArgumentMatchers.any());
        flushPrimary();
        failAll();
        Thread.sleep(2000L);
        ((FlushRequester) Mockito.verify(this.flushRequester, Mockito.times(2))).requestFlush((HRegion) ArgumentMatchers.any(), ArgumentMatchers.anyList(), (FlushLifeCycleTracker) ArgumentMatchers.any());
        Assert.assertEquals(HRegion.FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushPrimary().getResult());
        Assert.assertFalse(this.secondary.get(new Get(bytes).setCheckExistenceOnly(true)).getExists().booleanValue());
        replicateOne();
        Assert.assertEquals(1L, Bytes.toInt(this.secondary.get(new Get(bytes)).getValue(FAMILY, QUAL)));
    }

    @Test
    public void testCatchUpWithReopen() throws IOException {
        byte[] bytes = Bytes.toBytes(0);
        this.primary.put(new Put(bytes).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
        failOne();
        this.primary.close();
        Assert.assertFalse(this.secondary.get(new Get(bytes).setCheckExistenceOnly(true)).getExists().booleanValue());
        this.primary = HRegion.openHRegion(this.testDir, this.primary.getRegionInfo(), this.td, this.primary.getWAL(), UTIL.getConfiguration(), this.rss, (CancelableProgressable) null);
        replicateAll();
        Assert.assertEquals(1L, Bytes.toInt(this.secondary.get(new Get(bytes)).getValue(FAMILY, QUAL)));
    }
}
