package com.orientechnologies.orient.server.hazelcast;

import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.EntryListener;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
import com.orientechnologies.common.parser.OSystemVariableResolver;
import com.orientechnologies.orient.core.Orient;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.metadata.schema.OType;
import com.orientechnologies.orient.core.record.impl.ODocument;
import com.orientechnologies.orient.server.OClientConnectionManager;
import com.orientechnologies.orient.server.OServer;
import com.orientechnologies.orient.server.config.OServerParameterConfiguration;
import com.orientechnologies.orient.server.distributed.ODistributedAbstractPlugin;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.ODistributedThreadLocal;
import com.orientechnologies.orient.server.distributed.OReplicationConfig;
import com.orientechnologies.orient.server.distributed.OServerOfflineException;
import com.orientechnologies.orient.server.distributed.OStorageSynchronizer;
import com.orientechnologies.orient.server.distributed.conflict.OReplicationConflictResolver;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.distributed.task.OAbstractReplicatedTask;
import com.orientechnologies.orient.server.distributed.task.OAlignRequestTask;
import com.orientechnologies.orient.server.journal.ODatabaseJournal;
import com.orientechnologies.orient.server.network.OServerNetworkListener;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;

/* loaded from: input_file:com/orientechnologies/orient/server/hazelcast/OHazelcastPlugin.class */
public class OHazelcastPlugin extends ODistributedAbstractPlugin implements MembershipListener, EntryListener<String, Object> {
    protected static final String DISTRIBUTED_EXECUTOR_NAME = "OHazelcastPlugin::Executor";
    protected static final int SEND_RETRY_MAX = 100;
    protected int nodeNumber;
    protected String localNodeId;
    protected long timeOffset;
    protected TimerTask alignmentTask;
    protected String membershipListenerRegistration;
    protected volatile HazelcastInstance hazelcastInstance;
    protected String configFile = "hazelcast.xml";
    protected Map<String, Member> remoteClusterNodes = new ConcurrentHashMap();
    protected long runId = -1;
    protected volatile String status = "starting";
    protected Map<String, Boolean> pendingAlignments = new HashMap();
    protected Map<Long, Long> executionQueue = new HashMap();
    protected Object lockQueue = new Object();

    public void config(OServer oServer, OServerParameterConfiguration[] oServerParameterConfigurationArr) {
        super.config(oServer, oServerParameterConfigurationArr);
        for (OServerParameterConfiguration oServerParameterConfiguration : oServerParameterConfigurationArr) {
            if (oServerParameterConfiguration.name.equalsIgnoreCase("configuration.hazelcast")) {
                this.configFile = OSystemVariableResolver.resolveSystemVariables(oServerParameterConfiguration.value);
            }
        }
    }

    public void startup() {
        if (this.enabled) {
            this.remoteClusterNodes.clear();
            this.synchronizers.clear();
            try {
                this.hazelcastInstance = Hazelcast.newHazelcastInstance(new FileSystemXmlConfig(this.configFile));
                this.localNodeId = getNodeId(this.hazelcastInstance.getCluster().getLocalMember());
                OServer.registerServerInstance(this.localNodeId, this.serverInstance);
                initDistributedDatabases();
                IMap<String, Object> configurationMap = getConfigurationMap();
                configurationMap.addEntryListener(this, true);
                setStatus("aligning");
                configurationMap.putIfAbsent("runId", Long.valueOf(this.hazelcastInstance.getCluster().getClusterTime()));
                this.runId = ((Long) getConfigurationMap().get("runId")).longValue();
                this.timeOffset = System.currentTimeMillis() - getHazelcastInstance().getCluster().getClusterTime();
                registerAndAlignNodes();
                super.startup();
            } catch (FileNotFoundException e) {
                throw new OConfigurationException("Error on creating Hazelcast instance", e);
            }
        }
    }

    public void sendShutdown() {
        shutdown();
    }

    public void shutdown() {
        if (this.enabled) {
            if (this.alignmentTask != null) {
                this.alignmentTask.cancel();
            }
            super.shutdown();
            this.remoteClusterNodes.clear();
            if (this.membershipListenerRegistration != null) {
                this.hazelcastInstance.getCluster().removeMembershipListener(this.membershipListenerRegistration);
            }
        }
    }

    public long incrementDistributedSerial(String str) {
        return this.hazelcastInstance.getAtomicLong("db." + str).incrementAndGet();
    }

    public long getRunId() {
        return this.runId;
    }

