package won.matcher.service.nodemanager.actor;

import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.IteratorUtils;
import org.apache.jena.query.Dataset;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.duration.Duration;
import won.cryptography.service.RegistrationClient;
import won.cryptography.ssl.MessagingContext;
import won.matcher.service.common.event.AtomHintEvent;
import won.matcher.service.common.event.BulkHintEvent;
import won.matcher.service.common.event.HintEvent;
import won.matcher.service.common.event.SocketHintEvent;
import won.matcher.service.common.event.WonNodeEvent;
import won.matcher.service.common.spring.SpringExtension;
import won.matcher.service.crawler.actor.MasterCrawlerActor;
import won.matcher.service.nodemanager.config.ActiveMqWonNodeConnectionFactory;
import won.matcher.service.nodemanager.config.WonNodeControllerConfig;
import won.matcher.service.nodemanager.pojo.WonNodeConnection;
import won.matcher.service.nodemanager.service.HintDBService;
import won.matcher.service.nodemanager.service.WonNodeSparqlService;
import won.protocol.service.WonNodeInfo;
import won.protocol.util.linkeddata.LinkedDataSource;
import won.protocol.util.linkeddata.WonLinkedDataUtils;

@Scope("prototype")
@Component
/* loaded from: input_file:won/matcher/service/nodemanager/actor/WonNodeControllerActor.class */
public class WonNodeControllerActor extends UntypedActor {
    private ActorRef pubSubMediator;
    private ActorRef crawler;
    private ActorRef saveAtomActor;
    private static final String LIFE_CHECK_TICK = "life_check_tick";

    @Autowired
    private WonNodeSparqlService sparqlService;

    @Autowired
    private WonNodeControllerConfig config;

    @Autowired
    private RegistrationClient registrationClient;

    @Autowired
    LinkedDataSource linkedDataSource;

    @Autowired
    private MessagingContext messagingContext;

    @Autowired
    private HintDBService hintDatabase;
    private LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    private Map<String, WonNodeConnection> crawlWonNodes = new HashMap();
    private Set<String> skipWonNodeUris = new HashSet();
    private Set<String> failedWonNodeUris = new HashSet();

