package org.apache.phoenix.trace;

import com.google.common.collect.ImmutableMap;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.phoenix.end2end.BaseTenantSpecificViewIndexIT;
import org.apache.phoenix.end2end.HBaseManagedTimeTest;
import org.apache.phoenix.metrics.Metrics;
import org.apache.phoenix.trace.TraceReader;
import org.cloudera.htrace.Sampler;
import org.cloudera.htrace.Span;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({HBaseManagedTimeTest.class})
/* loaded from: input_file:org/apache/phoenix/trace/PhoenixTracingEndToEndIT.class */
public class PhoenixTracingEndToEndIT extends BaseTracingTestIT {
    private static final Log LOG = LogFactory.getLog(PhoenixTracingEndToEndIT.class);
    private static final int MAX_RETRIES = 10;
    private final String table = "ENABLED_FOR_LOGGING";
    private final String index = "ENABALED_FOR_LOGGING_INDEX";
    private static DisableableMetricsWriter sink;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/phoenix/trace/PhoenixTracingEndToEndIT$CountDownConnection.class */
    public static class CountDownConnection extends DelegatingConnection {
        private CountDownLatch commit;

        public CountDownConnection(Connection connection, CountDownLatch countDownLatch) {
            super(connection);
            this.commit = countDownLatch;
        }

        @Override // org.apache.phoenix.trace.DelegatingConnection, java.sql.Connection
        public void commit() throws SQLException {
            this.commit.countDown();
            super.commit();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/phoenix/trace/PhoenixTracingEndToEndIT$TraceChecker.class */
    public abstract class TraceChecker {
        private TraceChecker() {
        }

        public boolean foundTrace(TraceReader.TraceHolder traceHolder) {
            return false;
        }

        public boolean foundTrace(TraceReader.TraceHolder traceHolder, TraceReader.SpanInfo spanInfo) {
            return false;
        }
    }

    @BeforeClass
    public static void setupMetrics() throws Exception {
        PhoenixMetricsSink phoenixMetricsSink = new PhoenixMetricsSink();
        phoenixMetricsSink.initForTesting(getConnectionWithoutTracing());
        sink = new DisableableMetricsWriter(phoenixMetricsSink);
        TracingTestUtil.registerSink(sink);
    }

    @After
    public void cleanup() {
        sink.disable();
        sink.clear();
        sink.enable();
    }

    private static void waitForCommit(CountDownLatch countDownLatch) throws SQLException {
        replaceWriterConnection(new CountDownConnection(getConnectionWithoutTracing(), countDownLatch));
    }

    private static void replaceWriterConnection(Connection connection) throws SQLException {
        sink.disable();
        sink.getDelegate().initForTesting(connection);
        sink.enable();
    }

    @Test
    public void testWriteSpans() throws Exception {
        MetricsSource traceMetricSource = new TraceMetricSource();
        Metrics.initialize().register("testWriteSpans-source", "source for testWriteSpans", traceMetricSource);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        waitForCommit(countDownLatch);
        TraceScope startSpan = Trace.startSpan("Start write test", Sampler.ALWAYS);
        Span span = startSpan.getSpan();
        Span child = span.child("child 1");
        child.addTimelineAnnotation("timeline annotation");
        TracingUtils.addAnnotation(child, "test annotation", MAX_RETRIES);
        child.stop();
        Thread.sleep(100L);
        startSpan.close();
        traceMetricSource.receiveSpan(span);
        countDownLatch.await();
        checkStoredTraces(getConnectionWithoutTracing(), new TraceChecker() { // from class: org.apache.phoenix.trace.PhoenixTracingEndToEndIT.1
            @Override // org.apache.phoenix.trace.PhoenixTracingEndToEndIT.TraceChecker
            public boolean foundTrace(TraceReader.TraceHolder traceHolder, TraceReader.SpanInfo spanInfo) {
                if (!spanInfo.description.equals("child 1")) {
                    return false;
                }
                Assert.assertEquals("Not all annotations present", 1L, spanInfo.annotationCount);
                Assert.assertEquals("Not all tags present", 1L, spanInfo.tagCount);
                boolean z = false;
                Iterator it = spanInfo.annotations.iterator();
                while (it.hasNext()) {
                    if (((String) it.next()).startsWith("test annotation")) {
                        z = true;
                    }
                }
                Assert.assertTrue("Missing the annotations in span: " + spanInfo, z);
                boolean z2 = false;
                Iterator it2 = spanInfo.tags.iterator();
                while (it2.hasNext()) {
                    if (((String) it2.next()).endsWith("timeline annotation")) {
                        z2 = true;
                    }
                }
                Assert.assertTrue("Missing the tags in span: " + spanInfo, z2);
                return true;
            }
        });
    }

    @Test
    public void testClientServerIndexingTracing() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        waitForCommit(countDownLatch);
        Connection connectionWithoutTracing = getConnectionWithoutTracing();
        createTestTable(connectionWithoutTracing, true);
        Connection tracingConnection = getTracingConnection();
        LOG.debug("Doing dummy the writes to the tracked table");
        PreparedStatement prepareStatement = tracingConnection.prepareStatement("UPSERT INTO ENABLED_FOR_LOGGING VALUES (?, ?)");
        prepareStatement.setString(1, "key1");
        prepareStatement.setLong(2, 1L);
        prepareStatement.execute();
        prepareStatement.setString(1, "key2");
        prepareStatement.setLong(2, 2L);
        prepareStatement.execute();
        tracingConnection.commit();
        LOG.debug("Waiting for latch to complete!");
        countDownLatch.await(200L, TimeUnit.SECONDS);
        Assert.assertTrue("Never found indexing updates", checkStoredTraces(connectionWithoutTracing, new TraceChecker() { // from class: org.apache.phoenix.trace.PhoenixTracingEndToEndIT.2
            @Override // org.apache.phoenix.trace.PhoenixTracingEndToEndIT.TraceChecker
            public boolean foundTrace(TraceReader.TraceHolder traceHolder, TraceReader.SpanInfo spanInfo) {
                String traceHolder2 = traceHolder.toString();
                if (traceHolder2.contains("SYSTEM.TRACING_STATS")) {
                    return false;
                }
                return traceHolder2.contains("Completing index");
            }
        }));
    }

