package org.apache.carbondata.streaming;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
import org.apache.carbondata.spark.HandoffResultImpl;
import org.apache.spark.sql.SparkSession;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: StreamHandoffRDD.scala */
/* loaded from: input_file:org/apache/carbondata/streaming/StreamHandoffRDD$.class */
public final class StreamHandoffRDD$ implements Serializable {
    public static final StreamHandoffRDD$ MODULE$ = null;
    private final LogService LOGGER;

    static {
        new StreamHandoffRDD$();
    }

    private LogService LOGGER() {
        return this.LOGGER;
    }

    public void iterateStreamingHandoff(CarbonLoadModel carbonLoadModel, SparkSession sparkSession) {
        boolean z;
        CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
        AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
        ICarbonLock carbonLockObj = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, "handoff.lock");
        try {
            if (carbonLockObj.lockWithRetries()) {
                LOGGER().info(new StringBuilder().append("Acquired the handoff lock for table").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{" ", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonTable.getDatabaseName(), carbonTable.getTableName()}))).toString());
                do {
                    LoadMetadataDetails[] loadMetadataDetailsArr = null;
                    ICarbonLock tableStatusLock = new SegmentStatusManager(absoluteTableIdentifier).getTableStatusLock();
                    try {
                        if (tableStatusLock.lockWithRetries()) {
                            loadMetadataDetailsArr = SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
                        }
                        if (tableStatusLock != null) {
                            tableStatusLock.unlock();
                        }
                        if (loadMetadataDetailsArr == null) {
                            z = false;
                        } else {
                            LoadMetadataDetails[] loadMetadataDetailsArr2 = (LoadMetadataDetails[]) Predef$.MODULE$.refArrayOps(loadMetadataDetailsArr).filter(new StreamHandoffRDD$$anonfun$2());
                            z = loadMetadataDetailsArr2.length > 0;
                            if (z) {
                                executeStreamingHandoff(carbonLoadModel, sparkSession, loadMetadataDetailsArr2[0].getLoadName());
                            }
                        }
                    } catch (Throwable th) {
                        if (tableStatusLock != null) {
                            tableStatusLock.unlock();
                        }
                        throw th;
                    }
                } while (z);
            }
        } finally {
            if (carbonLockObj != null) {
                carbonLockObj.unlock();
            }
        }
    }

    public void startStreamingHandoffThread(final CarbonLoadModel carbonLoadModel, final SparkSession sparkSession) {
        new Thread(carbonLoadModel, sparkSession) { // from class: org.apache.carbondata.streaming.StreamHandoffRDD$$anon$2
            private final CarbonLoadModel carbonLoadModel$1;
            private final SparkSession sparkSession$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                StreamHandoffRDD$.MODULE$.iterateStreamingHandoff(this.carbonLoadModel$1, this.sparkSession$1);
            }

            {
                this.carbonLoadModel$1 = carbonLoadModel;
                this.sparkSession$1 = sparkSession;
            }
        }.start();
    }

    public void executeStreamingHandoff(CarbonLoadModel carbonLoadModel, SparkSession sparkSession, String str) {
        ObjectRef create = ObjectRef.create(SegmentStatus.SUCCESS);
        String str2 = "Handoff failure";
        try {
            LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis());
            CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, SegmentStatus.INSERT_IN_PROGRESS, carbonLoadModel.getFactTimeStamp(), false);
            CarbonLoaderUtil.recordNewLoadMetadata(loadMetadataDetails, carbonLoadModel, true, false);
            Predef$.MODULE$.refArrayOps((Tuple2[]) new StreamHandoffRDD(sparkSession.sparkContext(), new HandoffResultImpl(), carbonLoadModel, str).collect()).foreach(new StreamHandoffRDD$$anonfun$executeStreamingHandoff$1(create));
        } catch (Exception e) {
            create.elem = SegmentStatus.LOAD_FAILURE;
            str2 = new StringBuilder().append(str2).append(": ").append(e.getCause().getMessage()).toString();
            LOGGER().error(str2);
            LOGGER().error(e, new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Handoff failed on streaming segment ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str})));
        }
        SegmentStatus segmentStatus = (SegmentStatus) create.elem;
        SegmentStatus segmentStatus2 = SegmentStatus.LOAD_FAILURE;
        if (segmentStatus != null ? segmentStatus.equals(segmentStatus2) : segmentStatus2 == null) {
            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel);
            LOGGER().info("********starting clean up**********");
            CarbonLoaderUtil.deleteSegment(carbonLoadModel, new StringOps(Predef$.MODULE$.augmentString(carbonLoadModel.getSegmentId())).toInt());
            LOGGER().info("********clean up done**********");
            LOGGER().audit(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Handoff is failed for "})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonLoadModel.getDatabaseName(), carbonLoadModel.getTableName()}))).toString());
            LOGGER().warn("Cannot write load metadata file as handoff failed");
            throw new Exception(str2);
        }
        SegmentStatus segmentStatus3 = (SegmentStatus) create.elem;
        SegmentStatus segmentStatus4 = SegmentStatus.SUCCESS;
        if (segmentStatus3 == null) {
            if (segmentStatus4 != null) {
                return;
            }
        } else if (!segmentStatus3.equals(segmentStatus4)) {
            return;
        }
        if (updateLoadMetadata(str, carbonLoadModel)) {
            return;
        }
        LOGGER().audit(new StringBuilder().append("Handoff is failed for ").append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", ".", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{carbonLoadModel.getDatabaseName(), carbonLoadModel.getTableName()}))).toString());
        LOGGER().error("Handoff failed due to failure in table status updation.");
        throw new Exception("Handoff failed due to failure in table status updation.");
    }

    private boolean updateLoadMetadata(String str, CarbonLoadModel carbonLoadModel) {
        boolean z = false;
        String metaDataFilepath = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
        AbsoluteTableIdentifier absoluteTableIdentifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
        CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
        String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath();
        FileFactory.FileType fileType = FileFactory.getFileType(metadataDirectoryPath);
        if (FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(FileFactory.mkdirs(metadataDirectoryPath, fileType));
        }
        String tableStatusFilePath = carbonTablePath.getTableStatusFilePath();
        ICarbonLock tableStatusLock = new SegmentStatusManager(absoluteTableIdentifier).getTableStatusLock();
        try {
            if (tableStatusLock.lockWithRetries()) {
                LOGGER().info(new StringBuilder().append("Acquired lock for table").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).append(" for table status updation").toString());
                LoadMetadataDetails[] readLoadMetadata = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
                Option find = Predef$.MODULE$.refArrayOps(readLoadMetadata).find(new StreamHandoffRDD$$anonfun$3(carbonLoadModel));
                if (find.isEmpty()) {
                    throw new Exception("Failed to update table status for new segment");
                }
                ((LoadMetadataDetails) find.get()).setSegmentStatus(SegmentStatus.SUCCESS);
                ((LoadMetadataDetails) find.get()).setLoadEndTime(System.currentTimeMillis());
                Option find2 = Predef$.MODULE$.refArrayOps(readLoadMetadata).find(new StreamHandoffRDD$$anonfun$4(str));
                if (find2.isEmpty()) {
                    throw new Exception("Failed to update table status for streaming segment");
                }
                ((LoadMetadataDetails) find2.get()).setSegmentStatus(SegmentStatus.COMPACTED);
                SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFilePath, readLoadMetadata);
                z = true;
            } else {
                LOGGER().error(new StringBuilder().append("Not able to acquire the lock for Table status updation for table ").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).toString());
            }
            return z;
        } finally {
            if (tableStatusLock.unlock()) {
                LOGGER().info(new StringBuilder().append("Table unlocked successfully after table status updation").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).toString());
            } else {
                LOGGER().error(new StringBuilder().append("Unable to unlock Table lock for table").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).append(" during table status updation").toString());
            }
        }
    }

    private Object readResolve() {
        return MODULE$;
    }

    private StreamHandoffRDD$() {
        MODULE$ = this;
        this.LOGGER = LogServiceFactory.getLogService(getClass().getCanonicalName());
    }
}