    /* JADX WARN: Multi-variable type inference failed */
    public void preStart() {
        getContext().system().scheduler().schedule(this.config.getLifeCheckDuration(), this.config.getLifeCheckDuration(), getSelf(), LIFE_CHECK_TICK, getContext().dispatcher(), (ActorRef) null);
        this.pubSubMediator = DistributedPubSub.get(getContext().system()).mediator();
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(WonNodeEvent.class.getName(), getSelf()), getSelf());
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(HintEvent.class.getName(), getSelf()), getSelf());
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(BulkHintEvent.class.getName(), getSelf()), getSelf());
        this.skipWonNodeUris.addAll(this.config.getSkipWonNodes());
        Set<WonNodeInfo> hashSet = new HashSet();
        try {
            hashSet = this.sparqlService.retrieveAllWonNodeInfo();
        } catch (Exception e) {
            this.log.error("Error querying SPARQL endpoint {}. SPARQL endpoint must be running at matcher service startup!", this.sparqlService.getSparqlEndpoint());
            this.log.error("Exception was: {}", e);
            this.log.info("Shut down matcher service!");
            System.exit(-1);
        }
        for (WonNodeInfo wonNodeInfo : hashSet) {
            if (!this.config.getCrawlWonNodes().contains(wonNodeInfo.getWonNodeURI())) {
                WonNodeEvent wonNodeEvent = new WonNodeEvent(wonNodeInfo.getWonNodeURI(), WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
                this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent.getClass().getName(), wonNodeEvent), getSelf());
            }
        }
        for (String str : this.config.getCrawlWonNodes()) {
            if (!this.skipWonNodeUris.contains(str) && !this.crawlWonNodes.containsKey(str)) {
                WonNodeEvent wonNodeEvent2 = new WonNodeEvent(str, WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
                this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent2.getClass().getName(), wonNodeEvent2), getSelf());
            }
        }
        this.crawler = getContext().actorOf(((SpringExtension.SpringExt) SpringExtension.SpringExtProvider.get(getContext().system())).props(MasterCrawlerActor.class), "MasterCrawlerActor");
        this.saveAtomActor = getContext().actorOf(((SpringExtension.SpringExt) SpringExtension.SpringExtProvider.get(getContext().system())).props(SaveAtomEventActor.class), "SaveAtomEventActor");
    }

    public void onReceive(Object obj) {
        if (obj instanceof Terminated) {
            handleConnectionErrors((Terminated) obj);
            return;
        }
        if (obj.equals(LIFE_CHECK_TICK)) {
            lifeCheck();
            return;
        }
        if (obj instanceof WonNodeEvent) {
            WonNodeEvent wonNodeEvent = (WonNodeEvent) obj;
            if (wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED) || wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING) || wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE)) {
                if (this.crawlWonNodes.containsKey(wonNodeEvent.getWonNodeUri())) {
                    this.log.debug("Won node uri '{}' already discovered", wonNodeEvent.getWonNodeUri());
                    if (wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING)) {
                        WonNodeEvent wonNodeEvent2 = new WonNodeEvent(wonNodeEvent.getWonNodeUri(), WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE, this.crawlWonNodes.get(wonNodeEvent.getWonNodeUri()).getWonNodeInfo());
                        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent2.getClass().getName(), wonNodeEvent2), getSelf());
                        return;
                    }
                    return;
                }
                if (this.skipWonNodeUris.contains(wonNodeEvent.getWonNodeUri())) {
                    this.log.debug("Skip crawling won node with uri '{}'", wonNodeEvent.getWonNodeUri());
                    WonNodeEvent wonNodeEvent3 = new WonNodeEvent(wonNodeEvent.getWonNodeUri(), WonNodeEvent.STATUS.SKIP_WON_NODE);
                    this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent3.getClass().getName(), wonNodeEvent3), getSelf());
                    return;
                } else {
                    if (this.failedWonNodeUris.contains(wonNodeEvent.getWonNodeUri())) {
                        this.log.debug("Suppress connection to already failed won node with uri {} , will try to connect later ...", wonNodeEvent.getWonNodeUri());
                        return;
                    }
                    WonNodeConnection addWonNodeForCrawling = addWonNodeForCrawling(wonNodeEvent.getWonNodeUri(), wonNodeEvent.getStatus().equals(WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE));
                    if (this.failedWonNodeUris.contains(wonNodeEvent.getWonNodeUri())) {
                        this.log.debug("Still could not connect to won node with uri: {}, will retry later ...", wonNodeEvent.getWonNodeUri());
                        return;
                    } else if (addWonNodeForCrawling == null || addWonNodeForCrawling.getWonNodeInfo() == null) {
                        this.log.error("Cannot retrieve won node info from won node connection!");
                        return;
                    } else {
                        WonNodeEvent wonNodeEvent4 = new WonNodeEvent(wonNodeEvent.getWonNodeUri(), WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE, addWonNodeForCrawling.getWonNodeInfo());
                        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent4.getClass().getName(), wonNodeEvent4), getSelf());
                        return;
                    }
                }
            }
        }
        if (obj instanceof HintEvent) {
            processHint((HintEvent) obj);
        } else {
            if (!(obj instanceof BulkHintEvent)) {
                unhandled(obj);
                return;
            }
            Iterator<HintEvent> it = ((BulkHintEvent) obj).getHintEvents().iterator();
            while (it.hasNext()) {
                expandToSocketHintsIfAppropriate(it.next()).forEach(hintEvent -> {
                    processHint(hintEvent);
                });
            }
        }
    }

    private void processHint(HintEvent hintEvent) {
        if (this.hintDatabase.mightHintSaved(hintEvent)) {
            this.log.debug("Hint " + hintEvent + " is filtered out by duplicate filter!");
            this.hintDatabase.saveHint(hintEvent);
        } else {
            this.hintDatabase.saveHint(hintEvent);
            sendHint(hintEvent);
        }
    }

    private void sendHint(HintEvent hintEvent) {
        if (!this.crawlWonNodes.containsKey(hintEvent.getRecipientWonNodeUri())) {
            this.log.warning("cannot send hint to won node {}! Is registered with the won node controller?", hintEvent.getRecipientWonNodeUri());
            return;
        }
        WonNodeConnection wonNodeConnection = this.crawlWonNodes.get(hintEvent.getRecipientWonNodeUri());
        this.log.info("Send hint {} to won node {}", hintEvent, hintEvent.getRecipientWonNodeUri());
        wonNodeConnection.getHintProducer().tell(hintEvent, getSelf());
    }

    private Collection<HintEvent> expandToSocketHintsIfAppropriate(HintEvent hintEvent) {
        if (hintEvent instanceof AtomHintEvent) {
            AtomHintEvent atomHintEvent = (AtomHintEvent) hintEvent;
            Set compatibleSocketsForAtoms = WonLinkedDataUtils.getCompatibleSocketsForAtoms(this.linkedDataSource, URI.create(atomHintEvent.getRecipientAtomUri()), URI.create(atomHintEvent.getTargetAtomUri()));
            if (!compatibleSocketsForAtoms.isEmpty()) {
                return (Collection) compatibleSocketsForAtoms.stream().map(pair -> {
                    return new SocketHintEvent(((URI) pair.getFirst()).toString(), atomHintEvent.getRecipientWonNodeUri(), ((URI) pair.getSecond()).toString(), atomHintEvent.getTargetWonNodeUri(), atomHintEvent.getMatcherUri(), atomHintEvent.getScore(), atomHintEvent.getCause());
                }).collect(Collectors.toList());
            }
        }
        return Collections.singletonList(hintEvent);
    }

    private WonNodeConnection addWonNodeForCrawling(String str, boolean z) {
        WonNodeConnection wonNodeConnection = null;
        try {
            this.registrationClient.register(str);
            Dataset dataForResource = this.linkedDataSource.getDataForResource(URI.create(str));
            try {
                this.sparqlService.updateNamedGraphsOfDataset(dataForResource);
                WonNodeInfo wonNodeInfoFromDataset = this.sparqlService.getWonNodeInfoFromDataset(dataForResource);
                try {
                    wonNodeConnection = subscribeAtomUpdates(wonNodeInfoFromDataset);
                    this.crawlWonNodes.put(wonNodeInfoFromDataset.getWonNodeURI(), wonNodeConnection);
                    this.failedWonNodeUris.remove(wonNodeInfoFromDataset.getWonNodeURI());
                    this.log.info("registered won node {} and start crawling it", wonNodeInfoFromDataset.getWonNodeURI());
                } catch (Exception e) {
                    addFailedWonNode(str, wonNodeConnection);
                    this.log.error("Error subscribing for atom updates at won node {}", str);
                    this.log.error("Exception message: {} \nCause: {} ", e.getMessage(), e.getCause());
                }
                return wonNodeConnection;
            } catch (Exception e2) {
                addFailedWonNode(str, null);
                this.log.error("Error saving won node information from {} into RDF store with SPARQL endpoint {}", str, this.sparqlService.getSparqlEndpoint());
                this.log.error("Exception message: {} \nCause: {} ", e2.getMessage(), e2.getCause());
                return null;
            }
        } catch (Exception e3) {
            addFailedWonNode(str, null);
            if (z) {
                this.log.warning("Error requesting won node information from {}", str);
                this.log.warning("Exception message: {} \nCause: {} ", e3.getMessage(), e3.getCause());
                return null;
            }
            this.log.debug("Error requesting won node information from {}", str);
            this.log.debug("Exception message: {} \nCause: {} ", e3.getMessage(), e3.getCause());
            return null;
        }
    }

    private void lifeCheck() {
        List list = IteratorUtils.toList(this.failedWonNodeUris.iterator());
        this.log.debug("retry to connect to all failed won nodes again: {}", list);
        this.failedWonNodeUris.clear();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            WonNodeEvent wonNodeEvent = new WonNodeEvent((String) it.next(), WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE);
            this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(wonNodeEvent.getClass().getName(), wonNodeEvent), getSelf());
        }
    }

    private void addFailedWonNode(String str, WonNodeConnection wonNodeConnection) {
        if (wonNodeConnection != null) {
            getContext().stop(wonNodeConnection.getAtomCreatedConsumer());
            getContext().stop(wonNodeConnection.getAtomActivatedConsumer());
            getContext().stop(wonNodeConnection.getAtomDeactivatedConsumer());
        }
        this.crawlWonNodes.remove(str);
        this.failedWonNodeUris.add(str);
    }

    private WonNodeConnection subscribeAtomUpdates(WonNodeInfo wonNodeInfo) {
        return ActiveMqWonNodeConnectionFactory.createWonNodeConnection(getContext(), wonNodeInfo, this.messagingContext);
    }

    private void handleConnectionErrors(Terminated terminated) {
        for (String str : this.crawlWonNodes.keySet()) {
            WonNodeConnection wonNodeConnection = this.crawlWonNodes.get(str);
            if (wonNodeConnection != null) {
                if (wonNodeConnection.getAtomCreatedConsumer().equals(terminated.getActor())) {
                    this.log.error("AtomCreatedConsumer '{}' of won '{}' has been shut down", terminated.getActor(), str);
                    addFailedWonNode(wonNodeConnection.getWonNodeInfo().getWonNodeURI(), wonNodeConnection);
                } else if (wonNodeConnection.getAtomActivatedConsumer().equals(terminated.getActor())) {
                    this.log.error("AtomActivatedConsumer '{}' of won '{}' has been shut down", terminated.getActor(), str);
                    addFailedWonNode(wonNodeConnection.getWonNodeInfo().getWonNodeURI(), wonNodeConnection);
                } else if (wonNodeConnection.getAtomDeactivatedConsumer().equals(terminated.getActor())) {
                    this.log.error("AtomDeactivatedConsumer '{}' of won '{}' has been shut down", terminated.getActor(), str);
                    addFailedWonNode(wonNodeConnection.getWonNodeInfo().getWonNodeURI(), wonNodeConnection);
                } else if (wonNodeConnection.getHintProducer().equals(terminated.getActor())) {
                    this.log.error("HintProducer '{}' of won '{}' has been shut down", terminated.getActor(), str);
                    addFailedWonNode(wonNodeConnection.getWonNodeInfo().getWonNodeURI(), wonNodeConnection);
                }
            }
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(0, Duration.Zero(), new Function<Throwable, SupervisorStrategy.Directive>() { // from class: won.matcher.service.nodemanager.actor.WonNodeControllerActor.1
            public SupervisorStrategy.Directive apply(Throwable th) throws Exception {
                WonNodeControllerActor.this.log.warning("Actor encountered error: {}", th);
                return SupervisorStrategy.escalate();
            }
        });
    }
}
