package org.apache.phoenix.flume;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.sink.DefaultSinkFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
import org.apache.phoenix.flume.serializer.CustomSerializer;
import org.apache.phoenix.flume.serializer.EventSerializers;
import org.apache.phoenix.flume.sink.NullPhoenixSink;
import org.apache.phoenix.flume.sink.PhoenixSink;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/phoenix/flume/PhoenixSinkIT.class */
public class PhoenixSinkIT extends BaseHBaseManagedTimeIT {
    private Context sinkContext;
    private PhoenixSink sink;

    @Test
    public void testSinkCreation() {
        Sink create = new DefaultSinkFactory().create("PhoenixSink__", "org.apache.phoenix.flume.sink.PhoenixSink");
        Assert.assertNotNull(create);
        Assert.assertTrue(PhoenixSink.class.isInstance(create));
    }

    @Test
    public void testConfiguration() {
        this.sinkContext = new Context();
        this.sinkContext.put("table", "test");
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.REGEX.name());
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
    }

    @Test(expected = NullPointerException.class)
    public void testInvalidConfiguration() {
        this.sinkContext = new Context();
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.REGEX.name());
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
    }

    @Test(expected = RuntimeException.class)
    public void testInvalidConfigurationOfSerializer() {
        this.sinkContext = new Context();
        this.sinkContext.put("table", "test");
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", "unknown");
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
    }

    @Test
    public void testInvalidTable() {
        this.sinkContext = new Context();
        this.sinkContext.put("table", "flume_test");
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.REGEX.name());
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        this.sink.setChannel(initChannel());
        try {
            this.sink.start();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage(), e.getMessage().contains("ERROR 1012 (42M03): Table undefined."));
        }
    }

    @Test
    public void testSinkLifecycle() {
        String generateUniqueName = generateUniqueName();
        this.sinkContext = new Context();
        this.sinkContext.put("table", generateUniqueName);
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.REGEX.name());
        this.sinkContext.put("ddl", "CREATE TABLE " + generateUniqueName + "  (flume_time timestamp not null, col1 varchar , col2 varchar  CONSTRAINT pk PRIMARY KEY (flume_time))\n");
        this.sinkContext.put("serializer.regex", "^([^\t]+)\t([^\t]+)$");
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        this.sink.setChannel(initChannel());
        this.sink.start();
        Assert.assertEquals(LifecycleState.START, this.sink.getLifecycleState());
        this.sink.stop();
        Assert.assertEquals(LifecycleState.STOP, this.sink.getLifecycleState());
    }

    @Test
    public void testCreateTable() throws Exception {
        String generateUniqueName = generateUniqueName();
        this.sinkContext = new Context();
        this.sinkContext.put("table", generateUniqueName);
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", EventSerializers.REGEX.name());
        this.sinkContext.put("ddl", "CREATE TABLE " + generateUniqueName + "   (flume_time timestamp not null, col1 varchar , col2 varchar  CONSTRAINT pk PRIMARY KEY (flume_time))\n");
        this.sinkContext.put("serializer.regex", "^([^\t]+)\t([^\t]+)$");
        this.sinkContext.put("serializer.columns", "col1,col2");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        this.sink = new PhoenixSink();
        Configurables.configure(this.sink, this.sinkContext);
        Assert.assertEquals(LifecycleState.IDLE, this.sink.getLifecycleState());
        this.sink.setChannel(initChannel());
        this.sink.start();
        HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin();
        try {
            Assert.assertTrue(admin.tableExists(generateUniqueName));
            admin.close();
        } catch (Throwable th) {
            admin.close();
            throw th;
        }
    }

    @Test
    public void testExtendedSink() throws Exception {
        PhoenixSink phoenixSink = (PhoenixSink) Mockito.mock(NullPhoenixSink.class);
        this.sinkContext = new Context();
        this.sinkContext.put("table", "FLUME_TEST_EXTENDED");
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", CustomSerializer.class.getName());
        this.sinkContext.put("serializer.columns", "ID, COUNTS");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        Configurables.configure(phoenixSink, this.sinkContext);
        ((PhoenixSink) Mockito.verify(phoenixSink)).configure(this.sinkContext);
    }

    @Test
    public void testExtendedSerializer() throws Exception {
        Connection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES));
        connection.createStatement().execute("CREATE TABLE FLUME_TEST_EXTENDED (ID BIGINT NOT NULL PRIMARY KEY, COUNTS UNSIGNED_LONG)");
        connection.commit();
        this.sinkContext = new Context();
        this.sinkContext.put("table", "FLUME_TEST_EXTENDED");
        this.sinkContext.put("jdbcUrl", getUrl());
        this.sinkContext.put("serializer", CustomSerializer.class.getName());
        this.sinkContext.put("serializer.columns", "ID, COUNTS");
        this.sinkContext.put("serializer.rowkeyType", DefaultKeyGenerator.TIMESTAMP.name());
        PhoenixSink phoenixSink = new PhoenixSink();
        Configurables.configure(phoenixSink, this.sinkContext);
        Channel initChannel = initChannel();
        phoenixSink.setChannel(initChannel);
        phoenixSink.start();
        Transaction transaction = initChannel.getTransaction();
        transaction.begin();
        initChannel.put(EventBuilder.withBody(Bytes.toBytes("test event")));
        transaction.commit();
        transaction.close();
        phoenixSink.process();
        phoenixSink.stop();
        ResultSet executeQuery = connection.createStatement().executeQuery("SELECT * FROM FLUME_TEST_EXTENDED");
        Assert.assertTrue(executeQuery.next());
        Assert.assertTrue(executeQuery.getLong(1) == 1);
    }

    private Channel initChannel() {
        Context context = new Context();
        context.put("capacity", "10000");
        context.put("transactionCapacity", "200");
        MemoryChannel memoryChannel = new MemoryChannel();
        memoryChannel.setName("memorychannel");
        Configurables.configure(memoryChannel, context);
        return memoryChannel;
    }
}
