package com.arakelian.elastic.bulk;

import com.arakelian.core.utils.ExecutorUtils;
import com.arakelian.core.utils.MoreStringUtils;
import com.arakelian.elastic.ElasticClient;
import com.arakelian.elastic.bulk.BulkOperation;
import com.arakelian.elastic.bulk.event.IndexerListener;
import com.arakelian.elastic.model.BulkIndexerConfig;
import com.arakelian.elastic.model.BulkIndexerStats;
import com.arakelian.elastic.model.BulkResponse;
import com.arakelian.elastic.model.ImmutableBulkIndexerStats;
import com.arakelian.elastic.refresh.RefreshLimiter;
import com.arakelian.elastic.utils.ElasticClientUtils;
import com.arakelian.retry.RetryException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import repackaged.com.arakelian.elastic.com.google.common.base.MoreObjects;
import repackaged.com.arakelian.elastic.com.google.common.base.Preconditions;
import repackaged.com.arakelian.elastic.com.google.common.base.Stopwatch;
import repackaged.com.arakelian.elastic.com.google.common.collect.ImmutableList;
import repackaged.com.arakelian.elastic.com.google.common.collect.Lists;
import repackaged.com.arakelian.elastic.com.google.common.collect.UnmodifiableIterator;
import repackaged.com.arakelian.elastic.com.google.common.util.concurrent.Futures;
import repackaged.com.arakelian.elastic.com.google.common.util.concurrent.ListenableFuture;
import repackaged.com.arakelian.elastic.com.google.common.util.concurrent.ListeningExecutorService;
import repackaged.com.arakelian.elastic.com.google.common.util.concurrent.MoreExecutors;
import repackaged.com.arakelian.elastic.com.google.common.util.concurrent.Uninterruptibles;
import repackaged.com.arakelian.elastic.org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/arakelian/elastic/bulk/BulkIndexer.class */
public class BulkIndexer implements Closeable {
    private static final int ONE_KB = 1024;
    private static final AtomicInteger BULK_ID;
    private static final Logger LOGGER;
    private final BulkIndexerConfig config;
    private final ElasticClient elasticClient;
    private final RefreshLimiter refreshLimiter;
    private final ScheduledExecutorService flushExecutor;
    private int totalPendingBytes;
    private final Thread shutdownHook;
    private final LinkedBlockingQueue<Runnable> batchWorkQueue;
    private final LinkedBlockingQueue<Runnable> bulkResponseWorkQueue;
    private final ThreadPoolExecutor batchExecutor;
    private final ListeningExecutorService listeningBatchExecutor;
    private final ThreadPoolExecutor bulkResponseExecutor;
    private final ListeningExecutorService listeningBulkResponseExecutor;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Lock batchLock = new ReentrantLock();
    private final List<BulkOperation> pendingOperations = Lists.newArrayList();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicInteger submitted = new AtomicInteger();
    private final AtomicInteger retries = new AtomicInteger();
    private final AtomicLong totalBytes = new AtomicLong();
    private final AtomicInteger successful = new AtomicInteger();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicInteger versionConflicts = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arakelian/elastic/bulk/BulkIndexer$Batch.class */
    public final class Batch implements Callable<BulkResponse> {
        private final int id;
        private final ImmutableList<BulkOperation> operations;
        private final int totalBytes;
        private final int delayMillis;
        private final int attempt;
        private final Reason reason;

        public Batch(ImmutableList<BulkOperation> immutableList, int i, int i2, int i3, Reason reason) {
            Preconditions.checkNotNull(immutableList);
            this.id = BulkIndexer.BULK_ID.incrementAndGet();
            this.operations = immutableList;
            this.totalBytes = i;
            this.delayMillis = i2;
            this.reason = reason;
            this.attempt = Math.min(i3, 1);
            BulkIndexer.this.totalBytes.addAndGet(i);
        }

