package com.ngdata.sep.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.ngdata.sep.EventListener;
import com.ngdata.sep.PayloadExtractor;
import com.ngdata.sep.SepEvent;
import com.ngdata.sep.util.concurrent.WaitPolicy;
import com.ngdata.sep.util.zookeeper.ZooKeeperItf;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:com/ngdata/sep/impl/SepConsumer.class */
public class SepConsumer {
    private final String subscriptionId;
    private final long subscriptionTimestamp;
    private final EventListener listener;
    private final ZooKeeperItf zk;
    private final Configuration hbaseConf;
    private final HRegionServer regionServer;
    private final ServerName serverName;
    private final SepMetrics sepMetrics;
    private final PayloadExtractor payloadExtractor;
    private String zkNodePath;
    private final List<ThreadPoolExecutor> executors;
    private final Predicate<TableName> tableNamePredicate;
    private boolean running;
    private final Log log;
    private static final AtomicBoolean AT_MOST_ONCE = new AtomicBoolean(true);
    private static final String MASTERLESS_ROOT_ZK_PATH = "/ngdata/sep/hbase-masterless";
    private static final String MASTERLESS_ZK_DIR_SUFFIX = "hbaseindexer.masterless.zkDirSuffix";

    public SepConsumer(String str, long j, EventListener eventListener, int i, String str2, ZooKeeperItf zooKeeperItf, Configuration configuration) throws IOException, InterruptedException {
        this(str, j, eventListener, i, str2, zooKeeperItf, configuration, null, null);
    }

