package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.util.Timestamps;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.UnmodifiableIterator;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader.class */
class PubsubLiteUnboundedReader extends UnboundedSource.UnboundedReader<SequencedMessage> implements OffsetFinalizer {
    private final UnboundedSource<SequencedMessage, ?> source;

    @GuardedBy("monitor.monitor")
    private final ImmutableMap<Partition, SubscriberState> subscriberMap;
    private final CommitterProxy committerProxy;
    private final CloseableMonitor monitor = new CloseableMonitor();

    @GuardedBy("monitor.monitor")
    private final Queue<PartitionedSequencedMessage> messages = new ArrayDeque();

    @GuardedBy("monitor.monitor")
    private Optional<StatusException> permanentError = Optional.empty();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader$CommitterProxy.class */
    private static class CommitterProxy extends ProxyService {
        private final Consumer<StatusException> permanentErrorSetter;

        CommitterProxy(Collection<SubscriberState> collection, Consumer<StatusException> consumer) throws StatusException {
            this.permanentErrorSetter = consumer;
            addServices((Collection) collection.stream().map(subscriberState -> {
                return subscriberState.committer;
            }).collect(Collectors.toList()));
        }

        protected void start() {
        }

        protected void stop() {
        }

        protected void handlePermanentError(StatusException statusException) {
            this.permanentErrorSetter.accept(statusException);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader$PartitionedSequencedMessage.class */
    public static abstract class PartitionedSequencedMessage {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Partition partition();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract SequencedMessage sequencedMessage();

        /* JADX INFO: Access modifiers changed from: private */
        public static PartitionedSequencedMessage of(Partition partition, SequencedMessage sequencedMessage) {
            return new AutoValue_PubsubLiteUnboundedReader_PartitionedSequencedMessage(partition, sequencedMessage);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteUnboundedReader$SubscriberState.class */
    public static class SubscriberState {
        Instant lastDeliveredPublishTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
        Optional<Offset> lastDelivered = Optional.empty();
        PullSubscriber subscriber;
        Committer committer;
    }

    public PubsubLiteUnboundedReader(UnboundedSource<SequencedMessage, ?> unboundedSource, Map<Partition, SubscriberState> map) throws StatusException {
        this.source = unboundedSource;
        this.subscriberMap = ImmutableMap.copyOf(map);
        this.committerProxy = new CommitterProxy(map.values(), statusException -> {
            CloseableMonitor.Hold enter = this.monitor.enter();
            try {
                this.permanentError = Optional.of(this.permanentError.orElse(statusException));
                if (enter != null) {
                    $closeResource(null, enter);
                }
            } catch (Throwable th) {
                if (enter != null) {
                    $closeResource(null, enter);
                }
                throw th;
            }
        });
        this.committerProxy.startAsync().awaitRunning();
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.OffsetFinalizer
    public void finalizeOffsets(Map<Partition, Offset> map) throws StatusException {
        ArrayList arrayList = new ArrayList();
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                for (Partition partition : map.keySet()) {
                    if (!this.subscriberMap.containsKey(partition)) {
                        throw Status.INVALID_ARGUMENT.withDescription(String.format("Asked to finalize an offset for partition %s which was not managed by this reader.", partition)).asException();
                    }
                    arrayList.add(((SubscriberState) this.subscriberMap.get(partition)).committer.commitOffset(map.get(partition)));
                }
                if (enter != null) {
                    $closeResource(null, enter);
                }
                arrayList.forEach(apiFuture -> {
                    ExtractStatus.addFailureHandler(apiFuture, statusException -> {
                        CloseableMonitor.Hold enter2 = this.monitor.enter();
                        try {
                            if (!this.permanentError.isPresent()) {
                                this.permanentError = Optional.of(statusException);
                            }
                        } finally {
                            if (enter2 != null) {
                                $closeResource(null, enter2);
                            }
                        }
                    });
                });
            } finally {
            }
        } catch (Throwable th2) {
            if (enter != null) {
                $closeResource(th, enter);
            }
            throw th2;
        }
    }

    public boolean start() throws IOException {
        return advance();
    }

    public boolean advance() throws IOException {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            try {
                if (this.permanentError.isPresent()) {
                    throw this.permanentError.get();
                }
                if (!this.messages.isEmpty()) {
                    this.messages.poll();
                }
                if (!this.messages.isEmpty()) {
                    setLastDelivered(this.messages.peek());
                    if (enter != null) {
                        $closeResource(null, enter);
                    }
                    return true;
                }
                pullFromSubscribers();
                if (this.messages.isEmpty()) {
                    return false;
                }
                setLastDelivered(this.messages.peek());
                if (enter != null) {
                    $closeResource(null, enter);
                }
                return true;
            } finally {
                if (enter != null) {
                    $closeResource(null, enter);
                }
            }
        } catch (StatusException e) {
            throw new IOException((Throwable) e);
        }
    }

    @GuardedBy("monitor.monitor")
    private void setLastDelivered(PartitionedSequencedMessage partitionedSequencedMessage) {
        SubscriberState subscriberState = (SubscriberState) this.subscriberMap.get(partitionedSequencedMessage.partition());
        subscriberState.lastDelivered = Optional.of(Offset.of(partitionedSequencedMessage.sequencedMessage().getCursor().getOffset()));
        subscriberState.lastDeliveredPublishTimestamp = new Instant(Timestamps.toMillis(partitionedSequencedMessage.sequencedMessage().getPublishTime()));
    }

    @GuardedBy("monitor.monitor")
    private void pullFromSubscribers() throws StatusException {
        UnmodifiableIterator it = this.subscriberMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            Iterator<SequencedMessage> it2 = ((SubscriberState) entry.getValue()).subscriber.pull().iterator();
            while (it2.hasNext()) {
                this.messages.add(PartitionedSequencedMessage.of((Partition) entry.getKey(), it2.next()));
            }
        }
    }

