package org.apache.gobblin.instrumented.extractor;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import io.reactivex.Flowable;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.records.RecordStreamWithMetadata;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.FinalState;

/* loaded from: input_file:org/apache/gobblin/instrumented/extractor/InstrumentedExtractorBase.class */
public abstract class InstrumentedExtractorBase<S, D> implements Extractor<S, D>, Instrumentable, Closeable, FinalState {
    private final boolean instrumentationEnabled;
    private MetricContext metricContext;
    private Optional<Meter> readRecordsMeter;
    private Optional<Meter> dataRecordExceptionsMeter;
    private Optional<Timer> extractorTimer;
    protected final Closer closer;

    public InstrumentedExtractorBase(WorkUnitState workUnitState) {
        this(workUnitState, Optional.absent());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InstrumentedExtractorBase(WorkUnitState workUnitState, Optional<Class<?>> optional) {
        this.closer = Closer.create();
        this.instrumentationEnabled = GobblinMetrics.isEnabled(workUnitState);
        this.metricContext = this.closer.register(Instrumented.getMetricContext(workUnitState, (Class) optional.or(getClass()), generateTags(workUnitState)));
        regenerateMetrics();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public void switchMetricContext(List<Tag<?>> list) {
        this.metricContext = this.closer.register(Instrumented.newContextFromReferenceContext(this.metricContext, list, Optional.absent()));
        regenerateMetrics();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public void switchMetricContext(MetricContext metricContext) {
        this.metricContext = metricContext;
        regenerateMetrics();
    }

    protected void regenerateMetrics() {
        if (isInstrumentationEnabled()) {
            this.readRecordsMeter = Optional.of(this.metricContext.meter("gobblin.extractor.records.read"));
            this.dataRecordExceptionsMeter = Optional.of(this.metricContext.meter("gobblin.extractor.records.failed"));
            this.extractorTimer = Optional.of(this.metricContext.timer("gobblin.extractor.extract.time"));
        } else {
            this.readRecordsMeter = Optional.absent();
            this.dataRecordExceptionsMeter = Optional.absent();
            this.extractorTimer = Optional.absent();
        }
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public boolean isInstrumentationEnabled() {
        return this.instrumentationEnabled;
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public List<Tag<?>> generateTags(State state) {
        return Lists.newArrayList();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public RecordEnvelope<D> readRecordEnvelope() throws DataRecordException, IOException {
        if (!isInstrumentationEnabled()) {
            return readRecordEnvelopeImpl();
        }
        try {
            long nanoTime = System.nanoTime();
            beforeRead();
            RecordEnvelope<D> readRecordEnvelopeImpl = readRecordEnvelopeImpl();
            afterRead(readRecordEnvelopeImpl == null ? null : readRecordEnvelopeImpl.getRecord(), nanoTime);
            return readRecordEnvelopeImpl;
        } catch (IOException e) {
            onException(e);
            throw e;
        } catch (DataRecordException e2) {
            onException(e2);
            throw e2;
        }
    }

    public RecordStreamWithMetadata<D, S> recordStream(AtomicBoolean atomicBoolean) throws IOException {
        return new RecordStreamWithMetadata<>(Flowable.generate(() -> {
            return atomicBoolean;
        }, (atomicBoolean2, emitter) -> {
            if (atomicBoolean2.get()) {
                emitter.onComplete();
            }
            try {
                long j = 0;
                if (isInstrumentationEnabled()) {
                    j = System.nanoTime();
                    beforeRead();
                }
                RecordEnvelope readStreamEntityImpl = readStreamEntityImpl();
                if (isInstrumentationEnabled()) {
                    D d = null;
                    if (readStreamEntityImpl instanceof RecordEnvelope) {
                        d = readStreamEntityImpl.getRecord();
                    }
                    afterRead(d, j);
                }
                if (readStreamEntityImpl != null) {
                    emitter.onNext(readStreamEntityImpl);
                } else {
                    emitter.onComplete();
                }
            } catch (DataRecordException | IOException e) {
                if (isInstrumentationEnabled()) {
                    onException(e);
                }
                emitter.onError(e);
            }
        }).doFinally(this::close), GlobalMetadata.builder().schema(getSchema()).build());
    }

    public void beforeRead() {
    }

    public void afterRead(D d, long j) {
        Instrumented.updateTimer(this.extractorTimer, System.nanoTime() - j, TimeUnit.NANOSECONDS);
        if (d != null) {
            Instrumented.markMeter(this.readRecordsMeter);
        }
    }

    public void onException(Exception exc) {
        if (DataRecordException.class.isInstance(exc)) {
            Instrumented.markMeter(this.dataRecordExceptionsMeter);
        }
    }

    protected StreamEntity<D> readStreamEntityImpl() throws DataRecordException, IOException {
        return readRecordEnvelopeImpl();
    }

    @SuppressWarnings(value = {"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"}, justification = "Findbugs believes readRecord(null) is non-null. This is not true.")
    protected RecordEnvelope<D> readRecordEnvelopeImpl() throws DataRecordException, IOException {
        D readRecordImpl = readRecordImpl(null);
        if (readRecordImpl == null) {
            return null;
        }
        return new RecordEnvelope<>(readRecordImpl);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public D readRecordImpl(D d) throws DataRecordException, IOException {
        throw new UnsupportedOperationException();
    }

    public State getFinalState() {
        return new State();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public MetricContext getMetricContext() {
        return this.metricContext;
    }
}
