package com.orientechnologies.orient.server.hazelcast;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IMap;
import com.hazelcast.core.IQueue;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseRecordThreadLocal;
import com.orientechnologies.orient.core.db.OScenarioThreadLocal;
import com.orientechnologies.orient.core.db.document.ODatabaseDocumentTx;
import com.orientechnologies.orient.core.metadata.security.OSecurityUser;
import com.orientechnologies.orient.core.metadata.security.OUser;
import com.orientechnologies.orient.core.record.ORecord;
import com.orientechnologies.orient.server.config.OServerUserConfiguration;
import com.orientechnologies.orient.server.distributed.ODiscardedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.task.OAbstractRemoteTask;
import com.orientechnologies.orient.server.distributed.task.OCreateRecordTask;
import com.orientechnologies.orient.server.distributed.task.ODeleteRecordTask;
import com.orientechnologies.orient.server.distributed.task.OFixTxTask;
import com.orientechnologies.orient.server.distributed.task.OResurrectRecordTask;
import com.orientechnologies.orient.server.distributed.task.OSQLCommandTask;
import com.orientechnologies.orient.server.distributed.task.OTxTask;
import com.orientechnologies.orient.server.distributed.task.OUpdateRecordTask;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/orientechnologies/orient/server/hazelcast/ODistributedWorker.class */
public class ODistributedWorker extends Thread {
    private static final int LOCAL_QUEUE_MAXSIZE = 1000;
    protected final OHazelcastDistributedDatabase distributed;
    protected final OHazelcastPlugin manager;
    protected final OHazelcastDistributedMessageService msgService;
    protected final String databaseName;
    protected final IQueue<ODistributedRequest> requestQueue;
    protected volatile ODatabaseDocumentTx database;
    protected volatile OUser lastUser;
    protected boolean restoringMessages;
    protected Queue<ODistributedRequest> localQueue = new ArrayBlockingQueue(LOCAL_QUEUE_MAXSIZE);
    protected volatile boolean running = true;

