package com.leonarduk.clearcheckbook.processor.parallel;

import com.leonarduk.clearcheckbook.ClearcheckbookException;
import com.leonarduk.clearcheckbook.calls.BulkProcessable;
import com.leonarduk.clearcheckbook.dto.AbstractDataType;
import com.leonarduk.clearcheckbook.processor.ClearCheckBookTaskProcessor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/leonarduk/clearcheckbook/processor/parallel/ClearCheckBookDataTypeParallelProcessor.class */
public class ClearCheckBookDataTypeParallelProcessor<T extends AbstractDataType<?>> implements ClearCheckBookTaskProcessor<T> {
    private static final Logger _logger;
    private final BulkProcessable<T> call;
    private final int queueSize;
    private final int numberOfConsumers;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ClearCheckBookDataTypeParallelProcessor(BulkProcessable<T> bulkProcessable, int i, int i2) {
        this.call = bulkProcessable;
        this.queueSize = i;
        this.numberOfConsumers = i2;
    }

    @Override // com.leonarduk.clearcheckbook.processor.ClearCheckBookTaskProcessor
    public List<String> processQueue(List<T> list) throws ClearcheckbookException {
        _logger.info("Starting processor for " + list.size() + " data items with queue size of " + this.queueSize + " and " + this.numberOfConsumers + " Consumers");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.queueSize);
        _logger.debug("starting producer to produce messages in queue");
        ClearCheckBookDataTypeProducer clearCheckBookDataTypeProducer = new ClearCheckBookDataTypeProducer(arrayBlockingQueue, list);
        Thread thread = new Thread(clearCheckBookDataTypeProducer, clearCheckBookDataTypeProducer.getClass().getName());
        thread.start();
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(this.numberOfConsumers, this.numberOfConsumers, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        MonitorThread monitorThread = new MonitorThread(threadPoolExecutor, 3);
        new Thread(monitorThread).start();
        List<String> synchronizedList = Collections.synchronizedList(new ArrayList(this.numberOfConsumers));
        ArrayList arrayList = new ArrayList(this.numberOfConsumers);
        for (int i = 0; i < this.numberOfConsumers; i++) {
            ClearCheckBookDataTypeConsumer clearCheckBookDataTypeConsumer = new ClearCheckBookDataTypeConsumer(arrayBlockingQueue, this.call, synchronizedList);
            threadPoolExecutor.execute(clearCheckBookDataTypeConsumer);
            arrayList.add(clearCheckBookDataTypeConsumer);
        }
        _logger.debug("Waiting for producer to complete");
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        _logger.info("There are " + arrayBlockingQueue.size() + " still to process");
        while (synchronizedList.size() < list.size()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        }
        if (!$assertionsDisabled && arrayBlockingQueue.size() != 0) {
            throw new AssertionError();
        }
        _logger.info("There are " + arrayBlockingQueue.size() + " still to process");
        _logger.debug("Killing consumers");
        threadPoolExecutor.shutdown();
        monitorThread.shutdown();
        _logger.info("Processor finished");
        return synchronizedList;
    }

    static {
        $assertionsDisabled = !ClearCheckBookDataTypeParallelProcessor.class.desiredAssertionStatus();
        _logger = Logger.getLogger(ClearCheckBookDataTypeParallelProcessor.class);
    }
}
