package org.apache.seatunnel.connectors.cdc.base.relational;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkEvent;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;

/* loaded from: input_file:org/apache/seatunnel/connectors/cdc/base/relational/JdbcSourceEventDispatcher.class */
public class JdbcSourceEventDispatcher extends EventDispatcher<TableId> {
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final String topic;

    public JdbcSourceEventDispatcher(CommonConnectorConfig commonConnectorConfig, TopicSelector<TableId> topicSelector, DatabaseSchema<TableId> databaseSchema, ChangeEventQueue<DataChangeEvent> changeEventQueue, DataCollectionFilters.DataCollectionFilter<TableId> dataCollectionFilter, ChangeEventCreator changeEventCreator, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
        super(commonConnectorConfig, topicSelector, databaseSchema, changeEventQueue, dataCollectionFilter, changeEventCreator, eventMetadataProvider, Heartbeat.create(getHeartbeatInterval(commonConnectorConfig), topicSelector.getHeartbeatTopic(), commonConnectorConfig.getLogicalName()), schemaNameAdjuster);
        this.queue = changeEventQueue;
        this.topic = topicSelector.getPrimaryTopic();
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public void dispatchWatermarkEvent(Map<String, ?> map, SourceSplitBase sourceSplitBase, Offset offset, WatermarkKind watermarkKind) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(WatermarkEvent.create(map, this.topic, sourceSplitBase.splitId(), watermarkKind, offset)));
    }

    private static Duration getHeartbeatInterval(CommonConnectorConfig commonConnectorConfig) {
        Duration duration = commonConnectorConfig.getConfig().getDuration(Heartbeat.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
        return duration.isZero() ? Duration.ofMillis(5000L) : duration;
    }
}
