package org.apache.asterix.replication.management;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.IndexFileProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.logging.ReplicationLogBuffer;
import org.apache.asterix.replication.logging.TxnLogReplicator;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.IClusterLifecycleListener;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.replication.IReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.util.StorageUtil;

/* loaded from: input_file:org/apache/asterix/replication/management/ReplicationManager.class */
public class ReplicationManager implements IReplicationManager {
    private static final int INITIAL_REPLICATION_FACTOR = 1;
    private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000;
    private final String nodeId;
    private ExecutorService replicationListenerThreads;
    private final ReplicaResourcesManager replicaResourcesManager;
    private final ILogManager logManager;
    private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider;
    private final ReplicationProperties replicationProperties;
    private final Map<String, Set<Integer>> replica2PartitionsMap;
    private ReplicationJobsProccessor replicationJobsProcessor;
    private String hostIPAddressFirstOctet;
    private LinkedBlockingQueue<ReplicationLogBuffer> emptyLogBuffersQ;
    private LinkedBlockingQueue<ReplicationLogBuffer> pendingFlushLogBuffersQ;
    protected ReplicationLogBuffer currentTxnLogBuffer;
    private TxnLogReplicator txnlogReplicator;
    private Future<? extends Object> txnLogReplicatorTask;
    private SocketChannel[] logsRepSockets;
    private final IReplicationStrategy replicationStrategy;
    private final PersistentLocalResourceRepository localResourceRepo;
    private static final Logger LOGGER = Logger.getLogger(ReplicationManager.class.getName());
    private static final int INITIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(4, StorageUtil.StorageUnit.KILOBYTE);
    private static final IReplicationJob REPLICATION_JOB_POISON_PILL = new ReplicationJob(IReplicationJob.ReplicationJobType.METADATA, IReplicationJob.ReplicationOperation.REPLICATE, IReplicationJob.ReplicationExecutionType.ASYNC, (Set) null);
    private int replicationFactor = INITIAL_REPLICATION_FACTOR;
    private final ByteBuffer txnLogsBatchSizeBuffer = ByteBuffer.allocate(4);
    private final Map<String, Replica> replicas = new HashMap();
    private final LinkedBlockingQueue<IReplicationJob> replicationJobsQ = new LinkedBlockingQueue<>();
    private final LinkedBlockingQueue<ReplicaEvent> replicaEventsQ = new LinkedBlockingQueue<>();
    private AtomicBoolean terminateJobsReplication = new AtomicBoolean(false);
    private AtomicBoolean jobsReplicationSuspended = new AtomicBoolean(true);
    private final AtomicBoolean replicationSuspended = new AtomicBoolean(true);
    private final Map<Integer, Set<String>> jobCommitAcks = new ConcurrentHashMap();
    private final Map<Integer, ILogRecord> replicationJobsPendingAcks = new ConcurrentHashMap();
    private final Set<String> shuttingDownReplicaIds = new HashSet();
    private ByteBuffer dataBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
    private final ReplicasEventsMonitor replicationMonitor = new ReplicasEventsMonitor(this, null);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.asterix.replication.management.ReplicationManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hyracks$api$application$IClusterLifecycleListener$ClusterEventType = new int[IClusterLifecycleListener.ClusterEventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$hyracks$api$application$IClusterLifecycleListener$ClusterEventType[IClusterLifecycleListener.ClusterEventType.NODE_FAILURE.ordinal()] = ReplicationManager.INITIAL_REPLICATION_FACTOR;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hyracks$api$application$IClusterLifecycleListener$ClusterEventType[IClusterLifecycleListener.ClusterEventType.NODE_JOIN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hyracks$api$application$IClusterLifecycleListener$ClusterEventType[IClusterLifecycleListener.ClusterEventType.NODE_SHUTTING_DOWN.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationManager$ReplicasEventsMonitor.class */
    public class ReplicasEventsMonitor extends Thread {
        ReplicaEvent event;

        private ReplicasEventsMonitor() {
        }

        /* JADX INFO: Infinite loop detected, blocks: 17, insns: 0 */
        /* JADX WARN: Failed to find 'out' block for switch in B:3:0x001f. Please report as an issue. */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.event = (ReplicaEvent) ReplicationManager.this.replicaEventsQ.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                switch (AnonymousClass1.$SwitchMap$org$apache$hyracks$api$application$IClusterLifecycleListener$ClusterEventType[this.event.getEventType().ordinal()]) {
                    case ReplicationManager.INITIAL_REPLICATION_FACTOR /* 1 */:
                        handleReplicaFailure(this.event.getReplica().getId());
                    case 2:
                        ReplicationManager.this.updateReplicaInfo(this.event.getReplica());
                        ReplicationManager.this.checkReplicaState(this.event.getReplica().getId(), false, true);
                    case 3:
                        handleShutdownEvent(this.event.getReplica().getId());
                }
            }
        }

        public void handleReplicaFailure(String str) throws InterruptedException {
            if (((Replica) ReplicationManager.this.replicas.get(str)).getState() == Replica.ReplicaState.DEAD) {
                return;
            }
            ReplicationManager.this.updateReplicaState(str, Replica.ReplicaState.DEAD, true);
            ReplicationManager.this.replicaResourcesManager.cleanInvalidLSMComponents(str);
        }

        public void handleShutdownEvent(String str) {
            synchronized (ReplicationManager.this.shuttingDownReplicaIds) {
                ReplicationManager.this.shuttingDownReplicaIds.add(str);
                ReplicationManager.this.shuttingDownReplicaIds.notifyAll();
            }
        }

        /* synthetic */ ReplicasEventsMonitor(ReplicationManager replicationManager, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationManager$ReplicationJobsProccessor.class */
    public class ReplicationJobsProccessor extends Thread {
        Map<String, SocketChannel> replicaSockets;
        ByteBuffer reusableBuffer;

        private ReplicationJobsProccessor() {
            this.reusableBuffer = ByteBuffer.allocate(ReplicationManager.INITIAL_BUFFER_SIZE);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("ReplicationJobsProccessor Thread");
            ReplicationManager.this.terminateJobsReplication.set(false);
            ReplicationManager.this.jobsReplicationSuspended.set(false);
            while (true) {
                try {
                } catch (IOException e) {
                    if (ReplicationManager.LOGGER.isLoggable(Level.WARNING)) {
                        ReplicationManager.LOGGER.log(Level.WARNING, "Couldn't complete processing replication job", (Throwable) e);
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                }
                if (ReplicationManager.this.terminateJobsReplication.get()) {
                    closeSockets();
                    synchronized (ReplicationManager.this.jobsReplicationSuspended) {
                        ReplicationManager.this.jobsReplicationSuspended.set(true);
                        ReplicationManager.this.jobsReplicationSuspended.notifyAll();
                    }
                    ReplicationManager.LOGGER.log(Level.INFO, "ReplicationJobsProccessor stopped. ");
                    return;
                }
                IReplicationJob iReplicationJob = (IReplicationJob) ReplicationManager.this.replicationJobsQ.take();
                if (iReplicationJob == ReplicationManager.REPLICATION_JOB_POISON_PILL) {
                    ReplicationManager.this.terminateJobsReplication.set(true);
                } else {
                    if (this.replicaSockets == null) {
                        this.replicaSockets = ReplicationManager.this.getActiveRemoteReplicasSockets();
                    }
                    ReplicationManager.this.processJob(iReplicationJob, this.replicaSockets, this.reusableBuffer);
                    if (ReplicationManager.this.replicationJobsQ.isEmpty()) {
                        ReplicationManager.LOGGER.log(Level.INFO, "No pending replication jobs. Closing connections to replicas");
                        closeSockets();
                    }
                }
            }
        }

        private void closeSockets() {
            if (this.replicaSockets != null) {
                ReplicationManager.this.closeReplicaSockets(this.replicaSockets);
                this.replicaSockets.clear();
                this.replicaSockets = null;
            }
        }

        /* synthetic */ ReplicationJobsProccessor(ReplicationManager replicationManager, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/replication/management/ReplicationManager$TxnLogsReplicationResponseListener.class */
    public class TxnLogsReplicationResponseListener implements Runnable {
        final SocketChannel replicaSocket;
        final String replicaId;

        public TxnLogsReplicationResponseListener(String str, SocketChannel socketChannel) {
            this.replicaId = str;
            this.replicaSocket = socketChannel;
        }

        @Override // java.lang.Runnable
        public void run() {
            Thread.currentThread().setName("TxnLogs Replication Listener Thread");
            ReplicationManager.LOGGER.log(Level.INFO, "Started listening on socket: " + this.replicaSocket.socket().getRemoteSocketAddress());
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.replicaSocket.socket().getInputStream()));
                Throwable th = null;
                while (true) {
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            ReplicationManager.this.addAckToJob(ReplicationProtocol.getJobIdFromLogAckMessage(readLine), ReplicationProtocol.getNodeIdFromLogAckMessage(readLine));
                        } catch (Throwable th2) {
                            if (bufferedReader != null) {
                                if (th != null) {
                                    try {
                                        bufferedReader.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    bufferedReader.close();
                                }
                            }
                            throw th2;
                        }
                    } catch (Throwable th4) {
                        th = th4;
                        throw th4;
                    }
                }
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
            } catch (AsynchronousCloseException e) {
                if (ReplicationManager.LOGGER.isLoggable(Level.INFO)) {
                    ReplicationManager.LOGGER.log(Level.INFO, "Replication listener stopped for remote replica: " + this.replicaId, (Throwable) e);
                }
            } catch (IOException e2) {
                ReplicationManager.this.handleReplicationFailure(this.replicaSocket, e2);
            }
        }
    }

    public ReplicationManager(String str, ReplicationProperties replicationProperties, IReplicaResourcesManager iReplicaResourcesManager, ILogManager iLogManager, IAppRuntimeContextProvider iAppRuntimeContextProvider) {
        this.hostIPAddressFirstOctet = null;
        this.nodeId = str;
        this.replicationProperties = replicationProperties;
        this.replicationStrategy = replicationProperties.getReplicationStrategy();
        this.replicaResourcesManager = (ReplicaResourcesManager) iReplicaResourcesManager;
        this.asterixAppRuntimeContextProvider = iAppRuntimeContextProvider;
        this.logManager = iLogManager;
        this.localResourceRepo = iAppRuntimeContextProvider.getLocalResourceRepository();
        this.hostIPAddressFirstOctet = replicationProperties.getReplicaIPAddress(str).substring(0, 3);
        Set<Replica> remoteReplicas = replicationProperties.getReplicationStrategy().getRemoteReplicas(str);
        this.replicationListenerThreads = Executors.newCachedThreadPool();
        this.replicationJobsProcessor = new ReplicationJobsProccessor(this, null);
        Map nodePartitions = iAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
        this.replica2PartitionsMap = new HashMap(remoteReplicas.size());
        for (Replica replica : remoteReplicas) {
            this.replicas.put(replica.getId(), replica);
            Set remotePrimaryReplicasIds = replicationProperties.getRemotePrimaryReplicasIds(replica.getId());
            ArrayList arrayList = new ArrayList();
            Iterator it = remotePrimaryReplicasIds.iterator();
            while (it.hasNext()) {
                ClusterPartition[] clusterPartitionArr = (ClusterPartition[]) nodePartitions.get((String) it.next());
                int length = clusterPartitionArr.length;
                for (int i = 0; i < length; i += INITIAL_REPLICATION_FACTOR) {
                    arrayList.add(Integer.valueOf(clusterPartitionArr[i].getPartitionId()));
                }
            }
            HashSet hashSet = new HashSet(arrayList.size());
            hashSet.addAll(arrayList);
            this.replica2PartitionsMap.put(replica.getId(), hashSet);
        }
        int logBufferNumOfPages = replicationProperties.getLogBufferNumOfPages();
        this.emptyLogBuffersQ = new LinkedBlockingQueue<>(logBufferNumOfPages);
        this.pendingFlushLogBuffersQ = new LinkedBlockingQueue<>(logBufferNumOfPages);
        int logBufferPageSize = replicationProperties.getLogBufferPageSize();
        for (int i2 = 0; i2 < logBufferNumOfPages; i2 += INITIAL_REPLICATION_FACTOR) {
            this.emptyLogBuffersQ.offer(new ReplicationLogBuffer(this, logBufferPageSize, replicationProperties.getLogBatchSize()));
        }
    }

    public void submitJob(IReplicationJob iReplicationJob) throws IOException {
        if (iReplicationJob.getExecutionType() == IReplicationJob.ReplicationExecutionType.ASYNC) {
            this.replicationJobsQ.offer(iReplicationJob);
            return;
        }
        while (this.replicationSuspended.get()) {
            synchronized (this.replicationSuspended) {
                try {
                    this.replicationSuspended.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        processJob(iReplicationJob, null, null);
    }

    public void replicateLog(ILogRecord iLogRecord) throws InterruptedException {
        if (iLogRecord.getLogType() == INITIAL_REPLICATION_FACTOR || iLogRecord.getLogType() == 3) {
            while (this.replicationSuspended.get()) {
                synchronized (this.replicationSuspended) {
                    this.replicationSuspended.wait();
                }
            }
            Set<String> synchronizedSet = Collections.synchronizedSet(new HashSet());
            synchronizedSet.add(this.nodeId);
            this.jobCommitAcks.put(Integer.valueOf(iLogRecord.getJobId()), synchronizedSet);
        }
        appendToLogBuffer(iLogRecord);
    }

    protected void getAndInitNewLargePage(int i) {
        this.currentTxnLogBuffer = new ReplicationLogBuffer(this, i, this.replicationProperties.getLogBufferPageSize());
        this.pendingFlushLogBuffersQ.offer(this.currentTxnLogBuffer);
    }

    protected void getAndInitNewPage() throws InterruptedException {
        this.currentTxnLogBuffer = null;
        while (this.currentTxnLogBuffer == null) {
            this.currentTxnLogBuffer = this.emptyLogBuffersQ.take();
        }
        this.currentTxnLogBuffer.reset();
        this.pendingFlushLogBuffersQ.offer(this.currentTxnLogBuffer);
    }

    private synchronized void appendToLogBuffer(ILogRecord iLogRecord) throws InterruptedException {
        if (!this.currentTxnLogBuffer.hasSpace(iLogRecord)) {
            this.currentTxnLogBuffer.isFull(true);
            if (iLogRecord.getLogSize() > getLogPageSize()) {
                getAndInitNewLargePage(iLogRecord.getLogSize());
            } else {
                getAndInitNewPage();
            }
        }
        this.currentTxnLogBuffer.append(iLogRecord);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    public void processJob(IReplicationJob iReplicationJob, Map<String, SocketChannel> map, ByteBuffer byteBuffer) throws IOException {
        try {
            IndexFileProperties indexFileRef = this.localResourceRepo.getIndexFileRef((String) iReplicationJob.getJobFiles().iterator().next());
            if (this.replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
                int partitionId = indexFileRef.getPartitionId();
                LSMIndexFileProperties lSMIndexFileProperties = new LSMIndexFileProperties();
                if (byteBuffer == null) {
                    byteBuffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
                }
                boolean z = iReplicationJob.getJobType() == IReplicationJob.ReplicationJobType.LSM_COMPONENT;
                if (map == null) {
                    try {
                        map = getActiveRemoteReplicasSockets();
                    } catch (Throwable th) {
                        if (iReplicationJob.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
                            closeReplicaSockets(map);
                        }
                        throw th;
                    }
                }
                int size = iReplicationJob.getJobFiles().size();
                if (iReplicationJob.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE) {
                    ILSMIndexReplicationJob iLSMIndexReplicationJob = null;
                    if (iReplicationJob.getJobType() == IReplicationJob.ReplicationJobType.LSM_COMPONENT) {
                        iLSMIndexReplicationJob = (ILSMIndexReplicationJob) iReplicationJob;
                        byteBuffer = ReplicationProtocol.writeLSMComponentPropertiesRequest(new LSMComponentProperties(iLSMIndexReplicationJob, this.nodeId), byteBuffer);
                        sendRequest(map, byteBuffer);
                    }
                    loop0: for (String str : iReplicationJob.getJobFiles()) {
                        size--;
                        if (Files.notExists(Paths.get(str, new String[0]), new LinkOption[0])) {
                            LOGGER.log(Level.SEVERE, "File deleted before replication: " + str);
                        } else {
                            LOGGER.log(Level.INFO, "Replicating file: " + str);
                            RandomAccessFile randomAccessFile = new RandomAccessFile(str, "r");
                            Throwable th2 = null;
                            try {
                                FileChannel channel = randomAccessFile.getChannel();
                                Throwable th3 = null;
                                try {
                                    long size2 = channel.size();
                                    if (iLSMIndexReplicationJob != null) {
                                        lSMIndexFileProperties.initialize(str, size2, this.nodeId, z, LSMIndexUtil.getComponentFileLSNOffset(iLSMIndexReplicationJob.getLSMIndex(), (ILSMDiskComponent) iLSMIndexReplicationJob.getLSMIndexOperationContext().getComponentsToBeReplicated().get(0), str), size == 0);
                                    } else {
                                        lSMIndexFileProperties.initialize(str, size2, this.nodeId, z, -1L, size == 0);
                                    }
                                    byteBuffer = ReplicationProtocol.writeFileReplicationRequest(byteBuffer, lSMIndexFileProperties, ReplicationProtocol.ReplicationRequestType.REPLICATE_FILE);
                                    Iterator<Map.Entry<String, SocketChannel>> it = map.entrySet().iterator();
                                    while (it.hasNext()) {
                                        Map.Entry<String, SocketChannel> next = it.next();
                                        if (this.replica2PartitionsMap.get(next.getKey()).contains(Integer.valueOf(partitionId))) {
                                            SocketChannel value = next.getValue();
                                            try {
                                                try {
                                                    NetworkingUtil.transferBufferToChannel(value, byteBuffer);
                                                    NetworkingUtil.sendFile(channel, value);
                                                } finally {
                                                }
                                            } catch (IOException e) {
                                                handleReplicationFailure(value, e);
                                                it.remove();
                                                byteBuffer.position(0);
                                            }
                                            if (lSMIndexFileProperties.requiresAck() && waitForResponse(value, null) != ReplicationProtocol.ReplicationRequestType.ACK) {
                                                throw new IOException("Could not receive ACK from replica " + next.getKey());
                                                break loop0;
                                            }
                                            byteBuffer.position(0);
                                        }
                                    }
                                    if (channel != null) {
                                        if (0 != 0) {
                                            try {
                                                channel.close();
                                            } catch (Throwable th4) {
                                                th3.addSuppressed(th4);
                                            }
                                        } else {
                                            channel.close();
                                        }
                                    }
                                    if (randomAccessFile != null) {
                                        if (0 != 0) {
                                            try {
                                                randomAccessFile.close();
                                            } catch (Throwable th5) {
                                                th2.addSuppressed(th5);
                                            }
                                        } else {
                                            randomAccessFile.close();
                                        }
                                    }
                                } catch (Throwable th6) {
                                    if (channel != null) {
                                        if (0 != 0) {
                                            try {
                                                channel.close();
                                            } catch (Throwable th7) {
                                                th3.addSuppressed(th7);
                                            }
                                        } else {
                                            channel.close();
                                        }
                                    }
                                    throw th6;
                                }
                            } catch (Throwable th8) {
                                if (randomAccessFile != null) {
                                    if (0 != 0) {
                                        try {
                                            randomAccessFile.close();
                                        } catch (Throwable th9) {
                                            th2.addSuppressed(th9);
                                        }
                                    } else {
                                        randomAccessFile.close();
                                    }
                                }
                                throw th8;
                            }
                        }
                    }
                } else if (iReplicationJob.getOperation() == IReplicationJob.ReplicationOperation.DELETE) {
                    Iterator it2 = iReplicationJob.getJobFiles().iterator();
                    while (it2.hasNext()) {
                        size--;
                        lSMIndexFileProperties.initialize((String) it2.next(), -1L, this.nodeId, z, -1L, size == 0);
                        ReplicationProtocol.writeFileReplicationRequest(byteBuffer, lSMIndexFileProperties, ReplicationProtocol.ReplicationRequestType.DELETE_FILE);
                        Iterator<Map.Entry<String, SocketChannel>> it3 = map.entrySet().iterator();
                        while (it3.hasNext()) {
                            Map.Entry<String, SocketChannel> next2 = it3.next();
                            if (this.replica2PartitionsMap.get(next2.getKey()).contains(Integer.valueOf(partitionId))) {
                                SocketChannel value2 = next2.getValue();
                                try {
                                    try {
                                        sendRequest(map, byteBuffer);
                                        if (lSMIndexFileProperties.requiresAck()) {
                                            waitForResponse(value2, null);
                                        }
                                        byteBuffer.position(0);
                                    } catch (Throwable th10) {
                                        byteBuffer.position(0);
                                        throw th10;
                                    }
                                } catch (IOException e2) {
                                    handleReplicationFailure(value2, e2);
                                    it3.remove();
                                    byteBuffer.position(0);
                                }
                            }
                        }
                    }
                }
                if (iReplicationJob.getExecutionType() == IReplicationJob.ReplicationExecutionType.SYNC) {
                    closeReplicaSockets(map);
                }
                exitReplicatedLSMComponent(iReplicationJob);
            }
        } finally {
            exitReplicatedLSMComponent(iReplicationJob);
        }
    }

    private static void exitReplicatedLSMComponent(IReplicationJob iReplicationJob) throws HyracksDataException {
        if (iReplicationJob.getOperation() == IReplicationJob.ReplicationOperation.REPLICATE && (iReplicationJob instanceof ILSMIndexReplicationJob)) {
            ((ILSMIndexReplicationJob) iReplicationJob).endReplication();
        }
    }

    private static ReplicationProtocol.ReplicationRequestType waitForResponse(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException {
        if (byteBuffer == null) {
            byteBuffer = ByteBuffer.allocate(4);
        } else {
            byteBuffer.clear();
        }
        return ReplicationProtocol.getRequestType(socketChannel, byteBuffer);
    }

    public boolean isReplicationEnabled() {
        return this.replicationProperties.isParticipant(this.nodeId);
    }

    public synchronized void updateReplicaInfo(Replica replica) {
        Replica replica2 = this.replicas.get(replica.getNode().getId());
        if (replica2.getState() == Replica.ReplicaState.ACTIVE) {
            return;
        }
        replica2.getNode().setClusterIp(replica.getNode().getClusterIp());
    }

    private void suspendReplication(boolean z) {
        if (this.replicationJobsProcessor != null && this.replicationJobsProcessor.isAlive()) {
            if (z) {
                this.terminateJobsReplication.set(true);
            }
            this.replicationJobsQ.offer(REPLICATION_JOB_POISON_PILL);
            synchronized (this.jobsReplicationSuspended) {
                while (!this.jobsReplicationSuspended.get()) {
                    try {
                        this.jobsReplicationSuspended.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        if (this.txnlogReplicator != null) {
            endTxnLogReplicationHandshake();
        }
    }

    private void establishTxnLogReplicationHandshake() {
        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
        this.logsRepSockets = new SocketChannel[activeRemoteReplicasSockets.size()];
        int i = 0;
        for (Map.Entry<String, SocketChannel> entry : activeRemoteReplicasSockets.entrySet()) {
            this.logsRepSockets[i] = entry.getValue();
            this.replicationListenerThreads.execute(new TxnLogsReplicationResponseListener(entry.getKey(), entry.getValue()));
            i += INITIAL_REPLICATION_FACTOR;
        }
        ByteBuffer putInt = ByteBuffer.allocate(4).putInt(ReplicationProtocol.ReplicationRequestType.REPLICATE_LOG.ordinal());
        putInt.flip();
        SocketChannel[] socketChannelArr = this.logsRepSockets;
        int length = socketChannelArr.length;
        for (int i2 = 0; i2 < length; i2 += INITIAL_REPLICATION_FACTOR) {
            SocketChannel socketChannel = socketChannelArr[i2];
            try {
                try {
                    NetworkingUtil.transferBufferToChannel(socketChannel, putInt);
                    putInt.position(0);
                } catch (IOException e) {
                    handleReplicationFailure(socketChannel, e);
                    putInt.position(0);
                }
            } catch (Throwable th) {
                putInt.position(0);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleReplicationFailure(SocketChannel socketChannel, Throwable th) {
        if (LOGGER.isLoggable(Level.WARNING)) {
            LOGGER.log(Level.WARNING, "Could not complete replication request.", th);
        }
        if (socketChannel.isOpen()) {
            try {
                socketChannel.close();
            } catch (IOException e) {
                if (LOGGER.isLoggable(Level.WARNING)) {
                    LOGGER.log(Level.WARNING, "Could not close socket.", (Throwable) e);
                }
            }
        }
        reportFailedReplica(getReplicaIdBySocket(socketChannel));
    }

    /* JADX WARN: Code restructure failed: missing block: B:42:0x00ea, code lost:
    
        org.apache.asterix.replication.management.ReplicationManager.LOGGER.log(java.util.logging.Level.SEVERE, "Timeout before receving all job ACKs from replicas. Pending jobs (" + r5.jobCommitAcks.keySet().toString() + ")");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void endTxnLogReplicationHandshake() {
        /*
            Method dump skipped, instructions count: 421
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.asterix.replication.management.ReplicationManager.endTxnLogReplicationHandshake():void");
    }

    private void sendShutdownNotifiction() throws IOException {
        Node node = new Node();
        node.setId(this.nodeId);
        node.setClusterIp(NetworkingUtil.getHostAddress(this.hostIPAddressFirstOctet));
        ByteBuffer writeReplicaEventRequest = ReplicationProtocol.writeReplicaEventRequest(new ReplicaEvent(new Replica(node), IClusterLifecycleListener.ClusterEventType.NODE_SHUTTING_DOWN));
        Map<String, SocketChannel> activeRemoteReplicasSockets = getActiveRemoteReplicasSockets();
        sendRequest(activeRemoteReplicasSockets, writeReplicaEventRequest);
        closeReplicaSockets(activeRemoteReplicasSockets);
    }

    private void sendRequest(Map<String, SocketChannel> map, ByteBuffer byteBuffer) {
        Iterator<Map.Entry<String, SocketChannel>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            SocketChannel value = it.next().getValue();
            try {
                try {
                    NetworkingUtil.transferBufferToChannel(value, byteBuffer);
                    byteBuffer.position(0);
                } catch (IOException e) {
                    handleReplicationFailure(value, e);
                    it.remove();
                    byteBuffer.position(0);
                }
            } catch (Throwable th) {
                byteBuffer.position(0);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeReplicaSockets(Map<String, SocketChannel> map) {
        sendRequest(map, ReplicationProtocol.getGoodbyeBuffer());
        Iterator<Map.Entry<String, SocketChannel>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            SocketChannel value = it.next().getValue();
            if (value.isOpen()) {
                try {
                    value.close();
                } catch (IOException e) {
                    handleReplicationFailure(value, e);
                }
            }
        }
    }

    public void initializeReplicasState() {
        Iterator<Replica> it = this.replicas.values().iterator();
        while (it.hasNext()) {
            checkReplicaState(it.next().getNode().getId(), false, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkReplicaState(String str, boolean z, boolean z2) {
        Future submit = this.asterixAppRuntimeContextProvider.getThreadExecutor().submit(new ReplicaStateChecker(this.replicas.get(str), this.replicationProperties.getReplicationTimeOut(), this, this.replicationProperties, z2));
        if (z) {
            return;
        }
        while (!submit.isDone()) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public synchronized void updateReplicaState(String str, Replica.ReplicaState replicaState, boolean z) throws InterruptedException {
        Replica replica = this.replicas.get(str);
        if (replica.getState() == replicaState) {
            return;
        }
        if (z) {
            this.replicationSuspended.set(true);
            if (replicaState == Replica.ReplicaState.DEAD) {
                synchronized (this.jobCommitAcks) {
                    Iterator<Integer> it = this.jobCommitAcks.keySet().iterator();
                    while (it.hasNext()) {
                        addAckToJob(it.next().intValue(), str);
                    }
                }
            }
            suspendReplication(true);
        }
        replica.setState(replicaState);
        if (replicaState == Replica.ReplicaState.ACTIVE) {
            this.replicationFactor += INITIAL_REPLICATION_FACTOR;
        } else if (replicaState == Replica.ReplicaState.DEAD && this.replicationFactor > INITIAL_REPLICATION_FACTOR) {
            this.replicationFactor -= INITIAL_REPLICATION_FACTOR;
        }
        LOGGER.log(Level.WARNING, "Replica " + str + " state changed to: " + replicaState.name() + ". Replication factor changed to: " + this.replicationFactor);
        if (z) {
            startReplicationThreads();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addAckToJob(int i, String str) {
        synchronized (this.jobCommitAcks) {
            if (!this.jobCommitAcks.containsKey(Integer.valueOf(i))) {
                if (LOGGER.isLoggable(Level.WARNING)) {
                    LOGGER.warning("Invalid job replication ACK received for jobId(" + i + ")");
                }
                return;
            }
            this.jobCommitAcks.get(Integer.valueOf(i)).add(str);
            if (this.jobCommitAcks.get(Integer.valueOf(i)).size() == this.replicationFactor && this.replicationJobsPendingAcks.containsKey(Integer.valueOf(i))) {
                ILogRecord iLogRecord = this.replicationJobsPendingAcks.get(Integer.valueOf(i));
                synchronized (iLogRecord) {
                    iLogRecord.notifyAll();
                }
            }
        }
    }

    public boolean hasBeenReplicated(ILogRecord iLogRecord) {
        int jobId = iLogRecord.getJobId();
        if (!this.jobCommitAcks.containsKey(Integer.valueOf(jobId))) {
            return true;
        }
        synchronized (this.jobCommitAcks) {
            if (this.jobCommitAcks.get(Integer.valueOf(jobId)).size() != this.replicationFactor) {
                this.replicationJobsPendingAcks.putIfAbsent(Integer.valueOf(jobId), iLogRecord);
                return false;
            }
            this.jobCommitAcks.remove(Integer.valueOf(jobId));
            this.replicationJobsPendingAcks.remove(Integer.valueOf(jobId));
            if (this.jobCommitAcks.size() == 0) {
                this.jobCommitAcks.notifyAll();
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, SocketChannel> getActiveRemoteReplicasSockets() {
        HashMap hashMap = new HashMap();
        for (Replica replica : this.replicas.values()) {
            if (replica.getState() == Replica.ReplicaState.ACTIVE) {
                try {
                    hashMap.put(replica.getId(), getReplicaSocket(replica.getId()));
                } catch (IOException e) {
                    if (LOGGER.isLoggable(Level.WARNING)) {
                        LOGGER.log(Level.WARNING, "Could not get replica socket", (Throwable) e);
                    }
                    reportFailedReplica(replica.getId());
                }
            }
        }
        return hashMap;
    }

    private SocketChannel getReplicaSocket(String str) throws IOException {
        Replica replicaById = this.replicationProperties.getReplicaById(str);
        SocketChannel open = SocketChannel.open();
        open.configureBlocking(true);
        InetSocketAddress address = replicaById.getAddress(this.replicationProperties);
        open.connect(new InetSocketAddress(address.getHostString(), address.getPort()));
        return open;
    }

    public Set<String> getDeadReplicasIds() {
        HashSet hashSet = new HashSet();
        for (Replica replica : this.replicas.values()) {
            if (replica.getState() == Replica.ReplicaState.DEAD) {
                hashSet.add(replica.getNode().getId());
            }
        }
        return hashSet;
    }

    public Set<String> getActiveReplicasIds() {
        HashSet hashSet = new HashSet();
        for (Replica replica : this.replicas.values()) {
            if (replica.getState() == Replica.ReplicaState.ACTIVE) {
                hashSet.add(replica.getNode().getId());
            }
        }
        return hashSet;
    }

    public int getActiveReplicasCount() {
        return getActiveReplicasIds().size();
    }

    public void start() {
    }

    public void dumpState(OutputStream outputStream) throws IOException {
    }

    public void stop(boolean z, OutputStream outputStream) throws IOException {
        suspendReplication(false);
        if (!this.replicationStrategy.getRemoteReplicas(this.nodeId).isEmpty()) {
            sendShutdownNotifiction();
        }
        Stream map = this.replicationStrategy.getRemotePrimaryReplicas(this.nodeId).stream().map((v0) -> {
            return v0.getId();
        });
        Set<String> activeReplicasIds = getActiveReplicasIds();
        activeReplicasIds.getClass();
        Set set = (Set) map.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toSet());
        if (!set.isEmpty()) {
            synchronized (this.shuttingDownReplicaIds) {
                while (!this.shuttingDownReplicaIds.containsAll(set)) {
                    try {
                        this.shuttingDownReplicaIds.wait();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        LOGGER.log(Level.INFO, "Got shutdown notification from all remote replicas");
        this.asterixAppRuntimeContextProvider.getAppContext().getReplicationChannel().close();
        LOGGER.log(Level.INFO, "Replication manager stopped.");
    }

    public void reportReplicaEvent(ReplicaEvent replicaEvent) {
        this.replicaEventsQ.offer(replicaEvent);
    }

    public void reportFailedReplica(String str) {
        Replica replica = this.replicas.get(str);
        if (replica == null || replica.getState() == Replica.ReplicaState.DEAD) {
            return;
        }
        this.terminateJobsReplication.set(true);
        reportReplicaEvent(new ReplicaEvent(replica, IClusterLifecycleListener.ClusterEventType.NODE_FAILURE));
    }

    private String getReplicaIdBySocket(SocketChannel socketChannel) {
        InetSocketAddress socketAddress = NetworkingUtil.getSocketAddress(socketChannel);
        for (Replica replica : this.replicas.values()) {
            InetSocketAddress address = replica.getAddress(this.replicationProperties);
            if (address.getHostName().equals(socketAddress.getHostName()) && address.getPort() == socketAddress.getPort()) {
                return replica.getId();
            }
        }
        return null;
    }

    public void startReplicationThreads() throws InterruptedException {
        this.replicationJobsProcessor = new ReplicationJobsProccessor(this, null);
        if (this.logsRepSockets == null) {
            establishTxnLogReplicationHandshake();
            getAndInitNewPage();
            this.txnlogReplicator = new TxnLogReplicator(this.emptyLogBuffersQ, this.pendingFlushLogBuffersQ);
            this.txnLogReplicatorTask = this.asterixAppRuntimeContextProvider.getThreadExecutor().submit(this.txnlogReplicator);
        }
        this.replicationJobsProcessor.start();
        if (!this.replicationMonitor.isAlive()) {
            this.replicationMonitor.start();
        }
        synchronized (this.replicationSuspended) {
            LOGGER.log(Level.INFO, "Replication started/resumed");
            this.replicationSuspended.set(false);
            this.replicationSuspended.notifyAll();
        }
    }

    public void requestFlushLaggingReplicaIndexes(long j) throws IOException {
        long appendLSN = this.logManager.getAppendLSN();
        Set<String> activeReplicasIds = getActiveReplicasIds();
        if (activeReplicasIds.isEmpty()) {
            return;
        }
        ByteBuffer allocate = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
        for (String str : activeReplicasIds) {
            Map<Long, String> laggingReplicaIndexesId2PathMap = this.replicaResourcesManager.getLaggingReplicaIndexesId2PathMap(str, j);
            if (laggingReplicaIndexesId2PathMap.size() > 0) {
                ReplicaIndexFlushRequest replicaIndexFlushRequest = null;
                SocketChannel replicaSocket = getReplicaSocket(str);
                Throwable th = null;
                try {
                    try {
                        allocate = ReplicationProtocol.writeGetReplicaIndexFlushRequest(allocate, new ReplicaIndexFlushRequest(laggingReplicaIndexesId2PathMap.keySet()));
                        NetworkingUtil.transferBufferToChannel(replicaSocket, allocate);
                        if (waitForResponse(replicaSocket, allocate) == ReplicationProtocol.ReplicationRequestType.FLUSH_INDEX) {
                            allocate = ReplicationProtocol.readRequest(replicaSocket, allocate);
                            replicaIndexFlushRequest = ReplicationProtocol.readReplicaIndexFlushRequest(allocate);
                        }
                        ReplicationProtocol.sendGoodbye(replicaSocket);
                        if (replicaSocket != null) {
                            if (0 != 0) {
                                try {
                                    replicaSocket.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                replicaSocket.close();
                            }
                        }
                        if (replicaIndexFlushRequest != null) {
                            Iterator<Long> it = replicaIndexFlushRequest.getLaggingRescouresIds().iterator();
                            while (it.hasNext()) {
                                String str2 = laggingReplicaIndexesId2PathMap.get(it.next());
                                Map<Long, Long> replicaIndexLSNMap = this.replicaResourcesManager.getReplicaIndexLSNMap(str2);
                                replicaIndexLSNMap.put(-1L, Long.valueOf(appendLSN));
                                this.replicaResourcesManager.updateReplicaIndexLSNMap(str2, replicaIndexLSNMap);
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (replicaSocket != null) {
                        if (th != null) {
                            try {
                                replicaSocket.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            replicaSocket.close();
                        }
                    }
                    throw th3;
                }
            }
        }
    }

    public long getMaxRemoteLSN(Set<String> set) throws IOException {
        long j = 0;
        ReplicationProtocol.writeGetReplicaMaxLSNRequest(this.dataBuffer);
        HashMap hashMap = new HashMap();
        try {
            for (String str : set) {
                hashMap.put(str, getReplicaSocket(str));
            }
            Iterator<Map.Entry<String, SocketChannel>> it = hashMap.entrySet().iterator();
            while (it.hasNext()) {
                NetworkingUtil.transferBufferToChannel(it.next().getValue(), this.dataBuffer);
                this.dataBuffer.position(0);
            }
            Iterator<Map.Entry<String, SocketChannel>> it2 = hashMap.entrySet().iterator();
            while (it2.hasNext()) {
                NetworkingUtil.readBytes(it2.next().getValue(), this.dataBuffer, 8);
                j = Math.max(j, this.dataBuffer.getLong());
            }
            return j;
        } finally {
            closeReplicaSockets(hashMap);
        }
    }

    public void requestReplicaFiles(String str, Set<Integer> set, Set<String> set2) throws IOException {
        this.dataBuffer = ReplicationProtocol.writeGetReplicaFilesRequest(this.dataBuffer, new ReplicaFilesRequest(set, set2));
        SocketChannel replicaSocket = getReplicaSocket(str);
        Throwable th = null;
        try {
            NetworkingUtil.transferBufferToChannel(replicaSocket, this.dataBuffer);
            ReplicationProtocol.ReplicationRequestType requestType = ReplicationProtocol.getRequestType(replicaSocket, this.dataBuffer);
            while (requestType != ReplicationProtocol.ReplicationRequestType.GOODBYE) {
                this.dataBuffer = ReplicationProtocol.readRequest(replicaSocket, this.dataBuffer);
                LSMIndexFileProperties readFileReplicationRequest = ReplicationProtocol.readFileReplicationRequest(this.dataBuffer);
                String indexPath = this.replicaResourcesManager.getIndexPath(readFileReplicationRequest);
                File file = new File(indexPath + File.separator + readFileReplicationRequest.getFileName());
                file.createNewFile();
                RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
                Throwable th2 = null;
                try {
                    try {
                        FileChannel channel = randomAccessFile.getChannel();
                        Throwable th3 = null;
                        try {
                            try {
                                randomAccessFile.setLength(readFileReplicationRequest.getFileSize());
                                NetworkingUtil.downloadFile(channel, replicaSocket);
                                channel.force(true);
                                if (channel != null) {
                                    if (0 != 0) {
                                        try {
                                            channel.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        channel.close();
                                    }
                                }
                                if (randomAccessFile != null) {
                                    if (0 != 0) {
                                        try {
                                            randomAccessFile.close();
                                        } catch (Throwable th5) {
                                            th2.addSuppressed(th5);
                                        }
                                    } else {
                                        randomAccessFile.close();
                                    }
                                }
                                if (!readFileReplicationRequest.isLSMComponentFile() && !readFileReplicationRequest.getNodeId().equals(this.nodeId)) {
                                    this.replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, this.logManager.getAppendLSN());
                                }
                                requestType = ReplicationProtocol.getRequestType(replicaSocket, this.dataBuffer);
                            } catch (Throwable th6) {
                                if (channel != null) {
                                    if (th3 != null) {
                                        try {
                                            channel.close();
                                        } catch (Throwable th7) {
                                            th3.addSuppressed(th7);
                                        }
                                    } else {
                                        channel.close();
                                    }
                                }
                                throw th6;
                            }
                        } catch (Throwable th8) {
                            th3 = th8;
                            throw th8;
                        }
                    } catch (Throwable th9) {
                        th2 = th9;
                        throw th9;
                    }
                } catch (Throwable th10) {
                    if (randomAccessFile != null) {
                        if (th2 != null) {
                            try {
                                randomAccessFile.close();
                            } catch (Throwable th11) {
                                th2.addSuppressed(th11);
                            }
                        } else {
                            randomAccessFile.close();
                        }
                    }
                    throw th10;
                }
            }
            ReplicationProtocol.sendGoodbye(replicaSocket);
            if (replicaSocket != null) {
                if (0 == 0) {
                    replicaSocket.close();
                    return;
                }
                try {
                    replicaSocket.close();
                } catch (Throwable th12) {
                    th.addSuppressed(th12);
                }
            }
        } catch (Throwable th13) {
            if (replicaSocket != null) {
                if (0 != 0) {
                    try {
                        replicaSocket.close();
                    } catch (Throwable th14) {
                        th.addSuppressed(th14);
                    }
                } else {
                    replicaSocket.close();
                }
            }
            throw th13;
        }
    }

    public int getLogPageSize() {
        return this.replicationProperties.getLogBufferPageSize();
    }

    public void replicateTxnLogBatch(ByteBuffer byteBuffer) {
        while (this.replicationSuspended.get()) {
            try {
                synchronized (this.replicationSuspended) {
                    this.replicationSuspended.wait();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.txnLogsBatchSizeBuffer.clear();
        this.txnLogsBatchSizeBuffer.putInt(byteBuffer.remaining());
        this.txnLogsBatchSizeBuffer.flip();
        byteBuffer.mark();
        SocketChannel[] socketChannelArr = this.logsRepSockets;
        int length = socketChannelArr.length;
        for (int i = 0; i < length; i += INITIAL_REPLICATION_FACTOR) {
            SocketChannel socketChannel = socketChannelArr[i];
            try {
                try {
                    NetworkingUtil.transferBufferToChannel(socketChannel, this.txnLogsBatchSizeBuffer);
                    NetworkingUtil.transferBufferToChannel(socketChannel, byteBuffer);
                    this.txnLogsBatchSizeBuffer.position(0);
                    byteBuffer.reset();
                } catch (IOException e2) {
                    handleReplicationFailure(socketChannel, e2);
                    this.txnLogsBatchSizeBuffer.position(0);
                    byteBuffer.reset();
                }
            } catch (Throwable th) {
                this.txnLogsBatchSizeBuffer.position(0);
                byteBuffer.reset();
                throw th;
            }
        }
        byteBuffer.position(byteBuffer.limit());
    }
}
