package org.apache.accumulo.test.replication;

import com.google.common.collect.Iterators;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/test/replication/MultiInstanceReplicationIT.class */
public class MultiInstanceReplicationIT extends ConfigurableMacBase {
    private static final Logger log = LoggerFactory.getLogger(MultiInstanceReplicationIT.class);
    private ExecutorService executor;

    @Override // org.apache.accumulo.harness.AccumuloITBase
    public int defaultTimeoutSeconds() {
        return 600;
    }

    @Before
    public void createExecutor() {
        this.executor = Executors.newSingleThreadExecutor();
    }

    @After
    public void stopExecutor() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
    }

    @Override // org.apache.accumulo.test.functional.ConfigurableMacBase
    public void configure(MiniAccumuloConfigImpl miniAccumuloConfigImpl, Configuration configuration) {
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "15s");
        miniAccumuloConfigImpl.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
        miniAccumuloConfigImpl.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
        miniAccumuloConfigImpl.setProperty(Property.GC_CYCLE_START, "1s");
        miniAccumuloConfigImpl.setProperty(Property.GC_CYCLE_DELAY, "5s");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
        miniAccumuloConfigImpl.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "master");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
        miniAccumuloConfigImpl.setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX, "1M");
        configuration.set("fs.file.impl", RawLocalFileSystem.class.getName());
    }

    private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl miniAccumuloConfigImpl, MiniAccumuloConfigImpl miniAccumuloConfigImpl2) {
        Map siteConfig = miniAccumuloConfigImpl.getSiteConfig();
        if ("true".equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
            HashMap hashMap = new HashMap();
            hashMap.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
            String str = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
            Assert.assertNotNull("Keystore Path was null", str);
            hashMap.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), str);
            String str2 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
            Assert.assertNotNull("Truststore Path was null", str2);
            hashMap.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), str2);
            String str3 = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
            if (str3 != null) {
                hashMap.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), str3);
            }
            String str4 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
            if (str4 != null) {
                hashMap.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), str4);
            }
            System.out.println("Setting site configuration for peer " + hashMap);
            miniAccumuloConfigImpl2.setSiteConfig(hashMap);
        }
        String str5 = (String) siteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
        if (str5 != null) {
            Map siteConfig2 = miniAccumuloConfigImpl2.getSiteConfig();
            siteConfig2.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), str5);
            miniAccumuloConfigImpl2.setSiteConfig(siteConfig2);
        }
    }

    @Test(timeout = 600000)
    public void dataWasReplicatedToThePeer() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            AccumuloClient accumuloClient = (AccumuloClient) Accumulo.newClient().from(getClientProperties()).build();
            try {
                AccumuloClient createAccumuloClient = miniAccumuloClusterImpl.createAccumuloClient("root", new PasswordToken("testRootPassword1"));
                try {
                    ReplicationTable.setOnline(accumuloClient);
                    createAccumuloClient.securityOperations().createLocalUser("peer", new PasswordToken("foo"));
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "peer");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "foo");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
                    createAccumuloClient.tableOperations().create("peer", new NewTableConfiguration());
                    String str = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer");
                    Assert.assertNotNull(str);
                    createAccumuloClient.securityOperations().grantTablePermission("peer", "peer", TablePermission.WRITE);
                    HashMap hashMap = new HashMap();
                    hashMap.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
                    accumuloClient.tableOperations().create("master", new NewTableConfiguration().setProperties(hashMap));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master"));
                    BatchWriter createBatchWriter = accumuloClient.createBatchWriter("master");
                    for (int i = 0; i < 5000; i++) {
                        try {
                            Mutation mutation = new Mutation(Integer.toString(i));
                            for (int i2 = 0; i2 < 100; i2++) {
                                String num = Integer.toString(i2);
                                mutation.put(num, "", num);
                            }
                            createBatchWriter.addMutation(mutation);
                        } catch (Throwable th) {
                            if (createBatchWriter != null) {
                                try {
                                    createBatchWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    log.info("Wrote all data to master cluster");
                    Set referencedFiles = accumuloClient.replicationOperations().referencedFiles("master");
                    log.info("Files to replicate: " + referencedFiles);
                    Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
                    while (it.hasNext()) {
                        this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
                    }
                    this.cluster.exec(TabletServer.class, new String[0]);
                    log.info("TabletServer restarted");
                    Iterators.size(ReplicationTable.getScanner(accumuloClient).iterator());
                    log.info("TabletServer is online");
                    while (!ReplicationTable.isOnline(accumuloClient)) {
                        log.info("Replication table still offline, waiting");
                        Thread.sleep(5000L);
                    }
                    log.info("");
                    log.info("Fetching metadata records:");
                    for (Map.Entry entry : accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
                        if (MetadataSchema.ReplicationSection.COLF.equals(((Key) entry.getKey()).getColumnFamily())) {
                            log.info("{} {}", ((Key) entry.getKey()).toStringNoTruncate(), ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry.getValue()).get())));
                        } else {
                            log.info("{} {}", ((Key) entry.getKey()).toStringNoTruncate(), entry.getValue());
                        }
                    }
                    log.info("");
                    log.info("Fetching replication records:");
                    for (Map.Entry entry2 : ReplicationTable.getScanner(accumuloClient)) {
                        log.info("{} {}", ((Key) entry2.getKey()).toStringNoTruncate(), ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry2.getValue()).get())));
                    }
                    Future submit = this.executor.submit(() -> {
                        long currentTimeMillis = System.currentTimeMillis();
                        accumuloClient.replicationOperations().drain("master", referencedFiles);
                        log.info("Drain completed in " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                        return true;
                    });
                    try {
                        try {
                            submit.get(60L, TimeUnit.SECONDS);
                            this.executor.shutdownNow();
                        } catch (TimeoutException e) {
                            submit.cancel(true);
                            Assert.fail("Drain did not finish within 60 seconds");
                            this.executor.shutdownNow();
                        }
                        log.info("drain completed");
                        log.info("");
                        log.info("Fetching metadata records:");
                        for (Map.Entry entry3 : accumuloClient.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
                            if (MetadataSchema.ReplicationSection.COLF.equals(((Key) entry3.getKey()).getColumnFamily())) {
                                log.info("{} {}", ((Key) entry3.getKey()).toStringNoTruncate(), ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry3.getValue()).get())));
                            } else {
                                log.info("{} {}", ((Key) entry3.getKey()).toStringNoTruncate(), entry3.getValue());
                            }
                        }
                        log.info("");
                        log.info("Fetching replication records:");
                        for (Map.Entry entry4 : ReplicationTable.getScanner(accumuloClient)) {
                            log.info("{} {}", ((Key) entry4.getKey()).toStringNoTruncate(), ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry4.getValue()).get())));
                        }
                        Scanner createScanner = accumuloClient.createScanner("master", Authorizations.EMPTY);
                        try {
                            createScanner = createAccumuloClient.createScanner("peer", Authorizations.EMPTY);
                            try {
                                Iterator it2 = createScanner.iterator();
                                Iterator it3 = createScanner.iterator();
                                Map.Entry entry5 = null;
                                Map.Entry entry6 = null;
                                while (it2.hasNext() && it3.hasNext()) {
                                    entry5 = (Map.Entry) it2.next();
                                    entry6 = (Map.Entry) it3.next();
                                    Assert.assertEquals(entry5.getKey() + " was not equal to " + entry6.getKey(), 0L, ((Key) entry5.getKey()).compareTo((Key) entry6.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
                                    Assert.assertEquals(entry5.getValue(), entry6.getValue());
                                }
                                log.info("Last master entry: {}", entry5);
                                log.info("Last peer entry: {}", entry6);
                                Assert.assertFalse("Had more data to read from the master", it2.hasNext());
                                Assert.assertFalse("Had more data to read from the peer", it3.hasNext());
                                if (createScanner != null) {
                                    createScanner.close();
                                }
                                if (createScanner != null) {
                                    createScanner.close();
                                }
                                if (createAccumuloClient != null) {
                                    createAccumuloClient.close();
                                }
                                if (accumuloClient != null) {
                                    accumuloClient.close();
                                }
                            } finally {
                                if (createScanner != null) {
                                    try {
                                        createScanner.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                }
                            }
                        } catch (Throwable th4) {
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        this.executor.shutdownNow();
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (createAccumuloClient != null) {
                        try {
                            createAccumuloClient.close();
                        } catch (Throwable th7) {
                            th6.addSuppressed(th7);
                        }
                    }
                    throw th6;
                }
            } finally {
            }
        } finally {
            miniAccumuloClusterImpl.stop();
        }
    }

    @Test
    public void dataReplicatedToCorrectTable() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            AccumuloClient accumuloClient = (AccumuloClient) Accumulo.newClient().from(getClientProperties()).build();
            try {
                AccumuloClient createAccumuloClient = miniAccumuloClusterImpl.createAccumuloClient("root", new PasswordToken("testRootPassword1"));
                try {
                    createAccumuloClient.securityOperations().createLocalUser("peer", new PasswordToken("foo"));
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "peer");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "foo");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
                    createAccumuloClient.tableOperations().create("peer1", new NewTableConfiguration());
                    String str = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer1");
                    Assert.assertNotNull(str);
                    createAccumuloClient.tableOperations().create("peer2", new NewTableConfiguration());
                    String str2 = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer2");
                    Assert.assertNotNull(str2);
                    HashMap hashMap = new HashMap();
                    hashMap.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
                    accumuloClient.tableOperations().create("master1", new NewTableConfiguration().setProperties(hashMap));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master1"));
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap2.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
                    accumuloClient.tableOperations().create("master2", new NewTableConfiguration().setProperties(hashMap2));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master2"));
                    createAccumuloClient.securityOperations().grantTablePermission("peer", "peer1", TablePermission.WRITE);
                    createAccumuloClient.securityOperations().grantTablePermission("peer", "peer2", TablePermission.WRITE);
                    long j = 0;
                    BatchWriter createBatchWriter = accumuloClient.createBatchWriter("master1");
                    for (int i = 0; i < 2500; i++) {
                        try {
                            Mutation mutation = new Mutation("master1" + i);
                            for (int i2 = 0; i2 < 100; i2++) {
                                String num = Integer.toString(i2);
                                mutation.put(num, "", num);
                                j++;
                            }
                            createBatchWriter.addMutation(mutation);
                        } finally {
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    long j2 = 0;
                    createBatchWriter = accumuloClient.createBatchWriter("master2");
                    for (int i3 = 0; i3 < 2500; i3++) {
                        try {
                            Mutation mutation2 = new Mutation("master2" + i3);
                            for (int i4 = 0; i4 < 100; i4++) {
                                String num2 = Integer.toString(i4);
                                mutation2.put(num2, "", num2);
                                j2++;
                            }
                            createBatchWriter.addMutation(mutation2);
                        } finally {
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    log.info("Wrote all data to master cluster");
                    Set referencedFiles = accumuloClient.replicationOperations().referencedFiles("master1");
                    Set referencedFiles2 = accumuloClient.replicationOperations().referencedFiles("master2");
                    log.info("Files to replicate for table1: " + referencedFiles);
                    log.info("Files to replicate for table2: " + referencedFiles2);
                    Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
                    while (it.hasNext()) {
                        this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
                    }
                    this.cluster.exec(TabletServer.class, new String[0]);
                    log.info("Restarted the tserver");
                    Iterators.size(accumuloClient.createScanner("master1", Authorizations.EMPTY).iterator());
                    while (!ReplicationTable.isOnline(accumuloClient)) {
                        log.info("Replication table still offline, waiting");
                        Thread.sleep(5000L);
                    }
                    log.info("Waiting for {} for {}", referencedFiles, "master1");
                    accumuloClient.replicationOperations().drain("master1", referencedFiles);
                    log.info("Waiting for {} for {}", referencedFiles2, "master2");
                    accumuloClient.replicationOperations().drain("master2", referencedFiles2);
                    long j3 = 0;
                    for (Map.Entry entry : createAccumuloClient.createScanner("peer1", Authorizations.EMPTY)) {
                        j3++;
                        Assert.assertTrue("Found unexpected key-value" + ((Key) entry.getKey()).toStringNoTruncate() + " " + entry.getValue(), ((Key) entry.getKey()).getRow().toString().startsWith("master1"));
                    }
                    log.info("Found {} records in {}", Long.valueOf(j3), "peer1");
                    Assert.assertEquals(j, j3);
                    long j4 = 0;
                    for (Map.Entry entry2 : createAccumuloClient.createScanner("peer2", Authorizations.EMPTY)) {
                        j4++;
                        Assert.assertTrue("Found unexpected key-value" + ((Key) entry2.getKey()).toStringNoTruncate() + " " + entry2.getValue(), ((Key) entry2.getKey()).getRow().toString().startsWith("master2"));
                    }
                    log.info("Found {} records in {}", Long.valueOf(j4), "peer2");
                    Assert.assertEquals(j2, j4);
                    if (createAccumuloClient != null) {
                        createAccumuloClient.close();
                    }
                    if (accumuloClient != null) {
                        accumuloClient.close();
                    }
                } catch (Throwable th) {
                    if (createAccumuloClient != null) {
                        try {
                            createAccumuloClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } finally {
            miniAccumuloClusterImpl.stop();
        }
    }

    @Test
    public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            AccumuloClient accumuloClient = (AccumuloClient) Accumulo.newClient().from(getClientProperties()).build();
            try {
                AccumuloClient createAccumuloClient = miniAccumuloClusterImpl.createAccumuloClient("root", new PasswordToken("testRootPassword1"));
                try {
                    createAccumuloClient.securityOperations().createLocalUser("repl", new PasswordToken("passwd"));
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "repl");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "passwd");
                    createAccumuloClient.tableOperations().create("peer", new NewTableConfiguration());
                    String str = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer");
                    Assert.assertNotNull(str);
                    createAccumuloClient.securityOperations().grantTablePermission("repl", "peer", TablePermission.WRITE);
                    HashMap hashMap = new HashMap();
                    hashMap.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
                    accumuloClient.tableOperations().create("master", new NewTableConfiguration().setProperties(hashMap));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master"));
                    BatchWriter createBatchWriter = accumuloClient.createBatchWriter("master");
                    for (int i = 0; i < 5000; i++) {
                        try {
                            Mutation mutation = new Mutation(Integer.toString(i));
                            for (int i2 = 0; i2 < 100; i2++) {
                                String num = Integer.toString(i2);
                                mutation.put(num, "", num);
                            }
                            createBatchWriter.addMutation(mutation);
                        } catch (Throwable th) {
                            if (createBatchWriter != null) {
                                try {
                                    createBatchWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    log.info("Wrote all data to master cluster");
                    Set referencedFiles = accumuloClient.replicationOperations().referencedFiles("master");
                    log.info("Files to replicate:" + referencedFiles);
                    Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
                    while (it.hasNext()) {
                        this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
                    }
                    this.cluster.exec(TabletServer.class, new String[0]);
                    while (!ReplicationTable.isOnline(accumuloClient)) {
                        log.info("Replication table still offline, waiting");
                        Thread.sleep(5000L);
                    }
                    Iterators.size(accumuloClient.createScanner("master", Authorizations.EMPTY).iterator());
                    for (Map.Entry entry : ReplicationTable.getScanner(accumuloClient)) {
                        log.debug("{} {}", ((Key) entry.getKey()).toStringNoTruncate(), ProtobufUtil.toString(Replication.Status.parseFrom(((Value) entry.getValue()).get())));
                    }
                    accumuloClient.replicationOperations().drain("master", referencedFiles);
                    Scanner createScanner = accumuloClient.createScanner("master", Authorizations.EMPTY);
                    try {
                        createScanner = createAccumuloClient.createScanner("peer", Authorizations.EMPTY);
                        try {
                            Iterator it2 = createScanner.iterator();
                            Iterator it3 = createScanner.iterator();
                            while (it2.hasNext() && it3.hasNext()) {
                                Map.Entry entry2 = (Map.Entry) it2.next();
                                Map.Entry entry3 = (Map.Entry) it3.next();
                                Assert.assertEquals(entry3.getKey() + " was not equal to " + entry3.getKey(), 0L, ((Key) entry2.getKey()).compareTo((Key) entry3.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
                                Assert.assertEquals(entry2.getValue(), entry3.getValue());
                            }
                            Assert.assertFalse("Had more data to read from the master", it2.hasNext());
                            Assert.assertFalse("Had more data to read from the peer", it3.hasNext());
                            if (createScanner != null) {
                                createScanner.close();
                            }
                            if (createScanner != null) {
                                createScanner.close();
                            }
                            if (createAccumuloClient != null) {
                                createAccumuloClient.close();
                            }
                            if (accumuloClient != null) {
                                accumuloClient.close();
                            }
                        } finally {
                            if (createScanner != null) {
                                try {
                                    createScanner.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            }
                        }
                    } catch (Throwable th4) {
                        throw th4;
                    }
                } catch (Throwable th5) {
                    if (createAccumuloClient != null) {
                        try {
                            createAccumuloClient.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } finally {
            miniAccumuloClusterImpl.stop();
        }
    }

    @Test
    public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir(getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), "testRootPassword1");
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.setInstanceName("peer");
        miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, "peer");
        updatePeerConfigFromPrimary(getCluster().getConfig(), miniAccumuloConfigImpl);
        MiniAccumuloClusterImpl miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
        miniAccumuloClusterImpl.start();
        try {
            AccumuloClient accumuloClient = (AccumuloClient) Accumulo.newClient().from(getClientProperties()).build();
            try {
                AccumuloClient createAccumuloClient = miniAccumuloClusterImpl.createAccumuloClient("root", new PasswordToken("testRootPassword1"));
                try {
                    createAccumuloClient.securityOperations().createLocalUser("repl", new PasswordToken("passwd"));
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + "peer", "repl");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + "peer", "passwd");
                    accumuloClient.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "peer", ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
                    createAccumuloClient.tableOperations().create("peer1", new NewTableConfiguration());
                    String str = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer1");
                    Assert.assertNotNull(str);
                    createAccumuloClient.tableOperations().create("peer2", new NewTableConfiguration());
                    String str2 = (String) createAccumuloClient.tableOperations().tableIdMap().get("peer2");
                    Assert.assertNotNull(str2);
                    HashMap hashMap = new HashMap();
                    hashMap.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
                    accumuloClient.tableOperations().create("master1", new NewTableConfiguration().setProperties(hashMap));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master1"));
                    HashMap hashMap2 = new HashMap();
                    hashMap2.put(Property.TABLE_REPLICATION.getKey(), "true");
                    hashMap2.put(Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
                    accumuloClient.tableOperations().create("master2", new NewTableConfiguration().setProperties(hashMap2));
                    Assert.assertNotNull((String) accumuloClient.tableOperations().tableIdMap().get("master2"));
                    createAccumuloClient.securityOperations().grantTablePermission("repl", "peer1", TablePermission.WRITE);
                    createAccumuloClient.securityOperations().grantTablePermission("repl", "peer2", TablePermission.WRITE);
                    accumuloClient.tableOperations().setProperty("master1", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str);
                    accumuloClient.tableOperations().setProperty("master2", Property.TABLE_REPLICATION_TARGET.getKey() + "peer", str2);
                    BatchWriter createBatchWriter = accumuloClient.createBatchWriter("master1");
                    for (int i = 0; i < 2500; i++) {
                        try {
                            Mutation mutation = new Mutation("master1" + i);
                            for (int i2 = 0; i2 < 100; i2++) {
                                String num = Integer.toString(i2);
                                mutation.put(num, "", num);
                            }
                            createBatchWriter.addMutation(mutation);
                        } finally {
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    createBatchWriter = accumuloClient.createBatchWriter("master2");
                    for (int i3 = 0; i3 < 2500; i3++) {
                        try {
                            Mutation mutation2 = new Mutation("master2" + i3);
                            for (int i4 = 0; i4 < 100; i4++) {
                                String num2 = Integer.toString(i4);
                                mutation2.put(num2, "", num2);
                            }
                            createBatchWriter.addMutation(mutation2);
                        } finally {
                        }
                    }
                    if (createBatchWriter != null) {
                        createBatchWriter.close();
                    }
                    log.info("Wrote all data to master cluster");
                    Iterator it = ((Collection) this.cluster.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
                    while (it.hasNext()) {
                        this.cluster.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
                    }
                    this.cluster.exec(TabletServer.class, new String[0]);
                    while (!ReplicationTable.isOnline(accumuloClient)) {
                        log.info("Replication table still offline, waiting");
                        Thread.sleep(5000L);
                    }
                    boolean z = false;
                    for (int i5 = 0; i5 < 10 && !z; i5++) {
                        UtilWaitThread.sleepUninterruptibly(2L, TimeUnit.SECONDS);
                        Scanner scanner = ReplicationTable.getScanner(accumuloClient);
                        try {
                            ReplicationSchema.WorkSection.limit(scanner);
                            Iterator it2 = scanner.iterator();
                            while (it2.hasNext()) {
                                if (StatusUtil.isFullyReplicated(Replication.Status.parseFrom(((Value) ((Map.Entry) it2.next()).getValue()).get()))) {
                                    z |= true;
                                }
                            }
                            if (scanner != null) {
                                scanner.close();
                            }
                        } catch (Throwable th) {
                            if (scanner != null) {
                                try {
                                    scanner.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    }
                    Assert.assertNotEquals(0, Boolean.valueOf(z));
                    long j = 0;
                    for (int i6 = 0; i6 < 10; i6++) {
                        for (Map.Entry entry : createAccumuloClient.createScanner("peer1", Authorizations.EMPTY)) {
                            j++;
                            Assert.assertTrue("Found unexpected key-value" + ((Key) entry.getKey()).toStringNoTruncate() + " " + entry.getValue(), ((Key) entry.getKey()).getRow().toString().startsWith("master1"));
                        }
                        log.info("Found {} records in {}", Long.valueOf(j), "peer1");
                        if (j != 0) {
                            break;
                        }
                        Thread.sleep(5000L);
                    }
                    Assert.assertTrue("Found no records in peer1 in the peer cluster", j > 0);
                    for (int i7 = 0; i7 < 10; i7++) {
                        j = 0;
                        for (Map.Entry entry2 : createAccumuloClient.createScanner("peer2", Authorizations.EMPTY)) {
                            j++;
                            Assert.assertTrue("Found unexpected key-value" + ((Key) entry2.getKey()).toStringNoTruncate() + " " + entry2.getValue(), ((Key) entry2.getKey()).getRow().toString().startsWith("master2"));
                        }
                        log.info("Found {} records in {}", Long.valueOf(j), "peer2");
                        if (j != 0) {
                            break;
                        }
                        Thread.sleep(5000L);
                    }
                    Assert.assertTrue("Found no records in peer2 in the peer cluster", j > 0);
                    if (createAccumuloClient != null) {
                        createAccumuloClient.close();
                    }
                    if (accumuloClient != null) {
                        accumuloClient.close();
                    }
                } catch (Throwable th3) {
                    if (createAccumuloClient != null) {
                        try {
                            createAccumuloClient.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } finally {
            miniAccumuloClusterImpl.stop();
        }
    }
}
