package org.apache.phoenix.end2end.index;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({NeedsOwnMiniClusterTest.class})
/* loaded from: input_file:org/apache/phoenix/end2end/index/MutableIndexReplicationIT.class */
public class MutableIndexReplicationIT extends BaseTest {
    public static final String SCHEMA_NAME = "";
    public static final String DATA_TABLE_NAME = "T";
    public static final String INDEX_TABLE_NAME = "I";
    private static final long REPLICATION_WAIT_TIME_MILLIS = 10000;
    protected static PhoenixTestDriver driver;
    private static String URL;
    protected static Configuration conf2;
    protected static ZooKeeperWatcher zkw1;
    protected static ZooKeeperWatcher zkw2;
    protected static ReplicationAdmin admin;
    protected static HBaseTestingUtility utility1;
    protected static HBaseTestingUtility utility2;
    protected static final int REPLICATION_RETRIES = 100;
    private static final Log LOG = LogFactory.getLog(MutableIndexReplicationIT.class);
    public static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName("", "T");
    public static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName("", "I");
    protected static Configuration conf1 = HBaseConfiguration.create();
    protected static final byte[] tableName = Bytes.toBytes("test");
    protected static final byte[] row = Bytes.toBytes("row");

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        setupConfigsAndStartCluster();
        setupDriver();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        try {
            destroyDriver(driver);
            try {
                utility2.shutdownMiniCluster();
                utility1.shutdownMiniCluster();
            } finally {
            }
        } catch (Throwable th) {
            try {
                utility2.shutdownMiniCluster();
                utility1.shutdownMiniCluster();
                throw th;
            } finally {
            }
        }
    }

    private static void setupConfigsAndStartCluster() throws Exception {
        setUpConfigForMiniCluster(conf1);
        conf1.setFloat("hbase.regionserver.logroll.multiplier", 3.0E-4f);
        conf1.setInt("replication.source.size.capacity", 10240);
        conf1.setLong("replication.source.sleepforretries", 100L);
        conf1.setInt("hbase.regionserver.maxlogs", 10);
        conf1.setLong("hbase.master.logcleaner.ttl", 10L);
        conf1.setInt("zookeeper.recovery.retry", 1);
        conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
        conf1.setBoolean("hbase.replication", true);
        conf1.setBoolean("dfs.support.append", true);
        conf1.setLong("hbase.server.thread.wakefrequency", 100L);
        conf1.setInt("replication.stats.thread.period.seconds", 5);
        conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility1 = new HBaseTestingUtility(conf1);
        utility1.startMiniZKCluster();
        MiniZooKeeperCluster zkCluster = utility1.getZkCluster();
        conf1 = utility1.getConfiguration();
        zkw1 = new ZooKeeperWatcher(conf1, "cluster1", (Abortable) null, true);
        admin = new ReplicationAdmin(conf1);
        LOG.info("Setup first Zk");
        conf2 = HBaseConfiguration.create(conf1);
        conf2.set("zookeeper.znode.parent", "/2");
        conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
        conf2.setBoolean("hbase.replication", true);
        conf2.setBoolean("dfs.support.append", true);
        conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
        utility2 = new HBaseTestingUtility(conf2);
        utility2.setZkCluster(zkCluster);
        zkw2 = new ZooKeeperWatcher(conf2, "cluster2", (Abortable) null, true);
        admin.addPeer("1", utility2.getClusterKey());
        LOG.info("Setup second Zk");
        utility1.startMiniCluster(2);
        utility2.startMiniCluster(2);
    }

    private static void setupDriver() throws Exception {
        LOG.info("Setting up phoenix driver");
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(3);
        newHashMapWithExpectedSize.put("phoenix.index.mutableBatchSizeThreshold", Integer.toString(2));
        newHashMapWithExpectedSize.put("phoenix.schema.dropMetaData", Boolean.toString(true));
        URL = getLocalClusterUrl(utility1);
        LOG.info("Connecting driver to " + URL);
        driver = initAndRegisterDriver(URL, new ReadOnlyProps(newHashMapWithExpectedSize.entrySet().iterator()));
    }

    @Test
    public void testReplicationWithMutableIndexes() throws Exception {
        Connection connection = getConnection();
        connection.createStatement().execute("CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
        connection.createStatement().execute("CREATE INDEX I ON " + DATA_TABLE_FULL_NAME + " (v1)");
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + DATA_TABLE_FULL_NAME).next());
        Assert.assertFalse(connection.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME).next());
        HBaseAdmin hBaseAdmin = utility1.getHBaseAdmin();
        HBaseAdmin hBaseAdmin2 = utility2.getHBaseAdmin();
        ArrayList<String> arrayList = new ArrayList();
        arrayList.add(DATA_TABLE_FULL_NAME);
        arrayList.add(INDEX_TABLE_FULL_NAME);
        for (String str : arrayList) {
            HTableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(TableName.valueOf(str));
            hBaseAdmin2.createTable(tableDescriptor);
            LOG.info("Enabling replication on source table: " + str);
            HColumnDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
            Assert.assertEquals(1L, columnFamilies.length);
            HColumnDescriptor removeFamily = tableDescriptor.removeFamily(columnFamilies[0].getName());
            removeFamily.setScope(1);
            tableDescriptor.addFamily(removeFamily);
            hBaseAdmin.disableTable(tableDescriptor.getTableName());
            hBaseAdmin.modifyTable(str, tableDescriptor);
            hBaseAdmin.enableTable(tableDescriptor.getTableName());
            LOG.info("Replication enabled on source table: " + str);
        }
        PreparedStatement prepareStatement = connection.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
        prepareStatement.setString(1, "a");
        prepareStatement.setString(2, "x");
        prepareStatement.setString(3, "1");
        prepareStatement.execute();
        connection.commit();
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT * FROM " + INDEX_TABLE_FULL_NAME);
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals("x", executeQuery.getString(1));
        Assert.assertFalse(executeQuery.next());
        connection.close();
        LOG.info("Looking up tables in replication target");
        HTable hTable = new HTable(utility2.getConfiguration(), hBaseAdmin2.listTableNames()[0]);
        for (int i = 0; i < REPLICATION_RETRIES; i++) {
            if (i >= 99) {
                Assert.fail("Waited too much time for put replication on table " + hTable.getTableDescriptor().getNameAsString());
            }
            if (ensureAnyRows(hTable)) {
                break;
            }
            LOG.info("Sleeping for 10000 for edits to get replicated");
            Thread.sleep(REPLICATION_WAIT_TIME_MILLIS);
        }
        hTable.close();
    }

    private boolean ensureAnyRows(HTable hTable) throws IOException {
        Scan scan = new Scan();
        scan.setRaw(true);
        ResultScanner scanner = hTable.getScanner(scan);
        boolean z = false;
        Iterator it = scanner.iterator();
        while (it.hasNext()) {
            LOG.info("got row: " + ((Result) it.next()));
            z = true;
        }
        scanner.close();
        return z;
    }

    private static Connection getConnection() throws Exception {
        return DriverManager.getConnection(URL, PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
    }
}
