package org.apache.pekko.stream.connectors.pravega.impl;

import io.pravega.client.KeyValueTableFactory;
import io.pravega.client.tables.KeyValueTable;
import io.pravega.client.tables.KeyValueTableClientConfiguration;
import io.pravega.client.tables.TableEntry;
import io.pravega.client.tables.TableKey;
import java.util.concurrent.CompletableFuture;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.FlowShape;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.connectors.pravega.TableSettings;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.util.FutureConverters$;
import org.apache.pekko.util.FutureConverters$CompletionStageOps$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: PravegaTableReadFlow.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/pravega/impl/PravegaTableReadFlowStageLogic.class */
public final class PravegaTableReadFlowStageLogic<K, V> extends GraphStageLogic implements StageLogging {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final FlowShape shape;
    private final String scope;
    private final String tableName;
    public final TableSettings<K, V> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings;
    private KeyValueTableFactory keyValueTableFactory;
    public KeyValueTable org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$table;
    public volatile int org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight;
    public volatile boolean org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded;
    private final AsyncCallback<Try<TableEntry>> asyncMessageSendCallback;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public PravegaTableReadFlowStageLogic(FlowShape<K, Option<V>> flowShape, String str, String str2, TableSettings<K, V> tableSettings) {
        super(flowShape);
        this.shape = flowShape;
        this.scope = str;
        this.tableName = str2;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings = tableSettings;
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight = 0;
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded = false;
        this.asyncMessageSendCallback = getAsyncCallback(r8 -> {
            if (r8 instanceof Failure) {
                log().error(((Failure) r8).exception(), "Failed to send message {}");
            } else {
                if (!(r8 instanceof Success)) {
                    throw new MatchError(r8);
                }
                TableEntry tableEntry = (TableEntry) ((Success) r8).value();
                if (tableEntry != null) {
                    push(out(), Some$.MODULE$.apply(tableSettings.valueSerializer().deserialize(tableEntry.getValue())));
                } else {
                    push(out(), None$.MODULE$);
                }
            }
            this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight--;
            if (this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight == 0 && this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded) {
                log().info("Stage completed after upstream finish");
                completeStage();
            }
        });
        setHandler(org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$in(), new InHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableReadFlowStageLogic$$anon$1
            private final /* synthetic */ PravegaTableReadFlowStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onPush() {
                Object protected$grab = this.$outer.protected$grab(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$in());
                this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight++;
                this.$outer.handleSentEvent(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$table.get((TableKey) this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings.tableKey().apply(protected$grab)));
            }

            public void onUpstreamFinish() {
                this.$outer.log().debug("Upstream finished");
                if (this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$inFlight == 0) {
                    this.$outer.completeStage();
                }
                this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$upstreamEnded = true;
            }
        });
        setHandler(out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.pravega.impl.PravegaTableReadFlowStageLogic$$anon$2
            private final /* synthetic */ PravegaTableReadFlowStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
                OutHandler.onDownstreamFinish$(this, th);
            }

            public void onPull() {
                this.$outer.protected$pull(this.$outer.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$in());
            }
        });
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public FlowShape<K, Option<V>> shape() {
        return this.shape;
    }

    public String scope() {
        return this.scope;
    }

    public Inlet<K> org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$in() {
        return shape().in();
    }

    private Outlet<Option<V>> out() {
        return shape().out();
    }

    public void preStart() {
        try {
            KeyValueTableClientConfiguration build = KeyValueTableClientConfiguration.builder().build();
            this.keyValueTableFactory = KeyValueTableFactory.withScope(scope(), this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$tableSettings.clientConfig());
            this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$table = this.keyValueTableFactory.forKeyValueTable(this.tableName, build);
            log().debug("Open table {}", this.tableName);
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    failStage((Throwable) unapply.get());
                    return;
                }
            }
            throw th;
        }
    }

    public void handleSentEvent(CompletableFuture<TableEntry> completableFuture) {
        FutureConverters$CompletionStageOps$.MODULE$.asScala$extension(FutureConverters$.MODULE$.CompletionStageOps(completableFuture)).onComplete(r4 -> {
            return this.asyncMessageSendCallback.invokeWithFeedback(r4);
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    public void postStop() {
        log().debug("Closing table {}", this.tableName);
        this.org$apache$pekko$stream$connectors$pravega$impl$PravegaTableReadFlowStageLogic$$table.close();
        this.keyValueTableFactory.close();
    }

    public <T> T protected$grab(Inlet<T> inlet) {
        return (T) grab(inlet);
    }

    public <T> void protected$pull(Inlet<T> inlet) {
        pull(inlet);
    }
}
