package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({RegionServerTests.class, MediumTests.class})
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck.class */
public class TestAsyncFSWALRollStuck {
    private static ScheduledExecutorService EXECUTOR;
    private static TableName TN;
    private static RegionInfo RI;
    private static MultiVersionConcurrencyControl MVCC;
    private static AsyncFSWAL WAL;
    private static ExecutorService ROLL_EXEC;

    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncFSWALRollStuck.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestAsyncFSWALRollStuck.class);
    private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
    private static EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
    private static Class<? extends Channel> CHANNEL_CLASS = NioSocketChannel.class;
    private static BlockingQueue<CompletableFuture<Long>> FUTURES = new ArrayBlockingQueue(3);
    private static AtomicInteger SYNC_COUNT = new AtomicInteger(0);
    private static CountDownLatch ARRIVE = new CountDownLatch(1);
    private static CountDownLatch RESUME = new CountDownLatch(1);

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWALRollStuck$TestAsyncWriter.class */
    public static final class TestAsyncWriter extends AsyncProtobufLogWriter {
        public TestAsyncWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> cls) {
            super(eventLoopGroup, cls);
        }

        public CompletableFuture<Long> sync(boolean z) {
            int incrementAndGet = TestAsyncFSWALRollStuck.SYNC_COUNT.incrementAndGet();
            if (incrementAndGet < 3) {
                CompletableFuture<Long> completableFuture = new CompletableFuture<>();
                TestAsyncFSWALRollStuck.FUTURES.offer(completableFuture);
                return completableFuture;
            }
            if (incrementAndGet != 3) {
                return super.sync(z);
            }
            TestAsyncFSWALRollStuck.ARRIVE.countDown();
            try {
                TestAsyncFSWALRollStuck.RESUME.await();
            } catch (InterruptedException e) {
            }
            return super.sync(z);
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        Configuration configuration = UTIL.getConfiguration();
        configuration.setClass("hbase.regionserver.wal.async.writer.impl", TestAsyncWriter.class, AsyncFSWALProvider.AsyncWriter.class);
        configuration.setLong("hbase.wal.batch.size", 1L);
        TN = TableName.valueOf("test");
        RI = RegionInfoBuilder.newBuilder(TN).build();
        MVCC = new MultiVersionConcurrencyControl();
        EXECUTOR = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setDaemon(true).build());
        Path dataTestDir = UTIL.getDataTestDir();
        ROLL_EXEC = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).build());
        WAL = new AsyncFSWAL(UTIL.getTestFileSystem(), (Abortable) null, dataTestDir, "log", "oldlog", configuration, Arrays.asList(new WALActionsListener() { // from class: org.apache.hadoop.hbase.regionserver.wal.TestAsyncFSWALRollStuck.1
            public void logRollRequested(WALActionsListener.RollRequestReason rollRequestReason) {
                TestAsyncFSWALRollStuck.ROLL_EXEC.execute(() -> {
                    try {
                        TestAsyncFSWALRollStuck.WAL.rollWriter();
                    } catch (Exception e) {
                        TestAsyncFSWALRollStuck.LOG.warn("failed to roll writer", e);
                    }
                });
            }
        }), true, (String) null, (String) null, (FileSystem) null, (Path) null, EVENT_LOOP_GROUP, CHANNEL_CLASS, StreamSlowMonitor.create(configuration, "monitor"));
        WAL.init();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        EXECUTOR.shutdownNow();
        ROLL_EXEC.shutdownNow();
        Closeables.close(WAL, true);
        UTIL.cleanupTestDir();
    }

    @Test
    public void testRoll() throws Exception {
        byte[] bytes = Bytes.toBytes("family");
        WALEdit wALEdit = new WALEdit();
        wALEdit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setFamily(bytes).setQualifier(bytes).setRow(bytes).setValue(bytes).setTimestamp(EnvironmentEdgeManager.currentTime()).setType(Cell.Type.Put).build());
        WALKeyImpl wALKeyImpl = new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, EnvironmentEdgeManager.currentTime(), MVCC);
        WAL.appendData(RI, wALKeyImpl, wALEdit);
        long appendData = WAL.appendData(RI, new WALKeyImpl(RI.getEncodedNameAsBytes(), TN, wALKeyImpl.getWriteTime() + 1, MVCC), wALEdit);
        UTIL.waitFor(10000L, () -> {
            return FUTURES.size() == 2;
        });
        FUTURES.poll().completeExceptionally(new IOException("inject error"));
        FUTURES.poll().completeExceptionally(new IOException("inject error"));
        ARRIVE.await();
        EXECUTOR.schedule(() -> {
            RESUME.countDown();
        }, 1L, TimeUnit.SECONDS);
        WAL.rollWriter();
        WAL.sync(appendData);
    }
}
