package org.apache.gobblin.writer;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.typesafe.config.Config;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.gobblin.util.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/writer/SequentialBasedBatchAccumulator.class */
public class SequentialBasedBatchAccumulator<D> extends BatchAccumulator<D> {
    private static final LargeMessagePolicy DEFAULT_LARGE_MESSAGE_POLICY;
    private Deque<BytesBoundedBatch<D>> dq;
    private IncompleteRecordBatches incomplete;
    private final long batchSizeLimit;
    private final long memSizeLimit;
    private final double tolerance = 0.95d;
    private final long expireInMilliSecond;
    private final LargeMessagePolicy largeMessagePolicy;
    private static final Logger LOG;
    private final ReentrantLock dqLock;
    private final Condition notEmpty;
    private final Condition notFull;
    private final long capacity;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/gobblin/writer/SequentialBasedBatchAccumulator$IncompleteRecordBatches.class */
    private static final class IncompleteRecordBatches {
        private final Set<Batch> incomplete = new HashSet();

        public void add(Batch batch) {
            synchronized (this.incomplete) {
                this.incomplete.add(batch);
            }
        }

        public void remove(Batch batch) {
            synchronized (this.incomplete) {
                if (!this.incomplete.remove(batch)) {
                    throw new IllegalStateException("Remove from the incomplete set failed. This should be impossible.");
                }
            }
        }

        public ArrayList<Batch> all() {
            ArrayList<Batch> arrayList;
            synchronized (this.incomplete) {
                arrayList = new ArrayList<>(this.incomplete);
            }
            return arrayList;
        }
    }

    public SequentialBasedBatchAccumulator() {
        this(Batch.BATCH_SIZE_DEFAULT, 1000L, 100L);
    }

    public SequentialBasedBatchAccumulator(Properties properties) {
        this(ConfigUtils.propertiesToConfig(properties));
    }

    public SequentialBasedBatchAccumulator(Config config) {
        this(ConfigUtils.getLong(config, Batch.BATCH_SIZE, Long.valueOf(Batch.BATCH_SIZE_DEFAULT)).longValue(), ConfigUtils.getLong(config, Batch.BATCH_TTL, 1000L).longValue(), ConfigUtils.getLong(config, Batch.BATCH_QUEUE_CAPACITY, 100L).longValue());
    }

    public SequentialBasedBatchAccumulator(long j, long j2, long j3) {
        this(j, j2, j3, DEFAULT_LARGE_MESSAGE_POLICY);
    }

    public SequentialBasedBatchAccumulator(long j, long j2, long j3, LargeMessagePolicy largeMessagePolicy) {
        this.dq = new LinkedList();
        this.incomplete = new IncompleteRecordBatches();
        this.tolerance = 0.95d;
        this.dqLock = new ReentrantLock();
        this.notEmpty = this.dqLock.newCondition();
        this.notFull = this.dqLock.newCondition();
        this.batchSizeLimit = j;
        this.expireInMilliSecond = j2;
        this.capacity = j3;
        getClass();
        this.memSizeLimit = (long) (0.95d * this.batchSizeLimit);
        this.largeMessagePolicy = largeMessagePolicy;
    }

    public long getNumOfBatches() {
        this.dqLock.lock();
        try {
            return this.dq.size();
        } finally {
            this.dqLock.unlock();
        }
    }

