package com.bigdata.rdf.load;

import com.bigdata.counters.CounterSet;
import com.bigdata.counters.Instrument;
import com.bigdata.counters.OneShotInstrument;
import com.bigdata.jini.start.config.ServiceConfiguration;
import com.bigdata.service.AbstractFederation;
import com.bigdata.service.IBigdataFederation;
import com.bigdata.util.concurrent.DaemonThreadFactory;
import com.bigdata.util.concurrent.ThreadPoolExecutorStatisticsTask;
import java.io.File;
import java.io.IOException;
import java.lang.Runnable;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

/* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/rdf/load/ConcurrentDataLoader.class */
public class ConcurrentDataLoader<T extends Runnable, F> {
    protected static final Logger log = Logger.getLogger(ConcurrentDataLoader.class);
    protected final boolean WARN;
    protected final ThreadPoolExecutor loadService;
    protected final ReentrantLock lock;
    protected final WorkflowTaskCounters counters;
    protected final LinkedBlockingQueue<WorkflowTask<T, F>> errorQueue;
    protected final LinkedBlockingQueue<WorkflowTask<T, F>> failedQueue;
    protected final int maxtries;
    protected final int nthreads;
    protected final AtomicInteger taskedCount;
    protected final AbstractFederation fed;
    protected final ScheduledFuture loadServiceStatisticsFuture;
    protected final long rejectedExecutionDelay;
    private CounterSet counterSet;

    public int getTaskedCount() {
        return this.taskedCount.get();
    }

    public ConcurrentDataLoader(IBigdataFederation iBigdataFederation, int i) {
        this(iBigdataFederation, i, Math.max(100, i * 2), 250L, 3);
    }

    public ConcurrentDataLoader(IBigdataFederation iBigdataFederation, int i, int i2, long j, int i3) {
        BlockingQueue linkedBlockingQueue;
        this.WARN = log.getEffectiveLevel().toInt() <= Level.WARN.toInt();
        this.lock = new ReentrantLock();
        this.counters = new WorkflowTaskCounters();
        this.errorQueue = new LinkedBlockingQueue<>();
        this.failedQueue = new LinkedBlockingQueue<>();
        this.taskedCount = new AtomicInteger(0);
        if (iBigdataFederation == null) {
            throw new IllegalArgumentException();
        }
        if (i <= 0) {
            throw new IllegalArgumentException();
        }
        if (i2 < 0) {
            throw new IllegalArgumentException();
        }
        if (j < 1) {
            throw new IllegalArgumentException();
        }
        if (i3 < 1) {
            throw new IllegalArgumentException();
        }
        this.fed = (AbstractFederation) iBigdataFederation;
        this.nthreads = i;
        this.rejectedExecutionDelay = j;
        this.maxtries = i3;
        if (i == Integer.MAX_VALUE) {
            this.loadService = (ThreadPoolExecutor) Executors.newCachedThreadPool(new DaemonThreadFactory(getClass().getName() + ".loadService"));
        } else {
            switch (i2) {
                case 0:
                    linkedBlockingQueue = new SynchronousQueue();
                    break;
                case Integer.MAX_VALUE:
                    linkedBlockingQueue = new LinkedBlockingQueue();
                    break;
                default:
                    linkedBlockingQueue = new LinkedBlockingQueue(i2);
                    break;
            }
            this.loadService = new ThreadPoolExecutor(i, i, 2147483647L, TimeUnit.NANOSECONDS, (BlockingQueue<Runnable>) linkedBlockingQueue, new DaemonThreadFactory(getClass().getName() + ".loadService"));
        }
        CounterSet counters = getCounters();
        ThreadPoolExecutorStatisticsTask threadPoolExecutorStatisticsTask = new ThreadPoolExecutorStatisticsTask("Load Service", this.loadService, this.counters);
        this.loadServiceStatisticsFuture = this.fed.addScheduledTask(threadPoolExecutorStatisticsTask, 0L, 1000L, TimeUnit.MILLISECONDS);
        counters.makePath("Load Service").attach(threadPoolExecutorStatisticsTask.getCounters());
    }

    protected void finalize() throws Throwable {
        shutdownNow();
        this.loadServiceStatisticsFuture.cancel(true);
        super.finalize();
    }

    public void shutdown() {
        this.loadService.shutdown();
        this.fed.reportCounters();
    }

    public void shutdownNow() {
        this.loadService.shutdownNow();
    }

