package org.apache.asterix.replication.management;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.logging.RemoteLogMapping;
import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.util.StorageUtil;

/* loaded from: input_file:org/apache/asterix/replication/management/ReplicationChannel.class */
public class ReplicationChannel extends Thread implements IReplicationChannel {
    private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
    private final ExecutorService replicationThreads;
    private final String localNodeID;
    private final ILogManager logManager;
    private final ReplicaResourcesManager replicaResourcesManager;
    private final IReplicationManager replicationManager;
    private final ReplicationProperties replicationProperties;
    private final IAppRuntimeContextProvider appContextProvider;
    private final Set<Integer> nodeHostedPartitions;
    private final IDatasetLifecycleManager dsLifecycleManager;
    private final PersistentLocalResourceRepository localResourceRep;
    private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName());
    private static final int INTIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
    private SocketChannel socketChannel = null;
    private ServerSocketChannel serverSocketChannel = null;
    private final Object flushLogslock = new Object();
    private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
    private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap = new ConcurrentHashMap();
    private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap();
    private final LSMComponentsSyncService lsmComponentLSNMappingService = new LSMComponentsSyncService(this, null);
    private final ReplicationNotifier replicationNotifier = new ReplicationNotifier(this, null);

    /* renamed from: org.apache.asterix.replication.management.ReplicationChannel$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationChannel$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType = new int[ReplicationProtocol.ReplicationRequestType.values().length];

        static {
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal()] = ReplicationChannel.LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.LSM_COMPONENT_PROPERTIES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.REPLICATE_FILE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.DELETE_FILE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.REPLICA_EVENT.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.GET_REPLICA_MAX_LSN.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.GET_REPLICA_FILES.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[ReplicationProtocol.ReplicationRequestType.FLUSH_INDEX.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationChannel$LSMComponentsSyncService.class */
    private class LSMComponentsSyncService extends Thread {
        private static final int BULKLOAD_LSN = 0;

        private LSMComponentsSyncService() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("LSMComponentsSyncService Thread");
            while (true) {
                try {
                    LSMComponentLSNSyncTask lSMComponentLSNSyncTask = (LSMComponentLSNSyncTask) ReplicationChannel.this.lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
                    LSMComponentProperties lSMComponentProperties = (LSMComponentProperties) ReplicationChannel.this.lsmComponentId2PropertiesMap.get(lSMComponentLSNSyncTask.getComponentId());
                    syncLSMComponentFlushLSN(lSMComponentProperties, lSMComponentLSNSyncTask);
                    ReplicationChannel.this.updateLSMComponentRemainingFiles(lSMComponentProperties.getComponentId());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    if (ReplicationChannel.LOGGER.isLoggable(Level.SEVERE)) {
                        ReplicationChannel.LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", (Throwable) e2);
                    }
                }
            }
        }

        private void syncLSMComponentFlushLSN(LSMComponentProperties lSMComponentProperties, LSMComponentLSNSyncTask lSMComponentLSNSyncTask) throws InterruptedException, IOException {
            RemoteLogMapping remoteLogMapping;
            if (lSMComponentProperties.getOriginalLSN() == 0) {
                lSMComponentProperties.setReplicaLSN(Long.valueOf(ReplicationChannel.this.logManager.getAppendLSN()));
                return;
            }
            Path path = Paths.get(lSMComponentLSNSyncTask.getComponentFilePath(), new String[BULKLOAD_LSN]);
            if (lSMComponentProperties.getReplicaLSN() == null) {
                if (lSMComponentProperties.getOpType() == LSMOperationType.FLUSH) {
                    Object obj = ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.get(lSMComponentProperties.getNodeUniqueLSN());
                    while (true) {
                        remoteLogMapping = (RemoteLogMapping) obj;
                        if (remoteLogMapping != null || !Files.exists(path, new LinkOption[BULKLOAD_LSN])) {
                            break;
                        }
                        synchronized (ReplicationChannel.this.flushLogslock) {
                            ReplicationChannel.this.flushLogslock.wait();
                        }
                        obj = ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.get(lSMComponentProperties.getNodeUniqueLSN());
                    }
                    if (remoteLogMapping == null) {
                        return;
                    } else {
                        lSMComponentProperties.setReplicaLSN(Long.valueOf(remoteLogMapping.getLocalLSN()));
                    }
                } else if (lSMComponentProperties.getOpType() == LSMOperationType.MERGE) {
                    Long l = ReplicationChannel.this.replicaResourcesManager.getReplicaIndexLSNMap(lSMComponentProperties.getReplicaComponentPath(ReplicationChannel.this.replicaResourcesManager)).get(Long.valueOf(lSMComponentProperties.getOriginalLSN()));
                    if (l == null) {
                        l = Long.valueOf(ReplicationChannel.this.logManager.getAppendLSN());
                    }
                    lSMComponentProperties.setReplicaLSN(l);
                }
            }
            if (Files.notExists(path, new LinkOption[BULKLOAD_LSN])) {
                return;
            }
            File file = new File(lSMComponentLSNSyncTask.getComponentFilePath());
            ByteBuffer allocate = ByteBuffer.allocate(8);
            allocate.putLong(lSMComponentProperties.getReplicaLSN().longValue());
            allocate.flip();
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            Throwable th = null;
            try {
                FileChannel channel = randomAccessFile.getChannel();
                Throwable th2 = BULKLOAD_LSN;
                try {
                    try {
                        long lSNByteOffset = lSMComponentLSNSyncTask.getLSNByteOffset();
                        while (allocate.hasRemaining()) {
                            lSNByteOffset += channel.write(allocate, lSNByteOffset);
                        }
                        channel.force(true);
                        if (channel != null) {
                            if (th2 != null) {
                                try {
                                    channel.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                channel.close();
                            }
                        }
                        if (randomAccessFile != null) {
                            if (BULKLOAD_LSN == 0) {
                                randomAccessFile.close();
                                return;
                            }
                            try {
                                randomAccessFile.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        }
                    } catch (Throwable th5) {
                        th2 = th5;
                        throw th5;
                    }
                } catch (Throwable th6) {
                    if (channel != null) {
                        if (th2 != null) {
                            try {
                                channel.close();
                            } catch (Throwable th7) {
                                th2.addSuppressed(th7);
                            }
                        } else {
                            channel.close();
                        }
                    }
                    throw th6;
                }
            } catch (Throwable th8) {
                if (randomAccessFile != null) {
                    if (BULKLOAD_LSN != 0) {
                        try {
                            randomAccessFile.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    } else {
                        randomAccessFile.close();
                    }
                }
                throw th8;
            }
        }

        /* synthetic */ LSMComponentsSyncService(ReplicationChannel replicationChannel, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationChannel$ReplicationNotifier.class */
    private class ReplicationNotifier extends Thread {
        private ReplicationNotifier() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("ReplicationNotifier Thread");
            while (true) {
                try {
                    LogRecord logRecord = (LogRecord) ReplicationChannel.this.pendingNotificationRemoteLogsQ.take();
                    logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream().write((ReplicationChannel.this.localNodeID + ReplicationProtocol.JOB_REPLICATION_ACK + logRecord.getJobId() + System.lineSeparator()).getBytes());
                } catch (IOException e) {
                    if (ReplicationChannel.LOGGER.isLoggable(Level.WARNING)) {
                        ReplicationChannel.LOGGER.log(Level.WARNING, "Failed to send job replication ACK", (Throwable) e);
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        /* synthetic */ ReplicationNotifier(ReplicationChannel replicationChannel, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationChannel$ReplicationThread.class */
    private class ReplicationThread implements IReplicationThread {
        private final SocketChannel socketChannel;
        private ByteBuffer inBuffer = ByteBuffer.allocate(ReplicationChannel.INTIAL_BUFFER_SIZE);
        private ByteBuffer outBuffer = ByteBuffer.allocate(ReplicationChannel.INTIAL_BUFFER_SIZE);
        private final LogRecord remoteLog = new LogRecord();

        public ReplicationThread(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0023. Please report as an issue. */
        public void run() {
            Thread.currentThread().setName("Replication Thread");
            try {
                try {
                    ReplicationProtocol.ReplicationRequestType requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                    while (requestType != ReplicationProtocol.ReplicationRequestType.GOODBYE) {
                        switch (AnonymousClass1.$SwitchMap$org$apache$asterix$replication$functions$ReplicationProtocol$ReplicationRequestType[requestType.ordinal()]) {
                            case ReplicationChannel.LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE /* 1 */:
                                handleLogReplication();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case 2:
                                handleLSMComponentProperties();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case 3:
                                handleReplicateFile();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE /* 4 */:
                                handleDeleteFile();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case 5:
                                handleReplicaEvent();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case 6:
                                handleGetReplicaMaxLSN();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case 7:
                                handleGetReplicaFiles();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            case ReplicationProtocol.REPLICATION_REQUEST_HEADER_SIZE /* 8 */:
                                handleFlushIndex();
                                requestType = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                            default:
                                throw new IllegalStateException("Unknown replication request");
                        }
                    }
                    if (this.socketChannel.isOpen()) {
                        try {
                            this.socketChannel.close();
                        } catch (IOException e) {
                            if (ReplicationChannel.LOGGER.isLoggable(Level.WARNING)) {
                                ReplicationChannel.LOGGER.log(Level.WARNING, "Filed to close replication socket.", (Throwable) e);
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (this.socketChannel.isOpen()) {
                        try {
                            this.socketChannel.close();
                        } catch (IOException e2) {
                            if (ReplicationChannel.LOGGER.isLoggable(Level.WARNING)) {
                                ReplicationChannel.LOGGER.log(Level.WARNING, "Filed to close replication socket.", (Throwable) e2);
                            }
                        }
                    }
                    throw th;
                }
            } catch (Exception e3) {
                if (ReplicationChannel.LOGGER.isLoggable(Level.WARNING)) {
                    ReplicationChannel.LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", (Throwable) e3);
                }
                if (this.socketChannel.isOpen()) {
                    try {
                        this.socketChannel.close();
                    } catch (IOException e4) {
                        if (ReplicationChannel.LOGGER.isLoggable(Level.WARNING)) {
                            ReplicationChannel.LOGGER.log(Level.WARNING, "Filed to close replication socket.", (Throwable) e4);
                        }
                    }
                }
            }
        }

        private void handleFlushIndex() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            Set<Long> laggingRescouresIds = ReplicationProtocol.readReplicaIndexFlushRequest(this.inBuffer).getLaggingRescouresIds();
            IDatasetLifecycleManager datasetLifecycleManager = ReplicationChannel.this.appContextProvider.getDatasetLifecycleManager();
            List<IndexInfo> openIndexesInfo = datasetLifecycleManager.getOpenIndexesInfo();
            HashSet hashSet = new HashSet();
            for (IndexInfo indexInfo : openIndexesInfo) {
                if (laggingRescouresIds.contains(Long.valueOf(indexInfo.getResourceId()))) {
                    if (indexInfo.getIndex().getIOOperationCallback().hasPendingFlush()) {
                        laggingRescouresIds.remove(Long.valueOf(indexInfo.getResourceId()));
                    } else if (!indexInfo.getIndex().isCurrentMutableComponentEmpty()) {
                        hashSet.add(Integer.valueOf(indexInfo.getDatasetId()));
                        laggingRescouresIds.remove(Long.valueOf(indexInfo.getResourceId()));
                    }
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                datasetLifecycleManager.flushDataset(((Integer) it.next()).intValue(), true);
            }
            this.outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(this.outBuffer, new ReplicaIndexFlushRequest(laggingRescouresIds));
            NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
        }

        private void handleLSMComponentProperties() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMComponentProperties readLSMPropertiesRequest = ReplicationProtocol.readLSMPropertiesRequest(this.inBuffer);
            ReplicationChannel.this.replicaResourcesManager.createRemoteLSMComponentMask(readLSMPropertiesRequest);
            ReplicationChannel.this.lsmComponentId2PropertiesMap.put(readLSMPropertiesRequest.getComponentId(), readLSMPropertiesRequest);
        }

        /* JADX WARN: Finally extract failed */
        private void handleReplicateFile() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMIndexFileProperties readFileReplicationRequest = ReplicationProtocol.readFileReplicationRequest(this.inBuffer);
            String indexPath = ReplicationChannel.this.replicaResourcesManager.getIndexPath(readFileReplicationRequest);
            File file = new File(indexPath + File.separator + readFileReplicationRequest.getFileName());
            file.createNewFile();
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            Throwable th = null;
            try {
                FileChannel channel = randomAccessFile.getChannel();
                Throwable th2 = null;
                try {
                    randomAccessFile.setLength(readFileReplicationRequest.getFileSize());
                    NetworkingUtil.downloadFile(channel, this.socketChannel);
                    channel.force(true);
                    if (readFileReplicationRequest.requiresAck()) {
                        ReplicationProtocol.sendAck(this.socketChannel);
                    }
                    if (readFileReplicationRequest.isLSMComponentFile()) {
                        String lSMComponentID = LSMComponentProperties.getLSMComponentID(readFileReplicationRequest.getFilePath());
                        if (readFileReplicationRequest.getLSNByteOffset() > -1) {
                            ReplicationChannel.this.lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(new LSMComponentLSNSyncTask(lSMComponentID, file.getAbsolutePath(), readFileReplicationRequest.getLSNByteOffset()));
                        } else {
                            ReplicationChannel.this.updateLSMComponentRemainingFiles(lSMComponentID);
                        }
                    } else {
                        ReplicationChannel.this.replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, ReplicationChannel.this.logManager.getAppendLSN());
                    }
                    if (channel != null) {
                        if (0 != 0) {
                            try {
                                channel.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            channel.close();
                        }
                    }
                    if (randomAccessFile != null) {
                        if (0 == 0) {
                            randomAccessFile.close();
                            return;
                        }
                        try {
                            randomAccessFile.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (channel != null) {
                        if (0 != 0) {
                            try {
                                channel.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            channel.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (randomAccessFile != null) {
                    if (0 != 0) {
                        try {
                            randomAccessFile.close();
                        } catch (Throwable th8) {
                            th.addSuppressed(th8);
                        }
                    } else {
                        randomAccessFile.close();
                    }
                }
                throw th7;
            }
        }

        private void handleGetReplicaMaxLSN() throws IOException {
            long appendLSN = ReplicationChannel.this.logManager.getAppendLSN();
            this.outBuffer.clear();
            this.outBuffer.putLong(appendLSN);
            this.outBuffer.flip();
            NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
        }

        private void handleGetReplicaFiles() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            ReplicaFilesRequest readReplicaFileRequest = ReplicationProtocol.readReplicaFileRequest(this.inBuffer);
            LSMIndexFileProperties lSMIndexFileProperties = new LSMIndexFileProperties();
            Set<Integer> partitionIds = readReplicaFileRequest.getPartitionIds();
            Set<String> existingFiles = readReplicaFileRequest.getExistingFiles();
            SortedMap clusterPartitions = ReplicationChannel.this.appContextProvider.getAppContext().getMetadataProperties().getClusterPartitions();
            IReplicationStrategy replicationStrategy = ReplicationChannel.this.replicationProperties.getReplicationStrategy();
            ReplicationChannel.this.dsLifecycleManager.flushDataset(replicationStrategy);
            Iterator<Integer> it = partitionIds.iterator();
            while (it.hasNext()) {
                ClusterPartition clusterPartition = (ClusterPartition) clusterPartitions.get(it.next());
                for (String str : ReplicationChannel.this.replicaResourcesManager.getPartitionIndexesFiles(clusterPartition.getPartitionId(), false)) {
                    if (replicationStrategy.isMatch(ReplicationChannel.this.localResourceRep.getIndexFileRef(str).getDatasetId()) && !existingFiles.contains(StoragePathUtil.getIndexFileRelativePath(str))) {
                        RandomAccessFile randomAccessFile = new RandomAccessFile(str, "r");
                        Throwable th = null;
                        try {
                            FileChannel channel = randomAccessFile.getChannel();
                            Throwable th2 = null;
                            try {
                                try {
                                    lSMIndexFileProperties.initialize(str, channel.size(), clusterPartition.getNodeId(), false, -1L, false);
                                    this.outBuffer = ReplicationProtocol.writeFileReplicationRequest(this.outBuffer, lSMIndexFileProperties, ReplicationProtocol.ReplicationRequestType.REPLICATE_FILE);
                                    NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
                                    NetworkingUtil.sendFile(channel, this.socketChannel);
                                    if (channel != null) {
                                        if (0 != 0) {
                                            try {
                                                channel.close();
                                            } catch (Throwable th3) {
                                                th2.addSuppressed(th3);
                                            }
                                        } else {
                                            channel.close();
                                        }
                                    }
                                    if (randomAccessFile != null) {
                                        if (0 != 0) {
                                            try {
                                                randomAccessFile.close();
                                            } catch (Throwable th4) {
                                                th.addSuppressed(th4);
                                            }
                                        } else {
                                            randomAccessFile.close();
                                        }
                                    }
                                } finally {
                                }
                            } catch (Throwable th5) {
                                if (channel != null) {
                                    if (th2 != null) {
                                        try {
                                            channel.close();
                                        } catch (Throwable th6) {
                                            th2.addSuppressed(th6);
                                        }
                                    } else {
                                        channel.close();
                                    }
                                }
                                throw th5;
                            }
                        } catch (Throwable th7) {
                            if (randomAccessFile != null) {
                                if (0 != 0) {
                                    try {
                                        randomAccessFile.close();
                                    } catch (Throwable th8) {
                                        th.addSuppressed(th8);
                                    }
                                } else {
                                    randomAccessFile.close();
                                }
                            }
                            throw th7;
                        }
                    }
                }
            }
            ReplicationProtocol.sendGoodbye(this.socketChannel);
        }

        private void handleReplicaEvent() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            ReplicationChannel.this.replicationManager.reportReplicaEvent(ReplicationProtocol.readReplicaEventRequest(this.inBuffer));
        }

        private void handleDeleteFile() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMIndexFileProperties readFileReplicationRequest = ReplicationProtocol.readFileReplicationRequest(this.inBuffer);
            ReplicationChannel.this.replicaResourcesManager.deleteIndexFile(readFileReplicationRequest);
            if (readFileReplicationRequest.requiresAck()) {
                ReplicationProtocol.sendAck(this.socketChannel);
            }
        }

        private void handleLogReplication() throws IOException, ACIDException {
            this.inBuffer = ByteBuffer.allocate(ReplicationChannel.this.logManager.getLogPageSize());
            while (true) {
                this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
                if (this.inBuffer.remaining() == ReplicationChannel.LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
                    return;
                } else {
                    processLogsBatch(this.inBuffer);
                }
            }
        }

        private void processLogsBatch(ByteBuffer byteBuffer) throws ACIDException {
            while (byteBuffer.hasRemaining()) {
                this.inBuffer.getInt();
                this.remoteLog.readRemoteLog(this.inBuffer);
                this.remoteLog.setLogSource((byte) 1);
                switch (this.remoteLog.getLogType()) {
                    case 0:
                    case 2:
                        if (!ReplicationChannel.this.nodeHostedPartitions.contains(Integer.valueOf(this.remoteLog.getResourcePartition()))) {
                            break;
                        } else {
                            ReplicationChannel.this.logManager.log(this.remoteLog);
                            break;
                        }
                    case ReplicationChannel.LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE /* 1 */:
                    case 3:
                        LogRecord logRecord = new LogRecord();
                        TransactionUtil.formJobTerminateLogRecord(logRecord, this.remoteLog.getJobId(), this.remoteLog.getLogType() == ReplicationChannel.LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE);
                        logRecord.setReplicationThread(this);
                        logRecord.setLogSource((byte) 1);
                        ReplicationChannel.this.logManager.log(logRecord);
                        break;
                    case ReplicationProtocol.REPLICATION_REQUEST_TYPE_SIZE /* 4 */:
                        RemoteLogMapping remoteLogMapping = new RemoteLogMapping();
                        remoteLogMapping.setRemoteNodeID(this.remoteLog.getNodeId());
                        remoteLogMapping.setRemoteLSN(this.remoteLog.getLSN());
                        ReplicationChannel.this.logManager.log(this.remoteLog);
                        remoteLogMapping.setLocalLSN(this.remoteLog.getLSN());
                        remoteLogMapping.numOfFlushedIndexes.set(this.remoteLog.getNumOfFlushedIndexes());
                        ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.put(remoteLogMapping.getNodeUniqueLSN(), remoteLogMapping);
                        synchronized (ReplicationChannel.this.flushLogslock) {
                            ReplicationChannel.this.flushLogslock.notify();
                        }
                        break;
                    default:
                        ReplicationChannel.LOGGER.severe("Unsupported LogType: " + ((int) this.remoteLog.getLogType()));
                        break;
                }
            }
        }

        public void notifyLogReplicationRequester(LogRecord logRecord) {
            ReplicationChannel.this.pendingNotificationRemoteLogsQ.offer(logRecord);
        }

        public SocketChannel getReplicationClientSocket() {
            return this.socketChannel;
        }
    }

    public ReplicationChannel(String str, ReplicationProperties replicationProperties, ILogManager iLogManager, IReplicaResourcesManager iReplicaResourcesManager, IReplicationManager iReplicationManager, INCServiceContext iNCServiceContext, IAppRuntimeContextProvider iAppRuntimeContextProvider) {
        this.logManager = iLogManager;
        this.localNodeID = str;
        this.replicaResourcesManager = (ReplicaResourcesManager) iReplicaResourcesManager;
        this.replicationManager = iReplicationManager;
        this.replicationProperties = replicationProperties;
        this.appContextProvider = iAppRuntimeContextProvider;
        this.dsLifecycleManager = iAppRuntimeContextProvider.getDatasetLifecycleManager();
        this.localResourceRep = iAppRuntimeContextProvider.getLocalResourceRepository();
        this.replicationThreads = Executors.newCachedThreadPool(iNCServiceContext.getThreadFactory());
        Map nodePartitions = iAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
        Set remotePrimaryReplicasIds = replicationProperties.getRemotePrimaryReplicasIds(str);
        ArrayList arrayList = new ArrayList();
        Iterator it = remotePrimaryReplicasIds.iterator();
        while (it.hasNext()) {
            ClusterPartition[] clusterPartitionArr = (ClusterPartition[]) nodePartitions.get((String) it.next());
            int length = clusterPartitionArr.length;
            for (int i = 0; i < length; i += LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE) {
                arrayList.add(Integer.valueOf(clusterPartitionArr[i].getPartitionId()));
            }
        }
        this.nodeHostedPartitions = new HashSet(arrayList.size());
        this.nodeHostedPartitions.addAll(arrayList);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Thread.currentThread().setName("Replication Channel Thread");
        String replicaIPAddress = this.replicationProperties.getReplicaIPAddress(this.localNodeID);
        int dataReplicationPort = this.replicationProperties.getDataReplicationPort(this.localNodeID);
        try {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(true);
            this.serverSocketChannel.socket().bind(new InetSocketAddress(InetAddress.getByName(replicaIPAddress), dataReplicationPort));
            this.lsmComponentLSNMappingService.start();
            this.replicationNotifier.start();
            LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + replicaIPAddress + ":" + dataReplicationPort);
            while (true) {
                this.socketChannel = this.serverSocketChannel.accept();
                this.socketChannel.configureBlocking(true);
                this.replicationThreads.execute(new ReplicationThread(this.socketChannel));
            }
        } catch (IOException e) {
            throw new IllegalStateException("Could not open replication channel @ IP Address: " + replicaIPAddress + ":" + dataReplicationPort, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateLSMComponentRemainingFiles(String str) throws IOException {
        LSMComponentProperties lSMComponentProperties = this.lsmComponentId2PropertiesMap.get(str);
        if (lSMComponentProperties.markFileComplete() == 0) {
            if (lSMComponentProperties.getOpType() == LSMOperationType.FLUSH && lSMComponentProperties.getReplicaLSN() != null && this.replicaUniqueLSN2RemoteMapping.containsKey(lSMComponentProperties.getNodeUniqueLSN()) && this.replicaUniqueLSN2RemoteMapping.get(lSMComponentProperties.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet() == 0) {
                this.replicaUniqueLSN2RemoteMapping.remove(lSMComponentProperties.getNodeUniqueLSN());
            }
            this.replicaResourcesManager.markLSMComponentReplicaAsValid(lSMComponentProperties);
            this.lsmComponentId2PropertiesMap.remove(str);
            LOGGER.log(Level.INFO, "Completed LSMComponent " + str + " Replication.");
        }
    }

    public void close() throws IOException {
        if (this.serverSocketChannel.isOpen()) {
            return;
        }
        this.serverSocketChannel.close();
        LOGGER.log(Level.INFO, "Replication channel closed.");
    }
}
