package org.apache.seatunnel.engine.checkpoint.storage.api;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.common.ProtoStuffSerializer;
import org.apache.seatunnel.engine.checkpoint.storage.common.Serializer;
import org.apache.seatunnel.engine.checkpoint.storage.common.StorageThreadFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.class */
public abstract class AbstractCheckpointStorage implements CheckpointStorage {
    public static final String DEFAULT_CHECKPOINT_FILE_PATH_SPLIT = "/";
    public static final String FILE_NAME_SPLIT = "-";
    public static final int FILE_NAME_PIPELINE_ID_INDEX = 2;
    public static final int FILE_NAME_CHECKPOINT_ID_INDEX = 3;
    public static final int FILE_SORT_ID_INDEX = 0;
    public static final int FILE_NAME_RANDOM_RANGE = 1000;
    public static final String FILE_FORMAT = "ser";
    private volatile ExecutorService executorService;
    private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024;
    private static final Logger log = LoggerFactory.getLogger(AbstractCheckpointStorage.class);
    private static final int DEFAULT_THREAD_POOL_MIN_SIZE = (Runtime.getRuntime().availableProcessors() * 2) + 1;
    private static final int DEFAULT_THREAD_POOL_MAX_SIZE = (Runtime.getRuntime().availableProcessors() * 4) + 1;
    private final Serializer serializer = new ProtoStuffSerializer();
    private String storageNameSpace = "/seatunnel/checkpoint/";

    public abstract void initStorage(Map<String, String> map) throws CheckpointStorageException;

    public String getStorageParentDirectory() {
        return this.storageNameSpace;
    }

    public String getCheckPointName(PipelineState pipelineState) {
        return System.currentTimeMillis() + FILE_NAME_SPLIT + ThreadLocalRandom.current().nextInt(FILE_NAME_RANDOM_RANGE) + FILE_NAME_SPLIT + pipelineState.getPipelineId() + FILE_NAME_SPLIT + pipelineState.getCheckpointId() + "." + FILE_FORMAT;
    }

    public byte[] serializeCheckPointData(PipelineState pipelineState) throws IOException {
        return this.serializer.serialize(pipelineState);
    }

    public PipelineState deserializeCheckPointData(byte[] bArr) throws IOException {
        return (PipelineState) this.serializer.deserialize(bArr, PipelineState.class);
    }

    public void setStorageNameSpace(String str) {
        if (str != null) {
            this.storageNameSpace = str;
        }
    }

    public Set<String> getLatestPipelineNames(Collection<String> collection) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        collection.forEach(str -> {
            String[] fileNameSegments = getFileNameSegments(str);
            long parseLong = Long.parseLong(fileNameSegments[0]);
            String str = fileNameSegments[2];
            Long l = (Long) hashMap2.get(str);
            if (Objects.isNull(l) || parseLong > l.longValue()) {
                hashMap2.put(str, Long.valueOf(parseLong));
                hashMap.put(str, str);
            }
        });
        return (Set) hashMap.entrySet().stream().map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toSet());
    }

    public String getLatestCheckpointFileNameByJobIdAndPipelineId(List<String> list, String str) {
        AtomicReference atomicReference = new AtomicReference();
        AtomicLong atomicLong = new AtomicLong();
        list.forEach(str2 -> {
            String[] fileNameSegments = getFileNameSegments(str2);
            long parseLong = Long.parseLong(fileNameSegments[0]);
            if (!str.equals(fileNameSegments[2]) || parseLong <= atomicLong.get()) {
                return;
            }
            atomicLong.set(parseLong);
            atomicReference.set(str2);
        });
        return (String) atomicReference.get();
    }

    private String[] getFileNameSegments(String str) {
        return str.split(FILE_NAME_SPLIT);
    }

    public String getPipelineIdByFileName(String str) {
        return getFileNameSegments(str)[2];
    }

    public String getCheckpointIdByFileName(String str) {
        return getFileNameSegments(str)[3].split("\\.")[0];
    }

    @Override // org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage
    public void asyncStoreCheckPoint(PipelineState pipelineState) {
        initExecutor();
        this.executorService.submit(() -> {
            try {
                storeCheckPoint(pipelineState);
            } catch (Exception e) {
                log.error(String.format("store checkpoint failed, job id : %s, pipeline id : %d", pipelineState.getJobId(), Integer.valueOf(pipelineState.getPipelineId())), e);
            }
        });
    }

    private void initExecutor() {
        if (null == this.executorService || this.executorService.isShutdown()) {
            synchronized (this) {
                if (null == this.executorService || this.executorService.isShutdown()) {
                    this.executorService = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_MIN_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(DEFAULT_THREAD_POOL_QUENE_SIZE), new StorageThreadFactory());
                }
            }
        }
    }
}
