package fi.evolver.ai.spring.provider.openai;

import fi.evolver.ai.spring.ApiResponseException;
import fi.evolver.ai.spring.ContentSubscriber;
import fi.evolver.ai.spring.assistant.AssistantResponse;
import fi.evolver.ai.spring.chat.prompt.Message;
import fi.evolver.ai.spring.provider.openai.response.threads.OMessageDelta;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:fi/evolver/ai/spring/provider/openai/OpenAiStreamingAssistantResponse.class */
public class OpenAiStreamingAssistantResponse implements AssistantResponse {
    private final Logger LOG = LoggerFactory.getLogger(OpenAiStreamingAssistantResponse.class);
    private final Deque<OMessageDelta> results = new ConcurrentLinkedDeque();
    private List<ContentSubscriber> subscribers = new ArrayList();
    private Optional<Message> content = Optional.empty();
    private final CountDownLatch responseCompleteLatch = new CountDownLatch(1);
    private final CountDownLatch contentLatch = new CountDownLatch(1);
    private volatile Throwable responseException;

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void addResult(OMessageDelta oMessageDelta) {
        this.results.add(oMessageDelta);
        Optional<String> content = getContent(oMessageDelta);
        for (ContentSubscriber contentSubscriber : this.subscribers) {
            try {
                Objects.requireNonNull(contentSubscriber);
                content.ifPresent(contentSubscriber::onContent);
            } catch (RuntimeException e) {
                this.LOG.error("Subscriber failed handling content update", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handleError(Throwable th) {
        this.responseException = th;
        this.contentLatch.countDown();
        this.responseCompleteLatch.countDown();
        Iterator<ContentSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            try {
                it.next().onError(th);
            } catch (RuntimeException e) {
                this.LOG.error("Subscriber failed handling stream error ({})", th.toString(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void handleStreamEnd() {
        if (this.contentLatch.getCount() > 0) {
            this.content = Optional.of(Message.assistant((String) this.results.stream().map(OpenAiStreamingAssistantResponse::getContent).filter((v0) -> {
                return v0.isPresent();
            }).map((v0) -> {
                return v0.get();
            }).collect(Collectors.joining())));
            this.contentLatch.countDown();
        }
        this.responseCompleteLatch.countDown();
        Iterator<ContentSubscriber> it = this.subscribers.iterator();
        while (it.hasNext()) {
            try {
                it.next().onComplete("stop");
            } catch (RuntimeException e) {
                this.LOG.error("Subscriber failed handling stream completion", e);
            }
        }
    }

    private static Optional<String> getContent(OMessageDelta oMessageDelta) {
        return Optional.of((String) oMessageDelta.delta().content().stream().map((v0) -> {
            return v0.text();
        }).map((v0) -> {
            return v0.value();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.joining())).filter(str -> {
            return !str.isEmpty();
        });
    }

    @Override // fi.evolver.ai.spring.assistant.AssistantResponse
    public void addSubscriber(ContentSubscriber contentSubscriber) {
        this.subscribers.add(contentSubscriber);
        Iterator<OMessageDelta> it = this.results.iterator();
        while (it.hasNext()) {
            Optional<String> content = getContent(it.next());
            Objects.requireNonNull(contentSubscriber);
            content.ifPresent(contentSubscriber::onContent);
        }
        if (this.responseException != null) {
            contentSubscriber.onError(this.responseException);
        } else {
            contentSubscriber.onComplete("stop");
        }
    }

    @Override // fi.evolver.ai.spring.assistant.AssistantResponse
    public String getResultState() {
        try {
            getMessage();
            return "stop";
        } catch (RuntimeException e) {
            return "error";
        }
    }

    @Override // fi.evolver.ai.spring.assistant.AssistantResponse
    public Optional<Message> getMessage() {
        try {
            this.contentLatch.await();
            if (this.responseException != null) {
                throw new ApiResponseException(this.responseException, "Reading message failed unexpectedly", new Object[0]);
            }
            return this.content;
        } catch (InterruptedException e) {
            throw new ApiResponseException(e, "Interrupted while waiting for response", new Object[0]);
        }
    }

    @Override // fi.evolver.ai.spring.assistant.AssistantResponse
    public boolean isSuccess() {
        return OpenAiService.FINISH_REASONS_OK.contains(getResultState());
    }
}
