package com.erudika.para.queue;

import com.erudika.para.Para;
import com.erudika.para.annotations.Locked;
import com.erudika.para.core.App;
import com.erudika.para.core.ParaObject;
import com.erudika.para.core.Sysprop;
import com.erudika.para.core.utils.ParaObjectUtils;
import com.erudika.para.utils.Config;
import com.erudika.para.utils.HealthUtils;
import com.erudika.para.utils.Pager;
import com.fasterxml.jackson.databind.ObjectReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.hibernate.validator.internal.metadata.core.ConstraintHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/para-server-1.42.1.jar:com/erudika/para/queue/River.class */
public abstract class River implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) River.class);
    private static final int SLEEP = Config.getConfigInt("queue.polling_sleep_seconds", 60);
    private static final int MAX_FAILED_WEBHOOK_ATTEMPTS = Config.getConfigInt("max_failed_webhook_attempts", 10);
    private static final int MAX_INDEXING_RETRIES = Config.getConfigInt("river.max_indexing_retries", 5);
    private static final CloseableHttpClient HTTP = HttpClientBuilder.create().setConnectionReuseStrategy((httpRequest, httpResponse, httpContext) -> {
        return false;
    }).setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(10, TimeUnit.SECONDS).setConnectionRequestTimeout(10, TimeUnit.SECONDS).build()).build();
    private static ConcurrentHashMap<String, Integer> pendingIds;
    public static final int POLLING_INTERVAL;

    abstract List<String> pullMessages();

    @Override // java.lang.Runnable
    public void run() {
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        LinkedList linkedList3 = new LinkedList();
        ObjectReader jsonReader = ParaObjectUtils.getJsonReader(Map.class);
        int i = 0;
        while (!Thread.interrupted()) {
            try {
                logger.debug("Waiting {}s for messages...", Integer.valueOf(POLLING_INTERVAL));
                int i2 = 0;
                List<String> emptyList = Collections.emptyList();
                if (HealthUtils.getInstance().isHealthy()) {
                    try {
                        emptyList = pullMessages();
                        logger.debug("Pulled {} messages from queue.", Integer.valueOf(emptyList.size()));
                        for (String str : emptyList) {
                            logger.debug("Message from queue: {}", str);
                            if (StringUtils.contains(str, Config._APPID) && StringUtils.contains(str, "type")) {
                                i2 += parseAndCategorizeMessage((Map) jsonReader.readValue(str), linkedList, linkedList2, linkedList3);
                            }
                        }
                    } catch (Exception e) {
                        logger.error("Batch processing operation failed:", (Throwable) e);
                    }
                }
                if (!linkedList.isEmpty() || !linkedList2.isEmpty() || !linkedList3.isEmpty() || i2 > 0) {
                    logger.debug("River summary: {} created, {} updated, {} deleted, {} webhooks delivered.", Integer.valueOf(linkedList.size()), Integer.valueOf(linkedList2.size()), Integer.valueOf(linkedList3.size()), Integer.valueOf(i2));
                    persistChanges(linkedList, linkedList2, linkedList3);
                    i = 0;
                } else if (emptyList.isEmpty()) {
                    i++;
                    if (SLEEP > 0 && i >= 3) {
                        logger.debug("Queue is empty. Sleeping for {}s...", Integer.valueOf(SLEEP));
                        Thread.sleep(SLEEP * 1000);
                    }
                }
            } catch (InterruptedException e2) {
                logger.info("River interrupted: {}", e2.getMessage());
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    private int parseAndCategorizeMessage(Map<String, Object> map, List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        String str = map.containsKey("id") ? (String) map.get("id") : null;
        String str2 = (String) map.get("type");
        String str3 = (String) map.get(Config._APPID);
        boolean equals = ParaObjectUtils.toClass(str2).equals(Sysprop.class);
        if (StringUtils.isBlank(str3) || !equals) {
            return 0;
        }
        if ("webhookpayload".equals(str2)) {
            return processWebhookPayload(str3, str, map);
        }
        if ("indexpayload".equals(str2)) {
            return processIndexPayload(str3, str, map);
        }
        if (map.containsKey("_delete") && "true".equals(map.get("_delete")) && str != null) {
            Sysprop sysprop = new Sysprop(str);
            sysprop.setAppid(str3);
            list3.add(sysprop);
            return 0;
        }
        if (str != null && !"true".equals(map.get("_create"))) {
            list2.add(ParaObjectUtils.setAnnotatedFields(Para.getDAO().read(str3, str), map, Locked.class));
            return 0;
        }
        ParaObject annotatedFields = ParaObjectUtils.setAnnotatedFields(map);
        if (annotatedFields == null) {
            return 0;
        }
        list.add(annotatedFields);
        return 0;
    }

    /* JADX WARN: Removed duplicated region for block: B:24:0x014f A[Catch: Exception -> 0x0225, all -> 0x02f6, Exception -> 0x03ad, TryCatch #1 {all -> 0x02f6, blocks: (B:18:0x00e6, B:53:0x00f5, B:55:0x0106, B:24:0x014f, B:21:0x013a, B:48:0x015e, B:46:0x0171, B:51:0x0168, B:72:0x0227), top: B:17:0x00e6, outer: #0 }] */
    /* JADX WARN: Removed duplicated region for block: B:27:0x0177 A[Catch: Exception -> 0x03ad, DONT_GENERATE, TryCatch #0 {Exception -> 0x03ad, blocks: (B:10:0x0023, B:13:0x007a, B:15:0x00a6, B:18:0x00e6, B:53:0x00f5, B:55:0x0106, B:24:0x014f, B:27:0x0177, B:29:0x01a7, B:30:0x01ad, B:32:0x01ba, B:34:0x01ce, B:39:0x020a, B:21:0x013a, B:48:0x015e, B:46:0x0171, B:51:0x0168, B:72:0x0227, B:74:0x0248, B:76:0x0278, B:77:0x027e, B:79:0x028b, B:81:0x029f, B:85:0x02db, B:59:0x02fd, B:61:0x032d, B:62:0x0333, B:64:0x0340, B:66:0x0354, B:68:0x0390, B:70:0x03aa, B:87:0x00c8), top: B:9:0x0023, inners: #1, #2 }] */
    /* JADX WARN: Removed duplicated region for block: B:40:0x03ab A[ORIG_RETURN, RETURN] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected int processWebhookPayload(java.lang.String r8, java.lang.String r9, java.util.Map<java.lang.String, java.lang.Object> r10) {
        /*
            Method dump skipped, instructions count: 957
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.erudika.para.queue.River.processWebhookPayload(java.lang.String, java.lang.String, java.util.Map):int");
    }

    protected int processIndexPayload(String str, String str2, Map<String, Object> map) {
        if (!Config.isSearchEnabled() || StringUtils.isBlank(str2) || map.isEmpty()) {
            return 0;
        }
        Object obj = map.get(ConstraintHelper.PAYLOAD);
        try {
            boolean z = -1;
            switch (str2.hashCode()) {
                case -1021108635:
                    if (str2.equals("unindex_all_op")) {
                        z = true;
                        break;
                    }
                    break;
                case -1018717054:
                    if (str2.equals("delete_index_op")) {
                        z = 4;
                        break;
                    }
                    break;
                case -797884398:
                    if (str2.equals("rebuild_index_op")) {
                        z = 2;
                        break;
                    }
                    break;
                case 209268044:
                    if (str2.equals("index_all_op")) {
                        z = false;
                        break;
                    }
                    break;
                case 1028893777:
                    if (str2.equals("create_index_op")) {
                        z = 3;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    indexAllWithRetry(str, obj);
                    return 1;
                case true:
                    Para.getSearch().unindexAll(str, getPayloadObjects(str, obj));
                    return 1;
                case true:
                    Para.getSearch().rebuildIndex(Para.getDAO(), (App) ParaObjectUtils.setAnnotatedFields((Map) obj), "", new Pager[0]);
                    return 1;
                case true:
                    Para.getSearch().createIndex((App) ParaObjectUtils.setAnnotatedFields((Map) obj));
                    return 1;
                case true:
                    Para.getSearch().deleteIndex((App) ParaObjectUtils.setAnnotatedFields((Map) obj));
                    return 1;
                default:
                    return 1;
            }
        } catch (Exception e) {
            logger.error("Indexing operation " + str2 + " failed for app '" + str + "'!", (Throwable) e);
            return 0;
        }
    }

    private void persistChanges(List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        if (!list.isEmpty()) {
            Para.getDAO().createAll(list);
        }
        if (!list2.isEmpty()) {
            Para.getDAO().updateAll(list2);
        }
        if (!list3.isEmpty()) {
            Para.getDAO().deleteAll(list3);
        }
        list.clear();
        list2.clear();
        list3.clear();
    }

    private void indexAllWithRetry(String str, Object obj) {
        List<String> list = (List) Optional.ofNullable(obj).orElse(Collections.emptyList());
        Para.getCache().removeAll(str, list);
        Map readAll = Para.getDAO().readAll(str, list, true);
        Para.getSearch().indexAll(str, (List) readAll.values().stream().filter(paraObject -> {
            return paraObject != null;
        }).collect(Collectors.toList()));
        if (readAll.containsValue(null)) {
            if (pendingIds == null) {
                pendingIds = new ConcurrentHashMap<>();
            }
            readAll.entrySet().stream().filter(entry -> {
                return entry.getValue() == null;
            }).forEachOrdered(entry2 -> {
                pendingIds.putIfAbsent((String) entry2.getKey(), 1);
            });
            logger.debug("Some objects are missing from local database while performing 'index_all_op': {}", pendingIds);
            Para.asyncExecute(() -> {
                try {
                    for (int i = 0; i < MAX_INDEXING_RETRIES; i++) {
                        try {
                            Thread.sleep(1000 * (i + 1));
                            Map readAll2 = Para.getDAO().readAll(str, new ArrayList(pendingIds.keySet()), true);
                            int size = pendingIds.size();
                            readAll2.entrySet().stream().filter(entry3 -> {
                                return entry3.getValue() != null;
                            }).forEachOrdered(entry4 -> {
                                pendingIds.remove(entry4.getKey());
                            });
                            if (size != pendingIds.size()) {
                                Para.getSearch().indexAll(str, (List) readAll2.values().stream().collect(Collectors.toList()));
                            }
                            if (pendingIds.isEmpty()) {
                                break;
                            }
                        } catch (InterruptedException e) {
                            logger.info("Retry indexing operation interrupted: {}", e.getMessage());
                            Thread.currentThread().interrupt();
                            if (pendingIds.isEmpty()) {
                                return;
                            }
                            logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                            pendingIds.clear();
                            return;
                        }
                    }
                    if (pendingIds.isEmpty()) {
                        return;
                    }
                    logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                    pendingIds.clear();
                } catch (Throwable th) {
                    if (!pendingIds.isEmpty()) {
                        logger.warn("Indexing operation 'index_all_op' failed for objects {} as they were not found in the database for app '{}'. This may cause the index to become out of sync or corrupted.", pendingIds, str);
                        pendingIds.clear();
                    }
                    throw th;
                }
            });
        }
    }

    private List<ParaObject> getPayloadObjects(String str, Object obj) {
        List<String> list = (List) Optional.ofNullable(obj).orElse(Collections.emptyList());
        Para.getCache().removeAll(str, list);
        return (List) list.stream().map(str2 -> {
            Sysprop sysprop = new Sysprop(str2);
            sysprop.setAppid(str);
            return sysprop;
        }).collect(Collectors.toList());
    }

    static {
        POLLING_INTERVAL = Config.getConfigInt("queue.polling_interval_seconds", Config.IN_PRODUCTION ? 20 : 5);
    }
}
