package com.redis.smartcache.shaded.com.redis.smartcache.core;

import com.redis.smartcache.shaded.com.fasterxml.jackson.dataformat.javaprop.JavaPropsMapper;
import com.redis.smartcache.shaded.com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.smartcache.shaded.com.redis.lettucemod.api.sync.RedisModulesCommands;
import com.redis.smartcache.shaded.com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.smartcache.shaded.com.redis.micrometer.RediSearchRegistryConfig;
import com.redis.smartcache.shaded.io.lettuce.core.AbstractRedisClient;
import com.redis.smartcache.shaded.io.lettuce.core.Limit;
import com.redis.smartcache.shaded.io.lettuce.core.Range;
import com.redis.smartcache.shaded.io.lettuce.core.StreamMessage;
import com.redis.smartcache.shaded.io.lettuce.core.XReadArgs;
import com.redis.smartcache.shaded.org.slf4j.Marker;
import com.redis.smartcache.shaded.org.springframework.beans.BeanUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/redis/smartcache/shaded/com/redis/smartcache/core/StreamConfigManager.class */
public class StreamConfigManager<T> implements ConfigManager<T> {
    private static final Logger log = Logger.getLogger(StreamConfigManager.class.getName());
    public static final Duration DEFAULT_BLOCK = Duration.ofMillis(300);
    public static final OptionalLong DEFAULT_COUNT = OptionalLong.empty();
    private final AbstractRedisClient client;
    private final String key;
    private final JavaPropsMapper mapper;
    private final T config;
    private Duration block = DEFAULT_BLOCK;
    private OptionalLong count = DEFAULT_COUNT;
    private StreamPollRunnable reader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/smartcache/shaded/com/redis/smartcache/core/StreamConfigManager$State.class */
    public enum State {
        STARTING,
        STARTED,
        STOPPING,
        STOPPED
    }

    /* loaded from: input_file:com/redis/smartcache/shaded/com/redis/smartcache/core/StreamConfigManager$StreamPollRunnable.class */
    private static class StreamPollRunnable implements Runnable {
        private final StatefulRedisModulesConnection<String, String> connection;
        private final XReadArgs xreadArgs;
        private final XReadArgs.StreamOffset<String> offset;
        private final Consumer<StreamMessage<String, String>> consumer;
        private boolean stop;
        private State state = State.STARTING;

        public StreamPollRunnable(StatefulRedisModulesConnection<String, String> statefulRedisModulesConnection, XReadArgs xReadArgs, XReadArgs.StreamOffset<String> streamOffset, Consumer<StreamMessage<String, String>> consumer) {
            this.connection = statefulRedisModulesConnection;
            this.xreadArgs = xReadArgs;
            this.offset = streamOffset;
            this.consumer = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.state = State.STARTED;
            while (!this.stop) {
                this.connection.sync().xread(this.xreadArgs, this.offset).forEach(this.consumer);
            }
            this.connection.close();
            this.state = State.STOPPED;
        }

        public void stop() {
            this.stop = true;
            this.state = State.STOPPING;
        }
    }

    public StreamConfigManager(AbstractRedisClient abstractRedisClient, String str, T t, JavaPropsMapper javaPropsMapper) {
        this.client = abstractRedisClient;
        this.key = str;
        this.config = t;
        this.mapper = javaPropsMapper;
    }

    public void setBlock(Duration duration) {
        this.block = duration;
    }

    public void setCount(long j) {
        setCount(OptionalLong.of(j));
    }

    public void setCount(OptionalLong optionalLong) {
        this.count = optionalLong;
    }

    @Override // com.redis.smartcache.shaded.com.redis.smartcache.core.ConfigManager
    public void start() throws IOException {
        StatefulRedisModulesConnection<String, String> connection = RedisModulesUtils.connection(this.client);
        List<StreamMessage<String, String>> xrevrange = connection.sync().xrevrange(this.key, Range.create(RediSearchRegistryConfig.DEFAULT_INDEX_SEPARATOR, Marker.ANY_NON_NULL_MARKER), Limit.create(0L, 1L));
        if (xrevrange.isEmpty()) {
            Map<String, String> writeValueAsMap = this.mapper.writeValueAsMap(this.config);
            if (!writeValueAsMap.isEmpty()) {
                connection.sync().xadd((RedisModulesCommands<String, String>) this.key, (Map<RedisModulesCommands<String, String>, String>) writeValueAsMap);
            }
        } else {
            update(xrevrange.get(0));
        }
        this.reader = new StreamPollRunnable(connection, xreadArgs(), XReadArgs.StreamOffset.latest(this.key), this::update);
        Executors.newSingleThreadExecutor().submit(this.reader);
    }

    private void update(StreamMessage<String, String> streamMessage) {
        try {
            Object readMapAs = this.mapper.readMapAs(streamMessage.getBody(), this.config.getClass());
            if (readMapAs != null) {
                BeanUtils.copyProperties(readMapAs, this.config);
            }
        } catch (IOException e) {
            log.log(Level.SEVERE, "Could not parse config", (Throwable) e);
        }
    }

    public boolean isRunning() {
        return this.reader.state == State.STARTED;
    }

    private XReadArgs xreadArgs() {
        XReadArgs xReadArgs = new XReadArgs();
        if (!this.block.isZero()) {
            xReadArgs.block(this.block);
        }
        OptionalLong optionalLong = this.count;
        Objects.requireNonNull(xReadArgs);
        optionalLong.ifPresent(xReadArgs::count);
        return xReadArgs;
    }

    @Override // com.redis.smartcache.shaded.com.redis.smartcache.core.ConfigManager
    public T get() {
        return this.config;
    }

    @Override // com.redis.smartcache.shaded.com.redis.smartcache.core.ConfigManager
    public void stop() throws InterruptedException, ExecutionException, TimeoutException {
        this.reader.stop();
        this.reader = null;
    }
}
