package org.apache.accumulo.server.tabletserver.log;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.logger.LogFileKey;
import org.apache.accumulo.server.logger.LogFileValue;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:org/apache/accumulo/server/tabletserver/log/LogSorter.class */
public class LogSorter {
    private static final Logger log = Logger.getLogger(LogSorter.class);
    FileSystem fs;
    AccumuloConfiguration conf;
    private final Map<String, LogProcessor> currentWork = Collections.synchronizedMap(new HashMap());
    ThreadPoolExecutor threadPool;
    private final Instance instance;

    /* loaded from: input_file:org/apache/accumulo/server/tabletserver/log/LogSorter$LogProcessor.class */
    class LogProcessor implements DistributedWorkQueue.Processor {
        private FSDataInputStream input;
        private DataInputStream decryptingInput;
        private long bytesCopied = -1;
        private long sortStart = 0;
        private long sortStop = -1;

        LogProcessor() {
        }

        @Override // org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor
        public DistributedWorkQueue.Processor newProcessor() {
            return new LogProcessor();
        }

        @Override // org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor
        public void process(String str, byte[] bArr) {
            String str2 = Constants.getRecoveryDir(LogSorter.this.conf) + "/" + str;
            String str3 = new String(bArr);
            String name = new Path(str3).getName();
            synchronized (LogSorter.this.currentWork) {
                if (LogSorter.this.currentWork.containsKey(name)) {
                    return;
                }
                LogSorter.this.currentWork.put(name, this);
                try {
                    LogSorter.log.info("Copying " + str3 + " to " + str2);
                    sort(name, new Path(str3), str2);
                    LogSorter.this.currentWork.remove(name);
                } catch (Throwable th) {
                    LogSorter.this.currentWork.remove(name);
                    throw th;
                }
            }
        }

        public void sort(String str, Path path, String str2) {
            synchronized (this) {
                this.sortStart = System.currentTimeMillis();
            }
            String name = Thread.currentThread().getName();
            int i = 0;
            try {
                try {
                    LogSorter.this.fs.delete(new Path(str2), true);
                    LogSorter.this.fs.open(path);
                    HashMap hashMap = new HashMap();
                    FSDataInputStream readHeader = DfsLogger.readHeader(LogSorter.this.fs, path, hashMap);
                    if (hashMap.containsKey(Property.CRYPTO_MODULE_CLASS.getKey())) {
                        String str3 = (String) hashMap.get(Property.CRYPTO_MODULE_CLASS.getKey());
                        if (str3 == null) {
                            str3 = AccumuloConfiguration.getDefaultConfiguration().get(Property.CRYPTO_MODULE_CLASS);
                        }
                        synchronized (this) {
                            this.input = readHeader;
                        }
                        DataInputStream dataInputStream = new DataInputStream(CryptoModuleFactory.getCryptoModule(str3).getDecryptingInputStream(this.input, hashMap));
                        synchronized (this) {
                            this.decryptingInput = dataInputStream;
                        }
                    } else {
                        LogSorter.log.debug("Log file " + str + " not encrypted");
                        synchronized (this) {
                            this.input = readHeader;
                            this.decryptingInput = readHeader;
                        }
                    }
                    long memoryInBytes = LogSorter.this.conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE);
                    Thread.currentThread().setName("Sorting " + str + " for recovery");
                    while (true) {
                        ArrayList<Pair<LogFileKey, LogFileValue>> arrayList = new ArrayList<>();
                        try {
                            long pos = this.input.getPos();
                            while (this.input.getPos() - pos < memoryInBytes) {
                                LogFileKey logFileKey = new LogFileKey();
                                LogFileValue logFileValue = new LogFileValue();
                                logFileKey.readFields(this.decryptingInput);
                                logFileValue.readFields(this.decryptingInput);
                                arrayList.add(new Pair<>(logFileKey, logFileValue));
                            }
                            int i2 = i;
                            i++;
                            writeBuffer(str2, arrayList, i2);
                            arrayList.clear();
                        } catch (EOFException e) {
                            int i3 = i;
                            int i4 = i + 1;
                            writeBuffer(str2, arrayList, i3);
                            LogSorter.this.fs.create(new Path(str2, "finished")).close();
                            LogSorter.log.info("Finished log sort " + str + " " + getBytesCopied() + " bytes " + i4 + " parts in " + getSortTime() + "ms");
                            Thread.currentThread().setName(name);
                            try {
                                close();
                            } catch (Exception e2) {
                                LogSorter.log.error("Error during cleanup sort/copy " + str, e2);
                            }
                            synchronized (this) {
                                this.sortStop = System.currentTimeMillis();
                                return;
                            }
                        }
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setName(name);
                    try {
                        close();
                    } catch (Exception e3) {
                        LogSorter.log.error("Error during cleanup sort/copy " + str, e3);
                    }
                    synchronized (this) {
                        this.sortStop = System.currentTimeMillis();
                        throw th;
                    }
                }
            } catch (Throwable th2) {
                try {
                    LogSorter.this.fs.mkdirs(new Path(str2));
                    LogSorter.this.fs.create(new Path(str2, "failed")).close();
                } catch (IOException e4) {
                    LogSorter.log.error("Error creating failed flag file " + str, e4);
                }
                LogSorter.log.error(th2, th2);
                Thread.currentThread().setName(name);
                try {
                    close();
                } catch (Exception e5) {
                    LogSorter.log.error("Error during cleanup sort/copy " + str, e5);
                }
                synchronized (this) {
                    this.sortStop = System.currentTimeMillis();
                }
            }
        }

