package org.apache.seatunnel.translation.flink.source;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.class */
public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row> implements CheckpointListener, ResultTypeQueryable<Row>, CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BaseSeaTunnelSourceFunction.class);
    protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
    protected volatile transient BaseSourceFunction<SeaTunnelRow> internalSource;
    protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
    protected volatile transient Map<Integer, List<byte[]>> restoredState = new HashMap();
    protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
    protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
    private volatile boolean running = true;

    public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource) {
        this.source = seaTunnelSource;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.internalSource = createInternalSource();
        this.internalSource.open();
    }

    protected abstract BaseSourceFunction<SeaTunnelRow> createInternalSource();

    public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
        this.internalSource.run(new RowCollector(sourceContext, sourceContext.getCheckpointLock(), this.source.getProducedType()));
        long j = this.latestTriggerCheckpointId.get();
        if ((getRuntimeContext() instanceof StreamingRuntimeContext) && getRuntimeContext().isCheckpointingEnabled()) {
            while (this.running && j >= this.latestCompletedCheckpointId.get()) {
                Thread.sleep(100L);
            }
        }
    }

    public void close() throws Exception {
        cancel();
        LOG.debug("Close the SeaTunnelSourceFunction of Flink.");
    }

    public void cancel() {
        this.running = false;
        try {
            if (this.internalSource != null) {
                LOG.debug("Cancel the SeaTunnelSourceFunction of Flink.");
                this.internalSource.close();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.internalSource.notifyCheckpointComplete(j);
        this.latestCompletedCheckpointId.set(j);
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        this.internalSource.notifyCheckpointAborted(j);
    }

    public TypeInformation<Row> getProducedType() {
        return TypeConverterUtils.convert(this.source.getProducedType());
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        long checkpointId = functionSnapshotContext.getCheckpointId();
        this.latestTriggerCheckpointId.set(checkpointId);
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
        } else {
            this.sourceState.clear();
            this.sourceState.add(this.internalSource.snapshotState(checkpointId));
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.sourceState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor(getStateName(), Types.MAP(BasicTypeInfo.INT_TYPE_INFO, Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO))));
        if (!functionInitializationContext.isRestored()) {
            LOG.info("Consumer subtask {} has no restore state.", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        } else {
            ((Iterable) this.sourceState.get()).forEach(map -> {
                this.restoredState.putAll(map);
            });
            LOG.info("Consumer subtask {} restored state", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        }
    }

    protected abstract String getStateName();
}
