package won.node.service.persistence;

import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.text.MessageFormat;
import java.util.Objects;
import java.util.Optional;
import javax.persistence.EntityManager;
import org.apache.jena.atlas.lib.Chars;
import org.apache.jena.graph.TripleBoundary;
import org.apache.jena.query.Dataset;
import org.apache.jena.query.DatasetFactory;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelExtract;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.rdf.model.StatementTripleBoundary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import won.protocol.exception.ConnectionAlreadyExistsException;
import won.protocol.exception.IllegalMessageForAtomStateException;
import won.protocol.exception.IllegalMessageForConnectionStateException;
import won.protocol.exception.IncompatibleSocketsException;
import won.protocol.exception.MissingMessagePropertyException;
import won.protocol.exception.NoSuchAtomException;
import won.protocol.exception.NoSuchConnectionException;
import won.protocol.exception.SocketCapacityException;
import won.protocol.exception.WrongAddressingInformationException;
import won.protocol.message.WonMessage;
import won.protocol.message.WonMessageDirection;
import won.protocol.message.WonMessageType;
import won.protocol.model.Atom;
import won.protocol.model.AtomState;
import won.protocol.model.Connection;
import won.protocol.model.ConnectionEventType;
import won.protocol.model.ConnectionMessageContainer;
import won.protocol.model.ConnectionState;
import won.protocol.model.DatasetHolder;
import won.protocol.model.Socket;
import won.protocol.repository.AtomRepository;
import won.protocol.repository.ConnectionContainerRepository;
import won.protocol.repository.ConnectionMessageContainerRepository;
import won.protocol.repository.ConnectionRepository;
import won.protocol.repository.DatasetHolderRepository;
import won.protocol.repository.MessageEventRepository;
import won.protocol.repository.SocketRepository;
import won.protocol.service.WonNodeInformationService;
import won.protocol.util.DataAccessUtils;
import won.protocol.util.RdfUtils;
import won.protocol.vocabulary.WONCON;
import won.protocol.vocabulary.WONMSG;

@Component
/* loaded from: input_file:WEB-INF/lib/won-node-0.8.jar:won/node/service/persistence/ConnectionService.class */
public class ConnectionService {
    private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Autowired
    ConnectionRepository connectionRepository;

    @Autowired
    SocketService socketService;

    @Autowired
    WonNodeInformationService wonNodeInformationService;

    @Autowired
    AtomRepository atomRepository;

    @Autowired
    SocketRepository socketRepository;

    @Autowired
    ConnectionContainerRepository connectionContainerRepository;

    @Autowired
    ConnectionMessageContainerRepository connectionMessageContainerRepository;

    @Autowired
    DatasetHolderRepository datasetHolderRepository;

    @Autowired
    MessageEventRepository messageEventRepository;

    @Autowired
    EntityManager entityManager;

    public Optional<Connection> getConnection(URI uri) {
        return this.connectionRepository.findOneByConnectionURI(uri);
    }

    public Optional<Connection> getConnection(URI uri, URI uri2) {
        return this.connectionRepository.findOneBySocketURIAndTargetSocketURI(uri, uri2);
    }

    public Connection getConnectionRequired(URI uri, URI uri2) {
        return this.connectionRepository.findOneBySocketURIAndTargetSocketURI(uri, uri2).orElseThrow(() -> {
            return new NoSuchConnectionException(uri, uri2);
        });
    }

    public Connection getConnectionRequired(URI uri) {
        return getConnection(uri).orElseThrow(() -> {
            return new NoSuchConnectionException(uri);
        });
    }

    public boolean existOpenConnections(URI uri, URI uri2) {
        return this.connectionRepository.existsWithtomURIAndSocketURIAndState(uri, uri2, ConnectionState.CONNECTED);
    }

