package org.apache.asterix.replication.logging;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.replication.messaging.ReplicateLogsTask;
import org.apache.asterix.replication.messaging.ReplicationProtocol;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/replication/logging/RemoteLogsNotifier.class */
class RemoteLogsNotifier implements Runnable {
    private static final Logger LOGGER = LogManager.getLogger();
    private final PersistentLocalResourceRepository localResourceRep;
    private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
    private final LinkedBlockingQueue<RemoteLogRecord> remoteLogsQ;
    private final INcApplicationContext appCtx;

    public RemoteLogsNotifier(INcApplicationContext iNcApplicationContext, LinkedBlockingQueue<RemoteLogRecord> linkedBlockingQueue) {
        this.appCtx = iNcApplicationContext;
        this.remoteLogsQ = linkedBlockingQueue;
        this.localResourceRep = iNcApplicationContext.getLocalResourceRepository();
        this.indexCheckpointManagerProvider = iNcApplicationContext.getIndexCheckpointManagerProvider();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:6:0x0043. Please report as an issue. */
    @Override // java.lang.Runnable
    public void run() {
        RemoteLogRecord take;
        String nodeId = this.appCtx.getServiceContext().getNodeId();
        Thread.currentThread().setName(nodeId + RemoteLogsNotifier.class.getSimpleName());
        while (!Thread.currentThread().isInterrupted()) {
            try {
                take = this.remoteLogsQ.take();
            } catch (IOException e) {
                LOGGER.error("Failed to process replicated log", e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            switch (take.getLogType()) {
                case ReplicateLogsTask.END_REPLICATION_LOG_SIZE /* 1 */:
                case 3:
                    take.getReplicationWorker().getChannel().getSocketChannel().socket().getOutputStream().write((nodeId + ReplicationProtocol.LOG_REPLICATION_ACK + take.getTxnId() + System.lineSeparator()).getBytes());
                case 2:
                default:
                    throw new IllegalStateException("Unexpected log type: " + ((int) take.getLogType()));
                    break;
                case 4:
                    checkpointReplicaIndexes(take, take.getDatasetId());
            }
        }
    }

    private void checkpointReplicaIndexes(RemoteLogRecord remoteLogRecord, int i) throws HyracksDataException {
        Set partitions = this.appCtx.getReplicaManager().getPartitions();
        Iterator it = ((List) this.localResourceRep.getResources(localResource -> {
            DatasetLocalResource resource = localResource.getResource();
            return resource.getDatasetId() == i && !partitions.contains(Integer.valueOf(resource.getPartition()));
        }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList())).iterator();
        while (it.hasNext()) {
            IIndexCheckpointManager iIndexCheckpointManager = this.indexCheckpointManagerProvider.get((DatasetResourceReference) it.next());
            synchronized (iIndexCheckpointManager) {
                iIndexCheckpointManager.masterFlush(remoteLogRecord.getMasterLsn(), remoteLogRecord.getLSN());
            }
        }
    }
}
