package org.apache.seatunnel.connectors.cdc.base.source.reader;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotPhaseEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.CompletedSnapshotSplitsReportEvent;
import org.apache.seatunnel.connectors.cdc.base.source.event.SnapshotSplitWatermark;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.IncrementalSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SnapshotSplitState;
import org.apache.seatunnel.connectors.cdc.base.source.split.state.SourceSplitStateBase;
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/source/reader/IncrementalSourceReader.class */
public class IncrementalSourceReader<T, C extends SourceConfig> extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitStateBase> {
    private static final Logger log = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    private volatile boolean running;
    private final int subtaskId;
    private final C sourceConfig;
    private final DebeziumDeserializationSchema<T> debeziumDeserializationSchema;
    private final DataSourceDialect<C> dataSourceDialect;
    private final AtomicBoolean needSendSplitRequest;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public IncrementalSourceReader(DataSourceDialect<C> dataSourceDialect, BlockingQueue<RecordsWithSplitIds<SourceRecords>> blockingQueue, Supplier<IncrementalSourceSplitReader<C>> supplier, RecordEmitter<SourceRecords, T, SourceSplitStateBase> recordEmitter, SourceReaderOptions sourceReaderOptions, SourceReader.Context context, C c, DebeziumDeserializationSchema<T> debeziumDeserializationSchema) {
        super(blockingQueue, new SingleThreadFetcherManager(blockingQueue, supplier::get), recordEmitter, sourceReaderOptions, context);
        supplier.getClass();
        this.running = false;
        this.needSendSplitRequest = new AtomicBoolean(false);
        this.dataSourceDialect = dataSourceDialect;
        this.sourceConfig = c;
        this.finishedUnackedSplits = new HashMap();
        this.subtaskId = context.getIndexOfSubtask();
        this.debeziumDeserializationSchema = debeziumDeserializationSchema;
    }

    public void pollNext(Collector<T> collector) throws Exception {
        if (!this.running) {
            if (getNumberOfCurrentlyAssignedSplits() == 0) {
                this.context.sendSplitRequest();
            }
            this.running = true;
        }
        if (this.needSendSplitRequest.get()) {
            this.context.sendSplitRequest();
            this.needSendSplitRequest.compareAndSet(true, false);
        }
        super.pollNext(collector);
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.dataSourceDialect.notifyCheckpointComplete(j);
    }

    public void addSplits(List<SourceSplitBase> list) {
        ArrayList arrayList = new ArrayList();
        log.info("subtask {} add splits: {}", Integer.valueOf(this.subtaskId), list.stream().map((v0) -> {
            return v0.splitId();
        }).collect(Collectors.joining(",")));
        for (SourceSplitBase sourceSplitBase : list) {
            if (sourceSplitBase.isSnapshotSplit()) {
                SnapshotSplit asSnapshotSplit = sourceSplitBase.asSnapshotSplit();
                if (asSnapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(asSnapshotSplit.splitId(), asSnapshotSplit);
                    log.info("subtask {} add finished split: {}", Integer.valueOf(this.subtaskId), asSnapshotSplit.splitId());
                } else {
                    arrayList.add(sourceSplitBase);
                }
            } else {
                arrayList.add(sourceSplitBase.asIncrementalSplit());
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        if (arrayList.isEmpty()) {
            this.needSendSplitRequest.set(true);
        } else {
            super.addSplits(arrayList);
        }
    }

    protected void onSplitFinished(Map<String, SourceSplitStateBase> map) {
        Iterator<SourceSplitStateBase> it = map.values().iterator();
        while (it.hasNext()) {
            SourceSplitBase sourceSplit = it.next().toSourceSplit();
            Preconditions.checkState(sourceSplit.isSnapshotSplit() && sourceSplit.asSnapshotSplit().isSnapshotReadFinished(), String.format("Only snapshot split could finish, but the actual split is incremental split %s", sourceSplit));
            this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
        }
        reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (this.finishedUnackedSplits.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (SnapshotSplit snapshotSplit : this.finishedUnackedSplits.values()) {
            arrayList.add(new SnapshotSplitWatermark(snapshotSplit.splitId(), snapshotSplit.getLowWatermark(), snapshotSplit.getHighWatermark()));
        }
        CompletedSnapshotSplitsReportEvent completedSnapshotSplitsReportEvent = new CompletedSnapshotSplitsReportEvent();
        completedSnapshotSplitsReportEvent.setCompletedSnapshotSplitWatermarks(arrayList);
        this.context.sendSourceEventToEnumerator(completedSnapshotSplitsReportEvent);
        this.finishedUnackedSplits.clear();
        log.debug("The subtask {} reports offsets of finished snapshot splits {}.", Integer.valueOf(this.subtaskId), arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitStateBase initializedState(SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new SnapshotSplitState(sourceSplitBase.asSnapshotSplit());
        }
        IncrementalSplit asIncrementalSplit = sourceSplitBase.asIncrementalSplit();
        if (asIncrementalSplit.getCheckpointDataType() != null) {
            log.info("The incremental split[{}] has checkpoint datatype {} for restore.", asIncrementalSplit.splitId(), asIncrementalSplit.getCheckpointDataType());
            this.debeziumDeserializationSchema.restoreCheckpointProducedType(asIncrementalSplit.getCheckpointDataType());
        }
        IncrementalSplitState incrementalSplitState = new IncrementalSplitState(asIncrementalSplit);
        if (incrementalSplitState.autoEnterPureIncrementPhaseIfAllowed()) {
            log.info("The incremental split[{}] startup position {} is equal the maxSnapshotSplitsHighWatermark {}, auto enter pure increment phase.", new Object[]{asIncrementalSplit.splitId(), incrementalSplitState.getStartupOffset(), incrementalSplitState.getMaxSnapshotSplitsHighWatermark()});
            log.info("Clean the IncrementalSplit#completedSnapshotSplitInfos to empty.");
            this.context.sendSourceEventToEnumerator(new CompletedSnapshotPhaseEvent(incrementalSplitState.getTableIds()));
        }
        return incrementalSplitState;
    }

    public List<SourceSplitBase> snapshotState(long j) {
        List<SourceSplitBase> list = (List) super.snapshotState(j).stream().filter(sourceSplitBase -> {
            return !this.finishedUnackedSplits.containsKey(sourceSplitBase.splitId());
        }).collect(Collectors.toList());
        list.addAll(this.finishedUnackedSplits.values());
        return isIncrementalSplitPhase(list) ? snapshotCheckpointDataType(list) : list;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitBase toSplitType(String str, SourceSplitStateBase sourceSplitStateBase) {
        return sourceSplitStateBase.toSourceSplit();
    }

    private boolean isIncrementalSplitPhase(List<SourceSplitBase> list) {
        return list.size() == 1 && list.get(0).isIncrementalSplit();
    }

    private List<SourceSplitBase> snapshotCheckpointDataType(List<SourceSplitBase> list) {
        if (!isIncrementalSplitPhase(list)) {
            throw new IllegalStateException("The splits should be incremental split when snapshot  checkpoint datatype");
        }
        IncrementalSplit asIncrementalSplit = list.get(0).asIncrementalSplit();
        SeaTunnelDataType<T> producedType = this.debeziumDeserializationSchema.getProducedType();
        IncrementalSplit incrementalSplit = new IncrementalSplit(asIncrementalSplit, producedType);
        log.debug("Snapshot checkpoint datatype {} into split[{}] state.", producedType, asIncrementalSplit.splitId());
        return Arrays.asList(incrementalSplit);
    }
}