        private CharSequence buildPayload() {
            StringBuilder sb = new StringBuilder(BulkIndexer.roundAllocation(this.totalBytes));
            UnmodifiableIterator<BulkOperation> it = this.operations.iterator();
            while (it.hasNext()) {
                sb.append(it.next().getOperation());
            }
            return sb;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public BulkResponse call() throws IOException, InterruptedException {
            if (this.delayMillis != 0) {
                BulkIndexer.LOGGER.info("Waiting {} before sending retry of {}", MoreStringUtils.toString(this.delayMillis, TimeUnit.MILLISECONDS), this);
                Thread.sleep(this.delayMillis);
            }
            BulkIndexer.LOGGER.info("Sending {}", this);
            CharSequence buildPayload = buildPayload();
            try {
                try {
                    BulkResponse bulkResponse = (BulkResponse) BulkIndexer.this.config.getRetryer().call(() -> {
                        return BulkIndexer.this.elasticClient.bulk(buildPayload, false);
                    });
                    refreshIndexes();
                    return bulkResponse;
                } catch (ExecutionException e) {
                    throw new IOException("Unable to index " + this, e.getCause());
                } catch (RetryException e2) {
                    throw new IOException("Unable to index " + this, e2);
                }
            } catch (Throwable th) {
                refreshIndexes();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void failed(Throwable th) {
            IndexerListener listener = BulkIndexer.this.config.getListener();
            UnmodifiableIterator<BulkOperation> it = this.operations.iterator();
            while (it.hasNext()) {
                BulkOperation next = it.next();
                BulkIndexer.this.failed.incrementAndGet();
                listener.onFailure(next, th);
            }
        }

        private void refreshIndexes() {
            UnmodifiableIterator<BulkOperation> it = this.operations.iterator();
            while (it.hasNext()) {
                String name = it.next().getIndex().getName();
                try {
                    BulkIndexer.this.refreshLimiter.enqueueRefresh(name);
                } catch (RejectedExecutionException e) {
                    BulkIndexer.LOGGER.warn("Unable to queue refresh of index \"{}\"", name, e);
                }
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).omitNullValues().add("id", this.id).add("operations", this.operations.size()).add("totalBytes", this.totalBytes).add("attempt", this.attempt).add("reason", this.reason).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arakelian/elastic/bulk/BulkIndexer$BatchListener.class */
    public final class BatchListener implements Runnable {
        private final Stopwatch queued;
        private final Batch batch;
        private final ListenableFuture<BulkResponse> future;
        static final /* synthetic */ boolean $assertionsDisabled;

        private BatchListener(ListenableFuture<BulkResponse> listenableFuture, Batch batch, Stopwatch stopwatch) {
            this.future = listenableFuture;
            this.batch = batch;
            this.queued = stopwatch;
        }

        public void onFailure(Throwable th) {
            BulkIndexer.LOGGER.warn("{} failed after {}", new Object[]{this.batch, this.queued, th});
            this.batch.failed(th);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void onSuccess(BulkResponse bulkResponse) throws RejectedExecutionException {
            BulkIndexer.LOGGER.debug("Completed {} after {}", this.batch, this.queued);
            IndexerListener listener = BulkIndexer.this.config.getListener();
            List<BulkResponse.Item> items = bulkResponse.getItems();
            int size = items.size();
            Preconditions.checkState(size == this.batch.operations.size(), "Number of responses (%s) does not match number of batch operations (%s)", size, this.batch.operations.size());
            ArrayList arrayList = null;
            for (int i = 0; i < size; i++) {
                BulkOperation bulkOperation = (BulkOperation) this.batch.operations.get(i);
                BulkResponse.BulkOperationResponse bulkOperationResponse = items.get(i).get();
                int status = bulkOperationResponse.getStatus();
                if (status >= 200 && status < 300) {
                    BulkIndexer.this.successful.incrementAndGet();
                    listener.onSuccess(bulkOperation, status);
                } else if (bulkOperation.getAction() == BulkOperation.Action.DELETE && status == 404) {
                    BulkIndexer.this.successful.incrementAndGet();
                    listener.onSuccess(bulkOperation, status);
                } else if (BulkIndexer.this.isClosed() || !ElasticClientUtils.retryIfResponse(status)) {
                    BulkIndexer.this.failed.incrementAndGet();
                    if (status == 409) {
                        BulkIndexer.this.versionConflicts.incrementAndGet();
                    }
                    listener.onFailure(bulkOperation, bulkOperationResponse);
                } else {
                    Preconditions.checkState(StringUtils.equals(bulkOperation.getId(), bulkOperationResponse.getId()), "Response id %s did not match request type %s", bulkOperationResponse.getId(), bulkOperation.getId());
                    Preconditions.checkState(StringUtils.equals(bulkOperation.getType(), bulkOperationResponse.getType()), "Response type %s did not match request type %s", bulkOperationResponse.getType(), bulkOperation.getType());
                    BulkIndexer.this.retries.incrementAndGet();
                    if (arrayList == null) {
                        arrayList = new ArrayList(size);
                    }
                    arrayList.add(bulkOperation);
                }
            }
            if (arrayList != null) {
                int i2 = 0;
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    i2 += ((BulkOperation) it.next()).getOperation().length();
                }
                ListenableFuture submitBatch = BulkIndexer.this.submitBatch(new Batch(ImmutableList.copyOf((Collection) arrayList), i2, BulkIndexer.this.config.getPartialRetryDelayMillis(), this.batch.attempt + 1, Reason.RETRY));
                if (!$assertionsDisabled && submitBatch == null) {
                    throw new AssertionError();
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BulkResponse bulkResponse = (BulkResponse) Uninterruptibles.getUninterruptibly(this.future);
                Preconditions.checkArgument(bulkResponse != null, "Batch response must be non-null");
                onSuccess(bulkResponse);
            } catch (Error e) {
                onFailure(e);
            } catch (RuntimeException e2) {
                onFailure(e2);
            } catch (ExecutionException e3) {
                onFailure(e3.getCause());
            }
        }

        static {
            $assertionsDisabled = !BulkIndexer.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/arakelian/elastic/bulk/BulkIndexer$PeriodicFlush.class */
    private final class PeriodicFlush implements Runnable {
        private PeriodicFlush() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (BulkIndexer.this.isClosed()) {
                return;
            }
            BulkIndexer.this.flushQuietly();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/arakelian/elastic/bulk/BulkIndexer$Reason.class */
    public enum Reason {
        FORCE,
        MAX_OPERATIONS,
        MAX_BYTES,
        RETRY
    }

    public static int roundAllocation(int i) {
        return (((i + ONE_KB) - 1) / ONE_KB) * ONE_KB;
    }

    public BulkIndexer(ElasticClient elasticClient, BulkIndexerConfig bulkIndexerConfig, RefreshLimiter refreshLimiter) {
        this.config = (BulkIndexerConfig) Preconditions.checkNotNull(bulkIndexerConfig, "config must be non-null");
        this.elasticClient = (ElasticClient) Preconditions.checkNotNull(elasticClient, "elasticClient must be non-null");
        this.refreshLimiter = (RefreshLimiter) Preconditions.checkNotNull(refreshLimiter, "refreshLimiter must be non-null");
        RejectedExecutionHandler blockCallerPolicy = bulkIndexerConfig.isBlockingQueue() ? new BlockCallerPolicy() : new ThreadPoolExecutor.AbortPolicy();
        this.batchWorkQueue = new LinkedBlockingQueue<>(bulkIndexerConfig.getQueueSize());
        this.batchExecutor = new ThreadPoolExecutor(1, bulkIndexerConfig.getMaximumThreads(), 0L, TimeUnit.MILLISECONDS, this.batchWorkQueue, ExecutorUtils.newThreadFactory(getClass(), "-batch", false), blockCallerPolicy);
        this.listeningBatchExecutor = MoreExecutors.listeningDecorator(this.batchExecutor);
        this.bulkResponseWorkQueue = new LinkedBlockingQueue<>(bulkIndexerConfig.getQueueSize());
        this.bulkResponseExecutor = new ThreadPoolExecutor(1, bulkIndexerConfig.getMaximumThreads(), 0L, TimeUnit.MILLISECONDS, this.bulkResponseWorkQueue, ExecutorUtils.newThreadFactory(getClass(), "-response", true), blockCallerPolicy);
        this.listeningBulkResponseExecutor = MoreExecutors.listeningDecorator(this.bulkResponseExecutor);
        int automaticFlushMillis = bulkIndexerConfig.getAutomaticFlushMillis();
        if (automaticFlushMillis != 0) {
            this.flushExecutor = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1, ExecutorUtils.newThreadFactory(getClass(), "-flush", false)), 1L, TimeUnit.MINUTES);
            this.flushExecutor.scheduleWithFixedDelay(new PeriodicFlush(), automaticFlushMillis, automaticFlushMillis, TimeUnit.MILLISECONDS);
        } else {
            this.flushExecutor = null;
        }
        this.shutdownHook = ExecutorUtils.createShutdownHook(this);
    }

    public Optional<ListenableFuture<BulkResponse>> add(BulkOperation bulkOperation, boolean z) throws RejectedExecutionException {
        ListenableFuture<BulkResponse> enqueue = enqueue(bulkOperation, z);
        if (enqueue != null) {
            return Optional.of(enqueue);
        }
        Preconditions.checkState(!z, "Expected bulk operation to result in future");
        return Optional.empty();
    }

    public Optional<ListenableFuture<List<BulkResponse>>> add(List<BulkOperation> list, boolean z) throws RejectedExecutionException {
        ArrayList arrayList = null;
        int i = 0;
        int size = list.size();
        while (i < size) {
            ListenableFuture<BulkResponse> enqueue = enqueue(list.get(i), z && i == size - 1);
            if (!$assertionsDisabled && z && enqueue == null) {
                throw new AssertionError();
            }
            if (enqueue != null) {
                if (arrayList == null) {
                    arrayList = new ArrayList();
                }
                arrayList.add(enqueue);
            }
            i++;
        }
        return (arrayList == null || arrayList.size() == 0) ? Optional.empty() : Optional.of(Futures.allAsList(arrayList));
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws BulkIndexerFailed {
        this.batchLock.lock();
        try {
            LOGGER.info("Closing {}", this);
            flushQuietly();
            if (this.closed.compareAndSet(false, true)) {
                long shutdownTimeout = this.config.getShutdownTimeout();
                TimeUnit shutdownTimeoutUnit = this.config.getShutdownTimeoutUnit();
                if (this.flushExecutor != null) {
                    ExecutorUtils.shutdown(this.flushExecutor, shutdownTimeout, shutdownTimeoutUnit, true);
                }
                ExecutorUtils.shutdown(this.listeningBatchExecutor, shutdownTimeout, shutdownTimeoutUnit, true);
                ExecutorUtils.shutdown(this.listeningBulkResponseExecutor, shutdownTimeout, shutdownTimeoutUnit, true);
                ExecutorUtils.removeShutdownHook(this.shutdownHook);
                BulkIndexerStats stats = getStats();
                this.config.getListener().closed(stats);
                if (stats.getSuccessful() != stats.getSubmitted() + stats.getRetries()) {
                    throw new BulkIndexerFailed(stats);
                }
            }
        } finally {
            this.batchLock.unlock();
        }
    }

    private Batch createBatch(boolean z) throws RejectedExecutionException {
        this.batchLock.lock();
        try {
            int size = this.pendingOperations.size();
            if (size == 0) {
                return null;
            }
            ensureOpen();
            Reason reason = z ? Reason.FORCE : size >= this.config.getMaxBulkOperations() ? Reason.MAX_OPERATIONS : this.totalPendingBytes > this.config.getMaxBulkOperationBytes() ? Reason.MAX_BYTES : null;
            if (reason == null) {
                this.batchLock.unlock();
                return null;
            }
            Batch batch = new Batch(ImmutableList.copyOf((Collection) this.pendingOperations), this.totalPendingBytes, 0, 1, reason);
            this.pendingOperations.clear();
            this.totalPendingBytes = 0;
            this.batchLock.unlock();
            return batch;
        } finally {
            this.batchLock.unlock();
        }
    }

    private ListenableFuture<BulkResponse> enqueue(BulkOperation bulkOperation, boolean z) throws RejectedExecutionException {
        Preconditions.checkArgument(bulkOperation != null, "bulkOperation must be non-null");
        this.batchLock.lock();
        try {
            ensureOpen();
            CharSequence operation = bulkOperation.getOperation();
            Preconditions.checkState(operation.charAt(operation.length() - 1) == '\n', "Bulk operations must end with newline");
            this.totalPendingBytes += operation.length();
            this.pendingOperations.add(bulkOperation);
            this.submitted.incrementAndGet();
            Batch createBatch = createBatch(z);
            this.batchLock.unlock();
            if (createBatch != null) {
                return submitBatch(createBatch);
            }
            return null;
        } catch (Throwable th) {
            this.batchLock.unlock();
            throw th;
        }
    }

    private void ensureOpen() throws RejectedExecutionException {
        if (isClosed()) {
            throw new AlreadyClosedException("Bulk indexer is closed");
        }
    }

    public ListenableFuture<BulkResponse> flush() throws RejectedExecutionException {
        Batch createBatch = createBatch(true);
        if (createBatch != null) {
            return submitBatch(createBatch);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushQuietly() {
        try {
            flush();
        } catch (Exception e) {
            LOGGER.warn("Unable to flush {}", this, e);
        }
    }

    public final BulkIndexerConfig getConfig() {
        return this.config;
    }

    public RefreshLimiter getRefreshLimiter() {
        return this.refreshLimiter;
    }

    public int getResponseQueueActiveThreads() {
        return this.bulkResponseExecutor.getActiveCount();
    }

    public int getResponseQueueSize() {
        return this.bulkResponseWorkQueue.size();
    }

    public BulkIndexerStats getStats() {
        return ImmutableBulkIndexerStats.builder().submitted(this.submitted.get()).retries(this.retries.get()).totalBytes(this.totalBytes.get()).successful(this.successful.get()).failed(this.failed.get()).versionConflicts(this.versionConflicts.get()).build();
    }

    public int getWorkQueueActiveThreads() {
        return this.batchExecutor.getActiveCount();
    }

    public int getWorkQueueSize() {
        return this.batchWorkQueue.size();
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public boolean isIdle() {
        return this.batchWorkQueue.isEmpty() && this.bulkResponseWorkQueue.isEmpty() && getWorkQueueActiveThreads() == 0 && getResponseQueueActiveThreads() == 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<BulkResponse> submitBatch(Batch batch) throws RejectedExecutionException {
        int maxPartialRetries = this.config.getMaxPartialRetries();
        if (batch.attempt > maxPartialRetries) {
            throw new RejectedExecutionException("Bulk indexer rejected after " + maxPartialRetries + " attempts: " + batch);
        }
        LOGGER.info("Queuing {}", batch);
        Stopwatch createStarted = Stopwatch.createStarted();
        try {
            ListenableFuture<BulkResponse> submit = this.listeningBatchExecutor.submit((Callable) batch);
            submit.addListener(new BatchListener(submit, batch, createStarted), this.listeningBulkResponseExecutor);
            return submit;
        } catch (RejectedExecutionException e) {
            batch.failed(e);
            throw new RejectedExecutionException("Bulk indexer failed to process " + batch, e);
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).omitNullValues().add("config", this.config).toString();
    }

    static {
        $assertionsDisabled = !BulkIndexer.class.desiredAssertionStatus();
        BULK_ID = new AtomicInteger(0);
        LOGGER = LoggerFactory.getLogger(BulkIndexer.class);
    }
}
