package org.apache.skywalking.apm.collector.storage.es.base.dao;

import java.lang.reflect.Field;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/apm/collector/storage/es/base/dao/BatchProcessEsDAO.class */
public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
    private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
    private BulkProcessor bulkProcessor;
    private final int bulkActions;
    private final int bulkSize;
    private final int flushInterval;
    private final int concurrentRequests;

    public BatchProcessEsDAO(ElasticSearchClient elasticSearchClient, int i, int i2, int i3, int i4) {
        super(elasticSearchClient);
        this.bulkActions = i;
        this.bulkSize = i2;
        this.flushInterval = i3;
        this.concurrentRequests = i4;
    }

    @GraphComputingMetric(name = "/persistence/batchPersistence/")
    public void batchPersistence(List<?> list) {
        if (this.bulkProcessor == null) {
            this.bulkProcessor = createBulkProcessor();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("bulk data size: {}", Integer.valueOf(list.size()));
        }
        if (CollectionUtils.isNotEmpty(list)) {
            list.forEach(obj -> {
                if (obj instanceof IndexRequestBuilder) {
                    this.bulkProcessor.add(((IndexRequestBuilder) obj).request());
                }
                if (obj instanceof UpdateRequestBuilder) {
                    this.bulkProcessor.add(((UpdateRequestBuilder) obj).request());
                }
            });
        }
    }

    private BulkProcessor createBulkProcessor() {
        ElasticSearchClient client = getClient();
        try {
            Field declaredField = client.getClass().getDeclaredField("client");
            declaredField.setAccessible(true);
            return BulkProcessor.builder((Client) declaredField.get(client), new BulkProcessor.Listener() { // from class: org.apache.skywalking.apm.collector.storage.es.base.dao.BatchProcessEsDAO.1
                public void beforeBulk(long j, BulkRequest bulkRequest) {
                }

                public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                }

                public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
                    BatchProcessEsDAO.logger.error("{} data bulk failed, reason: {}", Integer.valueOf(bulkRequest.numberOfActions()), th);
                }
            }).setBulkActions(this.bulkActions).setBulkSize(new ByteSizeValue(this.bulkSize, ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds(this.flushInterval)).setConcurrentRequests(this.concurrentRequests).setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100L), 3)).build();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new UnexpectedException(e.getMessage());
        }
    }
}