    /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
    public SequencedMessage m134getCurrent() throws NoSuchElementException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            if (this.messages.isEmpty()) {
                throw new NoSuchElementException();
            }
            SequencedMessage sequencedMessage = this.messages.peek().sequencedMessage();
            if (enter != null) {
                $closeResource(null, enter);
            }
            return sequencedMessage;
        } catch (Throwable th) {
            if (enter != null) {
                $closeResource(null, enter);
            }
            throw th;
        }
    }

    public Instant getCurrentTimestamp() throws NoSuchElementException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            if (this.messages.isEmpty()) {
                throw new NoSuchElementException();
            }
            Instant instant = new Instant(Timestamps.toMillis(this.messages.peek().sequencedMessage().getPublishTime()));
            if (enter != null) {
                $closeResource(null, enter);
            }
            return instant;
        } catch (Throwable th) {
            if (enter != null) {
                $closeResource(null, enter);
            }
            throw th;
        }
    }

    public void close() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            UnmodifiableIterator it = this.subscriberMap.values().iterator();
            while (it.hasNext()) {
                try {
                    ((SubscriberState) it.next()).subscriber.close();
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }
            this.committerProxy.stopAsync().awaitTerminated();
        } finally {
            if (enter != null) {
                $closeResource(null, enter);
            }
        }
    }

    public Instant getWatermark() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                Instant instant = (Instant) this.subscriberMap.values().stream().map(subscriberState -> {
                    return subscriberState.lastDeliveredPublishTimestamp;
                }).min((v0, v1) -> {
                    return v0.compareTo(v1);
                }).get();
                if (enter != null) {
                    $closeResource(null, enter);
                }
                return instant;
            } finally {
            }
        } catch (Throwable th2) {
            if (enter != null) {
                $closeResource(th, enter);
            }
            throw th2;
        }
    }

    public UnboundedSource.CheckpointMark getCheckpointMark() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            this.subscriberMap.forEach((partition, subscriberState) -> {
                subscriberState.lastDelivered.ifPresent(offset -> {
                    builder.put(partition, offset);
                });
            });
            OffsetCheckpointMark offsetCheckpointMark = new OffsetCheckpointMark(this, builder.build());
            if (enter != null) {
                $closeResource(null, enter);
            }
            return offsetCheckpointMark;
        } catch (Throwable th) {
            if (enter != null) {
                $closeResource(null, enter);
            }
            throw th;
        }
    }

    /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
    public UnboundedSource<SequencedMessage, ?> m133getCurrentSource() {
        return this.source;
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