    private void createTestTable(Connection connection, boolean z) throws SQLException {
        connection.createStatement().execute("create table if not exists ENABLED_FOR_LOGGING(k varchar not null, c1 bigint CONSTRAINT pk PRIMARY KEY (k))");
        if (z) {
            connection.createStatement().execute("CREATE INDEX IF NOT EXISTS ENABALED_FOR_LOGGING_INDEX on ENABLED_FOR_LOGGING (c1)");
            connection.commit();
        }
    }

    @Test
    public void testScanTracing() throws Exception {
        Connection tracingConnection = getTracingConnection();
        Connection connectionWithoutTracing = getConnectionWithoutTracing();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        waitForCommit(countDownLatch);
        createTestTable(connectionWithoutTracing, false);
        LOG.debug("Doing dummy the writes to the tracked table");
        PreparedStatement prepareStatement = connectionWithoutTracing.prepareStatement("UPSERT INTO ENABLED_FOR_LOGGING VALUES (?, ?)");
        prepareStatement.setString(1, "key1");
        prepareStatement.setLong(2, 1L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        prepareStatement.setString(1, "key2");
        prepareStatement.setLong(2, 2L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        ResultSet executeQuery = tracingConnection.createStatement().executeQuery("SELECT * FROM ENABLED_FOR_LOGGING");
        Assert.assertTrue("Didn't get first result", executeQuery.next());
        Assert.assertTrue("Didn't get second result", executeQuery.next());
        executeQuery.close();
        Assert.assertTrue("Get expected updates to trace table", countDownLatch.await(200L, TimeUnit.SECONDS));
        Assert.assertTrue("Didn't find the parallel scanner in the tracing", checkStoredTraces(connectionWithoutTracing, new TraceChecker() { // from class: org.apache.phoenix.trace.PhoenixTracingEndToEndIT.3
            @Override // org.apache.phoenix.trace.PhoenixTracingEndToEndIT.TraceChecker
            public boolean foundTrace(TraceReader.TraceHolder traceHolder) {
                return traceHolder.toString().contains("Parallel scanner");
            }
        }));
    }

    @Test
    public void testScanTracingOnServer() throws Exception {
        Connection tracingConnection = getTracingConnection();
        Connection connectionWithoutTracing = getConnectionWithoutTracing();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        waitForCommit(countDownLatch);
        createTestTable(connectionWithoutTracing, false);
        LOG.debug("Doing dummy the writes to the tracked table");
        PreparedStatement prepareStatement = connectionWithoutTracing.prepareStatement("UPSERT INTO ENABLED_FOR_LOGGING VALUES (?, ?)");
        prepareStatement.setString(1, "key1");
        prepareStatement.setLong(2, 1L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        prepareStatement.setString(1, "key2");
        prepareStatement.setLong(2, 2L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        ResultSet executeQuery = tracingConnection.createStatement().executeQuery("SELECT COUNT(*) FROM ENABLED_FOR_LOGGING");
        Assert.assertTrue("Didn't get count result", executeQuery.next());
        Assert.assertEquals("Didn't get the expected number of row", 2L, executeQuery.getInt(1));
        executeQuery.close();
        Assert.assertTrue("Get expected updates to trace table", countDownLatch.await(200L, TimeUnit.SECONDS));
        Assert.assertTrue("Didn't find the parallel scanner in the tracing", checkStoredTraces(connectionWithoutTracing, new TraceChecker() { // from class: org.apache.phoenix.trace.PhoenixTracingEndToEndIT.4
            @Override // org.apache.phoenix.trace.PhoenixTracingEndToEndIT.TraceChecker
            public boolean foundTrace(TraceReader.TraceHolder traceHolder) {
                return traceHolder.toString().contains("Scanner opened on server");
            }
        }));
    }

    @Test
    public void testCustomAnnotationTracing() throws Exception {
        Connection tracingConnection = getTracingConnection(ImmutableMap.of("myannot", "a1"), BaseTenantSpecificViewIndexIT.TENANT1_ID);
        Connection connectionWithoutTracing = getConnectionWithoutTracing();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        waitForCommit(countDownLatch);
        createTestTable(connectionWithoutTracing, false);
        LOG.debug("Doing dummy the writes to the tracked table");
        PreparedStatement prepareStatement = connectionWithoutTracing.prepareStatement("UPSERT INTO ENABLED_FOR_LOGGING VALUES (?, ?)");
        prepareStatement.setString(1, "key1");
        prepareStatement.setLong(2, 1L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        prepareStatement.setString(1, "key2");
        prepareStatement.setLong(2, 2L);
        prepareStatement.execute();
        connectionWithoutTracing.commit();
        connectionWithoutTracing.rollback();
        ResultSet executeQuery = tracingConnection.createStatement().executeQuery("SELECT * FROM ENABLED_FOR_LOGGING");
        Assert.assertTrue("Didn't get first result", executeQuery.next());
        Assert.assertTrue("Didn't get second result", executeQuery.next());
        executeQuery.close();
        Assert.assertTrue("Get expected updates to trace table", countDownLatch.await(200L, TimeUnit.SECONDS));
        assertAnnotationPresent("myannot", "a1", connectionWithoutTracing);
        assertAnnotationPresent("TenantId", BaseTenantSpecificViewIndexIT.TENANT1_ID, connectionWithoutTracing);
    }

    private void assertAnnotationPresent(final String str, final String str2, Connection connection) throws Exception {
        Assert.assertTrue("Didn't find the custom annotation in the tracing", checkStoredTraces(connection, new TraceChecker() { // from class: org.apache.phoenix.trace.PhoenixTracingEndToEndIT.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.phoenix.trace.PhoenixTracingEndToEndIT.TraceChecker
            public boolean foundTrace(TraceReader.TraceHolder traceHolder) {
                return traceHolder.toString().contains(str + " - " + str2);
            }
        }));
    }

    private boolean checkStoredTraces(Connection connection, TraceChecker traceChecker) throws Exception {
        TraceReader traceReader = new TraceReader(connection);
        boolean z = false;
        loop0: for (int i = 0; i < MAX_RETRIES; i++) {
            for (TraceReader.TraceHolder traceHolder : traceReader.readAll(100)) {
                LOG.info("Got trace: " + traceHolder);
                z = traceChecker.foundTrace(traceHolder);
                if (z) {
                    break loop0;
                }
                Iterator it = traceHolder.spans.iterator();
                while (it.hasNext()) {
                    z = traceChecker.foundTrace(traceHolder, (TraceReader.SpanInfo) it.next());
                    if (z) {
                        break loop0;
                    }
                }
            }
            LOG.info("======  Waiting for tracing updates to be propagated ========");
            Thread.sleep(1000L);
        }
        return z;
    }
}
