package com.indeed.lsmtree.recordlog;

import com.google.common.collect.Lists;
import com.indeed.lsmtree.recordlog.RecordFile;
import com.indeed.util.core.io.Closeables2;
import com.indeed.util.io.checkpointer.Checkpointer;
import com.indeed.util.varexport.Export;
import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:com/indeed/lsmtree/recordlog/GenericRecordLogDirectoryPoller.class */
public class GenericRecordLogDirectoryPoller<T> implements Runnable, Closeable {
    private static final Logger log = Logger.getLogger(GenericRecordLogDirectoryPoller.class);
    private static final int SYNC_FREQUENCY = 10000;
    private long lastPosition;
    private final RecordLogDirectory<T> recordLogDirectory;
    private final Checkpointer<Long> checkpointer;
    private final boolean loop;
    private final boolean gc;
    private final boolean skipFirst;
    private final AtomicBoolean isClosed;
    private final List<Functions<T>> functionsList;
    private Thread pollerThread;

    /* loaded from: input_file:com/indeed/lsmtree/recordlog/GenericRecordLogDirectoryPoller$Functions.class */
    public interface Functions<T> {
        void process(long j, T t) throws IOException;

        void sync() throws IOException;
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer) throws IOException {
        this(recordLogDirectory, checkpointer, true);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean z) throws IOException {
        this(recordLogDirectory, checkpointer, z, false);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean z, boolean z2) throws IOException {
        this(recordLogDirectory, checkpointer, z, z2, false);
    }

    public GenericRecordLogDirectoryPoller(RecordLogDirectory<T> recordLogDirectory, Checkpointer<Long> checkpointer, boolean z, boolean z2, boolean z3) throws IOException {
        this.recordLogDirectory = recordLogDirectory;
        this.checkpointer = checkpointer;
        this.loop = z;
        this.gc = z2;
        this.skipFirst = z3;
        this.isClosed = new AtomicBoolean(false);
        this.functionsList = Lists.newArrayList();
        this.lastPosition = ((Long) checkpointer.getCheckpoint()).longValue();
    }

    public void registerFunctions(Functions<T> functions) {
        this.functionsList.add(functions);
    }

    public void start() {
        this.pollerThread = new Thread(this);
        this.pollerThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        do {
            try {
                try {
                    RecordFile.Reader<T> reader = this.recordLogDirectory.reader(this.lastPosition);
                    int i = 0;
                    long j = this.lastPosition;
                    try {
                        if (this.skipFirst ? reader.next() : true) {
                            while (reader.next()) {
                                this.lastPosition = reader.getPosition();
                                T t = reader.get();
                                Iterator<Functions<T>> it = this.functionsList.iterator();
                                while (it.hasNext()) {
                                    it.next().process(this.lastPosition, t);
                                }
                                i++;
                                if (i % SYNC_FREQUENCY == 0) {
                                    Iterator<Functions<T>> it2 = this.functionsList.iterator();
                                    while (it2.hasNext()) {
                                        it2.next().sync();
                                    }
                                    checkpointPosition(this.lastPosition);
                                }
                                long j2 = this.lastPosition;
                            }
                        }
                        try {
                            reader.close();
                        } catch (IOException e) {
                            log.error("error closing reader", e);
                        }
                        if (j != this.lastPosition) {
                            Iterator<Functions<T>> it3 = this.functionsList.iterator();
                            while (it3.hasNext()) {
                                try {
                                    it3.next().sync();
                                } catch (Exception e2) {
                                    log.error("sync error", e2);
                                    throw new RuntimeException(e2);
                                }
                            }
                            try {
                                checkpointPosition(this.lastPosition);
                            } catch (Exception e3) {
                                log.error("error writing last position file", e3);
                            }
                        }
                        if (this.loop) {
                            Thread.sleep(5000L);
                        }
                    } catch (Exception e4) {
                        log.error("error reading segment file", e4);
                        this.lastPosition = j;
                        Closeables2.closeQuietly(reader, log);
                        Thread.sleep(5000L);
                    }
                } catch (Exception e5) {
                    log.error("error seeking to last valid position", e5);
                    Thread.sleep(5000L);
                }
                if (this.isClosed.get()) {
                    return;
                }
            } catch (InterruptedException e6) {
                return;
            }
        } while (this.loop);
    }

    private void checkpointPosition(long j) throws IOException {
        this.checkpointer.setCheckpoint(Long.valueOf(j));
        if (this.gc) {
            this.recordLogDirectory.garbageCollect(j);
        }
    }

    @Export(name = "last-position", doc = "The last position poller has read")
    public long getLastPosition() {
        return this.lastPosition;
    }

    @Export(name = "current-segment-timestamp", doc = "Timestamp of the current segment being read")
    public long getCurrentSegmentTimestamp() throws IOException {
        return this.recordLogDirectory.getSegmentTimestamp(getCurrentSegmentNum());
    }

    @Export(name = "current-segment-timestring", doc = "Same as current-segment-timestamp in human readable form")
    public String getCurrentSegmentTimestring() throws IOException {
        return new DateTime(getCurrentSegmentTimestamp()).toString();
    }

    @Export(name = "max-segment-timestamp", doc = "Timestamp of the max segment that exists in directory")
    public long getMaxSegmentTimestamp() throws IOException {
        return this.recordLogDirectory.getSegmentTimestamp(getMaxSegmentNum());
    }

    @Export(name = "max-segment-timestring", doc = "Same as max-segment-timestamp in human readable form")
    public String getMaxSegmentTimestring() throws IOException {
        return new DateTime(getMaxSegmentTimestamp()).toString();
    }

    @Export(name = "current-segment-num", doc = "The current segment that the poller is reading")
    public int getCurrentSegmentNum() {
        return this.recordLogDirectory.getSegmentNum(getLastPosition());
    }

    @Export(name = "max-segment-num", doc = "The max segment number that exists in directory")
    public int getMaxSegmentNum() throws IOException {
        return this.recordLogDirectory.getMaxSegmentNum();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.isClosed.set(true);
        if (this.pollerThread != null) {
            while (this.pollerThread.isAlive()) {
                Thread.yield();
            }
        }
    }

    public boolean isAlive() {
        return this.pollerThread.isAlive();
    }
}
