package org.apache.asterix.feeds;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.feeds.FeedActivity;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedJobInfo;
import org.apache.asterix.common.feeds.FeedRuntimeId;
import org.apache.asterix.common.feeds.NodeLoadReport;
import org.apache.asterix.common.feeds.api.IFeedLoadManager;
import org.apache.asterix.common.feeds.api.IFeedRuntime;
import org.apache.asterix.common.feeds.message.FeedCongestionMessage;
import org.apache.asterix.common.feeds.message.FeedReportMessage;
import org.apache.asterix.common.feeds.message.ScaleInReportMessage;
import org.apache.asterix.common.feeds.message.ThrottlingEnabledFeedMessage;
import org.apache.asterix.file.FeedOperations;
import org.apache.asterix.metadata.feeds.FeedUtil;
import org.apache.asterix.metadata.feeds.PrepareStallMessage;
import org.apache.asterix.metadata.feeds.TerminateDataFlowMessage;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;

/* loaded from: input_file:org/apache/asterix/feeds/FeedLoadManager.class */
public class FeedLoadManager implements IFeedLoadManager {
    private static final Logger LOGGER = Logger.getLogger(FeedLoadManager.class.getName());
    private static final long MIN_MODIFICATION_INTERVAL = 180000;
    private final TreeSet<NodeLoadReport> nodeReports = new TreeSet<>();
    private final Map<FeedConnectionId, FeedActivity> feedActivities = new HashMap();
    private final Map<String, Pair<Integer, Integer>> feedMetrics = new HashMap();
    private FeedConnectionId lastModified;
    private long lastModifiedTimestamp;
    private static final int UNKNOWN = -1;

    public void submitNodeLoadReport(NodeLoadReport nodeLoadReport) {
        this.nodeReports.remove(nodeLoadReport);
        this.nodeReports.add(nodeLoadReport);
    }

