package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.class */
public class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KTableKTableLeftJoin.class);

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.class */
    private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
        private final KTableValueGetter<K, V2> valueGetter;
        private StreamsMetricsImpl metrics;
        private Sensor droppedRecordsSensor;

        KTableKTableLeftJoinProcessor(KTableValueGetter<K, V2> kTableValueGetter) {
            this.valueGetter = kTableValueGetter;
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            this.metrics = (StreamsMetricsImpl) processorContext.metrics();
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), this.metrics);
            this.valueGetter.init(processorContext);
        }

        public void process(K k, Change<V1> change) {
            long timestamp;
            if (k == null) {
                KTableKTableLeftJoin.LOG.warn("Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]", change, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset()));
                this.droppedRecordsSensor.record();
                return;
            }
            R r = null;
            R r2 = null;
            ValueAndTimestamp<V2> valueAndTimestamp = this.valueGetter.get(k);
            Object valueOrNull = ValueAndTimestamp.getValueOrNull(valueAndTimestamp);
            if (valueOrNull != null) {
                timestamp = valueAndTimestamp.timestamp();
            } else if (change.newValue == null && change.oldValue == null) {
                return;
            } else {
                timestamp = -1;
            }
            long max = Math.max(context().timestamp(), timestamp);
            if (change.newValue != null) {
                r = KTableKTableLeftJoin.this.joiner.apply(change.newValue, valueOrNull);
            }
            if (KTableKTableLeftJoin.this.sendOldValues && change.oldValue != null) {
                r2 = KTableKTableLeftJoin.this.joiner.apply(change.oldValue, valueOrNull);
            }
            context().forward((ProcessorContext) k, (K) new Change(r, r2), To.all().withTimestamp(max));
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void close() {
            this.valueGetter.close();
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
            process((KTableKTableLeftJoinProcessor) obj, (Change) obj2);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin$KTableKTableLeftJoinValueGetter.class */
    private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> {
        private final KTableValueGetter<K, V1> valueGetter1;
        private final KTableValueGetter<K, V2> valueGetter2;

        KTableKTableLeftJoinValueGetter(KTableValueGetter<K, V1> kTableValueGetter, KTableValueGetter<K, V2> kTableValueGetter2) {
            this.valueGetter1 = kTableValueGetter;
            this.valueGetter2 = kTableValueGetter2;
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void init(ProcessorContext processorContext) {
            this.valueGetter1.init(processorContext);
            this.valueGetter2.init(processorContext);
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public ValueAndTimestamp<R> get(K k) {
            ValueAndTimestamp<V1> valueAndTimestamp = this.valueGetter1.get(k);
            Object valueOrNull = ValueAndTimestamp.getValueOrNull(valueAndTimestamp);
            if (valueOrNull == null) {
                return null;
            }
            ValueAndTimestamp<V2> valueAndTimestamp2 = this.valueGetter2.get(k);
            return ValueAndTimestamp.make(KTableKTableLeftJoin.this.joiner.apply(valueOrNull, ValueAndTimestamp.getValueOrNull(valueAndTimestamp2)), valueAndTimestamp2 == null ? valueAndTimestamp.timestamp() : Math.max(valueAndTimestamp.timestamp(), valueAndTimestamp2.timestamp()));
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
        public void close() {
            this.valueGetter1.close();
            this.valueGetter2.close();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin$KTableKTableLeftJoinValueGetterSupplier.class */
    private class KTableKTableLeftJoinValueGetterSupplier extends KTableKTableAbstractJoinValueGetterSupplier<K, R, V1, V2> {
        KTableKTableLeftJoinValueGetterSupplier(KTableValueGetterSupplier<K, V1> kTableValueGetterSupplier, KTableValueGetterSupplier<K, V2> kTableValueGetterSupplier2) {
            super(kTableValueGetterSupplier, kTableValueGetterSupplier2);
        }

        @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
        public KTableValueGetter<K, R> get() {
            return new KTableKTableLeftJoinValueGetter(this.valueGetterSupplier1.get(), this.valueGetterSupplier2.get());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KTableKTableLeftJoin(KTableImpl<K, ?, V1> kTableImpl, KTableImpl<K, ?, V2> kTableImpl2, ValueJoiner<? super V1, ? super V2, ? extends R> valueJoiner) {
        super(kTableImpl, kTableImpl2, valueJoiner);
    }

    @Override // org.apache.kafka.streams.processor.ProcessorSupplier
    public Processor<K, Change<V1>> get() {
        return new KTableKTableLeftJoinProcessor(this.valueGetterSupplier2.get());
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, R> view() {
        return new KTableKTableLeftJoinValueGetterSupplier(this.valueGetterSupplier1, this.valueGetterSupplier2);
    }
}
