package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.class */
public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
    private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
    protected final RecoveredReplicationSource source;
    private final ReplicationQueues replicationQueues;

    public RecoveredReplicationSourceShipper(Configuration configuration, String str, PriorityBlockingQueue<Path> priorityBlockingQueue, RecoveredReplicationSource recoveredReplicationSource, ReplicationQueues replicationQueues) {
        super(configuration, str, priorityBlockingQueue, recoveredReplicationSource);
        this.source = recoveredReplicationSource;
        this.replicationQueues = replicationQueues;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper, java.lang.Thread, java.lang.Runnable
    public void run() {
        setWorkerState(ReplicationSourceShipper.WorkerState.RUNNING);
        while (isActive()) {
            int i = 1;
            if (this.source.isPeerEnabled()) {
                while (this.entryReader == null) {
                    if (this.source.sleepForRetries("Replication WAL entry reader thread not initialized", i)) {
                        i++;
                    }
                }
                try {
                    ReplicationSourceWALReader.WALEntryBatch take = this.entryReader.take();
                    shipEdits(take);
                    if (take.getWalEntries().isEmpty()) {
                        LOG.debug("Finished recovering queue for group " + this.walGroupId + " of peer " + this.source.getPeerClusterZnode());
                        this.source.getSourceMetrics().incrCompletedRecoveryQueue();
                        setWorkerState(ReplicationSourceShipper.WorkerState.FINISHED);
                    }
                } catch (InterruptedException e) {
                    LOG.trace("Interrupted while waiting for next replication entry batch", e);
                    Thread.currentThread().interrupt();
                }
            } else if (this.source.sleepForRetries("Replication is disabled", 1)) {
                int i2 = 1 + 1;
            }
        }
        this.source.tryFinish();
        if (isFinished()) {
            return;
        }
        setWorkerState(ReplicationSourceShipper.WorkerState.STOPPED);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper
    public long getStartPosition() {
        long recoveredQueueStartPos = getRecoveredQueueStartPos();
        for (int i = 0; i <= this.maxRetriesMultiplier; i++) {
            try {
                this.source.locateRecoveredPaths(this.queue);
                break;
            } catch (IOException e) {
                LOG.error("Error while locating recovered queue paths, attempt #" + i);
            }
        }
        return recoveredQueueStartPos;
    }

    private long getRecoveredQueueStartPos() {
        long j = 0;
        String peerClusterZnode = this.source.getPeerClusterZnode();
        try {
            j = this.replicationQueues.getLogPosition(peerClusterZnode, this.queue.peek().getName());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " + j);
            }
        } catch (ReplicationException e) {
            terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
        }
        return j;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper
    protected void updateLogPosition(long j) {
        this.source.getSourceManager().logPositionAndCleanOldLogs(this.currentPath, this.source.getPeerClusterZnode(), j, true, false);
        this.lastLoggedPosition = j;
    }

    private void terminate(String str, Exception exc) {
        if (exc == null) {
            LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + str);
        } else {
            LOG.error("Closing worker for wal group " + this.walGroupId + " because an error occurred: " + str, exc);
        }
        this.entryReader.interrupt();
        Threads.shutdown(this.entryReader, this.sleepForRetries);
        interrupt();
        Threads.shutdown(this, this.sleepForRetries);
        LOG.info("ReplicationSourceWorker " + getName() + " terminated");
    }
}