        private void writeBuffer(String str, ArrayList<Pair<LogFileKey, LogFileValue>> arrayList, int i) throws IOException {
            int i2 = i + 1;
            MapFile.Writer writer = new MapFile.Writer(LogSorter.this.fs.getConf(), LogSorter.this.fs, str + String.format("/part-r-%05d", Integer.valueOf(i)), LogFileKey.class, LogFileValue.class);
            try {
                Collections.sort(arrayList, new Comparator<Pair<LogFileKey, LogFileValue>>() { // from class: org.apache.accumulo.server.tabletserver.log.LogSorter.LogProcessor.1
                    @Override // java.util.Comparator
                    public int compare(Pair<LogFileKey, LogFileValue> pair, Pair<LogFileKey, LogFileValue> pair2) {
                        return ((LogFileKey) pair.getFirst()).compareTo((LogFileKey) pair2.getFirst());
                    }
                });
                Iterator<Pair<LogFileKey, LogFileValue>> it = arrayList.iterator();
                while (it.hasNext()) {
                    Pair<LogFileKey, LogFileValue> next = it.next();
                    writer.append((WritableComparable) next.getFirst(), (Writable) next.getSecond());
                }
            } finally {
                writer.close();
            }
        }

        synchronized void close() throws IOException {
            this.bytesCopied = this.input.getPos();
            this.input.close();
            this.decryptingInput.close();
            this.input = null;
        }

        public synchronized long getSortTime() {
            if (this.sortStart > 0) {
                return this.sortStop > 0 ? this.sortStop - this.sortStart : System.currentTimeMillis() - this.sortStart;
            }
            return 0L;
        }

        synchronized long getBytesCopied() throws IOException {
            return this.input == null ? this.bytesCopied : this.input.getPos();
        }
    }

    public LogSorter(Instance instance, FileSystem fileSystem, AccumuloConfiguration accumuloConfiguration) {
        this.instance = instance;
        this.fs = fileSystem;
        this.conf = accumuloConfiguration;
        this.threadPool = new SimpleThreadPool(accumuloConfiguration.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT), getClass().getName());
    }

    public void startWatchingForRecoveryLogs(ThreadPoolExecutor threadPoolExecutor) throws KeeperException, InterruptedException {
        this.threadPool = threadPoolExecutor;
        new DistributedWorkQueue(ZooUtil.getRoot(this.instance) + "/recovery").startProcessing(new LogProcessor(), this.threadPool);
    }

    public List<RecoveryStatus> getLogSorts() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.currentWork) {
            for (Map.Entry<String, LogProcessor> entry : this.currentWork.entrySet()) {
                RecoveryStatus recoveryStatus = new RecoveryStatus();
                recoveryStatus.name = entry.getKey();
                try {
                    recoveryStatus.progress = entry.getValue().getBytesCopied() / (0.0d + this.conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
                } catch (IOException e) {
                    log.warn("Error getting bytes read");
                }
                recoveryStatus.runtime = (int) entry.getValue().getSortTime();
                arrayList.add(recoveryStatus);
            }
        }
        return arrayList;
    }
}
