package org.apache.hedwig.server.persistence;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.sql.rowset.serial.SerialBlob;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/persistence/LocalDBPersistenceManager.class */
public class LocalDBPersistenceManager implements PersistenceManagerWithRangeScan {
    static Logger logger = LoggerFactory.getLogger(LocalDBPersistenceManager.class);
    static String connectionURL;
    private static final ThreadLocal<Connection> threadLocalConnection;
    private static final ThreadLocal<MessageDigest> threadLocalDigest;
    static final String ID_FIELD_NAME = "id";
    static final String MSG_FIELD_NAME = "msg";
    static final String driver = "org.apache.derby.jdbc.EmbeddedDriver";
    static final int SCAN_CHUNK = 1000;
    private int version = 0;
    ConcurrentMap<ByteString, PubSubProtocol.MessageSeqId> currTopicSeqIds = new ConcurrentHashMap();
    static LocalDBPersistenceManager instance;

    public static LocalDBPersistenceManager instance() {
        return instance;
    }

    private LocalDBPersistenceManager() {
        try {
            Class.forName(driver).newInstance();
            logger.info("Derby Driver loaded");
        } catch (ClassNotFoundException e) {
            logger.error("Derby driver not found", e);
        } catch (IllegalAccessException e2) {
            logger.error("Could not instantiate derby driver", e2);
        } catch (InstantiationException e3) {
            logger.error("Could not instantiate derby driver", e3);
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void stop() {
    }

    private PubSubProtocol.MessageSeqId ensureSeqIdExistsForTopic(ByteString byteString) {
        PubSubProtocol.MessageSeqId messageSeqId = this.currTopicSeqIds.get(byteString);
        if (messageSeqId != null) {
            return messageSeqId;
        }
        PubSubProtocol.MessageSeqId build = PubSubProtocol.MessageSeqId.newBuilder().setLocalComponent(0L).build();
        PubSubProtocol.MessageSeqId putIfAbsent = this.currTopicSeqIds.putIfAbsent(byteString, build);
        return putIfAbsent != null ? putIfAbsent : build;
    }

    private long adjustTopicSeqIdForPublish(ByteString byteString, PubSubProtocol.Message message) throws PubSubException.UnexpectedConditionException {
        PubSubProtocol.MessageSeqId ensureSeqIdExistsForTopic;
        long localComponent;
        PubSubProtocol.MessageSeqId.Builder newBuilder = PubSubProtocol.MessageSeqId.newBuilder();
        do {
            ensureSeqIdExistsForTopic = ensureSeqIdExistsForTopic(byteString);
            localComponent = ensureSeqIdExistsForTopic.getLocalComponent() + 1;
            newBuilder.setLocalComponent(localComponent);
            if (message.hasMsgId()) {
                MessageIdUtils.takeRegionMaximum(newBuilder, message.getMsgId(), ensureSeqIdExistsForTopic);
            } else {
                newBuilder.addAllRemoteComponents(ensureSeqIdExistsForTopic.getRemoteComponentsList());
            }
        } while (!this.currTopicSeqIds.replace(byteString, ensureSeqIdExistsForTopic, newBuilder.build()));
        return localComponent;
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public long getSeqIdAfterSkipping(ByteString byteString, long j, int i) {
        return j + i;
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void persistMessage(PersistRequest persistRequest) {
        boolean z;
        Connection connection = threadLocalConnection.get();
        Callback<PubSubProtocol.MessageSeqId> callback = persistRequest.getCallback();
        Object ctx = persistRequest.getCtx();
        ByteString topic = persistRequest.getTopic();
        PubSubProtocol.Message message = persistRequest.getMessage();
        if (connection == null) {
            callback.operationFailed(ctx, new PubSubException.ServiceDownException("Not connected to derby"));
            return;
        }
        try {
            long adjustTopicSeqIdForPublish = adjustTopicSeqIdForPublish(topic, message);
            boolean z2 = false;
            while (true) {
                try {
                    z = z2;
                    message.getBody();
                    PreparedStatement prepareStatement = connection.prepareStatement("INSERT INTO " + getTableNameForTopic(topic) + " VALUES(?,?)");
                    prepareStatement.setLong(1, adjustTopicSeqIdForPublish);
                    prepareStatement.setBlob(2, (Blob) new SerialBlob(message.toByteArray()));
                    int executeUpdate = prepareStatement.executeUpdate();
                    prepareStatement.close();
                    if (executeUpdate == 1) {
                        callback.operationFinished(ctx, MessageIdUtils.mergeLocalSeqId(message, adjustTopicSeqIdForPublish).getMsgId());
                        return;
                    } else {
                        logger.error("Unexpected number of affected rows from derby");
                        callback.operationFailed(ctx, new PubSubException.ServiceDownException("Unexpected response from derby"));
                        return;
                    }
                } catch (SQLException e) {
                    if (!e.getSQLState().equals("42X05") || z) {
                        logger.error("Error while executing derby insert", e);
                        callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
                    } else {
                        createTable(connection, topic);
                        z2 = true;
                    }
                }
            }
            logger.error("Error while executing derby insert", e);
            callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
        } catch (PubSubException.UnexpectedConditionException e2) {
            callback.operationFailed(ctx, e2);
        }
    }

    private void createTable(Connection connection, ByteString byteString) {
        Statement statement = null;
        try {
            try {
                statement = connection.createStatement();
                String tableNameForTopic = getTableNameForTopic(byteString);
                statement.execute("CREATE TABLE " + tableNameForTopic + " (" + ID_FIELD_NAME + " BIGINT NOT NULL CONSTRAINT ID_PK_" + tableNameForTopic + " PRIMARY KEY," + MSG_FIELD_NAME + " BLOB(2M) NOT NULL)");
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e) {
                        logger.error("Error closing statement", e);
                    }
                }
            } catch (SQLException e2) {
                logger.debug("Could not create table", e2);
                if (statement != null) {
                    try {
                        statement.close();
                    } catch (SQLException e3) {
                        logger.error("Error closing statement", e3);
                    }
                }
            }
        } catch (Throwable th) {
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e4) {
                    logger.error("Error closing statement", e4);
                    throw th;
                }
            }
            throw th;
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public PubSubProtocol.MessageSeqId getCurrentSeqIdForTopic(ByteString byteString) {
        return ensureSeqIdExistsForTopic(byteString);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void scanSingleMessage(ScanRequest scanRequest) {
        scanMessagesInternal(scanRequest.getTopic(), scanRequest.getStartSeqId(), 1, Long.MAX_VALUE, scanRequest.getCallback(), scanRequest.getCtx(), 1);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan
    public void scanMessages(RangeScanRequest rangeScanRequest) {
        scanMessagesInternal(rangeScanRequest.getTopic(), rangeScanRequest.getStartSeqId(), rangeScanRequest.getMessageLimit(), rangeScanRequest.getSizeLimit(), rangeScanRequest.getCallback(), rangeScanRequest.getCtx(), SCAN_CHUNK);
    }

    private String getTableNameForTopic(ByteString byteString) {
        String str = byteString.toStringUtf8() + "_" + this.version;
        threadLocalDigest.get().reset();
        return String.format("TABLE_%032X", new BigInteger(1, threadLocalDigest.get().digest(str.getBytes())));
    }

    private void scanMessagesInternal(ByteString byteString, long j, int i, long j2, ScanCallback scanCallback, Object obj, int i2) {
        Connection connection = threadLocalConnection.get();
        if (connection == null) {
            scanCallback.scanFailed(obj, new PubSubException.ServiceDownException("Not connected to derby"));
            return;
        }
        long j3 = j;
        PreparedStatement preparedStatement = null;
        try {
            try {
                try {
                    preparedStatement = connection.prepareStatement("SELECT * FROM " + getTableNameForTopic(byteString) + " WHERE " + ID_FIELD_NAME + " >= ?  AND " + ID_FIELD_NAME + " <= ?");
                    int i3 = 0;
                    long j4 = 0;
                    while (true) {
                        preparedStatement.setLong(1, j3);
                        preparedStatement.setLong(2, j3 + i2);
                        if (!preparedStatement.execute()) {
                            logger.error("Select query did not return a result set");
                            preparedStatement.close();
                            scanCallback.scanFailed(obj, new PubSubException.ServiceDownException("Select query did not return a result set"));
                            if (preparedStatement != null) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e) {
                                    logger.error("Error closing statement", e);
                                    return;
                                }
                            }
                            return;
                        }
                        ResultSet resultSet = preparedStatement.getResultSet();
                        if (!resultSet.next()) {
                            preparedStatement.close();
                            scanCallback.scanFinished(obj, ScanCallback.ReasonForFinish.NO_MORE_MESSAGES);
                            if (preparedStatement != null) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e2) {
                                    logger.error("Error closing statement", e2);
                                    return;
                                }
                            }
                            return;
                        }
                        do {
                            scanCallback.messageScanned(obj, MessageIdUtils.mergeLocalSeqId(PubSubProtocol.Message.newBuilder().mergeFrom(resultSet.getBinaryStream(2)), resultSet.getLong(1)));
                            i3++;
                            j4 += r0.getBody().size();
                            if (i3 > i) {
                                preparedStatement.close();
                                scanCallback.scanFinished(obj, ScanCallback.ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
                                if (preparedStatement != null) {
                                    try {
                                        preparedStatement.close();
                                    } catch (SQLException e3) {
                                        logger.error("Error closing statement", e3);
                                        return;
                                    }
                                }
                                return;
                            }
                            if (j4 > j2) {
                                preparedStatement.close();
                                scanCallback.scanFinished(obj, ScanCallback.ReasonForFinish.SIZE_LIMIT_EXCEEDED);
                                if (preparedStatement != null) {
                                    try {
                                        preparedStatement.close();
                                    } catch (SQLException e4) {
                                        logger.error("Error closing statement", e4);
                                        return;
                                    }
                                }
                                return;
                            }
                        } while (resultSet.next());
                        j3 += 1000;
                    }
                } catch (Throwable th) {
                    if (0 != 0) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e5) {
                            logger.error("Error closing statement", e5);
                            throw th;
                        }
                    }
                    throw th;
                }
            } catch (SQLException e6) {
                if (!e6.getSQLState().equals("42X05")) {
                    throw e6;
                }
                scanCallback.scanFinished(obj, ScanCallback.ReasonForFinish.NO_MORE_MESSAGES);
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e7) {
                        logger.error("Error closing statement", e7);
                    }
                }
            }
        } catch (IOException e8) {
            logger.error("Message stored in derby is not parseable", e8);
            scanCallback.scanFailed(obj, new PubSubException.ServiceDownException(e8));
            if (0 != 0) {
                try {
                    preparedStatement.close();
                } catch (SQLException e9) {
                    logger.error("Error closing statement", e9);
                }
            }
        } catch (SQLException e10) {
            logger.error("SQL Exception", e10);
            scanCallback.scanFailed(obj, new PubSubException.ServiceDownException(e10));
            if (0 != 0) {
                try {
                    preparedStatement.close();
                } catch (SQLException e11) {
                    logger.error("Error closing statement", e11);
                }
            }
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void deliveredUntil(ByteString byteString, Long l) {
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumedUntil(ByteString byteString, Long l) {
        Connection connection = threadLocalConnection.get();
        if (connection == null) {
            logger.error("Not connected to derby");
            return;
        }
        PreparedStatement preparedStatement = null;
        try {
            try {
                preparedStatement = connection.prepareStatement("DELETE FROM " + getTableNameForTopic(byteString) + " WHERE " + ID_FIELD_NAME + " <= ?");
                preparedStatement.setLong(1, l.longValue());
                int executeUpdate = preparedStatement.executeUpdate();
                if (logger.isDebugEnabled()) {
                    logger.debug("Deleted " + executeUpdate + " records for topic: " + byteString.toStringUtf8() + ", seqId: " + l);
                }
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        logger.error("Error closing statement", e);
                    }
                }
            } catch (SQLException e2) {
                if (e2.getSQLState().equals("42X05")) {
                    logger.warn("Table for topic (" + byteString + ") does not exist so no consumed messages to delete!");
                } else {
                    logger.error("Error while executing derby delete for consumed messages", e2);
                }
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e3) {
                        logger.error("Error closing statement", e3);
                    }
                }
            }
        } catch (Throwable th) {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e4) {
                    logger.error("Error closing statement", e4);
                    throw th;
                }
            }
            throw th;
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void setMessageBound(ByteString byteString, Integer num) {
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void clearMessageBound(ByteString byteString) {
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumeToBound(ByteString byteString) {
    }

    protected void finalize() throws Throwable {
        if (driver.equals(driver)) {
            boolean z = false;
            try {
                DriverManager.getConnection("jdbc:derby:;shutdown=true").close();
            } catch (SQLException e) {
                if (e.getSQLState().equals("XJ015")) {
                    z = true;
                }
            }
            if (z) {
                logger.info("Database shut down normally");
            } else {
                logger.error("Database did not shut down normally");
            }
        }
        super.finalize();
    }

    public void reset() {
        this.version++;
        this.currTopicSeqIds.clear();
    }

    static {
        try {
            File createTempDirectory = FileUtils.createTempDirectory("derby", (String) null);
            if (!createTempDirectory.delete()) {
                throw new IOException("Could not delete dir: " + createTempDirectory.getAbsolutePath());
            }
            connectionURL = "jdbc:derby:" + createTempDirectory.getAbsolutePath() + ";create=true";
            threadLocalConnection = new ThreadLocal<Connection>() { // from class: org.apache.hedwig.server.persistence.LocalDBPersistenceManager.1
                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.lang.ThreadLocal
                public Connection initialValue() {
                    try {
                        return DriverManager.getConnection(LocalDBPersistenceManager.connectionURL);
                    } catch (SQLException e) {
                        LocalDBPersistenceManager.logger.error("Could not connect to derby", e);
                        return null;
                    }
                }
            };
            threadLocalDigest = new ThreadLocal<MessageDigest>() { // from class: org.apache.hedwig.server.persistence.LocalDBPersistenceManager.2
                /* JADX INFO: Access modifiers changed from: protected */
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.lang.ThreadLocal
                public MessageDigest initialValue() {
                    try {
                        return MessageDigest.getInstance("MD5");
                    } catch (NoSuchAlgorithmException e) {
                        LocalDBPersistenceManager.logger.error("Could not find MD5 hash", e);
                        return null;
                    }
                }
            };
            instance = new LocalDBPersistenceManager();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