    public synchronized CounterSet getCounters() {
        if (this.counterSet == null) {
            this.counterSet = new CounterSet();
            this.counterSet.addCounter("#threads", new OneShotInstrument(Integer.valueOf(this.nthreads)));
            CounterSet makePath = this.counterSet.makePath("Load Service");
            makePath.addCounter("#tasked", new Instrument<Long>() { // from class: com.bigdata.rdf.load.ConcurrentDataLoader.1
                @Override // com.bigdata.counters.Instrument
                protected void sample() {
                    setValue(Long.valueOf(ConcurrentDataLoader.this.taskedCount.get()));
                }
            });
            makePath.addCounter("taskRejectCount", new Instrument<Long>() { // from class: com.bigdata.rdf.load.ConcurrentDataLoader.2
                @Override // com.bigdata.counters.Instrument
                protected void sample() {
                    setValue(Long.valueOf(ConcurrentDataLoader.this.counters.taskRejectCount.get()));
                }
            });
            makePath.addCounter("taskRetryCount", new Instrument<Long>() { // from class: com.bigdata.rdf.load.ConcurrentDataLoader.3
                @Override // com.bigdata.counters.Instrument
                protected void sample() {
                    setValue(Long.valueOf(ConcurrentDataLoader.this.counters.taskRetryCount.get()));
                }
            });
            makePath.addCounter("taskCancelCount", new Instrument<Long>() { // from class: com.bigdata.rdf.load.ConcurrentDataLoader.4
                @Override // com.bigdata.counters.Instrument
                protected void sample() {
                    setValue(Long.valueOf(ConcurrentDataLoader.this.counters.taskCancelCount.get()));
                }
            });
            makePath.addCounter("taskFatalCount", new Instrument<Long>() { // from class: com.bigdata.rdf.load.ConcurrentDataLoader.5
                @Override // com.bigdata.counters.Instrument
                protected void sample() {
                    setValue(Long.valueOf(ConcurrentDataLoader.this.counters.taskFatalCount.get()));
                }
            });
        }
        return this.counterSet;
    }

    protected boolean consumeErrorTask() throws InterruptedException {
        WorkflowTask<T, F> poll = this.errorQueue.poll();
        if (poll == null) {
            return false;
        }
        if (log.isInfoEnabled()) {
            log.info("Re-submitting task=" + poll.target);
        }
        try {
            new WorkflowTask(poll).submit();
            this.counters.taskRetryCount.incrementAndGet();
            return true;
        } catch (RejectedExecutionException e) {
            Thread.sleep(this.rejectedExecutionDelay);
            return false;
        }
    }

    public boolean awaitCompletion(long j, TimeUnit timeUnit) throws InterruptedException {
        if (log.isInfoEnabled()) {
            log.info(this.counters.toString());
        }
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis;
        while (true) {
            this.lock.lockInterruptibly();
            try {
                int activeCount = this.loadService.getActiveCount();
                int size = this.loadService.getQueue().size();
                int size2 = this.errorQueue.size();
                int size3 = this.failedQueue.size();
                long currentTimeMillis2 = System.currentTimeMillis();
                if (log.isDebugEnabled()) {
                    log.debug("Awaiting completion: loadActiveCount=" + activeCount + ", loadQueueSize=" + size + ", errorQueueSize=" + size2 + ", failedQueueSize=" + size3 + ", elapsedWait=" + (currentTimeMillis2 - currentTimeMillis) + ", " + this.counters);
                }
                if (consumeErrorTask()) {
                    this.lock.unlock();
                } else {
                    if (activeCount == 0 && size == 0 && size2 == 0) {
                        if (log.isInfoEnabled()) {
                            log.info("complete");
                        }
                        return true;
                    }
                    if (TimeUnit.NANOSECONDS.convert(System.currentTimeMillis() - currentTimeMillis, timeUnit) > j) {
                        if (this.WARN) {
                            log.warn(ServiceConfiguration.Options.TIMEOUT);
                        }
                        this.lock.unlock();
                        return false;
                    }
                    if (log.isInfoEnabled() && currentTimeMillis2 - j2 > DefaultMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL) {
                        j2 = currentTimeMillis2;
                        log.info("Awaiting completion: loadActiveCount=" + activeCount + ", loadQueueSize=" + size + ", errorQueueSize=" + size2 + ", failedQueueSize=" + size3 + ", elapsedWait=" + (currentTimeMillis2 - currentTimeMillis) + ", " + this.counters);
                    }
                    Thread.sleep(100L);
                    this.lock.unlock();
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    public Future submitTask(String str, ITaskFactory<T> iTaskFactory) throws InterruptedException, Exception {
        if (log.isDebugEnabled()) {
            log.debug("Processing: resource=" + str);
        }
        if (new File(str).isDirectory()) {
            throw new IOException(str + " is a directory.");
        }
        T newTask = iTaskFactory.newTask(str);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            consumeErrorTask();
        } catch (InterruptedException e) {
            throw e;
        } catch (Throwable th) {
            log.warn(this, th);
        }
        while (true) {
            try {
                Future<F> submit = newWorkflowTask(newTask, this.loadService, this.lock, this.errorQueue, this.failedQueue, this.counters, this.maxtries).submit();
                this.taskedCount.incrementAndGet();
                return submit;
            } catch (RejectedExecutionException e2) {
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 - currentTimeMillis > DefaultMessageListenerContainer.DEFAULT_RECOVERY_INTERVAL) {
                    currentTimeMillis = currentTimeMillis2;
                    if (log.isInfoEnabled()) {
                        log.info("loadService queue full: queueSize=" + this.loadService.getQueue().size() + ", poolSize=" + this.loadService.getPoolSize() + ", active=" + this.loadService.getActiveCount() + ", completed=" + this.loadService.getCompletedTaskCount() + ", " + this.counters);
                    }
                }
                Thread.sleep(this.rejectedExecutionDelay);
            }
        }
    }

    protected WorkflowTask<T, F> newWorkflowTask(T t, ExecutorService executorService, ReentrantLock reentrantLock, Queue<WorkflowTask<T, F>> queue, Queue<WorkflowTask<T, F>> queue2, WorkflowTaskCounters workflowTaskCounters, int i) {
        return new WorkflowTask<>(t, executorService, reentrantLock, queue, queue2, workflowTaskCounters, i);
    }
}
