package org.apache.edgent.connectors.kafka.runtime;

import java.nio.charset.StandardCharsets;
import kafka.message.MessageAndMetadata;
import org.apache.edgent.connectors.kafka.KafkaConsumer;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Function;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber.class */
public class KafkaSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
    private static final long serialVersionUID = 1;
    private static final Logger trace = KafkaConsumerConnector.getTrace();
    private String id;
    private final KafkaConsumerConnector connector;
    private Function<ByteConsumerRecord, T> byteToTupleFn;
    private Function<StringConsumerRecord, T> stringToTupleFn;
    private final String[] topics;
    private Consumer<T> eventSubmitter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber$ByteConsumerRecord.class */
    public static class ByteConsumerRecord extends ConsumerRecordBase<byte[], byte[]> implements KafkaConsumer.ByteConsumerRecord {
        ByteConsumerRecord(MessageAndMetadata<byte[], byte[]> messageAndMetadata) {
            super(messageAndMetadata);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.edgent.connectors.kafka.runtime.KafkaSubscriber.ConsumerRecordBase, org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public byte[] key() {
            return (byte[]) this.rec.key();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.edgent.connectors.kafka.runtime.KafkaSubscriber.ConsumerRecordBase, org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public byte[] value() {
            return (byte[]) this.rec.message();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber$ConsumerRecordBase.class */
    public static abstract class ConsumerRecordBase<K, V> implements KafkaConsumer.ConsumerRecord<K, V> {
        protected final MessageAndMetadata<byte[], byte[]> rec;

        ConsumerRecordBase(MessageAndMetadata<byte[], byte[]> messageAndMetadata) {
            this.rec = messageAndMetadata;
        }

        @Override // org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public abstract K key();

        @Override // org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public abstract V value();

        @Override // org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public String topic() {
            return this.rec.topic();
        }

        @Override // org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public int partition() {
            return this.rec.partition();
        }

        @Override // org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public long offset() {
            return this.rec.offset();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/edgent/connectors/kafka/runtime/KafkaSubscriber$StringConsumerRecord.class */
    public static class StringConsumerRecord extends ConsumerRecordBase<String, String> implements KafkaConsumer.StringConsumerRecord {
        StringConsumerRecord(MessageAndMetadata<byte[], byte[]> messageAndMetadata) {
            super(messageAndMetadata);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.edgent.connectors.kafka.runtime.KafkaSubscriber.ConsumerRecordBase, org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public String key() {
            byte[] bArr = (byte[]) this.rec.key();
            if (bArr == null) {
                return null;
            }
            return new String(bArr, StandardCharsets.UTF_8);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.edgent.connectors.kafka.runtime.KafkaSubscriber.ConsumerRecordBase, org.apache.edgent.connectors.kafka.KafkaConsumer.ConsumerRecord
        public String value() {
            return new String((byte[]) this.rec.message(), StandardCharsets.UTF_8);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public KafkaSubscriber(KafkaConsumerConnector kafkaConsumerConnector, Function<? extends KafkaConsumer.ConsumerRecord<?, ?>, T> function, boolean z, String... strArr) {
        this.connector = kafkaConsumerConnector;
        if (z) {
            this.stringToTupleFn = function;
        } else {
            this.byteToTupleFn = function;
        }
        this.topics = strArr;
        kafkaConsumerConnector.addSubscriber(this, this.topics);
    }

    public void accept(Consumer<T> consumer) {
        try {
            this.eventSubmitter = consumer;
            this.connector.start(this);
        } catch (Throwable th) {
            trace.error("{} initialization failure", id(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void accept(MessageAndMetadata<byte[], byte[]> messageAndMetadata) {
        try {
            trace.trace("{} received rec for topic:{} partition:{} offset:{}", new Object[]{id(), messageAndMetadata.topic(), Integer.valueOf(messageAndMetadata.partition()), Long.valueOf(messageAndMetadata.offset())});
            this.eventSubmitter.accept(this.stringToTupleFn != null ? this.stringToTupleFn.apply(new StringConsumerRecord(messageAndMetadata)) : this.byteToTupleFn.apply(new ByteConsumerRecord(messageAndMetadata)));
        } catch (Exception e) {
            trace.error("{} failure processing record from {}", new Object[]{id(), String.format("[%s,%d]", messageAndMetadata.topic(), Integer.valueOf(messageAndMetadata.partition())), e});
        }
    }

    private String id() {
        if (this.id == null) {
            this.id = this.connector.id() + " SUB " + toString().substring(toString().indexOf(64) + 1);
        }
        return this.id;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.connector.close(this);
    }
}
