package org.apache.asterix.feeds;

import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.api.common.FeedWorkCollection;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.feeds.FeedActivity;
import org.apache.asterix.common.feeds.FeedConnectJobInfo;
import org.apache.asterix.common.feeds.FeedConnectionId;
import org.apache.asterix.common.feeds.FeedConnectionRequest;
import org.apache.asterix.common.feeds.FeedId;
import org.apache.asterix.common.feeds.FeedIntakeInfo;
import org.apache.asterix.common.feeds.FeedJobInfo;
import org.apache.asterix.common.feeds.FeedJointKey;
import org.apache.asterix.common.feeds.FeedPolicyAccessor;
import org.apache.asterix.common.feeds.api.IFeedJoint;
import org.apache.asterix.common.feeds.api.IFeedLifecycleEventSubscriber;
import org.apache.asterix.common.feeds.api.IIntakeProgressTracker;
import org.apache.asterix.common.feeds.message.StorageReportFeedMessage;
import org.apache.asterix.feeds.FeedLifecycleListener;
import org.apache.asterix.metadata.feeds.FeedCollectOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedIntakeOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedMetaOperatorDescriptor;
import org.apache.asterix.metadata.feeds.FeedWorkManager;
import org.apache.asterix.om.util.AsterixAppContextInfo;
import org.apache.asterix.result.ResultReader;
import org.apache.commons.lang3.StringUtils;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobInfo;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexInsertUpdateDeleteOperatorDescriptor;

/* loaded from: input_file:org/apache/asterix/feeds/FeedJobNotificationHandler.class */
public class FeedJobNotificationHandler implements Runnable {
    private static final Logger LOGGER = Logger.getLogger(FeedJobNotificationHandler.class.getName());
    private final LinkedBlockingQueue<FeedLifecycleListener.Message> inbox;
    private final Map<JobId, FeedJobInfo> jobInfos = new HashMap();
    private final Map<FeedId, FeedIntakeInfo> intakeJobInfos = new HashMap();
    private final Map<FeedConnectionId, FeedConnectJobInfo> connectJobInfos = new HashMap();
    private final Map<FeedId, List<IFeedJoint>> feedPipeline = new HashMap();
    private final Map<FeedConnectionId, List<IFeedLifecycleEventSubscriber>> eventSubscribers = new HashMap();
    private final Map<FeedConnectionId, Pair<IIntakeProgressTracker, Long>> feedIntakeProgressTrackers = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.asterix.feeds.FeedJobNotificationHandler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/asterix/feeds/FeedJobNotificationHandler$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$feeds$FeedLifecycleListener$Message$MessageKind;
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$asterix$common$feeds$api$IFeedJoint$FeedJointType = new int[IFeedJoint.FeedJointType.values().length];

