package com.orientechnologies.orient.server.distributed.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.orientechnologies.common.concur.OTimeoutException;
import com.orientechnologies.common.concur.lock.OModificationOperationProhibitedException;
import com.orientechnologies.common.log.OLogManager;
import com.orientechnologies.orient.core.config.OGlobalConfiguration;
import com.orientechnologies.orient.core.db.ODatabaseDocumentInternal;
import com.orientechnologies.orient.core.exception.OConfigurationException;
import com.orientechnologies.orient.core.exception.OStorageException;
import com.orientechnologies.orient.core.metadata.security.OSecurityUser;
import com.orientechnologies.orient.core.metadata.security.OUser;
import com.orientechnologies.orient.server.distributed.ODistributedException;
import com.orientechnologies.orient.server.distributed.ODistributedRequest;
import com.orientechnologies.orient.server.distributed.ODistributedRequestId;
import com.orientechnologies.orient.server.distributed.ODistributedResponse;
import com.orientechnologies.orient.server.distributed.ODistributedServerLog;
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.ORemoteServerController;
import com.orientechnologies.orient.server.distributed.task.ODistributedOperationException;
import com.orientechnologies.orient.server.distributed.task.ORemoteTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:com/orientechnologies/orient/server/distributed/impl/ODistributedWorker.class */
public class ODistributedWorker extends Thread {
    protected final ODistributedDatabaseImpl distributed;
    protected final ODistributedServerManager manager;
    protected final ODistributedMessageServiceImpl msgService;
    protected final String localNodeName;
    protected final String databaseName;
    protected final ArrayBlockingQueue<ODistributedRequest> localQueue;
    protected final int id;
    protected volatile ODatabaseDocumentInternal database;
    protected volatile OUser lastUser;
    protected volatile boolean running = true;
    private AtomicLong processedRequests = new AtomicLong(0);
    private AtomicBoolean waitingForNextRequest = new AtomicBoolean(true);
    private static final long MAX_SHUTDOWN_TIMEOUT = 5000;

    public ODistributedWorker(ODistributedDatabaseImpl oDistributedDatabaseImpl, String str, int i) {
        this.id = i;
        setName("OrientDB DistributedWorker node=" + oDistributedDatabaseImpl.getLocalNodeName() + " db=" + str + " id=" + i);
        this.distributed = oDistributedDatabaseImpl;
        this.localQueue = new ArrayBlockingQueue<>(this.distributed.getManager().getServerInstance().getContextConfiguration().getValueAsInteger(OGlobalConfiguration.DISTRIBUTED_LOCAL_QUEUESIZE));
        this.databaseName = str;
        this.manager = this.distributed.getManager();
        this.msgService = this.distributed.msgService;
        this.localNodeName = this.manager.getLocalNodeName();
    }

