package com.marklogic.client.datamovement.impl;

import com.fasterxml.jackson.databind.JsonNode;
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.MarkLogicInternalException;
import com.marklogic.client.Transaction;
import com.marklogic.client.datamovement.Forest;
import com.marklogic.client.datamovement.ForestConfiguration;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.RowBatchFailureListener;
import com.marklogic.client.datamovement.RowBatchSuccessListener;
import com.marklogic.client.datamovement.RowBatcher;
import com.marklogic.client.expression.PlanBuilder;
import com.marklogic.client.impl.DatabaseClientImpl;
import com.marklogic.client.io.BaseHandle;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.JacksonHandle;
import com.marklogic.client.io.StringHandle;
import com.marklogic.client.io.marker.AbstractWriteHandle;
import com.marklogic.client.io.marker.ContentHandle;
import com.marklogic.client.io.marker.StructureReadHandle;
import com.marklogic.client.row.RawPlanDefinition;
import com.marklogic.client.row.RawQueryDSLPlan;
import com.marklogic.client.row.RowManager;
import com.marklogic.client.util.RequestLogger;
import com.marklogic.client.util.RequestParameters;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl.class */
public class RowBatcherImpl<T> extends BatcherImpl implements RowBatcher<T> {
    private static final int DEFAULT_BATCH_SIZE = 1000;
    private static final long MAX_UNSIGNED_LONG = -1;
    private static Logger logger = LoggerFactory.getLogger((Class<?>) RowBatcherImpl.class);
    private static final String LOWER_BOUND = "ML_LOWER_BOUND";
    private static final String UPPER_BOUND = "ML_UPPER_BOUND";
    private long batchSize;
    private long batchCount;
    private RowBatcherImpl<T>.BatchThreadPoolExecutor threadPool;
    private final AtomicLong batchNum;
    private final AtomicLong failedBatches;
    private final AtomicInteger runningThreads;
    private RowBatchFailureListener[] failureListeners;
    private RowBatchSuccessListener[] successListeners;
    private RawPlanDefinition pagedPlan;
    private long rowCount;
    private HostInfo[] hostInfos;
    private boolean consistentSnapshot;
    private final AtomicLong serverTimestamp;
    private final ContentHandle<T> rowsHandle;
    private final RowManager defaultRowManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$BatchThreadPoolExecutor.class */
    public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
        BatchThreadPoolExecutor(int i) {
            super(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(i), new ThreadPoolExecutor.CallerRunsPolicy());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$HostInfo.class */
    public static class HostInfo {
        private String hostName;
        private DatabaseClient client;
        private RowManager rowMgr;

        private HostInfo() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$RowBatchCallable.class */
    public static class RowBatchCallable<T> implements Callable<Boolean> {
        private RowBatcherImpl rowBatcher;
        private ContentHandle<T> handle;

        RowBatchCallable(RowBatcherImpl<T> rowBatcherImpl, ContentHandle<T> contentHandle) {
            this.rowBatcher = rowBatcherImpl;
            this.handle = contentHandle;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public ContentHandle<T> getHandle() {
            return this.handle;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() {
            try {
                return Boolean.valueOf(this.rowBatcher.readRows(this));
            } catch (Throwable th) {
                RowBatcherImpl.logger.error("internal error", th);
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$RowBatchEventImpl.class */
    public static class RowBatchEventImpl extends BatchEventImpl {
        private String lowerBound;
        private String upperBound;

        private RowBatchEventImpl(long j, String str, String str2) {
            this.lowerBound = "0";
            this.upperBound = "0";
            this.lowerBound = str;
            this.upperBound = str2;
            withJobBatchNumber(j);
        }

        public String getLowerBound() {
            return this.lowerBound;
        }

        public String getUpperBound() {
            return this.upperBound;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$RowBatchFailureEventImpl.class */
    public static class RowBatchFailureEventImpl extends RowBatchEventImpl implements RowBatchFailureListener.RowBatchFailureEvent {
        private static final int DEFAULT_MAX_RETRIES = 10;
        private RowBatchFailureListener.BatchFailureDisposition disposition;
        private int maxRetries;
        private int batchRetries;
        private long failedJobBatches;

        private RowBatchFailureEventImpl(long j, String str, String str2) {
            super(j, str, str2);
            this.maxRetries = 10;
            this.batchRetries = 0;
            this.failedJobBatches = 0L;
            this.disposition = RowBatchFailureListener.BatchFailureDisposition.SKIP;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public int getBatchRetries() {
            return this.batchRetries;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RowBatchFailureEventImpl withBatchRetries(int i) {
            this.batchRetries = i;
            return this;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public long getFailedJobBatches() {
            return this.failedJobBatches;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public RowBatchFailureEventImpl withFailedJobBatches(long j) {
            this.failedJobBatches = j;
            return this;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public RowBatchFailureListener.BatchFailureDisposition getDisposition() {
            return this.disposition;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public RowBatchFailureListener.RowBatchFailureEvent withDisposition(RowBatchFailureListener.BatchFailureDisposition batchFailureDisposition) {
            this.disposition = batchFailureDisposition;
            return this;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public int getMaxRetries() {
            return this.maxRetries;
        }

        @Override // com.marklogic.client.datamovement.RowBatchFailureListener.RowBatchFailureEvent
        public RowBatchFailureEventImpl withMaxRetries(int i) {
            this.maxRetries = i;
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/marklogic/client/datamovement/impl/RowBatcherImpl$RowBatchResponseEventImpl.class */
    public static class RowBatchResponseEventImpl<T> extends RowBatchEventImpl implements RowBatchSuccessListener.RowBatchResponseEvent<T> {
        private T handle;

        private RowBatchResponseEventImpl(long j, String str, String str2, T t) {
            super(j, str, str2);
            this.handle = t;
        }

        @Override // com.marklogic.client.datamovement.RowBatchSuccessListener.RowBatchResponseEvent
        public T getRowsDoc() {
            return this.handle;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RowBatcherImpl(DataMovementManagerImpl dataMovementManagerImpl, ContentHandle<T> contentHandle) {
        super(dataMovementManagerImpl);
        this.batchSize = 0L;
        this.batchCount = 0L;
        this.batchNum = new AtomicLong(0L);
        this.failedBatches = new AtomicLong(0L);
        this.runningThreads = new AtomicInteger(0);
        this.rowCount = 0L;
        this.consistentSnapshot = false;
        this.serverTimestamp = new AtomicLong(-1L);
        validateRowsHandle(contentHandle);
        this.rowsHandle = contentHandle;
        this.defaultRowManager = getPrimaryClient().newRowManager();
        super.withBatchSize(1000);
        if (dataMovementManagerImpl.getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
            withForestConfig(dataMovementManagerImpl.getForestConfig());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void validateRowsHandle(ContentHandle<T> contentHandle) {
        if (contentHandle == 0) {
            throw new IllegalArgumentException("Cannot create RowBatcher with null rows manager");
        }
        if (!(contentHandle instanceof StructureReadHandle)) {
            throw new IllegalArgumentException("Rows handle must also be StructureReadHandle");
        }
        if (!(contentHandle instanceof BaseHandle)) {
            throw new IllegalArgumentException("Rows handle must also be BaseHandle");
        }
        if (((BaseHandle) contentHandle).getFormat() == Format.UNKNOWN) {
            throw new IllegalArgumentException("Rows handle must specify a format");
        }
        Class<?> contentClass = contentHandle.getContentClass();
        if (contentClass == null) {
            throw new IllegalArgumentException("Rows handle cannot have a null content class");
        }
        if (!DatabaseClientFactory.getHandleRegistry().isRegistered(contentClass)) {
            throw new IllegalArgumentException("Rows handle must be registered with DatabaseClientFactory.HandleFactoryRegistry");
        }
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowManager getRowManager() {
        return this.defaultRowManager;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> withBatchView(PlanBuilder.ModifyPlan modifyPlan) {
        if (modifyPlan == null) {
            throw new IllegalArgumentException("Plan cannot be null");
        }
        analyzePlan((AbstractWriteHandle) modifyPlan.export(new StringHandle().withFormat(Format.JSON)));
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> withBatchView(RawPlanDefinition rawPlanDefinition) {
        if (rawPlanDefinition == null) {
            throw new IllegalArgumentException("Raw plan definition cannot be null");
        }
        analyzePlan(rawPlanDefinition.getHandle());
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> withBatchView(RawQueryDSLPlan rawQueryDSLPlan) {
        if (rawQueryDSLPlan == null) {
            throw new IllegalArgumentException("Raw query DSL plan cannot be null");
        }
        analyzePlan(rawQueryDSLPlan.getHandle());
        return this;
    }

    private void analyzePlan(AbstractWriteHandle abstractWriteHandle) {
        requireNotStarted("Must specify batch view before starting job");
        JsonNode jsonNode = ((JacksonHandle) ((DatabaseClientImpl) getPrimaryClient()).getServices().postResource((RequestLogger) null, "internal/viewinfo", (Transaction) null, (RequestParameters) null, abstractWriteHandle, new JacksonHandle())).get();
        this.rowCount = jsonNode.get("rowCount").asLong(0L);
        this.pagedPlan = getRowManager().newRawPlanDefinition(new JacksonHandle(jsonNode.get("modifiedPlan")));
        JsonNode jsonNode2 = jsonNode.get("schemaName");
        Logger logger2 = logger;
        Object[] objArr = new Object[3];
        objArr[0] = jsonNode2 != null ? jsonNode2.asText(null) : null;
        objArr[1] = jsonNode.get("viewName").asText(null);
        objArr[2] = Long.valueOf(this.rowCount);
        logger2.info("plan analysis schema name: {}, view name: {}, row estimate: {}", objArr);
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public RowBatcher<T> withBatchSize(int i) {
        requireNotStarted("Must set batch size before starting job");
        super.withBatchSize(i);
        return this;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public RowBatcher<T> withThreadCount(int i) {
        requireNotStarted("Must set thread count before starting job");
        super.withThreadCount(i);
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> onSuccess(RowBatchSuccessListener rowBatchSuccessListener) {
        requireNotStarted("Must set success listener before starting job");
        if (rowBatchSuccessListener == null) {
            this.successListeners = null;
        } else if (this.successListeners == null || this.successListeners.length == 0) {
            this.successListeners = new RowBatchSuccessListener[]{rowBatchSuccessListener};
        } else {
            this.successListeners = (RowBatchSuccessListener[]) Arrays.copyOf(this.successListeners, this.successListeners.length + 1);
            this.successListeners[this.successListeners.length - 1] = rowBatchSuccessListener;
        }
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> onFailure(RowBatchFailureListener rowBatchFailureListener) {
        requireNotStarted("Must set failure listener before starting job");
        if (rowBatchFailureListener == null) {
            this.failureListeners = null;
        } else if (this.failureListeners == null || this.failureListeners.length == 0) {
            this.failureListeners = new RowBatchFailureListener[]{rowBatchFailureListener};
        } else {
            this.failureListeners = (RowBatchFailureListener[]) Arrays.copyOf(this.failureListeners, this.failureListeners.length + 1);
            this.failureListeners[this.failureListeners.length - 1] = rowBatchFailureListener;
        }
        return this;
    }

    @Override // com.marklogic.client.datamovement.Batcher
    public RowBatcher<T> withJobId(String str) {
        requireNotStarted("Must set job id before starting job");
        super.setJobId(str);
        return this;
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public RowBatcher<T> withJobName(String str) {
        requireNotStarted("Must set job name before starting job");
        super.withJobName(str);
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatcher<T> withConsistentSnapshot() {
        requireNotStarted("Must set consistent snapshot before starting job");
        if (!(this.rowsHandle instanceof BaseHandle)) {
            throw new IllegalStateException("Content handle for consistent snapshot must extend BaseHandle");
        }
        this.consistentSnapshot = true;
        return this;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatchSuccessListener[] getSuccessListeners() {
        return this.successListeners;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public RowBatchFailureListener[] getFailureListeners() {
        return this.failureListeners;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public void setSuccessListeners(RowBatchSuccessListener... rowBatchSuccessListenerArr) {
        requireNotStarted("Must set success listeners before starting job");
        this.successListeners = rowBatchSuccessListenerArr;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public void setFailureListeners(RowBatchFailureListener... rowBatchFailureListenerArr) {
        requireNotStarted("Must set failure listeners before starting job");
        this.failureListeners = rowBatchFailureListenerArr;
    }

    private void initRequestEvent(RowBatchEventImpl rowBatchEventImpl) {
        rowBatchEventImpl.withClient(getPrimaryClient());
        rowBatchEventImpl.withJobTicket(getJobTicket());
    }

    private void notifySuccess(RowBatchSuccessListener.RowBatchResponseEvent<T> rowBatchResponseEvent) {
        if (this.successListeners == null || this.successListeners.length == 0) {
            return;
        }
        for (RowBatchSuccessListener rowBatchSuccessListener : this.successListeners) {
            try {
                rowBatchSuccessListener.processEvent(rowBatchResponseEvent);
            } catch (Throwable th) {
                logger.info("error in success listener: {}", th.toString());
            }
        }
    }

    private void notifyFailure(RowBatchFailureEventImpl rowBatchFailureEventImpl, Throwable th) {
        for (RowBatchFailureListener rowBatchFailureListener : this.failureListeners) {
            RowBatchFailureListener.BatchFailureDisposition disposition = rowBatchFailureEventImpl.getDisposition();
            int maxRetries = rowBatchFailureEventImpl.getMaxRetries();
            try {
                rowBatchFailureListener.processFailure((RowBatchFailureListener.RowBatchFailureEvent) rowBatchFailureEventImpl, th);
            } catch (Throwable th2) {
                logger.info("error in failure listener: {}", th2.toString());
            }
            if (maxRetries < rowBatchFailureEventImpl.getMaxRetries()) {
                rowBatchFailureEventImpl.withMaxRetries(maxRetries);
            }
            RowBatchFailureListener.BatchFailureDisposition disposition2 = rowBatchFailureEventImpl.getDisposition();
            if (disposition != disposition2) {
                switch (disposition) {
                    case SKIP:
                        break;
                    case RETRY:
                        if (disposition2 == RowBatchFailureListener.BatchFailureDisposition.SKIP) {
                            rowBatchFailureEventImpl.withDisposition(disposition);
                            break;
                        } else {
                            break;
                        }
                    case STOP:
                        rowBatchFailureEventImpl.withDisposition(disposition);
                        break;
                    default:
                        throw new MarkLogicInternalException("unknown failure disposition: " + disposition.toString());
                }
            }
        }
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public boolean awaitCompletion() {
        try {
            return awaitCompletion(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public boolean awaitCompletion(long j, TimeUnit timeUnit) throws InterruptedException {
        requireStarted("Must start job before awaiting completion");
        if (this.threadPool != null) {
            return this.threadPool.awaitTermination(j, timeUnit);
        }
        return true;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public long getRowEstimate() {
        if (this.pagedPlan == null) {
            throw new IllegalStateException("Must supply plan before getting the row estimate");
        }
        return this.rowCount;
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public long getBatchCount() {
        requireStarted("Must start job before getting batch count");
        return this.batchNum.get();
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public long getFailedBatches() {
        requireStarted("Must start job before getting failed batches");
        return this.failedBatches.get();
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public JobTicket getJobTicket() {
        requireStarted("Must start job before getting ticket");
        return super.getJobTicket();
    }

    @Override // com.marklogic.client.datamovement.RowBatcher
    public Long getServerTimestamp() {
        long j = this.serverTimestamp.get();
        if (j > -1) {
            return Long.valueOf(j);
        }
        return null;
    }

    private void requireNotStarted(String str) {
        if (isStarted()) {
            throw new IllegalStateException(str);
        }
    }

    private void requireStarted(String str) {
        if (!isStarted()) {
            throw new IllegalStateException(str);
        }
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl
    public void stop() {
        if (super.getStopped().get()) {
            return;
        }
        super.getStopped().set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        super.setJobEndTime();
    }

    private void orderlyStop() {
        if (super.getStopped().get()) {
            return;
        }
        super.getStopped().set(true);
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
        super.setJobEndTime();
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl
    public synchronized void start(JobTicket jobTicket) {
        requireNotStarted("Job already started");
        if (this.pagedPlan == null) {
            throw new IllegalStateException("Plan must be supplied before starting the job");
        }
        if (this.successListeners == null || this.successListeners.length == 0) {
            throw new IllegalStateException("No listener for rows");
        }
        if (this.failureListeners == null || this.failureListeners.length == 0) {
            logger.warn("starting job with default failure listener");
            onFailure((rowBatchFailureEvent, th) -> {
                logger.warn("batch " + rowBatchFailureEvent.getJobBatchNumber() + " failed with error: " + th.getMessage());
            });
        }
        if (super.getBatchSize() <= 0) {
            logger.warn("batchSize must be 1 or greater--setting batchSize to 1000");
            super.withBatchSize(1000);
        }
        this.batchCount = (getRowEstimate() / super.getBatchSize()) + 1;
        this.batchSize = Long.divideUnsigned(-1L, this.batchCount);
        if (!logger.isDebugEnabled() || this.batchSize <= 0) {
            logger.info("batch count: {}", Long.valueOf(this.batchCount));
        } else {
            logger.debug("batch count: {}, calculated batch size: {}", Long.valueOf(this.batchCount), Long.valueOf(this.batchSize));
        }
        if (this.hostInfos != null && getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
            RowManager.RowSetPart datatypeStyle = getRowManager().getDatatypeStyle();
            RowManager.RowStructure rowStructureStyle = getRowManager().getRowStructureStyle();
            for (HostInfo hostInfo : this.hostInfos) {
                hostInfo.rowMgr.setDatatypeStyle(datatypeStyle);
                hostInfo.rowMgr.setRowStructureStyle(rowStructureStyle);
            }
        }
        this.threadPool = new BatchThreadPoolExecutor(super.getThreadCount());
        this.runningThreads.set(super.getThreadCount());
        super.setJobTicket(jobTicket);
        super.setJobStartTime();
        super.getStarted().set(true);
        for (int i = 0; i < super.getThreadCount(); i++) {
            RowBatchCallable<T> rowBatchCallable = new RowBatchCallable<>(this, this.rowsHandle.newHandle());
            if (i == 0 && this.consistentSnapshot) {
                readRows(rowBatchCallable);
            }
            submit(rowBatchCallable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public boolean readRows(RowBatchCallable<T> rowBatchCallable) {
        long incrementAndGet = this.batchNum.incrementAndGet();
        if (incrementAndGet > this.batchCount) {
            endThread();
            return false;
        }
        long j = (incrementAndGet - 1) * this.batchSize;
        String unsignedString = Long.toUnsignedString(j);
        String unsignedString2 = Long.toUnsignedString(incrementAndGet == this.batchCount ? -1L : j + (this.batchSize - 1));
        logger.debug("current batch: {}, lower bound: {}, upper bound: {}", Long.valueOf(incrementAndGet), unsignedString, unsignedString2);
        PlanBuilder.Plan bindParam = this.pagedPlan.bindParam(LOWER_BOUND, unsignedString).bindParam(UPPER_BOUND, unsignedString2);
        ContentHandle handle = rowBatchCallable.getHandle();
        boolean z = this.hostInfos != null && getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT;
        RowBatchFailureEventImpl rowBatchFailureEventImpl = null;
        int i = 0;
        while (true) {
            if (!shouldRequestBatch(rowBatchFailureEventImpl, i)) {
                break;
            }
            RowManager rowManager = z ? this.hostInfos[(int) ((incrementAndGet + i) % this.hostInfos.length)].rowMgr : getRowManager();
            Throwable th = null;
            Object obj = null;
            try {
                BaseHandle baseHandle = (BaseHandle) handle;
                if (this.consistentSnapshot && baseHandle.getPointInTimeQueryTimestamp() == -1) {
                    long j2 = this.serverTimestamp.get();
                    if (j2 > -1) {
                        logger.info("Initializing thread snapshot timestamp=[{}]", Long.valueOf(j2));
                        baseHandle.setPointInTimeQueryTimestamp(j2);
                    }
                }
                if (rowManager.resultDoc(bindParam, (StructureReadHandle) handle) != null) {
                    obj = handle.get();
                }
                if (this.consistentSnapshot && this.serverTimestamp.get() == -1) {
                    long serverTimestamp = baseHandle.getServerTimestamp();
                    if (this.serverTimestamp.compareAndSet(-1L, serverTimestamp)) {
                        logger.info("Established snapshot timestamp=[{}]", Long.valueOf(serverTimestamp));
                        baseHandle.setPointInTimeQueryTimestamp(serverTimestamp);
                    } else {
                        logger.info("Correcting thread snapshot timestamp=[{}]", Long.valueOf(serverTimestamp));
                        baseHandle.setPointInTimeQueryTimestamp(this.serverTimestamp.get());
                    }
                }
            } catch (Throwable th2) {
                th = th2;
            }
            if (th != null) {
                logger.debug("failed for batch: {}, retry: {}", Long.valueOf(incrementAndGet), Integer.valueOf(i));
                if (rowBatchFailureEventImpl == null) {
                    rowBatchFailureEventImpl = new RowBatchFailureEventImpl(incrementAndGet, unsignedString, unsignedString2);
                    initRequestEvent(rowBatchFailureEventImpl);
                }
                notifyFailure(rowBatchFailureEventImpl.withBatchRetries(i).withFailedJobBatches(getFailedBatches()), th);
            } else if (obj != null) {
                RowBatchResponseEventImpl rowBatchResponseEventImpl = new RowBatchResponseEventImpl(incrementAndGet, unsignedString, unsignedString2, obj);
                initRequestEvent(rowBatchResponseEventImpl);
                notifySuccess(rowBatchResponseEventImpl);
                if (rowBatchFailureEventImpl != null) {
                    rowBatchFailureEventImpl = null;
                }
            }
            i++;
        }
        if (rowBatchFailureEventImpl != null) {
            this.failedBatches.incrementAndGet();
        }
        if (rowBatchFailureEventImpl == null || rowBatchFailureEventImpl.getDisposition() != RowBatchFailureListener.BatchFailureDisposition.STOP) {
            logger.debug("finished batch: {}", Long.valueOf(incrementAndGet));
            if (this.batchNum.get() >= this.batchCount) {
                logger.debug("finished thread after batch: {}", Long.valueOf(incrementAndGet));
                endThread();
            } else {
                submit(rowBatchCallable);
            }
        } else {
            logger.debug("stopped for failed batch: {}", Long.valueOf(incrementAndGet));
            orderlyStop();
        }
        return rowBatchFailureEventImpl == null;
    }

    private boolean shouldRequestBatch(RowBatchFailureEventImpl rowBatchFailureEventImpl, int i) {
        if (i == 0) {
            return true;
        }
        return rowBatchFailureEventImpl != null && !super.getStopped().get() && rowBatchFailureEventImpl.getDisposition() == RowBatchFailureListener.BatchFailureDisposition.RETRY && i < rowBatchFailureEventImpl.getMaxRetries();
    }

    private void endThread() {
        if (this.runningThreads.decrementAndGet() == 0) {
            orderlyStop();
        }
    }

    @Override // com.marklogic.client.datamovement.impl.BatcherImpl, com.marklogic.client.datamovement.Batcher
    public synchronized RowBatcher<T> withForestConfig(ForestConfiguration forestConfiguration) {
        super.withForestConfig(forestConfiguration);
        this.hostInfos = forestHosts(forestConfiguration, this.hostInfos);
        return this;
    }

    private void submit(Callable<Boolean> callable) {
        submit(new FutureTask<>(callable));
    }

    private void submit(FutureTask<Boolean> futureTask) {
        this.threadPool.execute(futureTask);
    }

    synchronized HostInfo[] forestHosts(ForestConfiguration forestConfiguration, HostInfo[] hostInfoArr) {
        Forest[] forests = forests(forestConfiguration);
        Set<String> hosts = hosts(forests);
        HashMap hashMap = new HashMap();
        if (hostInfoArr != null) {
            for (HostInfo hostInfo : hostInfoArr) {
                hashMap.put(hostInfo.hostName, hostInfo);
            }
        }
        logger.info("(withForestConfig) Using forests on {} hosts for \"{}\"", hosts, forests[0].getDatabaseName());
        HostInfo[] hostInfoArr2 = new HostInfo[hosts.size()];
        int i = 0;
        for (String str : hosts) {
            HostInfo hostInfo2 = (HostInfo) hashMap.get(str);
            if (hostInfo2 != null) {
                hostInfoArr2[i] = hostInfo2;
            } else {
                HostInfo hostInfo3 = new HostInfo();
                hostInfoArr2[i] = hostInfo3;
                hostInfo3.hostName = str;
                hostInfo3.client = getMoveMgr().getHostClient(str);
                if (getMoveMgr().getConnectionType() == DatabaseClient.ConnectionType.DIRECT) {
                    logger.info("Adding DatabaseClient on port {} for host \"{}\" to the rotation", Integer.valueOf(hostInfoArr2[i].client.getPort()), str);
                    hostInfo3.rowMgr = hostInfo3.client.newRowManager();
                }
            }
            i++;
        }
        return hostInfoArr2;
    }
}
