package org.apache.flume.channel;

import com.google.common.base.Preconditions;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;

/* loaded from: input_file:org/apache/flume/channel/MemoryChannel.class */
public class MemoryChannel implements Channel, Configurable {
    private static final Integer defaultCapacity = 50;
    private static final Integer defaultKeepAlive = 3;
    private LinkedBlockingDeque<StampedEvent> queue;
    private Integer keepAlive;
    final Lock lock = new ReentrantLock();
    final Condition hasData = this.lock.newCondition();
    private AtomicInteger currentStamp = new AtomicInteger(1);
    private AtomicInteger lastCommitStamp = new AtomicInteger(0);
    private ConcurrentHashMap<Long, MemTransaction> txnMap = new ConcurrentHashMap<>();

    /* loaded from: input_file:org/apache/flume/channel/MemoryChannel$MemTransaction.class */
    public class MemTransaction implements Transaction {
        private int putStamp;
        private int takeStamp;
        private LinkedList<StampedEvent> undoTakeList;
        private LinkedList<StampedEvent> undoPutList;
        private Transaction.TransactionState txnState = Transaction.TransactionState.Closed;
        private int refCount;

        public MemTransaction() {
        }

        @Override // org.apache.flume.Transaction
        public void begin() {
            int i = this.refCount + 1;
            this.refCount = i;
            if (i > 1) {
                return;
            }
            this.undoTakeList = new LinkedList<>();
            this.undoPutList = new LinkedList<>();
            this.putStamp = 0;
            this.takeStamp = 0;
            this.txnState = Transaction.TransactionState.Started;
        }

        @Override // org.apache.flume.Transaction
        public void commit() {
            Preconditions.checkArgument(this.txnState == Transaction.TransactionState.Started, "transaction not started");
            int i = this.refCount - 1;
            this.refCount = i;
            if (i > 0) {
                return;
            }
            if (this.putStamp != 0) {
                MemoryChannel.this.lastCommitStamp.set(this.putStamp);
                MemoryChannel.this.lock.lock();
                try {
                    MemoryChannel.this.hasData.signal();
                    MemoryChannel.this.lock.unlock();
                } catch (Throwable th) {
                    MemoryChannel.this.lock.unlock();
                    throw th;
                }
            }
            this.txnState = Transaction.TransactionState.Committed;
            this.undoPutList.clear();
            this.undoTakeList.clear();
        }

        @Override // org.apache.flume.Transaction
        public void rollback() {
            Preconditions.checkArgument(this.txnState == Transaction.TransactionState.Started, "transaction not started");
            MemoryChannel.this.undoPut(this);
            MemoryChannel.this.undoTake(this);
            this.txnState = Transaction.TransactionState.RolledBack;
            this.refCount = 0;
        }

        @Override // org.apache.flume.Transaction
        public void close() {
            if (this.txnState == Transaction.TransactionState.Started) {
                rollback();
            }
            this.txnState = Transaction.TransactionState.Closed;
            MemoryChannel.this.forgetTransaction(this);
        }

        public Transaction.TransactionState getState() {
            return this.txnState;
        }

        protected int lastTakeStamp() {
            return this.takeStamp;
        }

        protected void logPut(StampedEvent stampedEvent, int i) {
            this.undoPutList.addLast(stampedEvent);
            this.putStamp = i;
        }

        protected void logTake(StampedEvent stampedEvent, int i) {
            this.undoTakeList.addLast(stampedEvent);
            this.takeStamp = i;
        }

        protected StampedEvent removePut() {
            if (this.undoPutList.isEmpty()) {
                return null;
            }
            return this.undoPutList.removeLast();
        }

