package cz.o2.proxima.storage.hdfs;

import cz.o2.proxima.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.hadoop.shaded.com.google.common.collect.Maps;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractBulkAttributeWriter;
import cz.o2.proxima.storage.AttributeWriterBase;
import cz.o2.proxima.storage.DataAccessor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.batch.BatchLogObservable;
import cz.o2.proxima.storage.batch.BatchLogObserver;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.GzipCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/hdfs/HdfsDataAccessor.class */
public class HdfsDataAccessor extends AbstractBulkAttributeWriter implements BatchLogObservable, DataAccessor {
    public static final String HDFS_MIN_ELEMENTS_TO_FLUSH = "hdfs.min-elements-to-flush";
    public static final String HDFS_ROLL_INTERVAL = "hdfs.log-roll-interval";
    public static final String HDFS_BATCH_PROCESS_SIZE_MIN = "hdfs.process-size.min";
    private static final int HDFS_MIN_ELEMENTS_TO_FLUSH_DEFAULT = 500;
    private static final long HDFS_BATCH_PROCES_SIZE_MIN_DEFAULT = 104857600;
    private final Configuration conf;
    private final FileSystem fs;
    private final int minElementsToFlush;
    private final long rollInterval;
    private final long batchProcessSize;
    private final DateTimeFormatter DIR_FORMAT;
    private SequenceFile.Writer writer;
    private Path writerTmpPath;
    private long lastRoll;
    private long elementsSinceFlush;
    private long minElementStamp;
    private long maxElementStamp;
    private long monothonicTime;
    private Executor executor;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HdfsDataAccessor.class);
    private static final long HDFS_ROLL_INTERVAL_DEFAULT = TimeUnit.HOURS.toMillis(1);
    private static final Pattern PART_FILE_PARSER = Pattern.compile("part-([0-9]+)_([0-9]+)-.+");

    public HdfsDataAccessor(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map) throws IOException {
        super(entityDescriptor, uri);
        this.DIR_FORMAT = DateTimeFormatter.ofPattern("/yyyy/MM/");
        this.writer = null;
        this.writerTmpPath = null;
        this.lastRoll = 0L;
        this.elementsSinceFlush = 0L;
        this.minElementStamp = Long.MAX_VALUE;
        this.maxElementStamp = Long.MIN_VALUE;
        this.monothonicTime = 0L;
        this.conf = toHadoopConf(map);
        this.fs = FileSystem.get(uri, this.conf);
        this.minElementsToFlush = ((Integer) getCfg(HDFS_MIN_ELEMENTS_TO_FLUSH, map, obj -> {
            return Integer.valueOf(obj.toString());
        }, 500)).intValue();
        this.rollInterval = ((Long) getCfg(HDFS_ROLL_INTERVAL, map, obj2 -> {
            return Long.valueOf(obj2.toString());
        }, Long.valueOf(HDFS_ROLL_INTERVAL_DEFAULT))).longValue();
        this.batchProcessSize = ((Long) getCfg(HDFS_BATCH_PROCESS_SIZE_MIN, map, obj3 -> {
            return Long.valueOf(obj3.toString());
        }, 104857600L)).longValue();
    }

    private static Configuration toHadoopConf(Map<String, Object> map) {
        Configuration configuration = new Configuration();
        map.entrySet().forEach(entry -> {
            configuration.set((String) entry.getKey(), entry.getValue().toString());
        });
        return configuration;
    }

    public void rollback() {
        if (this.writer != null) {
            try {
                this.writer.close();
                this.writer = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        clearTmpDir();
    }

    /*  JADX ERROR: Failed to decode insn: 0x0078: MOVE_MULTI, method: cz.o2.proxima.storage.hdfs.HdfsDataAccessor.write(cz.o2.proxima.storage.StreamElement, cz.o2.proxima.storage.CommitCallback):void
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    public void write(cz.o2.proxima.storage.StreamElement r9, cz.o2.proxima.storage.CommitCallback r10) {
        /*
            Method dump skipped, instructions count: 349
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: cz.o2.proxima.storage.hdfs.HdfsDataAccessor.write(cz.o2.proxima.storage.StreamElement, cz.o2.proxima.storage.CommitCallback):void");
    }

    private byte[] toKey(StreamElement streamElement) {
        return (streamElement.getKey() + "#" + streamElement.getAttribute()).getBytes();
    }

    private void openWriter(long j) {
        long j2 = (j / this.rollInterval) * this.rollInterval;
        try {
            Path tmpLocation = toTmpLocation(j2);
            log.debug("Opening writer at {}", tmpLocation);
            this.lastRoll = j2;
            this.elementsSinceFlush = 0L;
            this.writerTmpPath = tmpLocation;
            this.minElementStamp = Long.MAX_VALUE;
            this.maxElementStamp = Long.MIN_VALUE;
            this.writer = SequenceFile.createWriter(this.conf, SequenceFile.Writer.file(tmpLocation), SequenceFile.Writer.appendIfExists(false), SequenceFile.Writer.keyClass(BytesWritable.class), SequenceFile.Writer.valueClass(TimestampedNullableBytesWritable.class), SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK, new GzipCodec()));
        } catch (IOException | URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    String toPartName(long j) throws UnknownHostException {
        return String.format("part-%d-%s", Long.valueOf(j), getLocalhost());
    }

    String toFinalName(long j, long j2) throws UnknownHostException {
        return String.format("part-%d_%d-%s", Long.valueOf(j), Long.valueOf(j2), getLocalhost());
    }

    @VisibleForTesting
    Path toFinalLocation(long j, long j2, long j3) throws URISyntaxException, UnknownHostException {
        return new Path(new URI(getURI().toString() + this.DIR_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(j), ZoneId.ofOffset(HftpFileSystem.HFTP_TIMEZONE, ZoneOffset.ofHours(0)))) + "/" + toFinalName(j2, j3)));
    }

    @VisibleForTesting
    Path toTmpLocation(long j) throws UnknownHostException, URISyntaxException {
        return new Path(new URI(getURI().toString() + "/.tmp/" + toPartName(j)));
    }

    private <T> T getCfg(String str, Map<String, Object> map, Function<Object, T> function, T t) {
        return (T) Optional.ofNullable(map.get(str)).map(function).orElse(t);
    }

    private void clearTmpDir() {
        try {
            Path path = new Path(getURI().toString() + "/.tmp/");
            if (this.fs.exists(path)) {
                RemoteIterator<LocatedFileStatus> listFiles = this.fs.listFiles(path, false);
                String localhost = getLocalhost();
                while (listFiles.hasNext()) {
                    LocatedFileStatus next = listFiles.next();
                    if (matchesHostname(localhost, next)) {
                        this.fs.delete(next.getPath(), false);
                    }
                }
            }
        } catch (IOException e) {
            log.warn("Failed to clean tmp dir", (Throwable) e);
        }
    }

    private boolean matchesHostname(String str, LocatedFileStatus locatedFileStatus) {
        return locatedFileStatus.getPath().getName().endsWith(HelpFormatter.DEFAULT_OPT_PREFIX + str);
    }

    private String getLocalhost() throws UnknownHostException {
        return InetAddress.getLocalHost().getCanonicalHostName();
    }

    public List<Partition> getPartitions(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        try {
            RemoteIterator<LocatedFileStatus> listFiles = this.fs.listFiles(new Path(getURI().toString()), true);
            HdfsPartition hdfsPartition = null;
            while (listFiles.hasNext()) {
                LocatedFileStatus next = listFiles.next();
                if (next.isFile()) {
                    if (hdfsPartition == null) {
                        hdfsPartition = new HdfsPartition(arrayList.size());
                    }
                    Map.Entry<Long, Long> minMaxStamp = getMinMaxStamp(next.getPath().getName());
                    long longValue = minMaxStamp.getKey().longValue();
                    if (minMaxStamp.getValue().longValue() >= j && longValue <= j2) {
                        hdfsPartition.add(next);
                        if (hdfsPartition.size() > this.batchProcessSize) {
                            arrayList.add(hdfsPartition);
                            hdfsPartition = null;
                        }
                    }
                }
            }
            if (hdfsPartition != null && hdfsPartition.size() > 0) {
                arrayList.add(hdfsPartition);
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        this.executor.execute(() -> {
            try {
                Iterator it = list.iterator();
                while (1 != 0 && it.hasNext()) {
                    HdfsPartition hdfsPartition = (HdfsPartition) it.next();
                    for (Path path : hdfsPartition.getFiles()) {
                        try {
                            if (!path.getParent().getName().equals(".tmp")) {
                                long j = 0;
                                BytesWritable bytesWritable = new BytesWritable();
                                TimestampedNullableBytesWritable timestampedNullableBytesWritable = new TimestampedNullableBytesWritable();
                                SequenceFile.Reader reader = new SequenceFile.Reader(this.conf, SequenceFile.Reader.file(path));
                                Throwable th = null;
                                while (reader.next(bytesWritable, timestampedNullableBytesWritable)) {
                                    try {
                                        try {
                                            long j2 = j;
                                            j = j2 + 1;
                                            batchLogObserver.onNext(toStreamElement(batchLogObserver, j2, bytesWritable, timestampedNullableBytesWritable), hdfsPartition);
                                        } catch (Throwable th2) {
                                            if (reader != null) {
                                                if (th != null) {
                                                    try {
                                                        reader.close();
                                                    } catch (Throwable th3) {
                                                        th.addSuppressed(th3);
                                                    }
                                                } else {
                                                    reader.close();
                                                }
                                            }
                                            throw th2;
                                        }
                                    } catch (Throwable th4) {
                                        th = th4;
                                        throw th4;
                                    }
                                }
                                if (reader != null) {
                                    if (0 != 0) {
                                        try {
                                            reader.close();
                                        } catch (Throwable th5) {
                                            th.addSuppressed(th5);
                                        }
                                    } else {
                                        reader.close();
                                    }
                                }
                            }
                        } catch (IOException e) {
                            throw new RuntimeException("Failed to read file " + path, e);
                        }
                    }
                }
                batchLogObserver.onCompleted();
            } catch (Throwable th6) {
                batchLogObserver.onError(th6);
            }
        });
    }

    private StreamElement toStreamElement(Path path, long j, BytesWritable bytesWritable, TimestampedNullableBytesWritable timestampedNullableBytesWritable) {
        String str = new String(bytesWritable.copyBytes());
        String[] split = str.split("#", 2);
        if (split.length != 2) {
            throw new IllegalArgumentException("Invalid input in key bytes " + str);
        }
        String str2 = split[0];
        String str3 = split[1];
        AttributeDescriptor attributeDescriptor = (AttributeDescriptor) getEntityDescriptor().findAttribute(str3).orElseThrow(() -> {
            return new IllegalArgumentException("Attribute " + str3 + " does not exist in entity " + getEntityDescriptor().getName());
        });
        String str4 = path + ":" + j;
        return timestampedNullableBytesWritable.hasValue() ? StreamElement.update(getEntityDescriptor(), attributeDescriptor, str4, str2, str3, timestampedNullableBytesWritable.getStamp(), timestampedNullableBytesWritable.getValue()) : StreamElement.delete(getEntityDescriptor(), attributeDescriptor, str4, str2, str3, timestampedNullableBytesWritable.getStamp());
    }

    @VisibleForTesting
    static Map.Entry<Long, Long> getMinMaxStamp(String str) {
        Matcher matcher = PART_FILE_PARSER.matcher(str);
        return matcher.find() ? Maps.immutableEntry(Long.valueOf(matcher.group(1)), Long.valueOf(matcher.group(2))) : Maps.immutableEntry(-1L, -1L);
    }

    public Optional<AttributeWriterBase> getWriter(Context context) {
        this.executor = context.getExecutorService();
        return Optional.of(this);
    }

    public Optional<BatchLogObservable> getBatchLogObservable(Context context) {
        this.executor = context.getExecutorService();
        return Optional.of(this);
    }
}
