package org.apache.accumulo.test;

import java.time.Duration;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.compaction.CompactionRateLimitingIT;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/accumulo/test/TotalQueuedIT.class */
public class TotalQueuedIT extends ConfigurableMacBase {
    private int SMALL_QUEUE_SIZE = 100000;
    private int LARGE_QUEUE_SIZE = this.SMALL_QUEUE_SIZE * 10;
    private static final long N = 1000000;

    @Override // org.apache.accumulo.harness.AccumuloITBase
    protected Duration defaultTimeout() {
        return Duration.ofMinutes(4L);
    }

    @Override // org.apache.accumulo.test.functional.ConfigurableMacBase
    public void configure(MiniAccumuloConfigImpl miniAccumuloConfigImpl, Configuration configuration) {
        miniAccumuloConfigImpl.setNumTservers(1);
        miniAccumuloConfigImpl.useMiniDFS();
    }

    @Test
    public void test() throws Exception {
        AccumuloClient accumuloClient = (AccumuloClient) Accumulo.newClient().from(getClientProperties()).build();
        try {
            accumuloClient.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), this.SMALL_QUEUE_SIZE);
            String str = getUniqueNames(1)[0];
            accumuloClient.tableOperations().create(str);
            accumuloClient.tableOperations().setProperty(str, Property.TABLE_MAJC_RATIO.getKey(), "9999");
            accumuloClient.tableOperations().setProperty(str, Property.TABLE_FILE_MAX.getKey(), "999");
            UtilWaitThread.sleepUninterruptibly(1L, TimeUnit.SECONDS);
            byte[] bArr = new byte[250];
            BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
            batchWriterConfig.setMaxWriteThreads(10);
            batchWriterConfig.setMaxLatency(1L, TimeUnit.SECONDS);
            batchWriterConfig.setMaxMemory(CompactionRateLimitingIT.RATE);
            long syncs = getSyncs(accumuloClient);
            long currentTimeMillis = System.currentTimeMillis();
            long j = 0;
            BatchWriter createBatchWriter = accumuloClient.createBatchWriter(str, batchWriterConfig);
            for (int i = 0; i < N; i++) {
                try {
                    random.nextBytes(bArr);
                    Mutation mutation = new Mutation(bArr);
                    mutation.put("", "", "");
                    createBatchWriter.addMutation(mutation);
                    j += mutation.estimatedMemoryUsed();
                } finally {
                }
            }
            if (createBatchWriter != null) {
                createBatchWriter.close();
            }
            double currentTimeMillis2 = (System.currentTimeMillis() - currentTimeMillis) / 1000.0d;
            double d = j / this.SMALL_QUEUE_SIZE;
            System.out.printf("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)%n", Long.valueOf(j), Double.valueOf(currentTimeMillis2), Long.valueOf((long) d), Double.valueOf(d / currentTimeMillis2));
            long syncs2 = getSyncs(accumuloClient);
            System.out.println("Syncs " + (syncs2 - syncs));
            accumuloClient.instanceOperations().setProperty(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX.getKey(), this.LARGE_QUEUE_SIZE);
            accumuloClient.tableOperations().flush(str, (Text) null, (Text) null, true);
            UtilWaitThread.sleepUninterruptibly(1L, TimeUnit.SECONDS);
            createBatchWriter = accumuloClient.createBatchWriter(str, batchWriterConfig);
            try {
                long currentTimeMillis3 = System.currentTimeMillis();
                long j2 = 0;
                for (int i2 = 0; i2 < N; i2++) {
                    random.nextBytes(bArr);
                    Mutation mutation2 = new Mutation(bArr);
                    mutation2.put("", "", "");
                    createBatchWriter.addMutation(mutation2);
                    j2 += mutation2.estimatedMemoryUsed();
                }
                if (createBatchWriter != null) {
                    createBatchWriter.close();
                }
                double currentTimeMillis4 = (System.currentTimeMillis() - currentTimeMillis3) / 1000.0d;
                double d2 = j2 / this.LARGE_QUEUE_SIZE;
                System.out.printf("Sent %d bytes in %f secs approximately %d syncs (%f syncs per sec)%n", Long.valueOf(j2), Double.valueOf(currentTimeMillis4), Long.valueOf((long) d2), Double.valueOf(d2 / currentTimeMillis4));
                long syncs3 = getSyncs(accumuloClient);
                System.out.println("Syncs " + (syncs3 - syncs2));
                Assertions.assertTrue(syncs3 - syncs2 < syncs2);
                if (accumuloClient != null) {
                    accumuloClient.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (accumuloClient != null) {
                try {
                    accumuloClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private long getSyncs(AccumuloClient accumuloClient) throws Exception {
        ServerContext serverContext = getServerContext();
        Iterator it = accumuloClient.instanceOperations().getTabletServers().iterator();
        if (!it.hasNext()) {
            return 0L;
        }
        return ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, HostAndPort.fromString((String) it.next()), serverContext).getTabletServerStatus((TInfo) null, serverContext.rpcCreds()).syncs;
    }
}
