package org.apache.skywalking.oap.server.library.buffer;

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.Parser;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.library.buffer.Offset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/library/buffer/DataStreamReader.class */
public class DataStreamReader<MESSAGE_TYPE extends GeneratedMessageV3> {
    private static final Logger log = LoggerFactory.getLogger(DataStreamReader.class);
    private final File directory;
    private final Offset.ReadOffset readOffset;
    private final Parser<MESSAGE_TYPE> parser;
    private final CallBack<MESSAGE_TYPE> callBack;
    private final int collectionSize = 100;
    private final BufferDataCollection<MESSAGE_TYPE> bufferDataCollection = new BufferDataCollection<>(100);
    private File readingFile;
    private InputStream inputStream;

    /* loaded from: input_file:org/apache/skywalking/oap/server/library/buffer/DataStreamReader$CallBack.class */
    public interface CallBack<MESSAGE_TYPE extends GeneratedMessageV3> {
        boolean call(BufferData<MESSAGE_TYPE> bufferData);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStreamReader(File file, Offset.ReadOffset readOffset, Parser<MESSAGE_TYPE> parser, CallBack<MESSAGE_TYPE> callBack) {
        this.directory = file;
        this.readOffset = readOffset;
        this.parser = parser;
        this.callBack = callBack;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize() {
        preRead();
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new RunnableWithExceptionProtection(this::read, th -> {
            log.error("Buffer data pre read failure.", th);
        }), 3L, 1L, TimeUnit.SECONDS);
    }

    private void preRead() {
        String fileName = this.readOffset.getFileName();
        if (StringUtil.isEmpty(fileName)) {
            openInputStream(readEarliestDataFile());
            return;
        }
        File file = new File(this.directory, fileName);
        if (!file.exists()) {
            openInputStream(readEarliestDataFile());
            return;
        }
        openInputStream(file);
        try {
            this.inputStream.skip(this.readOffset.getOffset());
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    private void openInputStream(File file) {
        try {
            this.readingFile = file;
            if (Objects.nonNull(this.inputStream)) {
                this.inputStream.close();
            }
            this.inputStream = new FileInputStream(file);
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    private File readEarliestDataFile() {
        String[] list = this.directory.list(new PrefixFileFilter("data"));
        if (list == null || list.length <= 0) {
            return null;
        }
        BufferFileUtils.sort(list);
        this.readOffset.setFileName(list[0]);
        this.readOffset.setOffset(0L);
        return new File(this.directory, list[0]);
    }

    private void read() {
        if (log.isDebugEnabled()) {
            log.debug("Read buffer data");
        }
        try {
            if (this.readOffset.getOffset() == this.readingFile.length() && !this.readOffset.isCurrentWriteFile()) {
                FileUtils.forceDelete(this.readingFile);
                openInputStream(readEarliestDataFile());
            }
            while (this.readOffset.getOffset() < this.readingFile.length()) {
                BufferData<MESSAGE_TYPE> bufferData = new BufferData<>((GeneratedMessageV3) this.parser.parseDelimitedFrom(this.inputStream));
                if (bufferData.getMessageType() != null) {
                    boolean call = this.callBack.call(bufferData);
                    int serializedSize = bufferData.getMessageType().getSerializedSize();
                    this.readOffset.setOffset(this.readOffset.getOffset() + CodedOutputStream.computeUInt32SizeNoTag(serializedSize) + serializedSize);
                    if (!call) {
                        if (this.bufferDataCollection.size() == 100) {
                            reCall();
                        }
                        this.bufferDataCollection.add(bufferData);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("collection size: {}, max size: {}", Integer.valueOf(this.bufferDataCollection.size()), 100);
                    }
                } else if (this.bufferDataCollection.size() > 0) {
                    reCall();
                } else {
                    try {
                        TimeUnit.SECONDS.sleep(5L);
                    } catch (InterruptedException e) {
                        log.error(e.getMessage(), e);
                    }
                }
            }
            if (this.bufferDataCollection.size() > 0) {
                reCall();
            }
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
        }
    }

    private void reCall() {
        for (int i = 1; i <= 10 && this.bufferDataCollection.size() > 0; i++) {
            for (BufferData<MESSAGE_TYPE> bufferData : this.bufferDataCollection.export()) {
                if (!this.callBack.call(bufferData) && i != 10) {
                    this.bufferDataCollection.add(bufferData);
                }
            }
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
            } catch (InterruptedException e) {
                log.error(e.getMessage(), e);
            }
        }
    }
}