    public void reportCongestion(FeedCongestionMessage feedCongestionMessage) throws AsterixException {
        FeedRuntimeId runtimeId = feedCongestionMessage.getRuntimeId();
        FeedJobInfo.FeedJobState feedJobState = FeedLifecycleListener.INSTANCE.getFeedJobState(feedCongestionMessage.getConnectionId());
        if (feedJobState == null || feedJobState.equals(FeedJobInfo.FeedJobState.UNDER_RECOVERY) || (feedCongestionMessage.getConnectionId().equals(this.lastModified) && System.currentTimeMillis() - this.lastModifiedTimestamp < MIN_MODIFICATION_INTERVAL)) {
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Ignoring congestion report from " + runtimeId);
                return;
            }
            return;
        }
        try {
            FeedLifecycleListener.INSTANCE.setJobState(feedCongestionMessage.getConnectionId(), FeedJobInfo.FeedJobState.UNDER_RECOVERY);
            int inflowRate = feedCongestionMessage.getInflowRate();
            int outflowRate = feedCongestionMessage.getOutflowRate();
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(feedCongestionMessage.getConnectionId().getFeedId()));
            int size = arrayList.size();
            int ceil = ((int) Math.ceil((size * inflowRate) / outflowRate)) + 5;
            int i = ceil - size;
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("INCREASING COMPUTE CARDINALITY from " + size + " by " + i);
            }
            List<String> nodeForSubstitution = getNodeForSubstitution(i);
            JobSpecification collectJobSpecification = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(feedCongestionMessage.getConnectionId());
            nodeForSubstitution.addAll(arrayList);
            ArrayList arrayList2 = new ArrayList();
            arrayList2.addAll(arrayList);
            arrayList2.addAll(nodeForSubstitution);
            FeedUtil.increaseCardinality(collectJobSpecification, IFeedRuntime.FeedRuntimeType.COMPUTE, ceil, arrayList2);
            gracefullyTerminateDataFlow(feedCongestionMessage.getConnectionId(), Integer.MAX_VALUE);
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("New Job after adjusting to the workload " + collectJobSpecification);
            }
            Thread.sleep(10000L);
            runJob(collectJobSpecification, false);
            this.lastModified = feedCongestionMessage.getConnectionId();
            this.lastModifiedTimestamp = System.currentTimeMillis();
        } catch (Exception e) {
            e.printStackTrace();
            if (LOGGER.isLoggable(Level.SEVERE)) {
                LOGGER.severe("Unable to form the required job for scaling in/out" + e.getMessage());
            }
            throw new AsterixException(e);
        }
    }

    public void submitScaleInPossibleReport(ScaleInReportMessage scaleInReportMessage) throws Exception {
        FeedJobInfo.FeedJobState feedJobState = FeedLifecycleListener.INSTANCE.getFeedJobState(scaleInReportMessage.getConnectionId());
        if (feedJobState == null || feedJobState.equals(FeedJobInfo.FeedJobState.UNDER_RECOVERY)) {
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("JobState information for job [" + scaleInReportMessage.getConnectionId() + "] not found ");
                return;
            }
            return;
        }
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Processing scale-in message " + scaleInReportMessage);
        }
        FeedLifecycleListener.INSTANCE.setJobState(scaleInReportMessage.getConnectionId(), FeedJobInfo.FeedJobState.UNDER_RECOVERY);
        JobSpecification collectJobSpecification = FeedLifecycleListener.INSTANCE.getCollectJobSpecification(scaleInReportMessage.getConnectionId());
        int reducedCardinaliy = scaleInReportMessage.getReducedCardinaliy();
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(FeedLifecycleListener.INSTANCE.getComputeLocations(scaleInReportMessage.getConnectionId().getFeedId()));
        FeedUtil.decreaseComputeCardinality(collectJobSpecification, IFeedRuntime.FeedRuntimeType.COMPUTE, reducedCardinaliy, arrayList);
        gracefullyTerminateDataFlow(scaleInReportMessage.getConnectionId(), reducedCardinaliy - 1);
        Thread.sleep(3000L);
        JobId runJob = runJob(collectJobSpecification, false);
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Launch modified job[" + runJob + "]for scale-in \n" + collectJobSpecification);
        }
    }

    private void gracefullyTerminateDataFlow(FeedConnectionId feedConnectionId, int i) throws Exception {
        PrepareStallMessage prepareStallMessage = new PrepareStallMessage(feedConnectionId, i);
        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(feedConnectionId);
        List<String> computeLocations = FeedLifecycleListener.INSTANCE.getComputeLocations(feedConnectionId.getFeedId());
        List<String> storeLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(feedConnectionId);
        HashSet hashSet = new HashSet();
        hashSet.addAll(collectLocations);
        hashSet.addAll(computeLocations);
        hashSet.addAll(storeLocations);
        runJob(FeedOperations.buildPrepareStallMessageJob(prepareStallMessage, hashSet), true);
        runJob(FeedOperations.buildTerminateFlowMessageJob(new TerminateDataFlowMessage(feedConnectionId), collectLocations), true);
    }

    public static JobId runJob(JobSpecification jobSpecification, boolean z) throws Exception {
        IHyracksClientConnection hcc = AsterixAppContextInfo.getInstance().getHcc();
        JobId startJob = hcc.startJob(jobSpecification);
        if (z) {
            hcc.waitForCompletion(startJob);
        }
        return startJob;
    }

    public void submitFeedRuntimeReport(FeedReportMessage feedReportMessage) {
        String str = "" + feedReportMessage.getConnectionId() + ":" + feedReportMessage.getRuntimeId().getFeedRuntimeType();
        Pair<Integer, Integer> pair = this.feedMetrics.get(str);
        if (pair == null) {
            this.feedMetrics.put(str, new Pair<>(Integer.valueOf(feedReportMessage.getValue()), 1));
        } else {
            pair.first = Integer.valueOf(((Integer) pair.first).intValue() + feedReportMessage.getValue());
            pair.second = Integer.valueOf(((Integer) pair.second).intValue() + 1);
        }
    }

    public int getOutflowRate(FeedConnectionId feedConnectionId, IFeedRuntime.FeedRuntimeType feedRuntimeType) {
        String str = "" + feedConnectionId + ":" + feedRuntimeType;
        this.feedMetrics.get(str);
        Pair<Integer, Integer> pair = this.feedMetrics.get(str);
        return pair == null ? UNKNOWN : ((Integer) pair.first).intValue() / ((Integer) pair.second).intValue();
    }

    private List<String> getNodeForSubstitution(int i) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        while (i2 < i) {
            Iterator<NodeLoadReport> it = this.nodeReports.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getNodeId());
                i2++;
            }
        }
        return arrayList;
    }

    public synchronized List<String> getNodes(int i) {
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < i) {
            Iterator<NodeLoadReport> it = this.nodeReports.iterator();
            while (it.hasNext() && arrayList.size() < i) {
                arrayList.add(it.next().getNodeId());
            }
        }
        return arrayList;
    }

    public void reportThrottlingEnabled(ThrottlingEnabledFeedMessage throttlingEnabledFeedMessage) throws AsterixException, Exception {
        System.out.println("Throttling Enabled for " + throttlingEnabledFeedMessage.getConnectionId() + " " + throttlingEnabledFeedMessage.getFeedRuntimeId());
        FeedConnectionId connectionId = throttlingEnabledFeedMessage.getConnectionId();
        ArrayList arrayList = new ArrayList();
        List<String> storeLocations = FeedLifecycleListener.INSTANCE.getStoreLocations(connectionId);
        List<String> collectLocations = FeedLifecycleListener.INSTANCE.getCollectLocations(connectionId);
        arrayList.addAll(storeLocations);
        arrayList.addAll(collectLocations);
        runJob(FeedOperations.buildNotifyThrottlingEnabledMessageJob(throttlingEnabledFeedMessage, arrayList), true);
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.warning("Acking disabled for " + throttlingEnabledFeedMessage.getConnectionId() + " in view of activated throttling");
        }
        CentralFeedManager.getInstance().getFeedTrackingManager().disableAcking(connectionId);
    }

    public void reportFeedActivity(FeedConnectionId feedConnectionId, FeedActivity feedActivity) {
        this.feedActivities.put(feedConnectionId, feedActivity);
    }

    public FeedActivity getFeedActivity(FeedConnectionId feedConnectionId) {
        return this.feedActivities.get(feedConnectionId);
    }

    public Collection<FeedActivity> getFeedActivities() {
        return this.feedActivities.values();
    }

    public void removeFeedActivity(FeedConnectionId feedConnectionId) {
        this.feedActivities.remove(feedConnectionId);
    }
}
