package org.apache.asterix.feeds;

import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedTupleCommitAckMessage;
import org.apache.asterix.common.feeds.FeedTupleCommitResponseMessage;
import org.apache.asterix.common.feeds.api.IFeedTrackingManager;
import org.apache.asterix.file.FeedOperations;

/* loaded from: input_file:org/apache/asterix/feeds/FeedTrackingManager.class */
public class FeedTrackingManager implements IFeedTrackingManager {
    private static final Logger LOGGER = Logger.getLogger(FeedTrackingManager.class.getName());
    private final BitSet allOnes;
    private Map<FeedConnectionId, Map<AckId, BitSet>> ackHistory;
    private Map<FeedConnectionId, Map<AckId, Integer>> maxBaseAcked;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/asterix/feeds/FeedTrackingManager$AckId.class */
    public static class AckId {
        private FeedConnectionId connectionId;
        private int intakePartition;
        private int base;

        public AckId(FeedConnectionId feedConnectionId, int i, int i2) {
            this.connectionId = feedConnectionId;
            this.intakePartition = i;
            this.base = i2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof AckId)) {
                return false;
            }
            AckId ackId = (AckId) obj;
            return ackId.getConnectionId().equals(this.connectionId) && ackId.getIntakePartition() == this.intakePartition && ackId.getBase() == this.base;
        }

        public String toString() {
            return this.connectionId + "[" + this.intakePartition + "](" + this.base + ")";
        }

        public int hashCode() {
            return toString().hashCode();
        }

        public FeedConnectionId getConnectionId() {
            return this.connectionId;
        }

        public int getIntakePartition() {
            return this.intakePartition;
        }

        public int getBase() {
            return this.base;
        }
    }

    public FeedTrackingManager() {
        byte[] bArr = new byte[128];
        Arrays.fill(bArr, (byte) -1);
        this.allOnes = BitSet.valueOf(bArr);
        this.ackHistory = new HashMap();
        this.maxBaseAcked = new HashMap();
    }

    public synchronized void submitAckReport(FeedTupleCommitAckMessage feedTupleCommitAckMessage) {
        AckId ackId = getAckId(feedTupleCommitAckMessage);
        Map<AckId, BitSet> map = this.ackHistory.get(feedTupleCommitAckMessage.getConnectionId());
        if (map == null) {
            map = new HashMap();
            map.put(ackId, BitSet.valueOf(feedTupleCommitAckMessage.getCommitAcks()));
            this.ackHistory.put(feedTupleCommitAckMessage.getConnectionId(), map);
        }
        BitSet bitSet = map.get(ackId);
        if (bitSet == null) {
            bitSet = BitSet.valueOf(feedTupleCommitAckMessage.getCommitAcks());
            map.put(ackId, bitSet);
        } else {
            bitSet.or(BitSet.valueOf(feedTupleCommitAckMessage.getCommitAcks()));
        }
        if (!Arrays.equals(bitSet.toByteArray(), this.allOnes.toByteArray())) {
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("AckId " + ackId + " pending number of acks " + (1024 - bitSet.cardinality()));
                return;
            }
            return;
        }
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info(feedTupleCommitAckMessage.getIntakePartition() + " (" + feedTupleCommitAckMessage.getBase() + ") is convered");
        }
        Map<AckId, Integer> map2 = this.maxBaseAcked.get(feedTupleCommitAckMessage.getConnectionId());
        if (map2 == null) {
            map2 = new HashMap();
            this.maxBaseAcked.put(feedTupleCommitAckMessage.getConnectionId(), map2);
        }
        Integer num = map2.get(ackId);
        if (num == null) {
            Integer.valueOf(feedTupleCommitAckMessage.getBase());
            map2.put(ackId, Integer.valueOf(feedTupleCommitAckMessage.getBase()));
            sendCommitResponseMessage(feedTupleCommitAckMessage.getConnectionId(), feedTupleCommitAckMessage.getIntakePartition(), feedTupleCommitAckMessage.getBase());
        } else if (feedTupleCommitAckMessage.getBase() == num.intValue() + 1) {
            map2.put(ackId, Integer.valueOf(feedTupleCommitAckMessage.getBase()));
            sendCommitResponseMessage(feedTupleCommitAckMessage.getConnectionId(), feedTupleCommitAckMessage.getIntakePartition(), feedTupleCommitAckMessage.getBase());
        } else if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Ignoring discountiuous acked base " + feedTupleCommitAckMessage.getBase() + " for " + ackId);
        }
    }

    public synchronized void disableTracking(FeedConnectionId feedConnectionId) {
        this.ackHistory.remove(feedConnectionId);
        this.maxBaseAcked.remove(feedConnectionId);
    }

    private void sendCommitResponseMessage(FeedConnectionId feedConnectionId, int i, int i2) {
        FeedTupleCommitResponseMessage feedTupleCommitResponseMessage = new FeedTupleCommitResponseMessage(feedConnectionId, i, i2);
        List<String> storeLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(feedConnectionId);
        String str = FeedLifecycleListener.INSTANCE.getCollectLocations(feedConnectionId).get(i);
        HashSet hashSet = new HashSet();
        hashSet.add(str);
        hashSet.addAll(storeLocations);
        try {
            CentralFeedManager.runJob(FeedOperations.buildCommitAckResponseJob(feedTupleCommitResponseMessage, hashSet), false);
        } catch (Exception e) {
            e.printStackTrace();
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("Unable to send commit response message " + feedTupleCommitResponseMessage + " exception " + e.getMessage());
            }
        }
    }

    private static AckId getAckId(FeedTupleCommitAckMessage feedTupleCommitAckMessage) {
        return new AckId(feedTupleCommitAckMessage.getConnectionId(), feedTupleCommitAckMessage.getIntakePartition(), feedTupleCommitAckMessage.getBase());
    }

    public void disableAcking(FeedConnectionId feedConnectionId) {
        this.ackHistory.remove(feedConnectionId);
        this.maxBaseAcked.remove(feedConnectionId);
        if (LOGGER.isLoggable(Level.WARNING)) {
            LOGGER.warning("Acking disabled for " + feedConnectionId);
        }
    }
}