    @Override // org.apache.gobblin.writer.BatchAccumulator
    public final Future<RecordMetadata> enqueue(D d, WriteCallback writeCallback) throws InterruptedException {
        ReentrantLock reentrantLock = this.dqLock;
        reentrantLock.lock();
        try {
            BytesBoundedBatch<D> peekLast = this.dq.peekLast();
            if (peekLast != null) {
                Future<RecordMetadata> future = null;
                try {
                    future = peekLast.tryAppend(d, writeCallback, this.largeMessagePolicy);
                } catch (RecordTooLargeException e) {
                }
                if (future != null) {
                    return future;
                }
            }
            BytesBoundedBatch<D> bytesBoundedBatch = new BytesBoundedBatch<>(this.memSizeLimit, this.expireInMilliSecond);
            LOG.debug("Batch " + bytesBoundedBatch.getId() + " is generated");
            try {
                Future<RecordMetadata> tryAppend = bytesBoundedBatch.tryAppend(d, writeCallback, this.largeMessagePolicy);
                if (tryAppend == null) {
                    if (!$assertionsDisabled && !this.largeMessagePolicy.equals(LargeMessagePolicy.DROP)) {
                        throw new AssertionError();
                    }
                    LOG.error("Batch " + bytesBoundedBatch.getId() + " is silently marked as complete, dropping a huge record: " + d);
                    ListenableFuture immediateFuture = Futures.immediateFuture(new RecordMetadata(0L));
                    writeCallback.onSuccess(WriteResponse.EMPTY);
                    reentrantLock.unlock();
                    return immediateFuture;
                }
                while (this.dq.size() >= this.capacity) {
                    LOG.debug("Accumulator size {} is greater than capacity {}, waiting", Integer.valueOf(this.dq.size()), Long.valueOf(this.capacity));
                    this.notFull.await();
                }
                this.dq.addLast(bytesBoundedBatch);
                this.incomplete.add(bytesBoundedBatch);
                this.notEmpty.signal();
                reentrantLock.unlock();
                return tryAppend;
            } catch (RecordTooLargeException e2) {
                throw new RuntimeException("Failed due to a message that was too large", e2);
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    @Override // org.apache.gobblin.writer.BatchAccumulator
    public Batch<D> getNextAvailableBatch() {
        ReentrantLock reentrantLock = this.dqLock;
        try {
            try {
                reentrantLock.lock();
                if (isClosed()) {
                    BytesBoundedBatch<D> poll = this.dq.poll();
                    reentrantLock.unlock();
                    return poll;
                }
                while (this.dq.size() == 0) {
                    LOG.debug("ready to sleep because of queue is empty");
                    this.notEmpty.await();
                    if (isClosed()) {
                        BytesBoundedBatch<D> poll2 = this.dq.poll();
                        reentrantLock.unlock();
                        return poll2;
                    }
                }
                if (this.dq.size() > 1) {
                    BytesBoundedBatch<D> poll3 = this.dq.poll();
                    this.notFull.signal();
                    LOG.debug("retrieve batch " + poll3.getId());
                    reentrantLock.unlock();
                    return poll3;
                }
                if (this.dq.size() != 1) {
                    throw new RuntimeException("Should never get to here");
                }
                if (!this.dq.peekFirst().isTTLExpire()) {
                    reentrantLock.unlock();
                    return null;
                }
                LOG.debug("Batch " + this.dq.peekFirst().getId() + " is expired");
                BytesBoundedBatch<D> poll4 = this.dq.poll();
                this.notFull.signal();
                reentrantLock.unlock();
                return poll4;
            } catch (InterruptedException e) {
                LOG.error("Wait for next batch is interrupted. " + e.toString());
                reentrantLock.unlock();
                return null;
            }
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // org.apache.gobblin.writer.BatchAccumulator, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.dqLock.lock();
        try {
            this.notEmpty.signal();
        } finally {
            this.dqLock.unlock();
        }
    }

    @Override // org.apache.gobblin.writer.BatchAccumulator
    public void flush() {
        try {
            ArrayList<Batch> all = this.incomplete.all();
            int i = 0;
            Iterator<Batch> it = all.iterator();
            while (it.hasNext()) {
                i += it.next().getRecords().size();
            }
            LOG.debug("Flush called on {} batches with {} records total", Integer.valueOf(all.size()), Integer.valueOf(i));
            Iterator<Batch> it2 = all.iterator();
            while (it2.hasNext()) {
                it2.next().await();
            }
        } catch (Exception e) {
            LOG.error("Error happened while flushing batches");
        }
    }

    @Override // org.apache.gobblin.writer.BatchAccumulator
    public void deallocate(Batch<D> batch) {
        this.incomplete.remove(batch);
    }

    static {
        $assertionsDisabled = !SequentialBasedBatchAccumulator.class.desiredAssertionStatus();
        DEFAULT_LARGE_MESSAGE_POLICY = LargeMessagePolicy.FAIL;
        LOG = LoggerFactory.getLogger(SequentialBasedBatchAccumulator.class);
    }
}