    public void processRequest(ODistributedRequest oDistributedRequest) {
        try {
            this.localQueue.put(oDistributedRequest);
        } catch (InterruptedException e) {
            ODistributedServerLog.warn(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Received interruption signal, closing distributed worker thread (worker=%d)", new Object[]{Integer.valueOf(this.id)});
            shutdown();
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        long j = 0;
        while (true) {
            long j2 = j;
            if (!this.running) {
                break;
            }
            ODistributedRequestId oDistributedRequestId = null;
            ODistributedRequest oDistributedRequest = null;
            try {
                oDistributedRequest = readRequest();
                if (oDistributedRequest != null) {
                    oDistributedRequest.getId();
                    oDistributedRequest.getId();
                    onMessage(oDistributedRequest);
                }
            } catch (DistributedObjectDestroyedException e) {
                Thread.currentThread().interrupt();
            } catch (HazelcastInstanceNotActiveException e2) {
                Thread.currentThread().interrupt();
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                try {
                    if (th.getCause() instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    } else {
                        String str = this.localNodeName;
                        String nodeNameById = 0 != 0 ? this.manager.getNodeNameById(oDistributedRequestId.getNodeId()) : "?";
                        ODistributedServerLog.DIRECTION direction = ODistributedServerLog.DIRECTION.IN;
                        Object[] objArr = new Object[3];
                        objArr[0] = oDistributedRequest != null ? oDistributedRequest.getId() : -1;
                        objArr[1] = oDistributedRequest != null ? oDistributedRequest.getTask() : "-";
                        objArr[2] = Integer.valueOf(this.id);
                        ODistributedServerLog.error(this, str, nodeNameById, direction, "Error on executing distributed request %s: (%s) worker=%d", th, objArr);
                    }
                } catch (Throwable th2) {
                    String str2 = this.localNodeName;
                    ODistributedServerLog.DIRECTION direction2 = ODistributedServerLog.DIRECTION.IN;
                    Object[] objArr2 = new Object[3];
                    objArr2[0] = oDistributedRequest != null ? oDistributedRequest.getId() : -1;
                    objArr2[1] = oDistributedRequest != null ? oDistributedRequest.getTask() : "-";
                    objArr2[2] = Integer.valueOf(this.id);
                    ODistributedServerLog.error(this, str2, "?", direction2, "Error on executing distributed request %s: (%s) worker=%d", th, objArr2);
                }
            }
            j = j2 + 1;
        }
        ODistributedServerLog.debug(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "End of reading requests for database %s", new Object[]{this.databaseName});
    }

    public void initDatabaseInstance() {
        if (this.database != null) {
            if (this.database.isClosed()) {
                this.database.close();
                this.database = this.distributed.getDatabaseInstance();
                return;
            }
            return;
        }
        for (int i = 0; i < 100; i++) {
            try {
                this.database = this.distributed.getDatabaseInstance();
                break;
            } catch (OConfigurationException e) {
                if (!dbNotAvailable(i)) {
                    return;
                }
            } catch (OStorageException e2) {
                if (!dbNotAvailable(i)) {
                    return;
                }
            }
        }
        if (this.database == null) {
            ODistributedServerLog.info(this, this.manager.getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "Database '%s' not present, shutting down database manager", new Object[]{this.databaseName});
            this.distributed.shutdown();
            throw new ODistributedException("Cannot open database '" + this.databaseName + "'");
        }
    }

    protected boolean dbNotAvailable(int i) {
        try {
            ODistributedServerLog.info(this, this.manager.getLocalNodeName(), (String) null, ODistributedServerLog.DIRECTION.NONE, "Database '%s' not present, waiting for it (retry=%d/%d)...", new Object[]{this.databaseName, Integer.valueOf(i), 100});
            Thread.sleep(300L);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    public void shutdown() {
        this.running = false;
        int size = this.localQueue.size();
        if (size > 0) {
            ODistributedServerLog.info(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Received shutdown signal, waiting for distributed worker queue is empty (pending msgs=%d)...", new Object[]{Integer.valueOf(size)});
        }
        interrupt();
        try {
            if (size > 0) {
                try {
                    join(MAX_SHUTDOWN_TIMEOUT);
                } catch (Exception e) {
                    ODistributedServerLog.debug(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Interrupted shutdown of distributed worker thread", new Object[0]);
                }
            }
            ODistributedServerLog.debug(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Shutdown distributed worker '%s' completed", new Object[]{getName()});
            this.localQueue.clear();
            if (this.database != null) {
                this.database.activateOnCurrentThread();
                this.database.close();
            }
        } catch (Exception e2) {
            ODistributedServerLog.warn(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Error on shutting down distributed worker '%s'", e2, new Object[]{getName()});
        }
    }

    public ODatabaseDocumentInternal getDatabase() {
        return this.database;
    }

    protected ODistributedRequest readRequest() throws InterruptedException {
        ODistributedRequest nextMessage = nextMessage();
        if (this.manager.isOffline()) {
            waitNodeIsOnline();
        }
        if (ODistributedServerLog.isDebugEnabled()) {
            String nodeNameById = this.manager.getNodeNameById(nextMessage.getId().getNodeId());
            ODistributedServerLog.debug(this, this.localNodeName, nodeNameById, ODistributedServerLog.DIRECTION.IN, "Processing request=(%s) sourceNode=%s worker=%d", new Object[]{nextMessage, nodeNameById, Integer.valueOf(this.id)});
        }
        return nextMessage;
    }

    public boolean isWaitingForNextRequest() {
        return this.waitingForNextRequest.get();
    }

    protected ODistributedRequest nextMessage() throws InterruptedException {
        this.waitingForNextRequest.set(true);
        ODistributedRequest take = this.localQueue.take();
        this.waitingForNextRequest.set(false);
        this.processedRequests.incrementAndGet();
        return take;
    }

    protected void onMessage(ODistributedRequest oDistributedRequest) {
        String str = null;
        for (int i = 0; i < 10; i++) {
            str = this.manager.getNodeNameById(oDistributedRequest.getId().getNodeId());
            if (str != null) {
                break;
            }
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ODistributedException("Execution has been interrupted");
            }
        }
        if (str == null) {
            ODistributedServerLog.warn(this, this.localNodeName, str, ODistributedServerLog.DIRECTION.IN, "Sender server id %d is not registered in the cluster configuration, discard the request: (%s) (worker=%d)", new Object[]{Integer.valueOf(oDistributedRequest.getId().getNodeId()), oDistributedRequest, Integer.valueOf(this.id)});
            sendResponseBack(oDistributedRequest, new ODistributedException("Sender server id " + oDistributedRequest.getId().getNodeId() + " is not registered in the cluster configuration, discard the request"));
            return;
        }
        ORemoteTask task = oDistributedRequest.getTask();
        if (ODistributedServerLog.isDebugEnabled()) {
            ODistributedServerLog.debug(this, this.localNodeName, str, ODistributedServerLog.DIRECTION.IN, "Received request: (%s) (worker=%d)", new Object[]{oDistributedRequest, Integer.valueOf(this.id)});
        }
        Object obj = null;
        OSecurityUser oSecurityUser = null;
        try {
            try {
                waitNodeIsOnline();
                this.distributed.waitIsReady(task);
                if (task.isUsingDatabase()) {
                    initDatabaseInstance();
                    if (this.database == null) {
                        throw new ODistributedOperationException("Error on executing remote request because the database '" + this.databaseName + "' is not available");
                    }
                }
                if (this.database != null) {
                    this.database.activateOnCurrentThread();
                    oSecurityUser = this.database.getUser();
                    try {
                        if (oDistributedRequest.getUserRID() == null || !oDistributedRequest.getUserRID().isValid() || (this.lastUser != null && this.lastUser.getIdentity().equals(oDistributedRequest.getUserRID()))) {
                            oSecurityUser = null;
                        } else {
                            this.lastUser = this.database.getMetadata().getSecurity().getUser(oDistributedRequest.getUserRID());
                            this.database.setUser(this.lastUser);
                        }
                    } catch (Throwable th) {
                        OLogManager.instance().error(this, "Failed on user switching database. " + th.getMessage(), new Object[0]);
                    }
                }
                int i2 = 1;
                while (true) {
                    if (!this.running) {
                        break;
                    }
                    obj = this.manager.executeOnLocalNode(oDistributedRequest.getId(), oDistributedRequest.getTask(), this.database);
                    if (obj instanceof OModificationOperationProhibitedException) {
                        try {
                            ODistributedServerLog.info(this, this.localNodeName, str, ODistributedServerLog.DIRECTION.IN, "Database is frozen, waiting and retrying. Request %s (retry=%d, worker=%d)", new Object[]{oDistributedRequest, Integer.valueOf(i2), Integer.valueOf(this.id)});
                            Thread.sleep(1000L);
                        } catch (InterruptedException e2) {
                        }
                        i2++;
                    } else if (i2 > 1) {
                        ODistributedServerLog.info(this, this.localNodeName, str, ODistributedServerLog.DIRECTION.IN, "Request %s succeed after retry=%d", new Object[]{oDistributedRequest, Integer.valueOf(i2)});
                    }
                }
                if (this.database != null && !this.database.isClosed()) {
                    this.database.activateOnCurrentThread();
                    if (!this.database.isClosed()) {
                        this.database.rollback();
                        this.database.getLocalCache().clear();
                        if (oSecurityUser != null) {
                            this.database.setUser(oSecurityUser);
                        }
                    }
                }
                sendResponseBack(oDistributedRequest, obj);
            } catch (Throwable th2) {
                if (this.database != null && !this.database.isClosed()) {
                    this.database.activateOnCurrentThread();
                    if (!this.database.isClosed()) {
                        this.database.rollback();
                        this.database.getLocalCache().clear();
                        if (0 != 0) {
                            this.database.setUser((OSecurityUser) null);
                        }
                    }
                }
                throw th2;
            }
        } catch (RuntimeException e3) {
            sendResponseBack(oDistributedRequest, e3);
            throw e3;
        }
    }

    protected String getLocalNodeName() {
        return this.localNodeName;
    }

    private void sendResponseBack(ODistributedRequest oDistributedRequest, Object obj) {
        sendResponseBack(this, this.manager, oDistributedRequest, obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void sendResponseBack(Object obj, ODistributedServerManager oDistributedServerManager, ODistributedRequest oDistributedRequest, Object obj2) {
        if (oDistributedRequest.getId().getMessageId() < 0) {
            return;
        }
        String localNodeName = oDistributedServerManager.getLocalNodeName();
        String nodeNameById = oDistributedServerManager.getNodeNameById(oDistributedRequest.getId().getNodeId());
        ODistributedResponse oDistributedResponse = new ODistributedResponse(oDistributedRequest.getId(), localNodeName, nodeNameById, obj2);
        try {
            ORemoteServerController remoteServer = oDistributedServerManager.getRemoteServer(nodeNameById);
            ODistributedServerLog.debug(obj, localNodeName, nodeNameById, ODistributedServerLog.DIRECTION.OUT, "Sending response %s back (reqId=%s)", new Object[]{oDistributedResponse, oDistributedRequest});
            remoteServer.sendResponse(oDistributedResponse);
        } catch (Exception e) {
            ODistributedServerLog.debug(obj, localNodeName, nodeNameById, ODistributedServerLog.DIRECTION.OUT, "Error on sending response '%s' back (reqId=%s err=%s)", new Object[]{oDistributedResponse, oDistributedRequest.getId(), e.toString()});
        }
    }

    private void waitNodeIsOnline() throws OTimeoutException {
        ODistributedServerManager distributedManager = this.manager.getServerInstance().getDistributedManager();
        if (distributedManager != null && distributedManager.isEnabled() && distributedManager.isOffline()) {
            int i = 0;
            while (this.running && distributedManager != null && distributedManager.isOffline()) {
                ODistributedServerLog.info(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Node is not online yet (status=%s), blocking the command until it is online (retry=%d, queue=%d worker=%d)", new Object[]{distributedManager.getNodeStatus(), Integer.valueOf(i + 1), Integer.valueOf(this.localQueue.size()), Integer.valueOf(this.id)});
                if (this.localQueue.size() >= this.manager.getServerInstance().getContextConfiguration().getValueAsInteger(OGlobalConfiguration.DISTRIBUTED_LOCAL_QUEUESIZE)) {
                    ODistributedServerLog.warn(this, this.localNodeName, (String) null, ODistributedServerLog.DIRECTION.NONE, "Replication queue is full (retry=%d, queue=%d worker=%d), replication could be delayed", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(this.localQueue.size()), Integer.valueOf(this.id)});
                }
                try {
                    Thread.sleep(2000L);
                } catch (InterruptedException e) {
                }
                i++;
            }
        }
    }

    public long getProcessedRequests() {
        return this.processedRequests.get();
    }

    public void reset() {
        this.localQueue.clear();
        if (this.database != null) {
            this.database.activateOnCurrentThread();
            this.database.close();
            this.database = null;
        }
    }

    public void sendShutdown() {
        this.running = false;
        interrupt();
    }
}
