package com.cinchapi.concourse.server.plugin;

import com.cinchapi.common.reflect.Reflection;
import com.cinchapi.concourse.annotate.PackagePrivate;
import com.cinchapi.concourse.server.plugin.RemoteMessage;
import com.cinchapi.concourse.server.plugin.io.MessageQueue;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@PackagePrivate
/* loaded from: input_file:com/cinchapi/concourse/server/plugin/RealTimePlugin.class */
abstract class RealTimePlugin extends Plugin {
    static final String STREAM_ATTRIBUTE = "stream";
    private final ExecutorService workers;

    public RealTimePlugin(String str, String str2) {
        super(str, str2);
        this.workers = Executors.newCachedThreadPool();
    }

    @Override // com.cinchapi.concourse.server.plugin.Plugin
    public final void run() {
        Reflection.call(this, "setReadyState", new Object[0]);
        RemoteMessage remoteMessage = (RemoteMessage) this.serializer.deserialize(this.fromServer.read());
        if (remoteMessage.type() != RemoteMessage.Type.ATTRIBUTE) {
            throw new IllegalStateException();
        }
        RemoteAttributeExchange remoteAttributeExchange = (RemoteAttributeExchange) remoteMessage;
        if (!remoteAttributeExchange.key().equalsIgnoreCase(STREAM_ATTRIBUTE)) {
            throw new IllegalStateException("Unsupported attribute " + remoteAttributeExchange);
        }
        this.log.debug("Listening for streamed packets at {}", new Object[]{remoteAttributeExchange.value()});
        MessageQueue messageQueue = new MessageQueue(remoteAttributeExchange.value());
        Thread thread = new Thread(() -> {
            while (true) {
                ByteBuffer read = messageQueue.read();
                if (read == null) {
                    return;
                }
                Packet packet = (Packet) this.serializer.deserialize(read);
                this.workers.execute(() -> {
                    this.log.debug("Received packet from Concourse Server: {}", new Object[]{packet});
                    handlePacket(packet);
                });
            }
        });
        thread.setDaemon(true);
        thread.start();
        super.run();
    }

    protected abstract void handlePacket(Packet packet);
}