        protected StampedEvent removeTake() {
            if (this.undoTakeList.isEmpty()) {
                return null;
            }
            return this.undoTakeList.removeLast();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/channel/MemoryChannel$StampedEvent.class */
    public class StampedEvent {
        private int timeStamp;
        private Event event;

        public StampedEvent(int i, Event event) {
            this.timeStamp = i;
            this.event = event;
        }

        public int getStamp() {
            return this.timeStamp;
        }

        public Event getEvent() {
            return this.event;
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        String str = (String) context.get("capacity", String.class);
        Integer valueOf = str == null ? defaultCapacity : Integer.valueOf(Integer.parseInt(str));
        String str2 = (String) context.get("keep-alive", String.class);
        if (str2 == null) {
            this.keepAlive = defaultKeepAlive;
        } else {
            this.keepAlive = Integer.valueOf(Integer.parseInt(str2));
        }
        this.queue = new LinkedBlockingDeque<>(valueOf.intValue());
    }

    @Override // org.apache.flume.Channel
    public void put(Event event) {
        Preconditions.checkState(this.queue != null, "No queue defined (Did you forget to configure me?");
        try {
            MemTransaction findTransaction = findTransaction();
            Preconditions.checkState(findTransaction != null, "Transaction not started");
            int andIncrement = this.currentStamp.getAndIncrement();
            StampedEvent stampedEvent = new StampedEvent(andIncrement, event);
            if (!this.queue.offer(stampedEvent, this.keepAlive.intValue(), TimeUnit.SECONDS)) {
                throw new ChannelException("put(" + event + ") timed out");
            }
            findTransaction.logPut(stampedEvent, andIncrement);
        } catch (InterruptedException e) {
            throw new ChannelException("Failed to put(" + event + ")", e);
        }
    }

    protected void undoPut(MemTransaction memTransaction) {
        while (true) {
            StampedEvent removePut = memTransaction.removePut();
            if (removePut == null) {
                return;
            }
            StampedEvent removeLast = this.queue.removeLast();
            Preconditions.checkNotNull(removeLast, "Rollback error");
            Preconditions.checkArgument(removeLast == removePut, "Rollback error");
        }
    }

    @Override // org.apache.flume.Channel
    public Event take() {
        StampedEvent poll;
        Preconditions.checkState(this.queue != null, "Queue not configured");
        try {
            MemTransaction findTransaction = findTransaction();
            Preconditions.checkState(findTransaction != null, "Transaction not started");
            Event event = null;
            int intValue = this.keepAlive.intValue();
            if (intValue > 0 && findTransaction.lastTakeStamp() == this.lastCommitStamp.get()) {
                this.lock.lock();
                try {
                    this.hasData.await(intValue, TimeUnit.SECONDS);
                    this.lock.unlock();
                    intValue = 0;
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
            if (findTransaction.lastTakeStamp() != this.lastCommitStamp.get() && (poll = this.queue.poll(intValue, TimeUnit.SECONDS)) != null) {
                findTransaction.logTake(poll, poll.getStamp());
                event = poll.getEvent();
            }
            return event;
        } catch (InterruptedException e) {
            throw new ChannelException("Failed to take()", e);
        }
    }

    protected void undoTake(MemTransaction memTransaction) {
        while (true) {
            StampedEvent removeTake = memTransaction.removeTake();
            if (removeTake == null) {
                return;
            } else {
                this.queue.addFirst(removeTake);
            }
        }
    }

    @Override // org.apache.flume.Channel
    public Transaction getTransaction() {
        MemTransaction findTransaction = findTransaction();
        if (findTransaction == null) {
            findTransaction = new MemTransaction();
            this.txnMap.put(Long.valueOf(Thread.currentThread().getId()), findTransaction);
        }
        return findTransaction;
    }

    protected void forgetTransaction(MemTransaction memTransaction) {
        Preconditions.checkArgument(memTransaction == findTransaction(), "Wrong transaction to close");
        this.txnMap.remove(Long.valueOf(Thread.currentThread().getId()));
    }

    protected MemTransaction findTransaction() {
        try {
            return this.txnMap.get(Long.valueOf(Thread.currentThread().getId()));
        } catch (NullPointerException e) {
            return null;
        }
    }

    @Override // org.apache.flume.Channel
    public void shutdown() {
    }

    @Override // org.apache.flume.Channel
    public String getName() {
        return null;
    }
}