    public Connection connectFromOwner(WonMessage wonMessage) throws NoSuchConnectionException, NoSuchAtomException, IllegalMessageForAtomStateException, ConnectionAlreadyExistsException, SocketCapacityException, IncompatibleSocketsException {
        wonMessage.getMessageType().requireType(WonMessageType.CONNECT);
        URI senderAtomURIRequired = wonMessage.getSenderAtomURIRequired();
        URI senderNodeURIRequired = wonMessage.getSenderNodeURIRequired();
        URI recipientAtomURIRequired = wonMessage.getRecipientAtomURIRequired();
        URI senderSocketURIRequired = wonMessage.getSenderSocketURIRequired();
        URI recipientSocketURIRequired = wonMessage.getRecipientSocketURIRequired();
        failIfIsNotSocketOfAtom(Optional.of(senderSocketURIRequired), Optional.of(senderAtomURIRequired));
        failIfIsNotSocketOfAtom(Optional.of(recipientSocketURIRequired), Optional.of(recipientAtomURIRequired));
        this.logger.debug("connect from owner: processing message {}", wonMessage.getMessageURI());
        return connectFromOwner(senderAtomURIRequired, senderSocketURIRequired, senderNodeURIRequired, recipientAtomURIRequired, recipientSocketURIRequired);
    }