    public SepConsumer(String str, long j, EventListener eventListener, int i, String str2, ZooKeeperItf zooKeeperItf, Configuration configuration, PayloadExtractor payloadExtractor, Predicate<TableName> predicate) throws IOException, InterruptedException {
        this.running = false;
        this.log = LogFactory.getLog(getClass());
        Preconditions.checkArgument(i > 0, "Thread count must be > 0");
        this.subscriptionId = SepModelImpl.toInternalSubscriptionName(str);
        this.subscriptionTimestamp = j;
        this.listener = eventListener;
        this.zk = zooKeeperItf;
        this.hbaseConf = configuration;
        this.sepMetrics = new SepMetrics(str);
        this.payloadExtractor = payloadExtractor;
        this.executors = Lists.newArrayListWithCapacity(i);
        this.tableNamePredicate = predicate == null ? TableNamePredicates.getAlwaysMatchingTableNamePredicate() : predicate;
        Configuration create = HBaseConfiguration.create();
        create.addResource("hbase-indexer-site-masterless-defaults.xml");
        create.addResource("hbase-indexer-site.xml");
        create.set("hbase.client.connection.impl", SepConnection.class.getName());
        create.set("hbase.replication.sink.walentrysinkfilter", SepWALEntrySinkFilter.class.getName());
        String str3 = create.get(MASTERLESS_ZK_DIR_SUFFIX);
        String str4 = str3 == null ? MASTERLESS_ROOT_ZK_PATH : MASTERLESS_ROOT_ZK_PATH + str3.trim();
        create.set("zookeeper.znode.parent", str4);
        this.log.info("Creating masterless RegionServer using zookeeper.znode.parent:" + str4);
        create.setInt("hbase.regionserver.port", 0);
        String str5 = configuration.get("hbase.zookeeper.property.clientPort");
        if (str5 != null) {
            create.set("hbase.zookeeper.property.clientPort", str5);
        }
        create.set(SepConnection.SUBSCRIPTION_ID_PARAM_NAME, str);
        DefaultMetricsSystem.setMiniClusterMode(true);
        if (AT_MOST_ONCE.getAndSet(false)) {
            dumpConfiguration("Creating masterless RegionServer using", create);
        }
        this.regionServer = new HRegionServer(create);
        this.serverName = this.regionServer.getServerName();
        ZKUtil.loginClient(configuration, "hbase.zookeeper.client.keytab.file", "hbase.zookeeper.client.kerberos.principal", str2);
        User.login(configuration, "hbase.regionserver.keytab.file", "hbase.regionserver.kerberos.principal", str2);
        for (int i2 = 0; i2 < i; i2++) {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue(100));
            threadPoolExecutor.setRejectedExecutionHandler(new WaitPolicy());
            this.executors.add(threadPoolExecutor);
        }
    }

    public void start() throws InterruptedException, KeeperException {
        SepConnection.PARAMS_MAP.put(this.subscriptionId, new SepConnectionParams(this.tableNamePredicate, this.subscriptionTimestamp, this));
        Thread thread = new Thread() { // from class: com.ngdata.sep.impl.SepConsumer.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                SepConsumer.this.regionServer.run();
            }
        };
        thread.setDaemon(true);
        thread.start();
        this.regionServer.waitForServerOnline();
        this.zkNodePath = this.hbaseConf.get("hbasesep.zookeeper.znode.parent", "/ngdata/sep/hbase-slave") + "/" + this.subscriptionId + "/rs/" + this.serverName.getServerName();
        this.log.debug("Publishing our existence in zk at zkNodePathForSlave:" + this.zkNodePath);
        this.zk.create(this.zkNodePath, null, CreateMode.EPHEMERAL);
        this.running = true;
    }

    public void stop() {
        if (this.running) {
            this.running = false;
            if (this.regionServer != null) {
                this.regionServer.stop("Stopping masterless regionserver for subscriptionId:" + this.subscriptionId);
            }
            SepConnection.PARAMS_MAP.remove(this.subscriptionId);
            try {
                this.zk.delete(this.zkNodePath, -1);
            } catch (Exception e) {
                this.log.debug("Exception while removing zookeeper node", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.sepMetrics.shutdown();
        Iterator<ThreadPoolExecutor> it = this.executors.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void replicateBatch(List<? extends Row> list, Object[] objArr, TableName tableName) throws IOException {
        byte[] extractPayload;
        if (this.log.isDebugEnabled()) {
            this.log.debug("replicateBatch() received " + list.size() + " events from table " + tableName + " for subscriptionId " + this.subscriptionId);
        }
        long j = -1;
        SepEventExecutor sepEventExecutor = new SepEventExecutor(this.listener, this.executors, 100, this.sepMetrics);
        Iterator<? extends Row> it = list.iterator();
        while (it.hasNext()) {
            Mutation mutation = (Row) it.next();
            if (!(mutation instanceof Mutation)) {
                throw new RuntimeException("Unreachable code for row class: " + mutation.getClass().getName());
            }
            Mutation mutation2 = mutation;
            ArrayListMultimap create = ArrayListMultimap.create();
            HashMap newHashMap = Maps.newHashMap();
            CellScanner cellScanner = mutation2.cellScanner();
            while (cellScanner.advance()) {
                Cell current = cellScanner.current();
                j = Math.max(j, current.getTimestamp());
                ByteBuffer wrap = ByteBuffer.wrap(current.getRowArray(), current.getRowOffset(), current.getRowLength());
                KeyValue ensureKeyValue = KeyValueUtil.ensureKeyValue(current);
                if (this.payloadExtractor != null && (extractPayload = this.payloadExtractor.extractPayload(tableName.toBytes(), ensureKeyValue)) != null) {
                    if (newHashMap.containsKey(wrap)) {
                        this.log.error("Multiple payloads encountered for row " + Bytes.toStringBinary(wrap) + ", choosing " + Bytes.toStringBinary((byte[]) newHashMap.get(wrap)));
                    } else {
                        newHashMap.put(wrap, extractPayload);
                    }
                }
                create.put(wrap, ensureKeyValue);
            }
            for (ByteBuffer byteBuffer : create.keySet()) {
                List list2 = (List) create.get(byteBuffer);
                sepEventExecutor.scheduleSepEvent(new SepEvent(tableName.toBytes(), CellUtil.cloneRow((Cell) list2.get(0)), list2, (byte[]) newHashMap.get(byteBuffer)));
            }
        }
        waitOnSepEventCompletion(sepEventExecutor.flush());
        if (j > 0) {
            this.sepMetrics.reportSepTimestamp(j);
        }
    }

    private void waitOnSepEventCompletion(List<Future<?>> list) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<Future<?>> it = list.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted in processing events.", e);
            } catch (Exception e2) {
                this.log.warn("Error processing a batch of SEP events, the error will be forwarded to HBase for retry", e2);
                newArrayList.add(e2);
            }
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        this.log.error("Encountered exceptions on " + newArrayList.size() + " batches (out of " + list.size() + " total batches)");
        throw new RuntimeException((Throwable) newArrayList.get(0));
    }

    private void dumpConfiguration(String str, Configuration configuration) throws IOException {
        StringWriter stringWriter = new StringWriter();
        Configuration.dumpConfiguration(configuration, stringWriter);
        this.log.info(str + " " + configuration + " with properties: " + stringWriter.toString());
    }
}