    public ODistributedWorker(OHazelcastDistributedDatabase oHazelcastDistributedDatabase, IQueue<ODistributedRequest> iQueue, String str, int i, boolean z) {
        setName("OrientDB DistributedWorker node=" + oHazelcastDistributedDatabase.getLocalNodeName() + " db=" + str + " id=" + i);
        this.distributed = oHazelcastDistributedDatabase;
        this.requestQueue = iQueue;
        this.databaseName = str;
        this.manager = this.distributed.manager;
        this.msgService = this.distributed.msgService;
        this.restoringMessages = z;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        int size = this.requestQueue.size();
        long j = -1;
        long j2 = 0;
        while (true) {
            long j3 = j2;
            if (!this.running) {
                break;
            }
            if (this.restoringMessages && j3 >= size) {
                ODistributedServerLog.debug(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "executed all pending tasks in queue (%d), set restoringMessages=false and database '%s' as online. Last req=%d", new Object[]{Integer.valueOf(size), this.databaseName, Long.valueOf(j)});
                this.restoringMessages = false;
            }
            ODistributedRequest oDistributedRequest = null;
            try {
                oDistributedRequest = readRequest();
                if (oDistributedRequest != null) {
                    j = oDistributedRequest.getId();
                    oDistributedRequest.getSenderNodeName();
                    onMessage(oDistributedRequest);
                }
            } catch (HazelcastException e) {
                if (e.getCause() instanceof InterruptedException) {
                    Thread.interrupted();
                } else {
                    String localNodeName = this.manager.getLocalNodeName();
                    ODistributedServerLog.DIRECTION direction = ODistributedServerLog.DIRECTION.IN;
                    Object[] objArr = new Object[2];
                    objArr[0] = Long.valueOf(oDistributedRequest != null ? oDistributedRequest.getId() : -1L);
                    objArr[1] = oDistributedRequest != null ? oDistributedRequest.getTask() : "-";
                    ODistributedServerLog.error(this, localNodeName, (String) null, direction, "error on executing distributed request %d: %s", e, objArr);
                }
            } catch (DistributedObjectDestroyedException e2) {
                Thread.interrupted();
            } catch (HazelcastInstanceNotActiveException e3) {
                Thread.interrupted();
            } catch (InterruptedException e4) {
                Thread.interrupted();
            } catch (Throwable th) {
                String localNodeName2 = getLocalNodeName();
                ODistributedServerLog.DIRECTION direction2 = ODistributedServerLog.DIRECTION.IN;
                Object[] objArr2 = new Object[2];
                objArr2[0] = Long.valueOf(oDistributedRequest != null ? oDistributedRequest.getId() : -1L);
                objArr2[1] = oDistributedRequest != null ? oDistributedRequest.getTask() : "-";
                ODistributedServerLog.error(this, localNodeName2, (String) null, direction2, "error on executing distributed request %d: %s", th, objArr2);
            }
            j2 = j3 + 1;
        }
        ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "end of reading requests for database %s", new Object[]{this.databaseName});
    }

    public void initDatabaseInstance() {
        if (this.database == null) {
            OServerUserConfiguration user = this.manager.getServerInstance().getUser("replicator");
            this.database = this.manager.getServerInstance().openDatabase("document", this.databaseName, user.name, user.password);
        } else if (this.database.isClosed()) {
            OServerUserConfiguration user2 = this.manager.getServerInstance().getUser("replicator");
            this.database.open(user2.name, user2.password);
        }
    }

    public void shutdown() {
        int size = this.localQueue.size();
        if (size > 0) {
            ODistributedServerLog.warn(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "Received shutdown signal, waiting for distributed worker queue is empty (pending msgs=%d)...", new Object[]{Integer.valueOf(size)});
        }
        try {
            this.running = false;
            interrupt();
            if (size > 0) {
                join();
            }
            ODistributedServerLog.warn(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "Shutdown distributed worker completed", new Object[0]);
            this.localQueue.clear();
            if (this.database != null) {
                this.database.close();
            }
        } catch (Exception e) {
            ODistributedServerLog.warn(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "Error on shutting down distributed worker", e, new Object[0]);
        }
    }

    public ODatabaseDocumentTx getDatabase() {
        return this.database;
    }

    protected ODistributedRequest readRequest() throws InterruptedException {
        ODistributedRequest nextMessage = nextMessage();
        while (true) {
            if (this.distributed.waitForMessageId.get() <= -1) {
                break;
            }
            if (nextMessage != null) {
                if (nextMessage.getId() >= this.distributed.waitForMessageId.get()) {
                    ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), nextMessage.getSenderNodeName(), ODistributedServerLog.DIRECTION.IN, "reached waited request %d on request=%s sourceNode=%s", new Object[]{Long.valueOf(this.distributed.waitForMessageId.get()), nextMessage, nextMessage.getSenderNodeName()});
                    this.distributed.waitForMessageId.set(-1L);
                    break;
                }
                ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), nextMessage.getSenderNodeName(), ODistributedServerLog.DIRECTION.IN, "discarded request %d because waiting for %d request=%s sourceNode=%s", new Object[]{Long.valueOf(nextMessage.getId()), Long.valueOf(this.distributed.waitForMessageId.get()), nextMessage, nextMessage.getSenderNodeName()});
                sendResponseBack(nextMessage, nextMessage.getTask(), new ODiscardedResponse());
                nextMessage = nextMessage();
            }
        }
        if (ODistributedServerLog.isDebugEnabled()) {
            ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), nextMessage.getSenderNodeName(), ODistributedServerLog.DIRECTION.IN, "processing request=%s sourceNode=%s", new Object[]{nextMessage, nextMessage.getSenderNodeName()});
        }
        return nextMessage;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected ODistributedRequest nextMessage() throws InterruptedException {
        while (this.localQueue.isEmpty()) {
            this.localQueue.offer(this.requestQueue.take());
            this.requestQueue.drainTo(this.localQueue, 999);
        }
        return this.localQueue.poll();
    }

    protected void onMessage(ODistributedRequest oDistributedRequest) {
        OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.RUNNING_DISTRIBUTED);
        try {
            OAbstractRemoteTask task = oDistributedRequest.getTask();
            if (ODistributedServerLog.isDebugEnabled()) {
                ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), oDistributedRequest.getSenderNodeName(), ODistributedServerLog.DIRECTION.OUT, "received request: %s", new Object[]{oDistributedRequest});
            }
            OSecurityUser oSecurityUser = null;
            try {
                if (task.isRequiredOpenDatabase()) {
                    initDatabaseInstance();
                }
                ODatabaseRecordThreadLocal.INSTANCE.set(this.database);
                task.setNodeSource(oDistributedRequest.getSenderNodeName());
                if (this.database != null) {
                    oSecurityUser = this.database.getUser();
                    try {
                        if (this.lastUser == null || !this.lastUser.getName().equals(oDistributedRequest.getUserName())) {
                            this.lastUser = this.database.getMetadata().getSecurity().getUser(oDistributedRequest.getUserName());
                        }
                        this.database.setUser(this.lastUser);
                    } catch (Throwable th) {
                        OLogManager.instance().error(this, "failed to convert to OUser " + th.getMessage(), new Object[0]);
                    }
                }
                Serializable executeOnLocalNode = this.manager.executeOnLocalNode(oDistributedRequest, this.database);
                if (this.database != null) {
                    this.database.rollback();
                    this.database.getLocalCache().clear();
                    this.database.setUser(oSecurityUser);
                }
                sendResponseBack(oDistributedRequest, task, executeOnLocalNode);
                OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);
            } catch (Throwable th2) {
                if (this.database != null) {
                    this.database.rollback();
                    this.database.getLocalCache().clear();
                    this.database.setUser(oSecurityUser);
                }
                throw th2;
            }
        } catch (Throwable th3) {
            OScenarioThreadLocal.INSTANCE.set(OScenarioThreadLocal.RUN_MODE.DEFAULT);
            throw th3;
        }
    }

    protected String getPendingRequestMapName() {
        StringBuilder sb = new StringBuilder(128);
        OHazelcastDistributedDatabase oHazelcastDistributedDatabase = this.distributed;
        sb.append("orientdb.node.");
        sb.append(this.manager.getLocalNodeName());
        OHazelcastDistributedDatabase oHazelcastDistributedDatabase2 = this.distributed;
        sb.append(OHazelcastDistributedDatabase.NODE_QUEUE_PENDING_POSTFIX);
        return sb.toString();
    }

    protected String getLocalNodeName() {
        return this.manager.getLocalNodeName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IMap<String, Object> restoreMessagesBeforeFailure(boolean z) {
        ODistributedRequest oDistributedRequest;
        IMap<String, Object> map = this.manager.getHazelcastInstance().getMap(getPendingRequestMapName());
        if (z && (oDistributedRequest = (ODistributedRequest) map.remove(this.databaseName)) != null) {
            ODistributedServerLog.warn(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "restore last replication message before the crash for database '%s': %s...", new Object[]{this.databaseName, oDistributedRequest});
            try {
                initDatabaseInstance();
                if (checkIfOperationHasBeenExecuted(oDistributedRequest, oDistributedRequest.getTask())) {
                    onMessage(oDistributedRequest);
                }
            } catch (Throwable th) {
                ODistributedServerLog.error(this, getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "error on executing restored message for database %s", th, new Object[]{this.databaseName});
            }
        }
        return map;
    }

    protected void hotAlignmentError(ODistributedRequest oDistributedRequest, String str, Object... objArr) throws OHotAlignmentNotPossibleExeption {
        String format = String.format(str, objArr);
        ODistributedServerLog.warn(this, getLocalNodeName(), oDistributedRequest.getSenderNodeName(), ODistributedServerLog.DIRECTION.IN, "- " + format, new Object[0]);
        throw new OHotAlignmentNotPossibleExeption(format);
    }

    protected boolean checkIfOperationHasBeenExecuted(ODistributedRequest oDistributedRequest, OAbstractRemoteTask oAbstractRemoteTask) {
        boolean z = false;
        if (oAbstractRemoteTask instanceof ODeleteRecordTask) {
            z = ((ODeleteRecordTask) oAbstractRemoteTask).getRid().getRecord() != null;
        } else if (oAbstractRemoteTask instanceof OUpdateRecordTask) {
            ORecord record = ((OUpdateRecordTask) oAbstractRemoteTask).getRid().getRecord();
            if (record == null) {
                ODistributedServerLog.warn(this, getLocalNodeName(), oDistributedRequest.getSenderNodeName(), ODistributedServerLog.DIRECTION.IN, "- cannot update deleted record %s, database could be not aligned", new Object[]{((OUpdateRecordTask) oAbstractRemoteTask).getRid()});
            } else {
                z = !record.getRecordVersion().equals(((OUpdateRecordTask) oAbstractRemoteTask).getVersion());
            }
        } else if (oAbstractRemoteTask instanceof OCreateRecordTask) {
            z = ((OCreateRecordTask) oAbstractRemoteTask).getRid().getRecord() == null;
        } else if (oAbstractRemoteTask instanceof OSQLCommandTask) {
            if (!oAbstractRemoteTask.isIdempotent()) {
                hotAlignmentError(oDistributedRequest, "Not able to assure last command has been completed before last crash. Command='%s'", ((OSQLCommandTask) oAbstractRemoteTask).getPayload());
            }
        } else if (oAbstractRemoteTask instanceof OResurrectRecordTask) {
            if (((OResurrectRecordTask) oAbstractRemoteTask).getRid().getRecord() == null) {
                hotAlignmentError(oDistributedRequest, "Not able to resurrect deleted record '%s'", ((OResurrectRecordTask) oAbstractRemoteTask).getRid());
            }
        } else if (oAbstractRemoteTask instanceof OTxTask) {
            Iterator it = ((OTxTask) oAbstractRemoteTask).getTasks().iterator();
            while (it.hasNext()) {
                z = checkIfOperationHasBeenExecuted(oDistributedRequest, (OAbstractRemoteTask) it.next());
                if (z) {
                    return true;
                }
            }
        } else if (oAbstractRemoteTask instanceof OFixTxTask) {
            Iterator it2 = ((OFixTxTask) oAbstractRemoteTask).getTasks().iterator();
            while (it2.hasNext()) {
                z = checkIfOperationHasBeenExecuted(oDistributedRequest, (OAbstractRemoteTask) it2.next());
                if (z) {
                    return true;
                }
            }
        } else {
            hotAlignmentError(oDistributedRequest, "Not able to assure last operation has been completed before last crash. Task='%s'", oAbstractRemoteTask);
        }
        return z;
    }

    private void sendResponseBack(ODistributedRequest oDistributedRequest, OAbstractRemoteTask oAbstractRemoteTask, Serializable serializable) {
        ODistributedServerLog.debug(this, this.manager.getLocalNodeName(), oDistributedRequest.getSenderNodeName(), ODistributedServerLog.DIRECTION.OUT, "sending back response '%s' to request %d (%s)", new Object[]{serializable, Long.valueOf(oDistributedRequest.getId()), oAbstractRemoteTask});
        try {
            if (this.msgService.getQueue(OHazelcastDistributedMessageService.getResponseQueueName(oDistributedRequest.getSenderNodeName())).offer(new OHazelcastDistributedResponse(oDistributedRequest.getId(), this.manager.getLocalNodeName(), oDistributedRequest.getSenderNodeName(), serializable), OGlobalConfiguration.DISTRIBUTED_QUEUE_TIMEOUT.getValueAsLong(), TimeUnit.MILLISECONDS)) {
            } else {
                throw new ODistributedException("Timeout on dispatching response to the thread queue " + oDistributedRequest.getSenderNodeName());
            }
        } catch (Exception e) {
            throw new ODistributedException("Cannot dispatch response to the thread queue " + oDistributedRequest.getSenderNodeName(), e);
        }
    }

    private void createReplicatorUser(ODatabaseDocumentTx oDatabaseDocumentTx, OServerUserConfiguration oServerUserConfiguration) {
        if (oDatabaseDocumentTx.getMetadata().getSecurity().getUser(oServerUserConfiguration.name) == null) {
            oDatabaseDocumentTx.getMetadata().getSecurity().createUser(oServerUserConfiguration.name, oServerUserConfiguration.password, new String[]{"admin"});
        }
    }
}
