package com.scylladb.cdc.debezium.connector;

import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.cql.Cell;
import com.scylladb.cdc.model.worker.cql.CqlDate;
import io.debezium.data.Envelope;
import io.debezium.pipeline.AbstractChangeRecordEmitter;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.util.Clock;
import java.time.Instant;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.ConnectHeaders;

/* loaded from: input_file:com/scylladb/cdc/debezium/connector/ScyllaChangeRecordEmitter.class */
public class ScyllaChangeRecordEmitter extends AbstractChangeRecordEmitter<ScyllaCollectionSchema> {
    private final RawChange change;
    private final ScyllaSchema schema;
    private final RawChange preImage;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.scylladb.cdc.debezium.connector.ScyllaChangeRecordEmitter$1, reason: invalid class name */
    /* loaded from: input_file:com/scylladb/cdc/debezium/connector/ScyllaChangeRecordEmitter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType = new int[RawChange.OperationType.values().length];

        static {
            try {
                $SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType[RawChange.OperationType.ROW_UPDATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType[RawChange.OperationType.ROW_INSERT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType[RawChange.OperationType.PARTITION_DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType[RawChange.OperationType.ROW_DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public ScyllaChangeRecordEmitter(RawChange rawChange, OffsetContext offsetContext, ScyllaSchema scyllaSchema, Clock clock) {
        this(null, rawChange, offsetContext, scyllaSchema, clock);
    }

    public ScyllaChangeRecordEmitter(RawChange rawChange, RawChange rawChange2, OffsetContext offsetContext, ScyllaSchema scyllaSchema, Clock clock) {
        super(offsetContext, clock);
        this.change = rawChange2;
        this.schema = scyllaSchema;
        this.preImage = rawChange;
    }

    public RawChange getChange() {
        return this.change;
    }

    public ScyllaSchema getSchema() {
        return this.schema;
    }

    protected Envelope.Operation getOperation() {
        RawChange.OperationType operationType = this.change.getOperationType();
        switch (AnonymousClass1.$SwitchMap$com$scylladb$cdc$model$worker$RawChange$OperationType[operationType.ordinal()]) {
            case 1:
                return Envelope.Operation.UPDATE;
            case 2:
                return Envelope.Operation.CREATE;
            case 3:
            case 4:
                return Envelope.Operation.DELETE;
            default:
                throw new RuntimeException(String.format("Unsupported operation type: %s.", operationType));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitReadRecord(ChangeRecordEmitter.Receiver receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitCreateRecord(ChangeRecordEmitter.Receiver receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        Struct create;
        ScyllaCollectionSchema updateChangeSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.m3id(), this.change.getSchema());
        Struct struct = new Struct(updateChangeSchema.keySchema());
        Struct struct2 = new Struct(updateChangeSchema.afterSchema());
        fillStructWithChange(updateChangeSchema, struct, struct2, this.change);
        if (this.preImage != null) {
            Struct struct3 = new Struct(updateChangeSchema.beforeSchema());
            fillStructWithChange(updateChangeSchema, null, struct3, this.preImage);
            create = generalizedEnvelope(updateChangeSchema.getEnvelopeSchema().schema(), struct3, struct2, getOffset().getSourceInfo(), getClock().currentTimeAsInstant(), Envelope.Operation.CREATE);
        } else {
            create = updateChangeSchema.getEnvelopeSchema().create(struct2, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
        }
        receiver.changeRecord(updateChangeSchema, getOperation(), struct, create, getOffset(), (ConnectHeaders) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitUpdateRecord(ChangeRecordEmitter.Receiver receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        Struct update;
        ScyllaCollectionSchema updateChangeSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.m3id(), this.change.getSchema());
        Struct struct = new Struct(updateChangeSchema.keySchema());
        Struct struct2 = new Struct(updateChangeSchema.afterSchema());
        fillStructWithChange(updateChangeSchema, struct, struct2, this.change);
        if (this.preImage != null) {
            Struct struct3 = new Struct(updateChangeSchema.beforeSchema());
            fillStructWithChange(updateChangeSchema, null, struct3, this.preImage);
            update = generalizedEnvelope(updateChangeSchema.getEnvelopeSchema().schema(), struct3, struct2, getOffset().getSourceInfo(), getClock().currentTimeAsInstant(), Envelope.Operation.UPDATE);
        } else {
            update = updateChangeSchema.getEnvelopeSchema().update((Object) null, struct2, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
        }
        receiver.changeRecord(updateChangeSchema, getOperation(), struct, update, getOffset(), (ConnectHeaders) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitDeleteRecord(ChangeRecordEmitter.Receiver receiver, ScyllaCollectionSchema scyllaCollectionSchema) throws InterruptedException {
        ScyllaCollectionSchema updateChangeSchema = this.schema.updateChangeSchema(scyllaCollectionSchema.m3id(), this.change.getSchema());
        Struct struct = new Struct(updateChangeSchema.keySchema());
        Struct struct2 = new Struct(updateChangeSchema.beforeSchema());
        if (this.preImage != null) {
            fillStructWithChange(updateChangeSchema, struct, struct2, this.preImage);
        } else {
            fillStructWithChange(updateChangeSchema, struct, struct2, this.change);
        }
        receiver.changeRecord(updateChangeSchema, getOperation(), struct, updateChangeSchema.getEnvelopeSchema().delete(struct2, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()), getOffset(), (ConnectHeaders) null);
    }

    private void fillStructWithChange(ScyllaCollectionSchema scyllaCollectionSchema, Struct struct, Struct struct2, RawChange rawChange) {
        for (ChangeSchema.ColumnDefinition columnDefinition : rawChange.getSchema().getNonCdcColumnDefinitions()) {
            if (ScyllaSchema.isSupportedColumnSchema(columnDefinition)) {
                Object translateCellToKafka = translateCellToKafka(rawChange.getCell(columnDefinition.getColumnName()));
                if (columnDefinition.getBaseTableColumnType() == ChangeSchema.ColumnType.PARTITION_KEY || columnDefinition.getBaseTableColumnType() == ChangeSchema.ColumnType.CLUSTERING_KEY) {
                    struct2.put(columnDefinition.getColumnName(), translateCellToKafka);
                    if (struct != null) {
                        struct.put(columnDefinition.getColumnName(), translateCellToKafka);
                    }
                } else {
                    Boolean bool = this.change.getCell("cdc$deleted_" + columnDefinition.getColumnName()).getBoolean();
                    if (translateCellToKafka != null || (bool != null && bool.booleanValue())) {
                        Struct struct3 = new Struct(scyllaCollectionSchema.cellSchema(columnDefinition.getColumnName()));
                        struct3.put(ScyllaSchema.CELL_VALUE, translateCellToKafka);
                        struct2.put(columnDefinition.getColumnName(), struct3);
                    }
                }
            }
        }
    }

    private Struct generalizedEnvelope(Schema schema, Object obj, Object obj2, Struct struct, Instant instant, Envelope.Operation operation) {
        Struct struct2 = new Struct(schema);
        struct2.put("op", operation.code());
        if (obj != null) {
            struct2.put("before", obj);
        }
        if (obj2 != null) {
            struct2.put("after", obj2);
        }
        if (struct != null) {
            struct2.put("source", struct);
        }
        if (instant != null) {
            struct2.put("ts_ms", Long.valueOf(instant.toEpochMilli()));
        }
        return struct2;
    }

    private Object translateCellToKafka(Cell cell) {
        ChangeSchema.DataType cdcLogDataType = cell.getColumnDefinition().getCdcLogDataType();
        if (cell.getAsObject() == null) {
            return null;
        }
        if (cdcLogDataType.getCqlType() == ChangeSchema.CqlType.DECIMAL) {
            return cell.getDecimal().toString();
        }
        if (cdcLogDataType.getCqlType() != ChangeSchema.CqlType.UUID && cdcLogDataType.getCqlType() != ChangeSchema.CqlType.TIMEUUID) {
            if (cdcLogDataType.getCqlType() == ChangeSchema.CqlType.VARINT) {
                return cell.getVarint().toString();
            }
            if (cdcLogDataType.getCqlType() == ChangeSchema.CqlType.INET) {
                return cell.getInet().getHostAddress();
            }
            if (cdcLogDataType.getCqlType() != ChangeSchema.CqlType.DATE) {
                return cdcLogDataType.getCqlType() == ChangeSchema.CqlType.DURATION ? cell.getDuration().toString() : cell.getAsObject();
            }
            CqlDate date = cell.getDate();
            Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
            calendar.clear();
            calendar.set(date.getYear(), date.getMonth() - 1, date.getDay());
            return Date.from(calendar.toInstant());
        }
        return cell.getUUID().toString();
    }
}
