package org.apache.asterix.replication.management;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
import org.apache.asterix.common.replication.IPartitionReplica;
import org.apache.asterix.common.replication.IReplicationDestination;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.api.ReplicationDestination;
import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/replication/management/IndexReplicationManager.class */
public class IndexReplicationManager {
    private static final Logger LOGGER = LogManager.getLogger();
    private final IReplicationManager replicationManager;
    private final IReplicationStrategy replicationStrategy;
    private final PersistentLocalResourceRepository resourceRepository;
    private final INcApplicationContext appCtx;
    private final Set<ReplicationDestination> destinations = new HashSet();
    private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
    private final Object transferLock = new Object();
    private final Set<ReplicationDestination> failedDest = new HashSet();

    /* loaded from: input_file:org/apache/asterix/replication/management/IndexReplicationManager$ReplicationJobsProcessor.class */
    private class ReplicationJobsProcessor implements Runnable {
        private ReplicationJobsProcessor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName(ReplicationJobsProcessor.class.getSimpleName());
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    IndexReplicationManager.this.process((IReplicationJob) IndexReplicationManager.this.replicationJobsQ.take());
                } catch (InterruptedException e) {
                    IndexReplicationManager.LOGGER.warn(() -> {
                        return ReplicationJobsProcessor.class.getSimpleName() + " interrupted.";
                    }, e);
                    Thread.currentThread().interrupt();
                }
            }
            IndexReplicationManager.LOGGER.warn("{} stopped.", ReplicationJobsProcessor.class.getSimpleName());
        }
    }

    public IndexReplicationManager(INcApplicationContext iNcApplicationContext, IReplicationManager iReplicationManager) {
        this.appCtx = iNcApplicationContext;
        this.replicationManager = iReplicationManager;
        this.resourceRepository = iNcApplicationContext.getLocalResourceRepository();
        this.replicationStrategy = iReplicationManager.getReplicationStrategy();
        iNcApplicationContext.getThreadExecutor().execute(new ReplicationJobsProcessor());
    }

    public void register(ReplicationDestination replicationDestination) {
        synchronized (this.transferLock) {
            LOGGER.info(() -> {
                return "register " + replicationDestination;
            });
            this.destinations.add(replicationDestination);
            this.failedDest.remove(replicationDestination);
        }
    }

    public void unregister(IReplicationDestination iReplicationDestination) {
        synchronized (this.transferLock) {
            LOGGER.info(() -> {
                return "unregister " + iReplicationDestination;
            });
            this.destinations.remove(iReplicationDestination);
            this.failedDest.remove(iReplicationDestination);
        }
    }

    private void handleFailure(ReplicationDestination replicationDestination, Exception exc) {
        synchronized (this.transferLock) {
            if (this.failedDest.contains(replicationDestination)) {
                return;
            }
            LOGGER.error("Replica failed", exc);
            if (this.destinations.contains(replicationDestination)) {
                this.failedDest.add(replicationDestination);
            }
            this.replicationManager.notifyFailure(replicationDestination, exc);
        }
    }

    public void accept(IReplicationJob iReplicationJob) {
        if (iReplicationJob.getExecutionType() == IReplicationJob.ReplicationExecutionType.ASYNC) {
            this.replicationJobsQ.add(iReplicationJob);
        } else {
            process(iReplicationJob);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process(IReplicationJob iReplicationJob) {
        Optional<IPartitionReplica> partitionReplica;
        try {
            if (skip(iReplicationJob)) {
                return;
            }
            synchronized (this.transferLock) {
                if (this.destinations.isEmpty()) {
                    afterReplication(iReplicationJob);
                    return;
                }
                IndexSynchronizer indexSynchronizer = new IndexSynchronizer(iReplicationJob, this.appCtx);
                int jobPartition = getJobPartition(iReplicationJob);
                for (ReplicationDestination replicationDestination : this.destinations) {
                    try {
                        partitionReplica = replicationDestination.getPartitionReplica(jobPartition);
                    } catch (Exception e) {
                        handleFailure(replicationDestination, e);
                    }
                    if (partitionReplica.isPresent()) {
                        indexSynchronizer.sync((PartitionReplica) partitionReplica.get());
                    }
                }
                closeChannels();
                afterReplication(iReplicationJob);
            }
        } finally {
            afterReplication(iReplicationJob);
        }
    }

    private boolean skip(IReplicationJob iReplicationJob) {
        try {
            String anyFile = iReplicationJob.getAnyFile();
            Optional localResourceReference = this.resourceRepository.getLocalResourceReference(anyFile);
            if (localResourceReference.isPresent()) {
                return !this.replicationStrategy.isMatch(((DatasetResourceReference) localResourceReference.get()).getDatasetId());
            }
            LOGGER.warn("skipping replication of {} due to missing dataset resource reference", anyFile);
            return true;
        } catch (HyracksDataException e) {
            throw new IllegalStateException("Couldn't find resource for " + iReplicationJob.getAnyFile(), e);
        }
    }

    private int getJobPartition(IReplicationJob iReplicationJob) {
        return ResourceReference.of(iReplicationJob.getAnyFile()).getPartitionNum();
    }

    private void closeChannels() {
        if (this.replicationJobsQ.isEmpty()) {
            LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
            Iterator<ReplicationDestination> it = this.destinations.iterator();
            while (it.hasNext()) {
                Stream<IPartitionReplica> stream = it.next().getReplicas().stream();
                Class<PartitionReplica> cls = PartitionReplica.class;
                PartitionReplica.class.getClass();
                stream.map((v1) -> {
                    return r1.cast(v1);
                }).forEach((v0) -> {
                    v0.close();
                });
            }
        }
    }

    private static void afterReplication(IReplicationJob iReplicationJob) {
        try {
            if (iReplicationJob.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE && (iReplicationJob instanceof ILSMIndexReplicationJob)) {
                ((ILSMIndexReplicationJob) iReplicationJob).endReplication();
            }
        } catch (HyracksDataException e) {
            throw new ReplicationException(e);
        }
    }
}