    public Map<String, Object> propagate(Set<String> set, OAbstractRemoteTask<? extends Object> oAbstractRemoteTask) throws ODistributedException {
        HashMap hashMap = new HashMap();
        ODistributedServerLog.debug(this, oAbstractRemoteTask.getNodeSource(), set.toString(), ODistributedServerLog.DIRECTION.OUT, "propagate %s oper=%d.%d", new Object[]{oAbstractRemoteTask.getName().toUpperCase(), Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial())});
        for (String str : set) {
            if (this.remoteClusterNodes.get(str) == null) {
                ODistributedServerLog.warn(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.OUT, "cannot propagate operation on remote member because is disconnected", new Object[0]);
            } else {
                hashMap.put(str, sendOperation2Node(str, oAbstractRemoteTask));
            }
        }
        return hashMap;
    }

    public Object sendOperation2Node(final String str, final OAbstractRemoteTask<? extends Object> oAbstractRemoteTask) {
        oAbstractRemoteTask.setNodeDestination(str);
        Member member = this.remoteClusterNodes.get(str);
        if (member == null) {
            Iterator it = this.hazelcastInstance.getCluster().getMembers().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Member member2 = (Member) it.next();
                if (getNodeId(member2).equals(str)) {
                    member = member2;
                    break;
                }
            }
            if (member == null) {
                throw new ODistributedException("Remote node '" + str + "' is not configured");
            }
        }
        Member member3 = member;
        ExecutionCallback<Object> executionCallback = oAbstractRemoteTask.getMode() == ODistributedServerManager.EXECUTION_MODE.ASYNCHRONOUS ? new ExecutionCallback<Object>() { // from class: com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin.1
            public void onResponse(Object obj) {
            }

            public void onFailure(Throwable th) {
                ODistributedServerLog.error(this, OHazelcastPlugin.this.getLocalNodeId(), str, ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d in ASYNCH mode", th, new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial())});
            }
        } : null;
        for (int i = 0; i < SEND_RETRY_MAX; i++) {
            try {
                return executeOperation((Callable<Object>) oAbstractRemoteTask, member3, oAbstractRemoteTask.getMode(), executionCallback);
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof OServerOfflineException)) {
                    ODistributedServerLog.error(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d in %s mode", e, new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial()), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
                    throw new ODistributedException("Error on executing remote operation " + oAbstractRemoteTask.getRunId() + "." + oAbstractRemoteTask.getOperationSerial() + " in " + oAbstractRemoteTask.getMode() + " mode against node: " + member, e);
                }
                OServerOfflineException cause = e.getCause();
                ODistributedServerLog.warn(this, getLocalNodeId(), cause.getNodeId(), ODistributedServerLog.DIRECTION.OUT, "remote node %s is not online (status=%s), retrying %d...", new Object[]{cause.getNodeStatus(), Integer.valueOf(i + 1)});
                try {
                    Thread.sleep(200 + (i * 50));
                } catch (InterruptedException e2) {
                    Thread.interrupted();
                }
            } catch (Exception e3) {
                ODistributedServerLog.error(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d in %s mode", e3, new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial()), oAbstractRemoteTask.getMode()});
                throw new ODistributedException("Error on executing remote operation " + oAbstractRemoteTask.getRunId() + "." + oAbstractRemoteTask.getOperationSerial() + " in " + oAbstractRemoteTask.getMode() + " mode against node: " + member, e3);
            }
        }
        throw new ODistributedException("Cannot complete the operation because the cluster is offline");
    }

    public Object execute(String str, Object obj, OAbstractRemoteTask<? extends Object> oAbstractRemoteTask, OReplicationConfig oReplicationConfig) throws ExecutionException {
        try {
            if (oReplicationConfig == null) {
                ODistributedThreadLocal.INSTANCE.set(oAbstractRemoteTask.getNodeSource());
                try {
                    Object executeOnLocalNode = oAbstractRemoteTask.executeOnLocalNode();
                    ODistributedThreadLocal.INSTANCE.set((String) null);
                    return executeOnLocalNode;
                } catch (Throwable th) {
                    ODistributedThreadLocal.INSTANCE.set((String) null);
                    throw th;
                }
            }
            if (!checkOperationSequence(oAbstractRemoteTask)) {
                return null;
            }
            String databaseName = oAbstractRemoteTask.getDatabaseName();
            if (oReplicationConfig != null) {
                oAbstractRemoteTask.setNodeDestination(oReplicationConfig.masterNode);
                oReplicationConfig.masterNode = waitUntilMasterNodeIsOnline(str, obj, databaseName, oReplicationConfig.masterNode);
                String str2 = oReplicationConfig.masterNode;
            }
            return getLocalNodeId().equals(oReplicationConfig.masterNode) ? executeLocallyAndPropagate((OAbstractReplicatedTask) oAbstractRemoteTask) : executeRemotelyAndApplyLocally(str, obj, (OAbstractReplicatedTask) oAbstractRemoteTask, databaseName, oReplicationConfig);
        } catch (InterruptedException e) {
            Thread.interrupted();
            return null;
        } catch (Exception e2) {
            ODistributedServerLog.error(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.OUT, "error on execution %d.%d of operation in %s mode", e2, new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial()), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
            throw new ExecutionException("error on execution of operation " + oAbstractRemoteTask.getRunId() + "." + oAbstractRemoteTask.getOperationSerial() + " in " + ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS + " mode against node " + ((String) null), e2);
        }
    }

    protected boolean checkOperationSequence(OAbstractRemoteTask<? extends Object> oAbstractRemoteTask) {
        long[] lastExecutedOperationId = getDatabaseSynchronizer(oAbstractRemoteTask.getDatabaseName()).getLog().getLastExecutedOperationId();
        ODistributedServerLog.debug(this, getLocalNodeId(), oAbstractRemoteTask.getNodeSource(), ODistributedServerLog.DIRECTION.IN, "checking operation if %d.%d > last %d.%d", new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial()), Long.valueOf(lastExecutedOperationId[0]), Long.valueOf(lastExecutedOperationId[1])});
        if (oAbstractRemoteTask.getRunId() != lastExecutedOperationId[0] || oAbstractRemoteTask.getOperationSerial() > lastExecutedOperationId[1]) {
            return true;
        }
        ODistributedServerLog.warn(this, getLocalNodeId(), oAbstractRemoteTask.getNodeSource(), ODistributedServerLog.DIRECTION.IN, "received operation %d.%d but it has already been executed: probably it's from an alignment? Ignore it.", new Object[]{Long.valueOf(oAbstractRemoteTask.getRunId()), Long.valueOf(oAbstractRemoteTask.getOperationSerial())});
        return false;
    }

    protected Object executeRemotelyAndApplyLocally(String str, Object obj, OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask, String str2, OReplicationConfig oReplicationConfig) throws InterruptedException, Exception, ExecutionException {
        Object obj2;
        for (int i = 0; i < SEND_RETRY_MAX; i++) {
            ODistributedServerLog.debug(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "routing %s against db=%s in %s mode...", new Object[]{oAbstractReplicatedTask.getName().toUpperCase(), str2, ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
            try {
                ODistributedServerLog.debug(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "remote execution %s db=%s mode=%s oper=%d.%d...", new Object[]{oAbstractReplicatedTask.getName().toUpperCase(), str2, oAbstractReplicatedTask.getMode(), Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial())});
                Object executeOperation = executeOperation((Callable<Object>) oAbstractReplicatedTask, obj, ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS, (ExecutionCallback<Object>) null);
                if (oAbstractReplicatedTask instanceof OAbstractReplicatedTask) {
                    ODistributedServerLog.debug(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.IN, "local execution %s against db=%s mode=%s oper=%d.%d...", new Object[]{oAbstractReplicatedTask.getName().toUpperCase(), str2, oAbstractReplicatedTask.getMode(), Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial())});
                    obj2 = enqueueLocalExecution(oAbstractReplicatedTask);
                    if (executeOperation != null && obj2 != null && !executeOperation.equals(obj2)) {
                        ODistributedServerLog.warn(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.IN, "detected conflict on %s mode=%s db=%s oper=%d.%d: remote={%s} != local={%s}", new Object[]{oAbstractReplicatedTask.getName().toUpperCase(), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS, str2, Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial()), executeOperation, obj2});
                        oAbstractReplicatedTask.handleConflict(oAbstractReplicatedTask.getNodeDestination(), obj2, executeOperation);
                    }
                } else {
                    obj2 = executeOperation;
                }
                return obj2;
            } catch (MemberLeftException e) {
                ODistributedServerLog.warn(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d in %s mode, because node left. Re-route it in transparent way", e, new Object[]{Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial()), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
                return execute(str, obj, oAbstractReplicatedTask, oReplicationConfig);
            } catch (ExecutionException e2) {
                if (!(e2.getCause() instanceof OServerOfflineException)) {
                    ODistributedServerLog.error(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d in %s mode", e2, new Object[]{Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial()), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
                    throw e2;
                }
                ODistributedServerLog.warn(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "remote node is not online, retrying %d...", new Object[]{Integer.valueOf(i + 1)});
                try {
                    Thread.sleep(200 + (i * 50));
                } catch (InterruptedException e3) {
                    Thread.interrupted();
                }
            }
        }
        ODistributedServerLog.error(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.OUT, "error on execution of operation %d.%d type=%s in %s mode", new Object[]{Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial()), oAbstractReplicatedTask.getName(), ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS});
        throw new ODistributedException("Error on execution " + oAbstractReplicatedTask.getName() + " in " + ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS + " mode");
    }

    private Object executeLocallyAndPropagate(OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask) throws Exception {
        Object enqueueLocalExecution = enqueueLocalExecution(oAbstractReplicatedTask);
        Set<String> onlineRemoteNodeIdsBut = getOnlineRemoteNodeIdsBut(oAbstractReplicatedTask.getNodeSource(), oAbstractReplicatedTask.getNodeDestination());
        if (!onlineRemoteNodeIdsBut.isEmpty()) {
            oAbstractReplicatedTask.setNodeSource(getLocalNodeId());
            for (Map.Entry<String, Object> entry : propagate(onlineRemoteNodeIdsBut, oAbstractReplicatedTask).entrySet()) {
                String key = entry.getKey();
                Object value = entry.getValue();
                if ((enqueueLocalExecution == null && value != null) || ((enqueueLocalExecution != null && value == null) || (enqueueLocalExecution != null && !enqueueLocalExecution.equals(value)))) {
                    oAbstractReplicatedTask.handleConflict(key, enqueueLocalExecution, value);
                }
            }
        }
        return enqueueLocalExecution;
    }

    public boolean isLocalNodeMaster(Object obj) {
        Member owner = this.hazelcastInstance.getPartitionService().getPartition(obj).getOwner();
        boolean equals = owner.equals(this.hazelcastInstance.getCluster().getLocalMember());
        ODistributedServerLog.debug(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "network partition: check for local master: key '%s' is assigned to %s (local=%s)", new Object[]{obj, getNodeId(owner), Boolean.valueOf(equals)});
        return equals;
    }

    public OReplicationConfig getReplicationData(String str, String str2, Object obj, String str3, String str4) {
        ODocument databaseClusterConfiguration = getDatabaseClusterConfiguration(str, str2);
        Boolean bool = (Boolean) databaseClusterConfiguration.field("synchronization");
        if (bool == null || !bool.booleanValue()) {
            return null;
        }
        OReplicationConfig oReplicationConfig = new OReplicationConfig();
        oReplicationConfig.masterNode = (String) databaseClusterConfiguration.field("master");
        if (oReplicationConfig.masterNode == null) {
            ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "network partition: found wrong configuration for database '%s': cannot find the 'master' field for the cluster '%s'. '$auto' will be used", new Object[]{str, str2});
            oReplicationConfig.masterNode = "$auto";
        }
        if (oReplicationConfig.masterNode.startsWith("$")) {
            oReplicationConfig.masterNode = getReplicationStrategy(oReplicationConfig.masterNode).getNode(this, str2, obj);
        }
        if (oReplicationConfig.masterNode == null) {
            throw new ODistributedException("Cannot find a master node for the key '" + obj + "'");
        }
        boolean equals = oReplicationConfig.masterNode.equals(getLocalNodeId());
        String localNodeId = getLocalNodeId();
        ODistributedServerLog.DIRECTION direction = ODistributedServerLog.DIRECTION.OUT;
        Object[] objArr = new Object[5];
        objArr[0] = str2 != null ? "cluster=" + str2 + " " : "";
        objArr[1] = obj != null ? "key=" + obj : "";
        objArr[2] = (str2 == null && obj == null) ? "default operation" : "";
        objArr[3] = oReplicationConfig.masterNode;
        objArr[4] = Boolean.valueOf(equals);
        ODistributedServerLog.debug(this, localNodeId, "?", direction, "master node for %s%s%s -> %s (local=%s)", objArr);
        Set<String> onlineRemoteNodeIdsBut = getOnlineRemoteNodeIdsBut(str3, str4);
        if (!onlineRemoteNodeIdsBut.isEmpty()) {
            oReplicationConfig.synchReplicas = (String[]) onlineRemoteNodeIdsBut.toArray(new String[onlineRemoteNodeIdsBut.size()]);
        }
        return oReplicationConfig;
    }

    public ODocument getDatabaseConfiguration(String str) {
        IMap<String, Object> configurationMap = getConfigurationMap();
        ODocument oDocument = (ODocument) configurationMap.get("db." + str);
        if (oDocument == null) {
            oDocument = super.getDatabaseConfiguration(str);
            configurationMap.put("db." + str, oDocument);
        } else {
            saveDatabaseConfiguration(str, oDocument);
        }
        return oDocument;
    }

    public ODocument getDatabaseStatus(String str) {
        ODocument oDocument = new ODocument();
        oDocument.field("configuration", getDatabaseConfiguration(str), OType.EMBEDDED);
        oDocument.field("cluster", getClusterConfiguration(), OType.EMBEDDED);
        return oDocument;
    }

    public ODocument getClusterConfiguration() {
        if (!this.enabled) {
            return null;
        }
        ODocument oDocument = new ODocument();
        HazelcastInstance hazelcastInstance = getHazelcastInstance();
        oDocument.field("name", hazelcastInstance.getName());
        oDocument.field("local", getNodeId(hazelcastInstance.getCluster().getLocalMember()));
        ArrayList arrayList = new ArrayList();
        oDocument.field("members", arrayList, OType.EMBEDDEDLIST);
        arrayList.add(getLocalNodeConfiguration());
        Iterator<Member> it = this.remoteClusterNodes.values().iterator();
        while (it.hasNext()) {
            arrayList.add(getNodeConfiguration(getNodeId(it.next())));
        }
        return oDocument;
    }

    public ODocument getNodeConfiguration(String str) {
        return (ODocument) getConfigurationMap().get("node." + str);
    }

    public ODocument getLocalNodeConfiguration() {
        ODocument oDocument = new ODocument();
        oDocument.field("alias", getLocalNodeAlias());
        oDocument.field("id", getLocalNodeId());
        oDocument.field("status", getStatus());
        ArrayList arrayList = new ArrayList();
        oDocument.field("listeners", arrayList, OType.EMBEDDEDLIST);
        for (OServerNetworkListener oServerNetworkListener : this.serverInstance.getNetworkListeners()) {
            HashMap hashMap = new HashMap();
            arrayList.add(hashMap);
            hashMap.put("protocol", oServerNetworkListener.getProtocolType().getSimpleName());
            hashMap.put("listen", oServerNetworkListener.getListeningAddress());
        }
        return oDocument;
    }

    public String getStatus() {
        return this.status;
    }

    public boolean checkStatus(String str) {
        return this.status.equals(str);
    }

    public void setStatus(String str) {
        if (this.status.equals(str)) {
            return;
        }
        this.status = str;
        getConfigurationMap().put("node." + getLocalNodeId(), getLocalNodeConfiguration());
        ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "updated node status to '%s'", new Object[]{this.status});
    }

    public void registerAndAlignNodes() {
        this.membershipListenerRegistration = this.hazelcastInstance.getCluster().addMembershipListener(this);
        for (Member member : this.hazelcastInstance.getCluster().getMembers()) {
            String nodeId = getNodeId(member);
            if (!getLocalNodeId().equals(nodeId)) {
                this.remoteClusterNodes.put(nodeId, member);
            }
        }
        if (this.remoteClusterNodes.isEmpty()) {
            ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "no node running has been detected", new Object[0]);
        } else {
            ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "detected %d running nodes %s", new Object[]{Integer.valueOf(this.remoteClusterNodes.size()), this.remoteClusterNodes.keySet()});
        }
        if (!this.alignmentStartup) {
            setStatus("online");
        } else if (this.remoteClusterNodes.isEmpty()) {
            setStatus("online");
        } else {
            alignNodes();
        }
        if (this.alignmentTimer > 0) {
            this.alignmentTask = new TimerTask() { // from class: com.orientechnologies.orient.server.hazelcast.OHazelcastPlugin.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    OHazelcastPlugin.this.alignNodes();
                }
            };
            Orient.instance().getTimer().schedule(this.alignmentTask, this.alignmentTimer, this.alignmentTimer);
        }
    }

    protected void alignNodes() {
        long[] lastJournaledOperationId;
        if (this.remoteClusterNodes.isEmpty()) {
            return;
        }
        setStatus("aligning");
        synchronized (this.synchronizers) {
            for (Map.Entry entry : this.synchronizers.entrySet()) {
                String str = (String) entry.getKey();
                try {
                    lastJournaledOperationId = ((OStorageSynchronizer) entry.getValue()).getLog().getLastJournaledOperationId(ODatabaseJournal.OPERATION_STATUS.COMMITTED);
                } catch (IOException e) {
                    ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.OUT, "error on retrieve last operation id from the log for db=%s", new Object[]{str});
                }
                if (lastJournaledOperationId[0] != -1 || lastJournaledOperationId[1] != -1) {
                    ODistributedServerLog.warn(this, getLocalNodeId(), this.remoteClusterNodes.keySet().toString(), ODistributedServerLog.DIRECTION.OUT, "sending align request in broadcast for database %s from %d:%d", new Object[]{str, Long.valueOf(lastJournaledOperationId[0]), Long.valueOf(lastJournaledOperationId[1])});
                    synchronized (this.pendingAlignments) {
                        for (String str2 : this.remoteClusterNodes.keySet()) {
                            this.pendingAlignments.put(str2 + "/" + str, Boolean.FALSE);
                            ODistributedServerLog.info(this, getLocalNodeId(), str2, ODistributedServerLog.DIRECTION.NONE, "setting node in alignment state for db=%s", new Object[]{str});
                        }
                    }
                    propagate(this.remoteClusterNodes.keySet(), new OAlignRequestTask(this.serverInstance, this, str, ODistributedServerManager.EXECUTION_MODE.ASYNCHRONOUS, lastJournaledOperationId[0], lastJournaledOperationId[1]));
                }
            }
            if (this.pendingAlignments.isEmpty()) {
                setStatus("online");
            }
        }
    }

    public void endAlignment(String str, String str2) {
        synchronized (this.pendingAlignments) {
            if (this.pendingAlignments.remove(str + "/" + str2) == null) {
                ODistributedServerLog.error(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.OUT, "received response for an alignment against an unknown node %s database %s", new Object[]{str2});
            }
            if (this.pendingAlignments.isEmpty()) {
                setStatus("online");
            } else {
                for (Map.Entry<String, Boolean> entry : this.pendingAlignments.entrySet()) {
                    String[] split = entry.getKey().split("/");
                    String str3 = split[0];
                    String str4 = split[1];
                    if (entry.getValue().booleanValue()) {
                        try {
                            long[] lastJournaledOperationId = ((OStorageSynchronizer) this.synchronizers.get(str4)).getLog().getLastJournaledOperationId(ODatabaseJournal.OPERATION_STATUS.COMMITTED);
                            ODistributedServerLog.info(this, getLocalNodeId(), str3, ODistributedServerLog.DIRECTION.OUT, "resend alignment request db=%s from %d:%d", new Object[]{str4, Long.valueOf(lastJournaledOperationId[0]), Long.valueOf(lastJournaledOperationId[1])});
                            sendOperation2Node(str3, new OAlignRequestTask(this.serverInstance, this, str4, ODistributedServerManager.EXECUTION_MODE.ASYNCHRONOUS, lastJournaledOperationId[0], lastJournaledOperationId[1]));
                        } catch (IOException e) {
                            ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.OUT, "error on retrieve last operation id from the log for db=%s", new Object[]{str4});
                        }
                    } else {
                        ODistributedServerLog.info(this, getLocalNodeId(), str3, ODistributedServerLog.DIRECTION.NONE, "db=%s is in alignment status yet, the node is not online yet", new Object[]{str4});
                    }
                }
            }
        }
    }

    public void postponeAlignment(String str, String str2) {
        synchronized (this.pendingAlignments) {
            String str3 = str + "/" + str2;
            if (!this.pendingAlignments.containsKey(str3)) {
                ODistributedServerLog.error(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.IN, "received response to postpone an alignment against an unknown node", new Object[]{str2});
            }
            this.pendingAlignments.put(str3, Boolean.TRUE);
        }
    }

    public long getTimeOffset() {
        return this.timeOffset;
    }

    public String getLocalNodeId() {
        return this.localNodeId;
    }

    public String getLocalNodeAlias() {
        return this.alias != null ? this.alias : getLocalNodeId();
    }

    public String getNodeId(Member member) {
        return member.getInetSocketAddress().toString().substring(1);
    }

    public Set<String> getRemoteNodeIds() {
        return this.remoteClusterNodes.keySet();
    }

    public Set<String> getOnlineRemoteNodeIdsBut(String... strArr) {
        HashSet hashSet = new HashSet(this.remoteClusterNodes.keySet().size());
        for (String str : this.remoteClusterNodes.keySet()) {
            if (!isOfflineNode(str)) {
                boolean z = true;
                int length = strArr.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    if (str.equals(strArr[i])) {
                        z = false;
                        break;
                    }
                    i++;
                }
                if (z) {
                    hashSet.add(str);
                }
            }
        }
        return hashSet;
    }

    public void memberAdded(MembershipEvent membershipEvent) {
    }

    public void memberRemoved(MembershipEvent membershipEvent) {
        String nodeId = getNodeId(membershipEvent.getMember());
        getConfigurationMap().remove("node." + nodeId);
        this.remoteClusterNodes.remove(nodeId);
    }

    public void entryAdded(EntryEvent<String, Object> entryEvent) {
        if (((String) entryEvent.getKey()).startsWith("node.")) {
            String str = (String) ((ODocument) entryEvent.getValue()).field("id");
            if (!getLocalNodeId().equals(str)) {
                this.remoteClusterNodes.put(str, entryEvent.getMember());
            }
            OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
        }
    }

    public void entryRemoved(EntryEvent<String, Object> entryEvent) {
        if (((String) entryEvent.getKey()).startsWith("node.")) {
            String str = (String) ((ODocument) entryEvent.getValue()).field("id");
            ODistributedServerLog.warn(this, getLocalNodeId(), str, ODistributedServerLog.DIRECTION.NONE, "tracked remote node has been disconnected from the cluster", new Object[0]);
            this.remoteClusterNodes.remove(str);
            OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
        }
    }

    public void entryUpdated(EntryEvent<String, Object> entryEvent) {
        if (((String) entryEvent.getKey()).startsWith("node.")) {
            ODistributedServerLog.debug(this, getLocalNodeId(), (String) ((ODocument) entryEvent.getValue()).field("id"), ODistributedServerLog.DIRECTION.NONE, "received notification about update in the cluster: %s", new Object[]{entryEvent});
            OClientConnectionManager.instance().pushDistribCfg2Clients(getClusterConfiguration());
        }
    }

    public void entryEvicted(EntryEvent<String, Object> entryEvent) {
    }

    public String getRemoteNodeStatus(String str) {
        ODocument nodeConfiguration = getNodeConfiguration(str);
        return (String) (nodeConfiguration != null ? nodeConfiguration.field("status") : null);
    }

    public boolean isOfflineNode(String str) {
        synchronized (this.pendingAlignments) {
            if (this.pendingAlignments.containsKey(str)) {
                return true;
            }
            ODocument nodeConfiguration = getNodeConfiguration(str);
            return nodeConfiguration == null || !nodeConfiguration.field("status").equals("online");
        }
    }

    public int getNodeNumber() {
        return this.nodeNumber;
    }

    public HazelcastInstance getHazelcastInstance() {
        while (this.hazelcastInstance == null) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return this.hazelcastInstance;
    }

    protected IMap<String, Object> getConfigurationMap() {
        return getHazelcastInstance().getMap("orientdb");
    }

    public Lock getLock(String str) {
        return getHazelcastInstance().getLock(str);
    }

    public Class<? extends OReplicationConflictResolver> getConfictResolverClass() {
        return this.confictResolverClass;
    }

    protected Object executeOperation(Callable<Object> callable, Object obj, ODistributedServerManager.EXECUTION_MODE execution_mode, ExecutionCallback<Object> executionCallback) throws ExecutionException, InterruptedException {
        return executeOperation(callable, this.hazelcastInstance.getPartitionService().getPartition(obj).getOwner(), execution_mode, executionCallback);
    }

    protected Object executeOperation(Callable<Object> callable, Member member, ODistributedServerManager.EXECUTION_MODE execution_mode, ExecutionCallback<Object> executionCallback) throws ExecutionException, InterruptedException {
        if (execution_mode == ODistributedServerManager.EXECUTION_MODE.ASYNCHRONOUS && executionCallback != null) {
            this.hazelcastInstance.getExecutorService(DISTRIBUTED_EXECUTOR_NAME).submitToMember(callable, member, executionCallback);
            return null;
        }
        Future submitToMember = this.hazelcastInstance.getExecutorService(DISTRIBUTED_EXECUTOR_NAME).submitToMember(callable, member);
        if (execution_mode == ODistributedServerManager.EXECUTION_MODE.SYNCHRONOUS) {
            return submitToMember.get();
        }
        return null;
    }

    protected void initDistributedDatabases() {
        for (Map.Entry entry : this.serverInstance.getAvailableStorageNames().entrySet()) {
            ODistributedServerLog.warn(this, getLocalNodeId(), (String) null, ODistributedServerLog.DIRECTION.NONE, "opening database '%s'...", new Object[]{entry.getKey()});
            getDatabaseSynchronizer((String) entry.getKey());
        }
    }

    protected String waitUntilMasterNodeIsOnline(String str, Object obj, String str2, String str3) {
        if (!str3.equals(this.localNodeId) && isOfflineNode(str3)) {
            ODistributedServerLog.warn(this, getLocalNodeId(), str3, ODistributedServerLog.DIRECTION.OUT, "node is offline (status=%s). Waiting for completition...", new Object[]{getRemoteNodeStatus(str3)});
            while (isOfflineNode(str3)) {
                try {
                    Thread.sleep(200L);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
                OReplicationConfig replicationData = getReplicationData(str2, str, obj, null, null);
                if (!replicationData.masterNode.equals(str3)) {
                    ODistributedServerLog.warn(this, getLocalNodeId(), str3, ODistributedServerLog.DIRECTION.OUT, "node %s is the new owner of the requested key set", new Object[]{getRemoteNodeStatus(str3)});
                    str3 = replicationData.masterNode;
                }
            }
            ODistributedServerLog.warn(this, getLocalNodeId(), str3, ODistributedServerLog.DIRECTION.OUT, "node aligned, flushing pending operations...", new Object[0]);
        }
        return str3;
    }

    public Object enqueueLocalExecution(OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask) throws Exception {
        if (!checkOperationSequence(oAbstractReplicatedTask)) {
            return null;
        }
        OStorageSynchronizer databaseSynchronizer = oAbstractReplicatedTask.getDatabaseSynchronizer();
        waitForMyTurnInQueue(oAbstractReplicatedTask);
        try {
            ODistributedServerLog.debug(this, oAbstractReplicatedTask.getNodeSource(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.IN, "pop operation=%d:%d", new Object[]{Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial())});
            long logOperation2Journal = logOperation2Journal(databaseSynchronizer, oAbstractReplicatedTask);
            try {
                Object executeOnLocalNode = oAbstractReplicatedTask.executeOnLocalNode();
                updateJournal(oAbstractReplicatedTask, databaseSynchronizer, logOperation2Journal, true);
                updateQueue(oAbstractReplicatedTask);
                return executeOnLocalNode;
            } catch (Exception e) {
                updateJournal(oAbstractReplicatedTask, databaseSynchronizer, logOperation2Journal, false);
                throw e;
            }
        } catch (Throwable th) {
            updateQueue(oAbstractReplicatedTask);
            throw th;
        }
    }

    public String toString() {
        return getLocalNodeAlias();
    }

    private void updateQueue(OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask) {
        ODistributedThreadLocal.INSTANCE.set((String) null);
        synchronized (this.lockQueue) {
            ODistributedServerLog.debug(this, oAbstractReplicatedTask.getNodeSource(), oAbstractReplicatedTask.getNodeDestination(), ODistributedServerLog.DIRECTION.IN, "completed operation=%d:%d", new Object[]{Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial())});
            this.executionQueue.put(Long.valueOf(oAbstractReplicatedTask.getRunId()), Long.valueOf(oAbstractReplicatedTask.getOperationSerial()));
            this.lockQueue.notifyAll();
        }
    }

    private void updateJournal(OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask, OStorageSynchronizer oStorageSynchronizer, long j, boolean z) {
        try {
            if (z) {
                oAbstractReplicatedTask.setAsCommitted(oStorageSynchronizer, j);
            } else {
                oAbstractReplicatedTask.setAsCanceled(oStorageSynchronizer, j);
            }
        } catch (IOException e) {
            ODistributedServerLog.error(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeSource(), ODistributedServerLog.DIRECTION.IN, "error on changing the log status for %s db=%s %s", e, new Object[]{getName(), oAbstractReplicatedTask.getDatabaseName(), oAbstractReplicatedTask.getPayload()});
            throw new ODistributedException("Error on changing the log status", e);
        }
    }

    private long logOperation2Journal(OStorageSynchronizer oStorageSynchronizer, OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask) {
        try {
            return oStorageSynchronizer.getLog().append(oAbstractReplicatedTask);
        } catch (IOException e) {
            ODistributedServerLog.error(this, oAbstractReplicatedTask.getDistributedServerManager().getLocalNodeId(), oAbstractReplicatedTask.getNodeSource(), ODistributedServerLog.DIRECTION.IN, "error on logging operation %s db=%s %s", e, new Object[]{oAbstractReplicatedTask.getName(), oAbstractReplicatedTask.getDatabaseName(), oAbstractReplicatedTask.getPayload()});
            throw new ODistributedException("Error on logging operation", e);
        }
    }

    public void resetOperationQueue(long j, long j2) {
        synchronized (this.lockQueue) {
            Long l = this.executionQueue.get(Long.valueOf(j));
            if (l == null || l.longValue() != j2) {
                this.executionQueue.put(Long.valueOf(j), Long.valueOf(j2));
                this.lockQueue.notifyAll();
            }
        }
    }

    private void waitForMyTurnInQueue(OAbstractReplicatedTask<? extends Object> oAbstractReplicatedTask) {
        while (true) {
            synchronized (this.lockQueue) {
                Long l = this.executionQueue.get(Long.valueOf(oAbstractReplicatedTask.getRunId()));
                if (l == null) {
                    this.executionQueue.put(Long.valueOf(oAbstractReplicatedTask.getRunId()), 0L);
                } else if (l.longValue() != oAbstractReplicatedTask.getOperationSerial() - 1) {
                    try {
                        ODistributedServerLog.debug(this, getLocalNodeId(), oAbstractReplicatedTask.getNodeSource(), ODistributedServerLog.DIRECTION.NONE, "waiting for %d tasks in queue %s. current=%d my=%d", new Object[]{Long.valueOf((oAbstractReplicatedTask.getOperationSerial() - l.longValue()) - 1), Long.valueOf(oAbstractReplicatedTask.getRunId()), l, Long.valueOf(oAbstractReplicatedTask.getOperationSerial())});
                        this.lockQueue.wait(OGlobalConfiguration.STORAGE_LOCK_TIMEOUT.getValueAsLong());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
        ODistributedThreadLocal.INSTANCE.set(oAbstractReplicatedTask.getNodeSource());
    }
}