        static {
            try {
                $SwitchMap$org$apache$asterix$common$feeds$api$IFeedJoint$FeedJointType[IFeedJoint.FeedJointType.INTAKE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$asterix$common$feeds$api$IFeedJoint$FeedJointType[IFeedJoint.FeedJointType.COMPUTE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType = new int[FeedJobInfo.JobType.values().length];
            try {
                $SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType[FeedJobInfo.JobType.INTAKE.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType[FeedJobInfo.JobType.FEED_CONNECT.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$asterix$feeds$FeedLifecycleListener$Message$MessageKind = new int[FeedLifecycleListener.Message.MessageKind.values().length];
            try {
                $SwitchMap$org$apache$asterix$feeds$FeedLifecycleListener$Message$MessageKind[FeedLifecycleListener.Message.MessageKind.JOB_START.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$asterix$feeds$FeedLifecycleListener$Message$MessageKind[FeedLifecycleListener.Message.MessageKind.JOB_FINISH.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public FeedJobNotificationHandler(LinkedBlockingQueue<FeedLifecycleListener.Message> linkedBlockingQueue) {
        this.inbox = linkedBlockingQueue;
    }

    /* JADX INFO: Infinite loop detected, blocks: 15, insns: 0 */
    /* JADX WARN: Failed to find 'out' block for switch in B:3:0x0016. Please report as an issue. */
    @Override // java.lang.Runnable
    public void run() {
        FeedLifecycleListener.Message take;
        while (true) {
            try {
                take = this.inbox.take();
            } catch (Exception e) {
                e.printStackTrace();
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$asterix$feeds$FeedLifecycleListener$Message$MessageKind[take.messageKind.ordinal()]) {
                case ResultReader.NUM_READERS /* 1 */:
                    handleJobStartMessage(take);
                case 2:
                    handleJobFinishMessage(take);
            }
        }
    }

    public void registerFeedIntakeProgressTracker(FeedConnectionId feedConnectionId, IIntakeProgressTracker iIntakeProgressTracker) {
        if (this.feedIntakeProgressTrackers.get(feedConnectionId) != null) {
            throw new IllegalStateException(" Progress tracker for connection " + feedConnectionId + " is alreader registered");
        }
        this.feedIntakeProgressTrackers.put(feedConnectionId, new Pair<>(iIntakeProgressTracker, 0L));
    }

    public void deregisterFeedIntakeProgressTracker(FeedConnectionId feedConnectionId) {
        this.feedIntakeProgressTrackers.remove(feedConnectionId);
    }

    public void updateTrackingInformation(StorageReportFeedMessage storageReportFeedMessage) {
        Pair<IIntakeProgressTracker, Long> pair = this.feedIntakeProgressTrackers.get(storageReportFeedMessage.getConnectionId());
        if (pair == null || ((Long) pair.second).longValue() >= storageReportFeedMessage.getLastPersistedTupleIntakeTimestamp()) {
            return;
        }
        pair.second = Long.valueOf(storageReportFeedMessage.getLastPersistedTupleIntakeTimestamp());
        ((IIntakeProgressTracker) pair.first).notifyIngestedTupleTimestamp(((Long) pair.second).longValue());
    }

    public Collection<FeedIntakeInfo> getFeedIntakeInfos() {
        return this.intakeJobInfos.values();
    }

    public Collection<FeedConnectJobInfo> getFeedConnectInfos() {
        return this.connectJobInfos.values();
    }

    public void registerFeedJoint(IFeedJoint iFeedJoint) {
        List<IFeedJoint> list = this.feedPipeline.get(iFeedJoint.getOwnerFeedId());
        if (list == null) {
            ArrayList arrayList = new ArrayList();
            this.feedPipeline.put(iFeedJoint.getOwnerFeedId(), arrayList);
            arrayList.add(iFeedJoint);
        } else {
            if (list.contains(iFeedJoint)) {
                throw new IllegalArgumentException("Feed joint " + iFeedJoint + " already registered");
            }
            list.add(iFeedJoint);
        }
    }

    public void registerFeedIntakeJob(FeedId feedId, JobId jobId, JobSpecification jobSpecification) throws HyracksDataException {
        if (this.jobInfos.get(jobId) != null) {
            throw new IllegalStateException("Feed job already registered");
        }
        IFeedJoint iFeedJoint = null;
        Iterator<IFeedJoint> it = this.feedPipeline.get(feedId).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            IFeedJoint next = it.next();
            if (next.getType().equals(IFeedJoint.FeedJointType.INTAKE)) {
                iFeedJoint = next;
                break;
            }
        }
        if (iFeedJoint == null) {
            throw new HyracksDataException("Could not register feed intake job [" + jobId + "] for feed  " + feedId);
        }
        FeedJobInfo feedIntakeInfo = new FeedIntakeInfo(jobId, FeedJobInfo.FeedJobState.CREATED, FeedJobInfo.JobType.INTAKE, feedId, iFeedJoint, jobSpecification);
        this.intakeJobInfos.put(feedId, feedIntakeInfo);
        this.jobInfos.put(jobId, feedIntakeInfo);
        if (LOGGER.isLoggable(Level.INFO)) {
            LOGGER.info("Registered feed intake [" + jobId + "] for feed " + feedId);
        }
    }

    public void registerFeedCollectionJob(FeedId feedId, FeedConnectionId feedConnectionId, JobId jobId, JobSpecification jobSpecification, Map<String, String> map) {
        if (this.jobInfos.get(jobId) != null) {
            throw new IllegalStateException("Feed job already registered");
        }
        FeedConnectionId feedConnectionId2 = null;
        IFeedJoint iFeedJoint = null;
        Iterator<IFeedJoint> it = this.feedPipeline.get(feedId).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            IFeedJoint next = it.next();
            feedConnectionId2 = next.getReceiver(feedConnectionId);
            if (feedConnectionId2 != null) {
                iFeedJoint = next;
                break;
            }
        }
        if (feedConnectionId2 == null) {
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("Could not register feed collection job [" + jobId + "] for feed connection " + feedConnectionId);
            }
        } else {
            FeedConnectJobInfo feedConnectJobInfo = new FeedConnectJobInfo(jobId, FeedJobInfo.FeedJobState.CREATED, feedConnectionId, iFeedJoint, (IFeedJoint) null, jobSpecification, map);
            this.jobInfos.put(jobId, feedConnectJobInfo);
            this.connectJobInfos.put(feedConnectionId, feedConnectJobInfo);
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Registered feed connection [" + jobId + "] for feed " + feedConnectionId);
            }
        }
    }

    public void deregisterFeedIntakeJob(JobId jobId) {
        if (this.jobInfos.get(jobId) == null) {
            throw new IllegalStateException(" Feed Intake job not registered ");
        }
        FeedIntakeInfo feedIntakeInfo = this.jobInfos.get(jobId);
        this.jobInfos.remove(jobId);
        this.intakeJobInfos.remove(feedIntakeInfo.getFeedId());
        if (feedIntakeInfo.getState().equals(FeedJobInfo.FeedJobState.UNDER_RECOVERY)) {
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Not removing feed joint as intake job is in " + FeedJobInfo.FeedJobState.UNDER_RECOVERY + " state.");
            }
        } else {
            this.feedPipeline.get(feedIntakeInfo.getFeedId()).remove(feedIntakeInfo.getIntakeFeedJoint());
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Deregistered feed intake job [" + jobId + "]");
            }
        }
    }

    private void handleJobStartMessage(FeedLifecycleListener.Message message) throws Exception {
        FeedJobInfo feedJobInfo = this.jobInfos.get(message.jobId);
        switch (AnonymousClass1.$SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType[feedJobInfo.getJobType().ordinal()]) {
            case ResultReader.NUM_READERS /* 1 */:
                handleIntakeJobStartMessage((FeedIntakeInfo) feedJobInfo);
                return;
            case 2:
                handleCollectJobStartMessage((FeedConnectJobInfo) feedJobInfo);
                return;
            default:
                return;
        }
    }

    private void handleJobFinishMessage(FeedLifecycleListener.Message message) throws Exception {
        FeedConnectJobInfo feedConnectJobInfo = (FeedJobInfo) this.jobInfos.get(message.jobId);
        switch (AnonymousClass1.$SwitchMap$org$apache$asterix$common$feeds$FeedJobInfo$JobType[feedConnectJobInfo.getJobType().ordinal()]) {
            case ResultReader.NUM_READERS /* 1 */:
                if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Intake Job finished for feed intake " + feedConnectJobInfo.getJobId());
                }
                handleFeedIntakeJobFinishMessage((FeedIntakeInfo) feedConnectJobInfo, message);
                return;
            case 2:
                if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Collect Job finished for  " + feedConnectJobInfo);
                }
                handleFeedCollectJobFinishMessage(feedConnectJobInfo);
                return;
            default:
                return;
        }
    }

    private synchronized void handleIntakeJobStartMessage(FeedIntakeInfo feedIntakeInfo) throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = feedIntakeInfo.getSpec().getOperatorMap().entrySet().iterator();
        while (it.hasNext()) {
            IOperatorDescriptor iOperatorDescriptor = (IOperatorDescriptor) ((Map.Entry) it.next()).getValue();
            if (iOperatorDescriptor instanceof FeedIntakeOperatorDescriptor) {
                arrayList.add(iOperatorDescriptor.getOperatorId());
            }
        }
        JobInfo jobInfo = AsterixAppContextInfo.getInstance().getHcc().getJobInfo(feedIntakeInfo.getJobId());
        ArrayList arrayList2 = new ArrayList();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Map map = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it2.next());
            int size = map.size();
            for (int i = 0; i < size; i++) {
                arrayList2.add(map.get(Integer.valueOf(i)));
            }
        }
        feedIntakeInfo.setIntakeLocation(arrayList2);
        feedIntakeInfo.getIntakeFeedJoint().setState(IFeedJoint.State.ACTIVE);
        feedIntakeInfo.setState(FeedJobInfo.FeedJobState.ACTIVE);
        notifyFeedEventSubscribers(feedIntakeInfo, IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_INTAKE_STARTED);
    }

    private void handleCollectJobStartMessage(FeedConnectJobInfo feedConnectJobInfo) throws RemoteException, ACIDException {
        setLocations(feedConnectJobInfo);
        for (IFeedJoint iFeedJoint : this.feedPipeline.get(feedConnectJobInfo.getConnectionId().getFeedId())) {
            if (iFeedJoint.getProvider().equals(feedConnectJobInfo.getConnectionId())) {
                iFeedJoint.setState(IFeedJoint.State.ACTIVE);
                if (iFeedJoint.getType().equals(IFeedJoint.FeedJointType.COMPUTE)) {
                    feedConnectJobInfo.setComputeFeedJoint(iFeedJoint);
                }
            }
        }
        feedConnectJobInfo.setState(FeedJobInfo.FeedJobState.ACTIVE);
        registerFeedActivity(feedConnectJobInfo);
        notifyFeedEventSubscribers(feedConnectJobInfo, IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_COLLECT_STARTED);
    }

    private void notifyFeedEventSubscribers(FeedJobInfo feedJobInfo, IFeedLifecycleEventSubscriber.FeedLifecycleEvent feedLifecycleEvent) {
        FeedJobInfo.JobType jobType = feedJobInfo.getJobType();
        ArrayList arrayList = new ArrayList();
        if (jobType.equals(FeedJobInfo.JobType.INTAKE)) {
            FeedId feedId = ((FeedIntakeInfo) feedJobInfo).getFeedId();
            for (FeedConnectionId feedConnectionId : this.eventSubscribers.keySet()) {
                if (feedConnectionId.getFeedId().equals(feedId)) {
                    arrayList.add(feedConnectionId);
                }
            }
        } else {
            arrayList.add(((FeedConnectJobInfo) feedJobInfo).getConnectionId());
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            List<IFeedLifecycleEventSubscriber> list = this.eventSubscribers.get((FeedConnectionId) it.next());
            if (list != null && !list.isEmpty()) {
                Iterator<IFeedLifecycleEventSubscriber> it2 = list.iterator();
                while (it2.hasNext()) {
                    it2.next().handleFeedEvent(feedLifecycleEvent);
                }
            }
        }
    }

    public synchronized void submitFeedConnectionRequest(IFeedJoint iFeedJoint, FeedConnectionRequest feedConnectionRequest) throws Exception {
        List list = null;
        switch (AnonymousClass1.$SwitchMap$org$apache$asterix$common$feeds$api$IFeedJoint$FeedJointType[iFeedJoint.getType().ordinal()]) {
            case ResultReader.NUM_READERS /* 1 */:
                list = this.intakeJobInfos.get(iFeedJoint.getOwnerFeedId()).getIntakeLocation();
                break;
            case 2:
                list = this.connectJobInfos.get(iFeedJoint.getProvider()).getComputeLocations();
                break;
        }
        FeedWorkManager.INSTANCE.submitWork(new FeedWorkCollection.SubscribeFeedWork((String[]) list.toArray(new String[0]), feedConnectionRequest), new FeedWorkCollection.SubscribeFeedWork.FeedSubscribeWorkEventListener());
    }

    public IFeedJoint getSourceFeedJoint(FeedConnectionId feedConnectionId) {
        FeedConnectJobInfo feedConnectJobInfo = this.connectJobInfos.get(feedConnectionId);
        if (feedConnectJobInfo != null) {
            return feedConnectJobInfo.getSourceFeedJoint();
        }
        return null;
    }

    public Set<FeedConnectionId> getActiveFeedConnections() {
        HashSet hashSet = new HashSet();
        for (FeedConnectJobInfo feedConnectJobInfo : this.connectJobInfos.values()) {
            if (feedConnectJobInfo.getState().equals(FeedJobInfo.FeedJobState.ACTIVE)) {
                hashSet.add(feedConnectJobInfo.getConnectionId());
            }
        }
        return hashSet;
    }

    public boolean isFeedConnectionActive(FeedConnectionId feedConnectionId) {
        FeedConnectJobInfo feedConnectJobInfo = this.connectJobInfos.get(feedConnectionId);
        if (feedConnectJobInfo != null) {
            return feedConnectJobInfo.getState().equals(FeedJobInfo.FeedJobState.ACTIVE);
        }
        return false;
    }

    public void setJobState(FeedConnectionId feedConnectionId, FeedJobInfo.FeedJobState feedJobState) {
        this.connectJobInfos.get(feedConnectionId).setState(feedJobState);
    }

    public FeedJobInfo.FeedJobState getFeedJobState(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getState();
    }

    private void handleFeedIntakeJobFinishMessage(FeedIntakeInfo feedIntakeInfo, FeedLifecycleListener.Message message) throws Exception {
        IFeedLifecycleEventSubscriber.FeedLifecycleEvent feedLifecycleEvent = AsterixAppContextInfo.getInstance().getHcc().getJobInfo(message.jobId).getStatus().equals(JobStatus.FAILURE) ? IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_INTAKE_FAILURE : IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_ENDED;
        deregisterFeedIntakeJob(message.jobId);
        notifyFeedEventSubscribers(feedIntakeInfo, feedLifecycleEvent);
    }

    private void handleFeedCollectJobFinishMessage(FeedConnectJobInfo feedConnectJobInfo) throws Exception {
        FeedConnectionId connectionId = feedConnectJobInfo.getConnectionId();
        JobStatus status = AsterixAppContextInfo.getInstance().getHcc().getJobInfo(feedConnectJobInfo.getJobId()).getStatus();
        boolean z = status != null && status.equals(JobStatus.FAILURE);
        FeedPolicyAccessor feedPolicyAccessor = new FeedPolicyAccessor(feedConnectJobInfo.getFeedPolicy());
        boolean z2 = !z;
        if (!(feedConnectJobInfo.getState().equals(FeedJobInfo.FeedJobState.UNDER_RECOVERY) || (z && feedPolicyAccessor.continueOnHardwareFailure()))) {
            feedConnectJobInfo.getSourceFeedJoint().removeReceiver(connectionId);
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Subscription " + feedConnectJobInfo.getConnectionId() + " completed successfully. Removed subscription");
            }
            removeFeedJointsPostPipelineTermination(feedConnectJobInfo.getConnectionId());
        }
        if (z2) {
            this.connectJobInfos.remove(connectionId);
            this.jobInfos.remove(feedConnectJobInfo.getJobId());
            this.feedIntakeProgressTrackers.remove(feedConnectJobInfo.getConnectionId());
        }
        deregisterFeedActivity(feedConnectJobInfo);
        notifyFeedEventSubscribers(feedConnectJobInfo, z ? IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_COLLECT_FAILURE : IFeedLifecycleEventSubscriber.FeedLifecycleEvent.FEED_ENDED);
    }

    private void registerFeedActivity(FeedConnectJobInfo feedConnectJobInfo) {
        HashMap hashMap = new HashMap();
        if (feedConnectJobInfo.getCollectLocations() != null) {
            hashMap.put("intake-locations", StringUtils.join(feedConnectJobInfo.getCollectLocations().iterator(), ','));
        }
        if (feedConnectJobInfo.getComputeLocations() != null) {
            hashMap.put("compute-locations", StringUtils.join(feedConnectJobInfo.getComputeLocations().iterator(), ','));
        }
        if (feedConnectJobInfo.getStorageLocations() != null) {
            hashMap.put("storage-locations", StringUtils.join(feedConnectJobInfo.getStorageLocations().iterator(), ','));
        }
        hashMap.put("feed-policy-name", (String) feedConnectJobInfo.getFeedPolicy().get("policy"));
        hashMap.put("feed-connect-timestamp", new Date().toString());
        try {
            CentralFeedManager.getInstance().getFeedLoadManager().reportFeedActivity(feedConnectJobInfo.getConnectionId(), new FeedActivity(feedConnectJobInfo.getConnectionId().getFeedId().getDataverse(), feedConnectJobInfo.getConnectionId().getFeedId().getFeedName(), feedConnectJobInfo.getConnectionId().getDatasetName(), hashMap));
        } catch (Exception e) {
            e.printStackTrace();
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("Unable to register feed activity for " + feedConnectJobInfo + " " + e.getMessage());
            }
        }
    }

    public void deregisterFeedActivity(FeedConnectJobInfo feedConnectJobInfo) {
        try {
            CentralFeedManager.getInstance().getFeedLoadManager().removeFeedActivity(feedConnectJobInfo.getConnectionId());
        } catch (Exception e) {
            if (LOGGER.isLoggable(Level.WARNING)) {
                LOGGER.warning("Unable to deregister feed activity for " + feedConnectJobInfo + " " + e.getMessage());
            }
        }
    }

    public void removeFeedJointsPostPipelineTermination(FeedConnectionId feedConnectionId) {
        FeedConnectJobInfo feedConnectJobInfo = this.connectJobInfos.get(feedConnectionId);
        List<IFeedJoint> list = this.feedPipeline.get(feedConnectionId.getFeedId());
        IFeedJoint sourceFeedJoint = feedConnectJobInfo.getSourceFeedJoint();
        if (sourceFeedJoint.getReceivers().size() < 2) {
            list.remove(sourceFeedJoint);
        }
        IFeedJoint computeFeedJoint = feedConnectJobInfo.getComputeFeedJoint();
        if (computeFeedJoint == null || computeFeedJoint.getReceivers().size() >= 2) {
            return;
        }
        list.remove(computeFeedJoint);
    }

    public boolean isRegisteredFeedJob(JobId jobId) {
        return this.jobInfos.get(jobId) != null;
    }

    public List<String> getFeedComputeLocations(FeedId feedId) {
        for (IFeedJoint iFeedJoint : this.feedPipeline.get(feedId)) {
            if (iFeedJoint.getFeedJointKey().getFeedId().equals(feedId)) {
                return this.connectJobInfos.get(iFeedJoint.getProvider()).getComputeLocations();
            }
        }
        return null;
    }

    public List<String> getFeedStorageLocations(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getStorageLocations();
    }

    public List<String> getFeedCollectLocations(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getCollectLocations();
    }

    public List<String> getFeedIntakeLocations(FeedId feedId) {
        return this.intakeJobInfos.get(feedId).getIntakeLocation();
    }

    public JobId getFeedCollectJobId(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getJobId();
    }

    public void registerFeedEventSubscriber(FeedConnectionId feedConnectionId, IFeedLifecycleEventSubscriber iFeedLifecycleEventSubscriber) {
        List<IFeedLifecycleEventSubscriber> list = this.eventSubscribers.get(feedConnectionId);
        if (list == null) {
            list = new ArrayList();
            this.eventSubscribers.put(feedConnectionId, list);
        }
        list.add(iFeedLifecycleEventSubscriber);
    }

    public void deregisterFeedEventSubscriber(FeedConnectionId feedConnectionId, IFeedLifecycleEventSubscriber iFeedLifecycleEventSubscriber) {
        List<IFeedLifecycleEventSubscriber> list = this.eventSubscribers.get(feedConnectionId);
        if (list != null) {
            list.remove(iFeedLifecycleEventSubscriber);
        }
    }

    public boolean isFeedPointAvailable(FeedJointKey feedJointKey) {
        List<IFeedJoint> list = this.feedPipeline.get(feedJointKey.getFeedId());
        if (list == null || list.isEmpty()) {
            return false;
        }
        Iterator<IFeedJoint> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getFeedJointKey().equals(feedJointKey)) {
                return true;
            }
        }
        return false;
    }

    public Collection<IFeedJoint> getFeedIntakeJoints() {
        ArrayList arrayList = new ArrayList();
        Iterator<FeedIntakeInfo> it = this.intakeJobInfos.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getIntakeFeedJoint());
        }
        return arrayList;
    }

    public IFeedJoint getFeedJoint(FeedJointKey feedJointKey) {
        List<IFeedJoint> list = this.feedPipeline.get(feedJointKey.getFeedId());
        if (list == null || list.isEmpty()) {
            return null;
        }
        for (IFeedJoint iFeedJoint : list) {
            if (iFeedJoint.getFeedJointKey().equals(feedJointKey)) {
                return iFeedJoint;
            }
        }
        return null;
    }

    public IFeedJoint getAvailableFeedJoint(FeedJointKey feedJointKey) {
        IFeedJoint feedJoint = getFeedJoint(feedJointKey);
        if (feedJoint != null) {
            return feedJoint;
        }
        String stringRep = feedJointKey.getStringRep();
        List<IFeedJoint> list = this.feedPipeline.get(feedJointKey.getFeedId());
        IFeedJoint iFeedJoint = null;
        if (list != null) {
            for (IFeedJoint iFeedJoint2 : list) {
                if (stringRep.contains(iFeedJoint2.getFeedJointKey().getStringRep())) {
                    if (iFeedJoint == null) {
                        iFeedJoint = iFeedJoint2;
                    } else if (iFeedJoint2.getFeedJointKey().getStringRep().contains(iFeedJoint.getFeedJointKey().getStringRep())) {
                        iFeedJoint = iFeedJoint2;
                    }
                }
            }
        }
        return iFeedJoint;
    }

    public JobSpecification getCollectJobSpecification(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId).getSpec();
    }

    public IFeedJoint getFeedPoint(FeedId feedId, IFeedJoint.FeedJointType feedJointType) {
        for (IFeedJoint iFeedJoint : this.feedPipeline.get(feedId)) {
            if (iFeedJoint.getType().equals(feedJointType)) {
                return iFeedJoint;
            }
        }
        return null;
    }

    public FeedConnectJobInfo getFeedConnectJobInfo(FeedConnectionId feedConnectionId) {
        return this.connectJobInfos.get(feedConnectionId);
    }

    private void setLocations(FeedConnectJobInfo feedConnectJobInfo) {
        JobSpecification spec = feedConnectJobInfo.getSpec();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (Map.Entry entry : spec.getOperatorMap().entrySet()) {
            IOperatorDescriptor iOperatorDescriptor = (IOperatorDescriptor) entry.getValue();
            IOperatorDescriptor coreOperator = iOperatorDescriptor instanceof FeedMetaOperatorDescriptor ? ((FeedMetaOperatorDescriptor) iOperatorDescriptor).getCoreOperator() : iOperatorDescriptor;
            if (coreOperator instanceof AlgebricksMetaOperatorDescriptor) {
                AlgebricksMetaOperatorDescriptor algebricksMetaOperatorDescriptor = (AlgebricksMetaOperatorDescriptor) coreOperator;
                IPushRuntimeFactory[] runtimeFactories = algebricksMetaOperatorDescriptor.getPipeline().getRuntimeFactories();
                boolean z = false;
                int length = runtimeFactories.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    if (runtimeFactories[i] instanceof AssignRuntimeFactory) {
                        if (((IOperatorDescriptor) ((org.apache.commons.lang3.tuple.Pair) ((org.apache.commons.lang3.tuple.Pair) spec.getConnectorOperatorMap().get(((IConnectorDescriptor) ((List) spec.getOperatorInputMap().get(algebricksMetaOperatorDescriptor.getOperatorId())).get(0)).getConnectorId())).getLeft()).getLeft()) instanceof FeedCollectOperatorDescriptor) {
                            z = true;
                            break;
                        }
                    }
                    i++;
                }
                if (z) {
                    arrayList2.add(entry.getKey());
                }
            } else if (coreOperator instanceof LSMTreeIndexInsertUpdateDeleteOperatorDescriptor) {
                arrayList3.add(entry.getKey());
            } else if (coreOperator instanceof FeedCollectOperatorDescriptor) {
                arrayList.add(entry.getKey());
            }
        }
        try {
            JobInfo jobInfo = AsterixAppContextInfo.getInstance().getHcc().getJobInfo(feedConnectJobInfo.getJobId());
            ArrayList arrayList4 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                Map map = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it.next());
                int size = map.size();
                for (int i2 = 0; i2 < size; i2++) {
                    arrayList4.add(map.get(Integer.valueOf(i2)));
                }
            }
            ArrayList arrayList5 = new ArrayList();
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                Map map2 = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it2.next());
                if (map2 != null) {
                    int size2 = map2.size();
                    for (int i3 = 0; i3 < size2; i3++) {
                        arrayList5.add(map2.get(Integer.valueOf(i3)));
                    }
                } else {
                    arrayList5.clear();
                    arrayList5.addAll(arrayList4);
                }
            }
            ArrayList arrayList6 = new ArrayList();
            Iterator it3 = arrayList3.iterator();
            while (it3.hasNext()) {
                Map map3 = (Map) jobInfo.getOperatorLocations().get((OperatorDescriptorId) it3.next());
                if (map3 != null) {
                    int size3 = map3.size();
                    for (int i4 = 0; i4 < size3; i4++) {
                        arrayList6.add(map3.get(Integer.valueOf(i4)));
                    }
                }
            }
            feedConnectJobInfo.setCollectLocations(arrayList4);
            feedConnectJobInfo.setComputeLocations(arrayList5);
            feedConnectJobInfo.setStorageLocations(arrayList6);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
