package org.apache.hadoop.ozone.recon.tasks;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.jooq.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.class */
public class ReconTaskControllerImpl implements ReconTaskController {
    private static final Logger LOG = LoggerFactory.getLogger(ReconTaskControllerImpl.class);
    private ExecutorService executorService;
    private int threadCount;
    private final ReconOMMetadataManager omMetadataManager;
    private static final int TASK_FAILURE_THRESHOLD = 2;
    private ReconTaskStatusDao reconTaskStatusDao;
    private final Semaphore taskSemaphore = new Semaphore(1);
    private Map<String, AtomicInteger> taskFailureCounter = new HashMap();
    private Map<String, ReconDBUpdateTask> reconDBUpdateTasks = new HashMap();

    @Inject
    public ReconTaskControllerImpl(OzoneConfiguration ozoneConfiguration, ReconOMMetadataManager reconOMMetadataManager, Configuration configuration) {
        this.threadCount = 1;
        this.omMetadataManager = reconOMMetadataManager;
        this.threadCount = ozoneConfiguration.getInt(ReconServerConfigKeys.OZONE_RECON_TASK_THREAD_COUNT_KEY, 1);
        this.executorService = Executors.newFixedThreadPool(this.threadCount);
        this.reconTaskStatusDao = new ReconTaskStatusDao(configuration);
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public void registerTask(ReconDBUpdateTask reconDBUpdateTask) {
        String taskName = reconDBUpdateTask.getTaskName();
        LOG.info("Registered task " + taskName + " with controller.");
        this.reconDBUpdateTasks.put(taskName, reconDBUpdateTask);
        this.taskFailureCounter.put(taskName, new AtomicInteger(0));
        this.reconTaskStatusDao.insert(new ReconTaskStatus(taskName, 0L, 0L));
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public void consumeOMEvents(OMUpdateEventBatch oMUpdateEventBatch) throws InterruptedException {
        this.taskSemaphore.acquire();
        try {
            try {
                ArrayList arrayList = new ArrayList();
                Iterator<Map.Entry<String, ReconDBUpdateTask>> it = this.reconDBUpdateTasks.entrySet().iterator();
                while (it.hasNext()) {
                    ReconDBUpdateTask value = it.next().getValue();
                    arrayList.add(() -> {
                        return value.process(oMUpdateEventBatch);
                    });
                }
                List<String> processTaskResults = processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch);
                List<String> arrayList2 = new ArrayList();
                if (!processTaskResults.isEmpty()) {
                    arrayList.clear();
                    Iterator<String> it2 = processTaskResults.iterator();
                    while (it2.hasNext()) {
                        ReconDBUpdateTask reconDBUpdateTask = this.reconDBUpdateTasks.get(it2.next());
                        arrayList.add(() -> {
                            return reconDBUpdateTask.process(oMUpdateEventBatch);
                        });
                    }
                    arrayList2 = processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch);
                }
                if (!arrayList2.isEmpty()) {
                    arrayList.clear();
                    Iterator<String> it3 = processTaskResults.iterator();
                    while (it3.hasNext()) {
                        ReconDBUpdateTask reconDBUpdateTask2 = this.reconDBUpdateTasks.get(it3.next());
                        arrayList.add(() -> {
                            return reconDBUpdateTask2.reprocess(this.omMetadataManager);
                        });
                    }
                    for (String str : processTaskResults(this.executorService.invokeAll(arrayList), oMUpdateEventBatch)) {
                        LOG.info("Reprocess step failed for task : " + str);
                        if (this.taskFailureCounter.get(str).incrementAndGet() > TASK_FAILURE_THRESHOLD) {
                            LOG.info("Blacklisting Task since it failed retry and reprocess more than 2 times.");
                            this.reconDBUpdateTasks.remove(str);
                        }
                    }
                }
            } catch (ExecutionException e) {
                LOG.error("Unexpected error : ", e);
                this.taskSemaphore.release();
            }
        } finally {
            this.taskSemaphore.release();
        }
    }

    private void storeLastCompletedTransaction(String str, OMDBUpdateEvent.EventInfo eventInfo) {
        this.reconTaskStatusDao.update(new ReconTaskStatus(str, Long.valueOf(eventInfo.getEventTimestampMillis()), Long.valueOf(eventInfo.getSequenceNumber())));
    }

    @Override // org.apache.hadoop.ozone.recon.tasks.ReconTaskController
    public Map<String, ReconDBUpdateTask> getRegisteredTasks() {
        return this.reconDBUpdateTasks;
    }

    private List<String> processTaskResults(List<Future<Pair>> list, OMUpdateEventBatch oMUpdateEventBatch) throws ExecutionException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (Future<Pair> future : list) {
            String obj = future.get().getLeft().toString();
            if (((Boolean) future.get().getRight()).booleanValue()) {
                this.taskFailureCounter.get(obj).set(0);
                storeLastCompletedTransaction(obj, oMUpdateEventBatch.getLastEventInfo());
            } else {
                LOG.info("Failed task : " + obj);
                arrayList.add(future.get().getLeft().toString());
            }
        }
        return arrayList;
    }
}
