package cz.o2.proxima.storage.hdfs;

import cz.o2.proxima.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.hadoop.shaded.com.google.common.collect.Maps;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Context;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.batch.BatchLogObservable;
import cz.o2.proxima.storage.batch.BatchLogObserver;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.regex.Matcher;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/storage/hdfs/HdfsBatchLogObservable.class */
public class HdfsBatchLogObservable implements BatchLogObservable, Serializable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) HdfsBatchLogObservable.class);
    private final EntityDescriptor entityDesc;
    private final URI uri;
    private final Map<String, Object> cfg;
    private final long batchProcessSize;
    private final Context context;
    private transient Executor executor;

    public HdfsBatchLogObservable(EntityDescriptor entityDescriptor, URI uri, Map<String, Object> map, Context context, long j) {
        this.entityDesc = entityDescriptor;
        this.cfg = map;
        this.uri = uri;
        this.context = context;
        this.batchProcessSize = j;
    }

    public List<Partition> getPartitions(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        try {
            RemoteIterator<LocatedFileStatus> listFiles = HdfsDataAccessor.getFs(this.uri, this.cfg).listFiles(new Path(this.uri.toString()), true);
            HdfsPartition hdfsPartition = new HdfsPartition(arrayList.size());
            while (listFiles.hasNext()) {
                LocatedFileStatus next = listFiles.next();
                if (next.isFile()) {
                    Map.Entry<Long, Long> minMaxStamp = getMinMaxStamp(next.getPath().getName());
                    long longValue = minMaxStamp.getKey().longValue();
                    if (minMaxStamp.getValue().longValue() >= j && longValue <= j2) {
                        hdfsPartition.add(next);
                    }
                    if (hdfsPartition.size() > this.batchProcessSize) {
                        arrayList.add(hdfsPartition);
                        hdfsPartition = new HdfsPartition(arrayList.size());
                    }
                }
            }
            if (hdfsPartition.size() > 0) {
                arrayList.add(hdfsPartition);
            }
            return arrayList;
        } catch (IOException e) {
            throw new RuntimeException("Failed to retrieve partitions", e);
        }
    }

    public void observe(List<Partition> list, List<AttributeDescriptor<?>> list2, BatchLogObserver batchLogObserver) {
        if (this.executor == null) {
            this.executor = this.context.getExecutorService();
        }
        this.executor.execute(() -> {
            try {
                Iterator it = list.iterator();
                while (1 != 0 && it.hasNext()) {
                    HdfsPartition hdfsPartition = (HdfsPartition) it.next();
                    Iterator<URI> it2 = hdfsPartition.getFiles().iterator();
                    while (it2.hasNext()) {
                        processFile(batchLogObserver, hdfsPartition, new Path(it2.next()));
                    }
                }
                batchLogObserver.onCompleted();
            } catch (Throwable th) {
                batchLogObserver.onError(th);
            }
        });
    }

    private void processFile(BatchLogObserver batchLogObserver, HdfsPartition hdfsPartition, Path path) {
        try {
            if (!path.getParent().getName().equals(".tmp")) {
                long j = 0;
                BytesWritable bytesWritable = new BytesWritable();
                TimestampedNullableBytesWritable timestampedNullableBytesWritable = new TimestampedNullableBytesWritable();
                SequenceFile.Reader reader = new SequenceFile.Reader(HdfsDataAccessor.toHadoopConf(this.cfg), SequenceFile.Reader.file(path));
                Throwable th = null;
                while (reader.next(bytesWritable, timestampedNullableBytesWritable)) {
                    try {
                        try {
                            long j2 = j;
                            j = j2 + 1;
                            batchLogObserver.onNext(toStreamElement(batchLogObserver, j2, bytesWritable, timestampedNullableBytesWritable), hdfsPartition);
                        } finally {
                        }
                    } finally {
                    }
                }
                if (reader != null) {
                    if (0 != 0) {
                        try {
                            reader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        reader.close();
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("Failed to read file " + path, e);
        }
    }

    private StreamElement toStreamElement(Path path, long j, BytesWritable bytesWritable, TimestampedNullableBytesWritable timestampedNullableBytesWritable) {
        String str = new String(bytesWritable.copyBytes());
        String[] split = str.split("#", 2);
        if (split.length != 2) {
            throw new IllegalArgumentException("Invalid input in key bytes " + str);
        }
        String str2 = split[0];
        String str3 = split[1];
        AttributeDescriptor attributeDescriptor = (AttributeDescriptor) this.entityDesc.findAttribute(str3).orElseThrow(() -> {
            return new IllegalArgumentException("Attribute " + str3 + " does not exist in entity " + this.entityDesc.getName());
        });
        String str4 = path + ":" + j;
        return timestampedNullableBytesWritable.hasValue() ? StreamElement.update(this.entityDesc, attributeDescriptor, str4, str2, str3, timestampedNullableBytesWritable.getStamp(), timestampedNullableBytesWritable.getValue()) : StreamElement.delete(this.entityDesc, attributeDescriptor, str4, str2, str3, timestampedNullableBytesWritable.getStamp());
    }

    @VisibleForTesting
    static Map.Entry<Long, Long> getMinMaxStamp(String str) {
        Matcher matcher = HdfsDataAccessor.PART_FILE_PARSER.matcher(str);
        return matcher.find() ? Maps.immutableEntry(Long.valueOf(matcher.group(1)), Long.valueOf(matcher.group(2))) : Maps.immutableEntry(-1L, -1L);
    }
}