    public Connection connectFromOwner(URI uri, URI uri2, URI uri3, URI uri4, URI uri5) throws NoSuchConnectionException, NoSuchAtomException, IllegalMessageForAtomStateException, ConnectionAlreadyExistsException, SocketCapacityException, IncompatibleSocketsException {
        Objects.requireNonNull(uri);
        Objects.requireNonNull(uri3);
        Objects.requireNonNull(uri4);
        Objects.requireNonNull(uri2);
        Objects.requireNonNull(uri5);
        Socket socket = this.socketService.getSocket(uri, Optional.of(uri2));
        Optional of = Optional.of(socket.getSocketURI());
        Optional of2 = Optional.of(uri5);
        failIfIsNotSocketOfAtom(Optional.of(uri5), Optional.of(uri4));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("connect from owner: loading connection {} - {}", of.get(), of2.get());
        }
        Optional<Connection> findOneBySocketURIAndTargetSocketURIForUpdate = this.connectionRepository.findOneBySocketURIAndTargetSocketURIForUpdate((URI) of.get(), (URI) of2.get());
        if (findOneBySocketURIAndTargetSocketURIForUpdate.isPresent()) {
            this.entityManager.refresh(findOneBySocketURIAndTargetSocketURIForUpdate.get());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("connect from owner: found existing connection {} in state {}", findOneBySocketURIAndTargetSocketURIForUpdate.get().getConnectionURI(), findOneBySocketURIAndTargetSocketURIForUpdate.get().getState());
            }
        } else {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("connect from owner: did not find an existing connection");
            }
            findOneBySocketURIAndTargetSocketURIForUpdate = Optional.of(createConnection(uri, uri4, socket.getSocketURI(), socket.getTypeURI(), (URI) of2.get(), Optional.empty(), ConnectionState.REQUEST_SENT, ConnectionEventType.OWNER_CONNECT));
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("connect from owner: created new connection {}", findOneBySocketURIAndTargetSocketURIForUpdate.get().getConnectionURI());
            }
        }
        failForExceededCapacity(findOneBySocketURIAndTargetSocketURIForUpdate.get().getSocketURI());
        failForIncompatibleSockets(findOneBySocketURIAndTargetSocketURIForUpdate.get().getSocketURI(), findOneBySocketURIAndTargetSocketURIForUpdate.get().getTargetSocketURI());
        findOneBySocketURIAndTargetSocketURIForUpdate.get().changeStateTo(findOneBySocketURIAndTargetSocketURIForUpdate.get().getState().transit(ConnectionEventType.OWNER_CONNECT));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("connect from owner: set connection {} state to: {}", findOneBySocketURIAndTargetSocketURIForUpdate.get().getConnectionURI(), findOneBySocketURIAndTargetSocketURIForUpdate.get().getState());
        }
        return (Connection) this.connectionRepository.save((ConnectionRepository) findOneBySocketURIAndTargetSocketURIForUpdate.get());
    }

    public Connection connectFromNode(WonMessage wonMessage) {
        wonMessage.getMessageTypeRequired().requireType(WonMessageType.CONNECT);
        URI recipientAtomURIRequired = wonMessage.getRecipientAtomURIRequired();
        URI recipientSocketURIRequired = wonMessage.getRecipientSocketURIRequired();
        URI recipientNodeURIRequired = wonMessage.getRecipientNodeURIRequired();
        URI senderAtomURIRequired = wonMessage.getSenderAtomURIRequired();
        URI senderSocketURIRequired = wonMessage.getSenderSocketURIRequired();
        this.logger.debug("connect from node: processing message {}", wonMessage.getMessageURI());
        return connectFromNode(recipientAtomURIRequired, recipientSocketURIRequired, recipientNodeURIRequired, senderAtomURIRequired, senderSocketURIRequired);
    }

    private Connection connectFromNode(URI uri, URI uri2, URI uri3, URI uri4, URI uri5) {
        Objects.requireNonNull(uri);
        Objects.requireNonNull(uri2);
        Objects.requireNonNull(uri3);
        Objects.requireNonNull(uri4);
        Objects.requireNonNull(uri5);
        failIfIsNotSocketOfAtom(Optional.of(uri2), Optional.of(uri));
        Socket socket = this.socketService.getSocket(uri, Optional.of(uri2));
        failIfIsNotSocketOfAtom(Optional.of(uri5), Optional.of(uri4));
        Connection connection = null;
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("connect from node: loading connection {} - {}", socket.getSocketURI(), uri5);
        }
        Optional<Connection> findOneBySocketURIAndTargetSocketURIForUpdate = this.connectionRepository.findOneBySocketURIAndTargetSocketURIForUpdate(socket.getSocketURI(), uri5);
        if (findOneBySocketURIAndTargetSocketURIForUpdate.isPresent()) {
            this.entityManager.refresh(findOneBySocketURIAndTargetSocketURIForUpdate.get());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("connect from node: found existing connection {} in state {}", findOneBySocketURIAndTargetSocketURIForUpdate.get().getConnectionURI(), findOneBySocketURIAndTargetSocketURIForUpdate.get().getState());
            }
            connection = findOneBySocketURIAndTargetSocketURIForUpdate.get();
        } else if (this.logger.isDebugEnabled()) {
            this.logger.debug("connect from node: did not find an existing connection");
        }
        failForExceededCapacity(socket.getSocketURI());
        failForIncompatibleSockets(socket.getSocketURI(), uri5);
        if (connection == null) {
            connection = createConnection(uri, uri4, socket.getSocketURI(), socket.getTypeURI(), uri5, Optional.empty(), ConnectionState.REQUEST_RECEIVED, ConnectionEventType.PARTNER_CONNECT);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("connect from node: created new connection {}", connection.getConnectionURI());
            }
        }
        connection.changeStateTo(connection.getState().transit(ConnectionEventType.PARTNER_CONNECT));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("connect from node: set connection {} state to: {}", connection.getConnectionURI(), connection.getState());
        }
        return (Connection) this.connectionRepository.save((ConnectionRepository) connection);
    }

    public Connection socketHint(WonMessage wonMessage) {
        URI recipientAtomURIRequired = wonMessage.getRecipientAtomURIRequired();
        wonMessage.getRecipientNodeURIRequired();
        URI hintTargetSocketURIRequired = wonMessage.getHintTargetSocketURIRequired();
        URI recipientSocketURIRequired = wonMessage.getRecipientSocketURIRequired();
        Double hintScore = wonMessage.getHintScore();
        if (hintTargetSocketURIRequired == null) {
            throw new MissingMessagePropertyException(WONMSG.hintTargetSocket);
        }
        if (wonMessage.getHintTargetAtomURI() != null) {
            throw new WrongAddressingInformationException("A SocketHintMessage must not have a msg:hintTargetAtom property", wonMessage.getMessageURI(), WONMSG.hintTargetAtom);
        }
        if (hintScore.doubleValue() < 0.0d || hintScore.doubleValue() > 1.0d) {
            throw new WrongAddressingInformationException("score is not in [0,1]", wonMessage.getMessageURI(), WONMSG.hintScore);
        }
        if (recipientSocketURIRequired == null) {
            throw new MissingMessagePropertyException(WONMSG.recipientSocket);
        }
        if (!this.socketService.isCompatible(recipientSocketURIRequired, hintTargetSocketURIRequired)) {
            throw new IncompatibleSocketsException(recipientSocketURIRequired, hintTargetSocketURIRequired);
        }
        Socket socket = this.socketService.getSocket(recipientAtomURIRequired, Optional.ofNullable(recipientSocketURIRequired));
        URI atomOfSocketRequired = this.socketService.getAtomOfSocketRequired(hintTargetSocketURIRequired);
        Optional<Connection> findOneBySocketURIAndTargetSocketURIForUpdate = this.connectionRepository.findOneBySocketURIAndTargetSocketURIForUpdate(socket.getSocketURI(), hintTargetSocketURIRequired);
        if (findOneBySocketURIAndTargetSocketURIForUpdate.isPresent()) {
            this.entityManager.refresh(findOneBySocketURIAndTargetSocketURIForUpdate.get());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("socket hint: connection {} - {} already exists", findOneBySocketURIAndTargetSocketURIForUpdate.get().getSocketURI(), findOneBySocketURIAndTargetSocketURIForUpdate.get().getTargetSocketURI());
            }
            return findOneBySocketURIAndTargetSocketURIForUpdate.get();
        }
        Connection createConnection = createConnection(recipientAtomURIRequired, atomOfSocketRequired, recipientSocketURIRequired, socket.getTypeURI(), hintTargetSocketURIRequired, Optional.empty(), ConnectionState.SUGGESTED, ConnectionEventType.MATCHER_HINT);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("socket hint: created connection {}", createConnection.getConnectionURI());
        }
        return createConnection;
    }

    public Connection createConnection(URI uri, URI uri2, URI uri3, URI uri4, URI uri5, Optional<URI> optional, ConnectionState connectionState, ConnectionEventType connectionEventType) throws NoSuchAtomException, IllegalMessageForAtomStateException, ConnectionAlreadyExistsException {
        if (uri == null) {
            throw new IllegalArgumentException("atomURI is not set");
        }
        if (uri2 == null) {
            throw new IllegalArgumentException("otherAtomURI is not set");
        }
        if (uri.equals(uri2)) {
            throw new IllegalArgumentException("atomURI and otherAtomURI are the same");
        }
        if (uri3 == null) {
            throw new IllegalArgumentException("socketURI is not set");
        }
        if (uri4 == null) {
            throw new IllegalArgumentException("socketTypeURI is not set");
        }
        Optional<Atom> findOneByAtomURIForUpdate = this.atomRepository.findOneByAtomURIForUpdate(uri);
        if (findOneByAtomURIForUpdate.isPresent()) {
            this.entityManager.refresh(findOneByAtomURIForUpdate.get());
        }
        if (findOneByAtomURIForUpdate.get().getState() != AtomState.ACTIVE) {
            throw new IllegalMessageForAtomStateException(uri, connectionEventType.name(), findOneByAtomURIForUpdate.get().getState());
        }
        if (this.socketRepository.findByAtomURIAndTypeURI(uri, uri4).isEmpty()) {
            throw new RuntimeException("Socket '" + uri4 + "' is not supported by Atom: '" + uri + Chars.S_QUOTE1);
        }
        URI generateConnectionURI = this.wonNodeInformationService.generateConnectionURI(uri);
        Connection connection = new Connection();
        connection.setConnectionURI(generateConnectionURI);
        if (optional.isPresent()) {
            connection.setTargetConnectionURI(optional.get());
        }
        connection.setAtomURI(uri);
        connection.setState(connectionState);
        connection.setTargetAtomURI(uri2);
        connection.setTypeURI(uri4);
        connection.setSocketURI(uri3);
        if (uri5 != null) {
            connection.setTargetSocketURI(uri5);
        }
        ConnectionMessageContainer connectionMessageContainer = new ConnectionMessageContainer(connection, generateConnectionURI);
        try {
            connection = (Connection) this.connectionRepository.save((ConnectionRepository) connection);
            this.connectionMessageContainerRepository.save((ConnectionMessageContainerRepository) connectionMessageContainer);
            return connection;
        } catch (Exception e) {
            this.logger.warn("caught exception, assuming unique key constraint on atomURI, targetAtomURI, typeURI was violated. Throwing a ConnectionAlreadyExistsException. TODO: think about handling this exception separately", (Throwable) e);
            throw new ConnectionAlreadyExistsException(connection.getConnectionURI(), uri, uri2);
        }
    }

    public boolean shouldSendAutoOpenForConnect(WonMessage wonMessage) {
        return this.socketService.isAutoOpen(wonMessage.getRecipientSocketURIRequired());
    }

    public void grabRemoteConnectionURIFromRemoteResponse(WonMessage wonMessage) {
        wonMessage.getMessageType().requireType(WonMessageType.SUCCESS_RESPONSE);
        wonMessage.getRespondingToMessageType();
        URI connectionURIRequired = wonMessage.getConnectionURIRequired();
        Optional<Connection> findOneBySocketURIAndTargetSocketURI = this.connectionRepository.findOneBySocketURIAndTargetSocketURI(wonMessage.getRecipientSocketURIRequired(), wonMessage.getSenderSocketURIRequired());
        if (!findOneBySocketURIAndTargetSocketURI.isPresent() || connectionURIRequired == null) {
            return;
        }
        findOneBySocketURIAndTargetSocketURI.get().setTargetConnectionURI(connectionURIRequired);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Grabbed targetURI {} for connectionURI {} from success response {}", new Object[]{connectionURIRequired, findOneBySocketURIAndTargetSocketURI.get().getConnectionURI(), wonMessage.getMessageURI()});
        }
        this.connectionRepository.save((ConnectionRepository) findOneBySocketURIAndTargetSocketURI.get());
    }

    public Connection closeFromOwner(WonMessage wonMessage) {
        wonMessage.getMessageType().requireType(WonMessageType.CLOSE);
        return close(wonMessage.getSenderSocketURIRequired(), wonMessage.getRecipientSocketURIRequired());
    }

    public Connection closeFromSystem(WonMessage wonMessage) {
        wonMessage.getMessageType().requireType(WonMessageType.CLOSE);
        return close(wonMessage.getSenderSocketURIRequired(), wonMessage.getRecipientSocketURIRequired());
    }

    private Connection close(URI uri, URI uri2) {
        return nextConnectionState(getConnectionRequired(uri, uri2), ConnectionEventType.OWNER_CLOSE);
    }

    private Connection nextConnectionState(URI uri, ConnectionEventType connectionEventType) throws NoSuchConnectionException, IllegalMessageForConnectionStateException {
        if (uri == null) {
            throw new IllegalArgumentException("connectionURI is not set");
        }
        Connection loadConnection = DataAccessUtils.loadConnection(this.connectionRepository, uri);
        loadConnection.changeStateTo(performStateTransit(loadConnection, connectionEventType));
        return (Connection) this.connectionRepository.save((ConnectionRepository) loadConnection);
    }

    public Connection nextConnectionState(Connection connection, ConnectionEventType connectionEventType) throws IllegalMessageForConnectionStateException {
        connection.changeStateTo(performStateTransit(connection, connectionEventType));
        return (Connection) this.connectionRepository.save((ConnectionRepository) connection);
    }

    public Connection closeFromNode(WonMessage wonMessage) {
        wonMessage.getMessageType().requireType(WonMessageType.CLOSE);
        return close(wonMessage.getRecipientSocketURIRequired(), wonMessage.getSenderSocketURIRequired());
    }

    public void hintFeedbackFromOwner(WonMessage wonMessage) {
        wonMessage.getMessageType().requireType(WonMessageType.HINT_FEEDBACK_MESSAGE);
        Connection connectionRequired = getConnectionRequired(wonMessage.getConnectionURIRequired());
        URI messageURI = wonMessage.getMessageURI();
        RdfUtils.visit(wonMessage.getMessageContent(), model -> {
            Resource resource = model.getResource(messageURI.toString());
            if (!resource.hasProperty(WONCON.feedback)) {
                return null;
            }
            processFeedback(connectionRequired, resource);
            return null;
        });
    }

    public void processFeedback(Connection connection, RDFNode rDFNode) {
        if (!rDFNode.isResource()) {
            this.logger.warn("feedback node is not a resource, cannot process feedback for {}", connection.getConnectionURI());
        } else {
            if (addFeedback(connection, (Resource) rDFNode)) {
                return;
            }
            this.logger.warn("failed to add feedback to resource {}", connection.getConnectionURI());
        }
    }

    public boolean addFeedback(Connection connection, Resource resource) {
        Dataset dataset;
        this.logger.debug("adding feedback to resource {}", connection);
        DatasetHolder datasetHolder = connection.getDatasetHolder();
        if (datasetHolder == null) {
            dataset = DatasetFactory.create();
            datasetHolder = new DatasetHolder(connection.getConnectionURI(), dataset);
            connection.setDatasetHolder(datasetHolder);
        } else {
            dataset = datasetHolder.getDataset();
        }
        Model defaultModel = dataset.getDefaultModel();
        Resource resource2 = defaultModel.getResource(connection.getConnectionURI().toString());
        if (resource2 == null) {
            this.logger.debug("could not add feedback to resource {}: resource not found/created in model", connection.getConnectionURI());
            return false;
        }
        resource2.addProperty(WONCON.feedbackEvent, resource);
        defaultModel.add(new ModelExtract(new StatementTripleBoundary(TripleBoundary.stopNowhere)).extract(resource, resource.getModel()));
        dataset.setDefaultModel(defaultModel);
        datasetHolder.setDataset(dataset);
        this.datasetHolderRepository.save((DatasetHolderRepository) datasetHolder);
        this.connectionRepository.save((ConnectionRepository) connection);
        this.logger.debug("done adding feedback for resource {}", connection);
        return true;
    }

    public void updateTargetConnectionURI(Connection connection, URI uri) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("updating remote connection URI of con {} to {}", connection, uri);
        }
        connection.setTargetConnectionURI(uri);
        this.connectionRepository.save((ConnectionRepository) connection);
    }

    public void failForExceededCapacity(URI uri) throws SocketCapacityException {
        Optional<Integer> capacity = this.socketService.getCapacity(uri);
        if (capacity.isPresent() && this.connectionRepository.countBySocketURIAndState(uri, ConnectionState.CONNECTED) >= capacity.get().intValue()) {
            throw new SocketCapacityException("Connect would exceed socket " + uri + " capacity of " + capacity.get());
        }
    }

    public void failForIncompatibleSockets(URI uri, URI uri2) throws IncompatibleSocketsException {
        if (!this.socketService.isCompatible(uri, uri2)) {
            throw new IncompatibleSocketsException(uri, uri2);
        }
    }

    public void failIfIsNotSocketOfAtom(Optional<URI> optional, Optional<URI> optional2) {
        if (optional.isPresent() && optional2.isPresent() && !optional.get().toString().startsWith(optional2.get().toString())) {
            throw new IllegalArgumentException("User-defined socket " + optional.get() + " is not a socket of atom " + optional2.get());
        }
    }

    private ConnectionState performStateTransit(Connection connection, ConnectionEventType connectionEventType) throws IllegalMessageForConnectionStateException {
        if (connectionEventType.isMessageAllowed(connection.getState())) {
            return connection.getState().transit(connectionEventType);
        }
        throw new IllegalMessageForConnectionStateException(connection.getConnectionURI(), connectionEventType.name(), connection.getState());
    }

    public Connection getConnectionForMessageRequired(WonMessage wonMessage, WonMessageDirection wonMessageDirection) {
        return getConnectionForMessage(wonMessage, wonMessageDirection).orElseThrow(() -> {
            return new NoSuchConnectionException(MessageFormat.format("Did not find connection for message {0}, direction {1}", wonMessage.getMessageURI(), wonMessageDirection));
        });
    }

    public Optional<Connection> getConnectionForMessage(WonMessage wonMessage, WonMessageDirection wonMessageDirection) {
        URI senderSocketURI;
        URI recipientSocketURI;
        if (wonMessageDirection.isFromExternal()) {
            senderSocketURI = wonMessage.getRecipientSocketURI();
            recipientSocketURI = wonMessage.getSenderSocketURI();
        } else {
            senderSocketURI = wonMessage.getSenderSocketURI();
            recipientSocketURI = wonMessage.getRecipientSocketURI();
        }
        return (recipientSocketURI == null || senderSocketURI == null) ? Optional.empty() : this.connectionRepository.findOneBySocketURIAndTargetSocketURI(senderSocketURI, recipientSocketURI);
    }
}
