package org.apache.gobblin.writer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.gobblin.ack.Ackable;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.exception.NonTransientException;
import org.apache.gobblin.instrumented.Instrumentable;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.FinalState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/writer/AsyncWriterManager.class */
public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWriter<D>, Instrumentable, Closeable, FinalState {
    private static final long MILLIS_TO_NANOS = 1000000;
    public static final long COMMIT_TIMEOUT_MILLIS_DEFAULT = 60000;
    public static final long COMMIT_STEP_WAITTIME_MILLIS_DEFAULT = 500;
    public static final double FAILURE_ALLOWANCE_RATIO_DEFAULT = 0.0d;
    public static final boolean RETRIES_ENABLED_DEFAULT = true;
    public static final int NUM_RETRIES_DEFAULT = 5;
    public static final int MIN_RETRY_INTERVAL_MILLIS_DEFAULT = 3;
    public static final int MAX_OUTSTANDING_WRITES_DEFAULT = 1000;
    private final boolean instrumentationEnabled;
    private MetricContext metricContext;
    protected final Closer closer;

    @VisibleForTesting
    Meter recordsAttempted;

    @VisibleForTesting
    Meter recordsIn;

    @VisibleForTesting
    Meter recordsSuccess;

    @VisibleForTesting
    Meter recordsFailed;

    @VisibleForTesting
    Meter bytesWritten;

    @VisibleForTesting
    Optional<Timer> dataWriterTimer;
    private final long commitTimeoutMillis;
    private final long commitStepWaitTimeMillis;
    private final double failureAllowanceRatio;
    private final AsyncDataWriter asyncDataWriter;
    private final int numRetries;
    private final int minRetryIntervalMillis;
    private final Optional<ScheduledThreadPoolExecutor> retryThreadPool;
    private final Logger log;

    @VisibleForTesting
    final Optional<LinkedBlockingQueue<AsyncWriterManager<D>.Attempt>> retryQueue;
    private final int maxOutstandingWrites;
    private final Semaphore writePermits;
    private volatile Throwable cachedWriteException = null;

    /* loaded from: input_file:org/apache/gobblin/writer/AsyncWriterManager$AsyncWriterManagerBuilder.class */
    public static class AsyncWriterManagerBuilder {
        private AsyncDataWriter asyncDataWriter;
        private Config config = ConfigFactory.empty();
        private long commitTimeoutMillis = AsyncWriterManager.COMMIT_TIMEOUT_MILLIS_DEFAULT;
        private long commitStepWaitTimeMillis = 500;
        private double failureAllowanceRatio = AsyncWriterManager.FAILURE_ALLOWANCE_RATIO_DEFAULT;
        private boolean retriesEnabled = true;
        private int numRetries = 5;
        private int maxOutstandingWrites = AsyncWriterManager.MAX_OUTSTANDING_WRITES_DEFAULT;
        private Optional<Logger> logger = Optional.absent();

        public AsyncWriterManagerBuilder config(Config config) {
            this.config = config;
            return this;
        }

        public AsyncWriterManagerBuilder commitTimeoutMillis(long j) {
            this.commitTimeoutMillis = j;
            return this;
        }

        public AsyncWriterManagerBuilder commitStepWaitTimeInMillis(long j) {
            this.commitStepWaitTimeMillis = j;
            return this;
        }

        public AsyncWriterManagerBuilder failureAllowanceRatio(double d) {
            Preconditions.checkArgument(d <= 1.0d && d >= AsyncWriterManager.FAILURE_ALLOWANCE_RATIO_DEFAULT, "Failure Allowance must be a ratio between 0 and 1");
            this.failureAllowanceRatio = d;
            return this;
        }

        public AsyncWriterManagerBuilder asyncDataWriter(AsyncDataWriter asyncDataWriter) {
            this.asyncDataWriter = asyncDataWriter;
            return this;
        }

        public AsyncWriterManagerBuilder retriesEnabled(boolean z) {
            this.retriesEnabled = z;
            return this;
        }

        public AsyncWriterManagerBuilder numRetries(int i) {
            this.numRetries = i;
            return this;
        }

        public AsyncWriterManagerBuilder maxOutstandingWrites(int i) {
            this.maxOutstandingWrites = i;
            return this;
        }

        public AsyncWriterManagerBuilder logger(Optional<Logger> optional) {
            this.logger = optional;
            return this;
        }

        public AsyncWriterManager build() {
            return new AsyncWriterManager(this.config, this.commitTimeoutMillis, this.commitStepWaitTimeMillis, this.failureAllowanceRatio, this.retriesEnabled, this.numRetries, 3, this.maxOutstandingWrites, this.asyncDataWriter, this.logger);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/gobblin/writer/AsyncWriterManager$Attempt.class */
    public class Attempt {
        private final D record;
        private final Ackable ackable;
        private int attemptNum = 1;
        private Throwable prevAttemptFailure = null;
        private long prevAttemptTimestampNanos = -1;

        void incAttempt() {
            this.attemptNum++;
        }

        Attempt(D d, Ackable ackable) {
            this.record = d;
            this.ackable = ackable;
        }

        public D getRecord() {
            return this.record;
        }

        public Ackable getAckable() {
            return this.ackable;
        }

        public int getAttemptNum() {
            return this.attemptNum;
        }

        public Throwable getPrevAttemptFailure() {
            return this.prevAttemptFailure;
        }

        public long getPrevAttemptTimestampNanos() {
            return this.prevAttemptTimestampNanos;
        }

        public void setPrevAttemptFailure(Throwable th) {
            this.prevAttemptFailure = th;
        }

        public void setPrevAttemptTimestampNanos(long j) {
            this.prevAttemptTimestampNanos = j;
        }
    }

    /* loaded from: input_file:org/apache/gobblin/writer/AsyncWriterManager$RetryRunner.class */
    private class RetryRunner implements Runnable {
        private final LinkedBlockingQueue<AsyncWriterManager<D>.Attempt> retryQueue;
        private final long minRetryIntervalNanos;

        public RetryRunner() {
            Preconditions.checkArgument(AsyncWriterManager.this.retryQueue.isPresent(), "RetryQueue must be present for RetryRunner");
            this.retryQueue = (LinkedBlockingQueue) AsyncWriterManager.this.retryQueue.get();
            this.minRetryIntervalNanos = AsyncWriterManager.this.minRetryIntervalMillis * AsyncWriterManager.MILLIS_TO_NANOS;
        }

        private void maybeSleep(long j) throws InterruptedException {
            long nanoTime = this.minRetryIntervalNanos - (System.nanoTime() - j);
            if (nanoTime > 0) {
                Thread.sleep(nanoTime / AsyncWriterManager.MILLIS_TO_NANOS);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    AsyncWriterManager<D>.Attempt take = this.retryQueue.take();
                    if (take != null) {
                        maybeSleep(take.getPrevAttemptTimestampNanos());
                        AsyncWriterManager.this.log.debug("Retry thread will retry record: {}", take.getRecord().toString());
                        AsyncWriterManager.this.attemptWrite(take);
                    }
                } catch (InterruptedException e) {
                    AsyncWriterManager.this.log.info("Retry thread interrupted... will exit");
                    Throwables.propagate(e);
                }
            }
        }
    }

    @Override // org.apache.gobblin.writer.WatermarkAwareWriter
    public boolean isWatermarkCapable() {
        return true;
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public void switchMetricContext(List<Tag<?>> list) {
        this.metricContext = this.closer.register(Instrumented.newContextFromReferenceContext(this.metricContext, list, Optional.absent()));
        regenerateMetrics();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public void switchMetricContext(MetricContext metricContext) {
        this.metricContext = metricContext;
        regenerateMetrics();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public List<Tag<?>> generateTags(State state) {
        return Lists.newArrayList();
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    @Nonnull
    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    @Override // org.apache.gobblin.instrumented.Instrumentable
    public boolean isInstrumentationEnabled() {
        return this.instrumentationEnabled;
    }

    public State getFinalState() {
        return new State();
    }

    protected void regenerateMetrics() {
        this.recordsIn = this.metricContext.meter("gobblin.writer.records.in");
        this.recordsAttempted = this.metricContext.meter("gobblin.writer.records.attempted");
        this.recordsSuccess = this.metricContext.meter("gobblin.writer.successful.writes");
        this.recordsFailed = this.metricContext.meter("gobblin.writer.failed.writes");
        this.bytesWritten = this.metricContext.meter("gobblin.writer.bytes.written");
        if (isInstrumentationEnabled()) {
            this.dataWriterTimer = Optional.of(this.metricContext.timer("gobblin.writer.write.time"));
        } else {
            this.dataWriterTimer = Optional.absent();
        }
    }

    protected AsyncWriterManager(Config config, long j, long j2, double d, boolean z, int i, int i2, int i3, AsyncDataWriter asyncDataWriter, Optional<Logger> optional) {
        Preconditions.checkArgument(j > 0, "Commit timeout must be greater than 0");
        Preconditions.checkArgument(j2 > 0, "Commit step wait time must be greater than 0");
        Preconditions.checkArgument(j2 < j, "Commit step wait time must be less than commit timeout");
        Preconditions.checkArgument(d <= 1.0d && d >= FAILURE_ALLOWANCE_RATIO_DEFAULT, "Failure Allowance must be a ratio between 0 and 1");
        Preconditions.checkArgument(i3 > 0, "Max outstanding writes must be greater than 0");
        Preconditions.checkNotNull(asyncDataWriter, "Async Data Writer cannot be null");
        this.log = optional.isPresent() ? (Logger) optional.get() : LoggerFactory.getLogger(AsyncWriterManager.class);
        this.closer = Closer.create();
        State configToState = ConfigUtils.configToState(config);
        this.instrumentationEnabled = GobblinMetrics.isEnabled(configToState);
        this.metricContext = this.closer.register(Instrumented.getMetricContext(configToState, asyncDataWriter.getClass()));
        regenerateMetrics();
        this.commitTimeoutMillis = j;
        this.commitStepWaitTimeMillis = j2;
        this.failureAllowanceRatio = d;
        this.minRetryIntervalMillis = i2;
        if (z) {
            this.numRetries = i;
            this.retryQueue = Optional.of(new LinkedBlockingQueue());
            this.retryThreadPool = Optional.of(new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newDaemonThreadFactory(Optional.of(this.log), Optional.of("AsyncWriteManagerRetry-%d"))));
            ((ScheduledThreadPoolExecutor) this.retryThreadPool.get()).execute(new RetryRunner());
        } else {
            this.numRetries = 0;
            this.retryQueue = Optional.absent();
            this.retryThreadPool = Optional.absent();
        }
        this.maxOutstandingWrites = i3;
        this.writePermits = new Semaphore(i3);
        this.asyncDataWriter = asyncDataWriter;
        this.closer.register(asyncDataWriter);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.gobblin.writer.WatermarkAwareWriter
    public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
        write(recordEnvelope.getRecord(), recordEnvelope);
    }

    public void write(D d) throws IOException {
        write(d, Ackable.NoopAckable);
    }

    private void write(D d, Ackable ackable) throws IOException {
        maybeThrow();
        int i = 0;
        while (!this.writePermits.tryAcquire(100L, TimeUnit.MILLISECONDS)) {
            try {
                i++;
                if (i % 50 == 0) {
                    this.log.info("Spinning due to pending writes, in = " + this.recordsIn.getCount() + ", success = " + this.recordsSuccess.getCount() + ", failed = " + this.recordsFailed.getCount() + ", maxOutstandingWrites = " + this.maxOutstandingWrites);
                }
            } catch (InterruptedException e) {
                Throwables.propagate(e);
            }
        }
        this.recordsIn.mark();
        attemptWrite(new Attempt(d, ackable));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isFailureFatal() {
        return this.failureAllowanceRatio == FAILURE_ALLOWANCE_RATIO_DEFAULT;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void makeNextWriteThrow(Throwable th) {
        this.log.error("Will make next write throw", th);
        this.cachedWriteException = th;
    }

    private void maybeThrow() {
        if (this.cachedWriteException != null) {
            throw new NonTransientException("Irrecoverable failure on async write", this.cachedWriteException);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void attemptWrite(final AsyncWriterManager<D>.Attempt attempt) {
        this.recordsAttempted.mark();
        attempt.setPrevAttemptTimestampNanos(System.nanoTime());
        this.asyncDataWriter.write(((Attempt) attempt).record, new WriteCallback<Object>() { // from class: org.apache.gobblin.writer.AsyncWriterManager.1
            @Override // org.apache.gobblin.async.Callback
            public void onSuccess(WriteResponse writeResponse) {
                try {
                    attempt.ackable.ack();
                    AsyncWriterManager.this.recordsSuccess.mark();
                    if (writeResponse.bytesWritten() > 0) {
                        AsyncWriterManager.this.bytesWritten.mark(writeResponse.bytesWritten());
                    }
                    if (AsyncWriterManager.this.dataWriterTimer.isPresent()) {
                        ((Timer) AsyncWriterManager.this.dataWriterTimer.get()).update(System.nanoTime() - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
                    }
                } finally {
                    AsyncWriterManager.this.writePermits.release();
                }
            }

            @Override // org.apache.gobblin.async.Callback
            public void onFailure(Throwable th) {
                long nanoTime = System.nanoTime();
                if (AsyncWriterManager.this.dataWriterTimer.isPresent()) {
                    ((Timer) AsyncWriterManager.this.dataWriterTimer.get()).update(nanoTime - attempt.getPrevAttemptTimestampNanos(), TimeUnit.NANOSECONDS);
                }
                if (attempt.attemptNum <= AsyncWriterManager.this.numRetries) {
                    AsyncWriterManager.this.log.debug("Attempt {} had failure: {}; re-enqueueing record: {}", new Object[]{Integer.valueOf(attempt.attemptNum), th.getMessage(), attempt.getRecord().toString()});
                    attempt.incAttempt();
                    attempt.setPrevAttemptFailure(th);
                    ((LinkedBlockingQueue) AsyncWriterManager.this.retryQueue.get()).add(attempt);
                    return;
                }
                try {
                    AsyncWriterManager.this.recordsFailed.mark();
                    AsyncWriterManager.this.log.debug("Failed to write record : {}", attempt.getRecord().toString(), th);
                    if (AsyncWriterManager.this.isFailureFatal()) {
                        AsyncWriterManager.this.makeNextWriteThrow(th);
                    } else {
                        attempt.ackable.ack();
                    }
                } finally {
                    AsyncWriterManager.this.writePermits.release();
                }
            }
        });
    }

    public void cleanup() throws IOException {
    }

    public long recordsWritten() {
        return this.recordsSuccess.getCount();
    }

    public long bytesWritten() throws IOException {
        return this.bytesWritten.getCount();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.log.info("Close called");
        this.closer.close();
        if (this.retryThreadPool.isPresent()) {
            ExecutorsUtils.shutdownExecutorService((ExecutorService) this.retryThreadPool.get(), Optional.of(this.log), 1L, TimeUnit.MILLISECONDS);
        }
        this.log.info("Successfully done closing");
    }

    public void commit() throws IOException {
        this.log.info("Commit called, will wait for commitTimeout : {} ms", Long.valueOf(this.commitTimeoutMillis));
        long j = this.commitTimeoutMillis * MILLIS_TO_NANOS;
        long nanoTime = System.nanoTime();
        this.asyncDataWriter.flush();
        while (System.nanoTime() - nanoTime < j && this.recordsIn.getCount() != this.recordsSuccess.getCount() + this.recordsFailed.getCount()) {
            this.log.debug("Commit waiting... records produced: {}, written: {}, failed: {}", new Object[]{Long.valueOf(this.recordsIn.getCount()), Long.valueOf(this.recordsSuccess.getCount()), Long.valueOf(this.recordsFailed.getCount())});
            try {
                Thread.sleep(this.commitStepWaitTimeMillis);
            } catch (InterruptedException e) {
                this.log.info("Interrupted while waiting for commit to complete");
                throw new IOException("Interrupted while waiting for commit to complete", e);
            }
        }
        this.log.debug("Commit done waiting");
        long count = this.recordsIn.getCount();
        long count2 = this.recordsSuccess.getCount();
        long count3 = this.recordsFailed.getCount();
        long j2 = (count - count2) - count3;
        long j3 = j2 + count3;
        if (j2 > 0) {
            this.log.warn("Timeout waiting for all writes to be acknowledged. Missing {} responses out of {}", Long.valueOf(j2), Long.valueOf(count));
        }
        if (j3 > 0 && count > 0) {
            this.log.info("Commit failed to write {} records ({} failed, {} unacknowledged) out of {} produced", new Object[]{Long.valueOf(j3), Long.valueOf(count3), Long.valueOf(j2), Long.valueOf(count)});
            double d = j3 / count;
            if (d > this.failureAllowanceRatio) {
                this.log.error("Aborting because this is greater than the failureAllowance percentage: {}", Double.valueOf(this.failureAllowanceRatio * 100.0d));
                throw new IOException("Failed to meet failureAllowance SLA", this.cachedWriteException);
            }
            this.log.warn("Committing because the observed failure percentage {} is less than the failureAllowance percentage: {}", Double.valueOf(d * 100.0d), Double.valueOf(this.failureAllowanceRatio * 100.0d));
        }
        this.log.info("Successfully committed {} records.", Long.valueOf(count2));
    }

    public void flush() throws IOException {
        this.asyncDataWriter.flush();
    }

    public static AsyncWriterManagerBuilder builder() {
        return new AsyncWriterManagerBuilder();
    }
}
