package com.erudika.para.queue;

import com.erudika.para.Para;
import com.erudika.para.annotations.Locked;
import com.erudika.para.core.ParaObject;
import com.erudika.para.core.Sysprop;
import com.erudika.para.core.Thing;
import com.erudika.para.core.utils.ParaObjectUtils;
import com.erudika.para.utils.Config;
import com.erudika.para.webhooks.WebhookUtils;
import com.fasterxml.jackson.databind.ObjectReader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/erudika/para/queue/River.class */
public abstract class River implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(River.class);
    private static final int SLEEP = Config.getConfigInt("queue.polling_sleep_seconds", 60);
    public static final int POLLING_INTERVAL;

    abstract List<String> pullMessages();

    @Override // java.lang.Runnable
    public void run() {
        LinkedList linkedList = new LinkedList();
        LinkedList linkedList2 = new LinkedList();
        LinkedList linkedList3 = new LinkedList();
        ObjectReader jsonReader = ParaObjectUtils.getJsonReader(Map.class);
        int i = 0;
        do {
            logger.debug("Waiting {}s for messages...", Integer.valueOf(POLLING_INTERVAL));
            List<String> pullMessages = pullMessages();
            logger.debug("Pulled {} messages from queue.", Integer.valueOf(pullMessages.size()));
            try {
                int i2 = 0;
                for (String str : pullMessages) {
                    logger.debug("Message from queue: {}", str);
                    if (StringUtils.contains(str, "appid") && StringUtils.contains(str, "type")) {
                        i2 += parseAndCategorizeMessage((Map) jsonReader.readValue(str), linkedList, linkedList2, linkedList3);
                    }
                }
                if (!linkedList.isEmpty() || !linkedList2.isEmpty() || !linkedList3.isEmpty() || i2 > 0) {
                    logger.debug("River summary: {} created, {} updated, {} deleted, {} webhooks delivered.", new Object[]{Integer.valueOf(linkedList.size()), Integer.valueOf(linkedList2.size()), Integer.valueOf(linkedList3.size()), Integer.valueOf(i2)});
                    persistChanges(linkedList, linkedList2, linkedList3);
                    i = 0;
                } else if (pullMessages.isEmpty()) {
                    i++;
                    if (SLEEP > 0 && i >= 3) {
                        try {
                            logger.debug("Queue is empty. Sleeping for {}s...", Integer.valueOf(SLEEP));
                            Thread.sleep(SLEEP * 1000);
                        } catch (InterruptedException e) {
                            logger.warn("River interrupted: ", e);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            } catch (Exception e2) {
                logger.error("Batch processing operation failed: {}", e2);
            }
        } while (!Thread.interrupted());
    }

    private int parseAndCategorizeMessage(Map<String, Object> map, List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        String str = map.containsKey("id") ? (String) map.get("id") : null;
        String str2 = (String) map.get("type");
        String str3 = (String) map.get("appid");
        Class cls = ParaObjectUtils.toClass(str2);
        boolean z = cls.equals(Thing.class) || cls.equals(Sysprop.class);
        if (StringUtils.isBlank(str3) || !z) {
            return 0;
        }
        if ("webhookpayload".equals(str2)) {
            return WebhookUtils.processWebhookPayload(str3, str, map);
        }
        if (map.containsKey("_delete") && "true".equals(map.get("_delete")) && str != null) {
            Sysprop sysprop = new Sysprop(str);
            sysprop.setAppid(str3);
            list3.add(sysprop);
            return 0;
        }
        if (str != null && !"true".equals(map.get("_create"))) {
            list2.add(ParaObjectUtils.setAnnotatedFields(Para.getDAO().read(str3, str), map, Locked.class));
            return 0;
        }
        ParaObject annotatedFields = ParaObjectUtils.setAnnotatedFields(map);
        if (annotatedFields == null) {
            return 0;
        }
        list.add(annotatedFields);
        return 0;
    }

    private void persistChanges(List<ParaObject> list, List<ParaObject> list2, List<ParaObject> list3) {
        if (!list.isEmpty()) {
            Para.getDAO().createAll(list);
        }
        if (!list2.isEmpty()) {
            Para.getDAO().updateAll(list2);
        }
        if (!list3.isEmpty()) {
            Para.getDAO().deleteAll(list3);
        }
        list.clear();
        list2.clear();
        list3.clear();
    }

    static {
        POLLING_INTERVAL = Config.getConfigInt("queue.polling_interval_seconds", Config.IN_PRODUCTION ? 20 : 5);
    }
}
