package org.apache.hedwig.server.regions;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.client.netty.HedwigSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TopicOpQueuer;
import org.apache.hedwig.server.persistence.PersistRequest;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.subscriptions.SubscriptionEventListener;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.CallbackUtils;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/regions/RegionManager.class */
public class RegionManager implements SubscriptionEventListener {
    protected static final Logger LOGGER = LoggerFactory.getLogger(RegionManager.class);
    private final ByteString mySubId;
    private final PersistenceManager pm;
    private final TopicOpQueuer queue;
    private final String myRegion;
    private final ArrayList<HedwigHubClient> clients = new ArrayList<>();
    private final Timer timer = new Timer(true);
    private final HashMap<HedwigHubClient, Set<ByteString>> retryMap = new HashMap<>();
    private final ConcurrentMap<ByteString, Boolean> topicStatuses = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hedwig.server.regions.RegionManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hedwig/server/regions/RegionManager$1.class */
    public class AnonymousClass1 implements Callback<Void> {
        final /* synthetic */ ByteString val$topic;
        final /* synthetic */ HedwigSubscriber val$sub;
        final /* synthetic */ Callback val$mcb;
        final /* synthetic */ boolean val$synchronous;
        final /* synthetic */ HedwigHubClient val$client;

        AnonymousClass1(ByteString byteString, HedwigSubscriber hedwigSubscriber, Callback callback, boolean z, HedwigHubClient hedwigHubClient) {
            this.val$topic = byteString;
            this.val$sub = hedwigSubscriber;
            this.val$mcb = callback;
            this.val$synchronous = z;
            this.val$client = hedwigHubClient;
        }

        public void operationFinished(Object obj, Void r9) {
            if (RegionManager.LOGGER.isDebugEnabled()) {
                RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] cross-region subscription done for topic " + this.val$topic.toStringUtf8());
            }
            try {
                this.val$sub.startDelivery(this.val$topic, RegionManager.this.mySubId, new MessageHandler() { // from class: org.apache.hedwig.server.regions.RegionManager.1.1
                    public void deliver(final ByteString byteString, ByteString byteString2, PubSubProtocol.Message message, final Callback<Void> callback, final Object obj2) {
                        if (message.hasSrcRegion()) {
                            PubSubProtocol.Message.newBuilder(message).setMsgId(PubSubProtocol.MessageSeqId.newBuilder(message.getMsgId()).addRemoteComponents(PubSubProtocol.RegionSpecificSeqId.newBuilder().setRegion(message.getSrcRegion()).setSeqId(message.getMsgId().getLocalComponent())));
                        }
                        RegionManager.this.pm.persistMessage(new PersistRequest(byteString, message, new Callback<PubSubProtocol.MessageSeqId>() { // from class: org.apache.hedwig.server.regions.RegionManager.1.1.1
                            public void operationFinished(Object obj3, PubSubProtocol.MessageSeqId messageSeqId) {
                                if (RegionManager.LOGGER.isDebugEnabled()) {
                                    RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] cross-region recv-fwd succeeded for topic " + byteString.toStringUtf8());
                                }
                                callback.operationFinished(obj2, (Object) null);
                            }

                            public void operationFailed(Object obj3, PubSubException pubSubException) {
                                if (RegionManager.LOGGER.isDebugEnabled()) {
                                    RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] cross-region recv-fwd failed for topic " + byteString.toStringUtf8(), pubSubException);
                                }
                                callback.operationFailed(obj2, pubSubException);
                            }
                        }, null));
                    }
                });
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] cross-region start-delivery succeeded for topic " + this.val$topic.toStringUtf8());
                }
                this.val$mcb.operationFinished(obj, (Object) null);
            } catch (AlreadyStartDeliveryException e) {
                RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] cross-region start-delivery failed for topic " + this.val$topic.toStringUtf8(), e);
                this.val$mcb.operationFailed(obj, new PubSubException.UnexpectedConditionException("cross-region start-delivery failed : " + e.getMessage()));
            } catch (PubSubException e2) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] cross-region start-delivery failed for topic " + this.val$topic.toStringUtf8(), e2);
                }
                this.val$mcb.operationFailed(obj, e2);
            }
        }

        public void operationFailed(Object obj, PubSubException pubSubException) {
            if (RegionManager.LOGGER.isDebugEnabled()) {
                RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] cross-region subscribe failed for topic " + this.val$topic.toStringUtf8(), pubSubException);
            }
            if (!this.val$synchronous) {
                RegionManager.this.putTopicInRetryMap(this.val$client, this.val$topic);
            }
            this.val$mcb.operationFailed(obj, pubSubException);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/regions/RegionManager$RetrySubscribeTask.class */
    class RetrySubscribeTask extends TimerTask {
        RetrySubscribeTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            Set<ByteString> set;
            HashSet<HedwigHubClient> hashSet = new HashSet();
            synchronized (RegionManager.this.retryMap) {
                hashSet.addAll(RegionManager.this.retryMap.keySet());
            }
            if (hashSet.isEmpty()) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] There is no hub client needs to retry subscriptions.");
                    return;
                }
                return;
            }
            for (HedwigHubClient hedwigHubClient : hashSet) {
                synchronized (RegionManager.this.retryMap) {
                    set = (Set) RegionManager.this.retryMap.remove(hedwigHubClient);
                }
                if (null != set && !set.isEmpty()) {
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    Callback multiCallback = CallbackUtils.multiCallback(set.size(), new Callback<Void>() { // from class: org.apache.hedwig.server.regions.RegionManager.RetrySubscribeTask.1
                        public void operationFinished(Object obj, Void r4) {
                            finish();
                        }

                        public void operationFailed(Object obj, PubSubException pubSubException) {
                            finish();
                        }

                        void finish() {
                            countDownLatch.countDown();
                        }
                    }, (Object) null);
                    for (ByteString byteString : set) {
                        if (null == ((Boolean) RegionManager.this.topicStatuses.get(byteString))) {
                            multiCallback.operationFinished((Object) null, (Object) null);
                        } else {
                            RegionManager.this.retrySubscribe(hedwigHubClient, byteString, multiCallback);
                        }
                    }
                    try {
                        countDownLatch.await();
                    } catch (InterruptedException e) {
                        RegionManager.LOGGER.warn("Exception during retrying remote subscriptions : ", e);
                    }
                }
            }
        }
    }

    public RegionManager(PersistenceManager persistenceManager, ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, ScheduledExecutorService scheduledExecutorService, HedwigHubClientFactory hedwigHubClientFactory) {
        this.pm = persistenceManager;
        this.mySubId = ByteString.copyFromUtf8("__" + serverConfiguration.getMyRegion());
        this.queue = new TopicOpQueuer(scheduledExecutorService);
        Iterator<String> it = serverConfiguration.getRegions().iterator();
        while (it.hasNext()) {
            this.clients.add(hedwigHubClientFactory.create(new HedwigSocketAddress(it.next())));
        }
        this.myRegion = serverConfiguration.getMyRegionByteString().toStringUtf8();
        if (serverConfiguration.getRetryRemoteSubscribeThreadRunInterval() > 0) {
            this.timer.schedule(new RetrySubscribeTask(), 0L, serverConfiguration.getRetryRemoteSubscribeThreadRunInterval());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putTopicInRetryMap(HedwigHubClient hedwigHubClient, ByteString byteString) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[" + this.myRegion + "] Put topic in retry map : " + byteString.toStringUtf8());
        }
        synchronized (this.retryMap) {
            Set<ByteString> set = this.retryMap.get(hedwigHubClient);
            if (null == set) {
                set = new HashSet();
                this.retryMap.put(hedwigHubClient, set);
            }
            set.add(byteString);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doRemoteSubscribe(HedwigHubClient hedwigHubClient, ByteString byteString, boolean z, Callback<Void> callback, Object obj) {
        HedwigSubscriber subscriber = hedwigHubClient.getSubscriber();
        try {
            if (!subscriber.hasSubscription(byteString, this.mySubId)) {
                subscriber.asyncSubscribe(byteString, this.mySubId, PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH, new AnonymousClass1(byteString, subscriber, callback, z, hedwigHubClient), (Object) null);
                return;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("[" + this.myRegion + "] cross-region subscription for topic " + byteString.toStringUtf8() + " has existed before.");
            }
            callback.operationFinished((Object) null, (Object) null);
        } catch (PubSubException e) {
            LOGGER.error("[" + this.myRegion + "] checking cross-region subscription for topic " + byteString.toStringUtf8() + " failed (this is should not happen): ", e);
            callback.operationFailed(obj, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retrySubscribe(HedwigHubClient hedwigHubClient, ByteString byteString, Callback<Void> callback) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("[" + this.myRegion + "] Retry remote subscribe topic : " + byteString.toStringUtf8());
        }
        TopicOpQueuer topicOpQueuer = this.queue;
        TopicOpQueuer topicOpQueuer2 = this.queue;
        topicOpQueuer2.getClass();
        topicOpQueuer.pushAndMaybeRun(byteString, new TopicOpQueuer.AsynchronousOp<Void>(topicOpQueuer2, byteString, callback, null, hedwigHubClient) { // from class: org.apache.hedwig.server.regions.RegionManager.2
            final /* synthetic */ HedwigHubClient val$client;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(byteString, callback, r11);
                this.val$client = hedwigHubClient;
                topicOpQueuer2.getClass();
            }

            @Override // java.lang.Runnable
            public void run() {
                if (null == ((Boolean) RegionManager.this.topicStatuses.get(this.topic))) {
                    this.cb.operationFinished(this.ctx, (Object) null);
                } else {
                    RegionManager.this.doRemoteSubscribe(this.val$client, this.topic, false, this.cb, this.ctx);
                }
            }
        });
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionEventListener
    public void onFirstLocalSubscribe(ByteString byteString, boolean z, Callback<Void> callback) {
        this.topicStatuses.put(byteString, true);
        TopicOpQueuer topicOpQueuer = this.queue;
        TopicOpQueuer topicOpQueuer2 = this.queue;
        topicOpQueuer2.getClass();
        topicOpQueuer.pushAndMaybeRun(byteString, new TopicOpQueuer.AsynchronousOp<Void>(topicOpQueuer2, byteString, callback, null, z) { // from class: org.apache.hedwig.server.regions.RegionManager.3
            final /* synthetic */ boolean val$synchronous;

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(byteString, callback, r11);
                this.val$synchronous = z;
                topicOpQueuer2.getClass();
            }

            @Override // java.lang.Runnable
            public void run() {
                Callback multiCallback = CallbackUtils.multiCallback(RegionManager.this.clients.size(), this.val$synchronous ? this.cb : CallbackUtils.logger(RegionManager.LOGGER, "[" + RegionManager.this.myRegion + "] all cross-region subscriptions succeeded", "[" + RegionManager.this.myRegion + "] at least one cross-region subscription failed"), this.ctx);
                Iterator it = RegionManager.this.clients.iterator();
                while (it.hasNext()) {
                    RegionManager.this.doRemoteSubscribe((HedwigHubClient) it.next(), this.topic, this.val$synchronous, multiCallback, this.ctx);
                }
                if (this.val$synchronous) {
                    return;
                }
                this.cb.operationFinished((Object) null, (Object) null);
            }
        });
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionEventListener
    public void onLastLocalUnsubscribe(final ByteString byteString) {
        this.topicStatuses.remove(byteString);
        TopicOpQueuer topicOpQueuer = this.queue;
        TopicOpQueuer topicOpQueuer2 = this.queue;
        topicOpQueuer2.getClass();
        topicOpQueuer.pushAndMaybeRun(byteString, new TopicOpQueuer.AsynchronousOp<Void>(topicOpQueuer2, byteString, new Callback<Void>() { // from class: org.apache.hedwig.server.regions.RegionManager.4
            public void operationFinished(Object obj, Void r6) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] cross-region unsubscribes succeeded for topic " + byteString.toStringUtf8());
                }
            }

            public void operationFailed(Object obj, PubSubException pubSubException) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] cross-region unsubscribes failed for topic " + byteString.toStringUtf8(), pubSubException);
                }
            }
        }, null) { // from class: org.apache.hedwig.server.regions.RegionManager.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(byteString, r10, r11);
                topicOpQueuer2.getClass();
            }

            @Override // java.lang.Runnable
            public void run() {
                Callback multiCallback = CallbackUtils.multiCallback(RegionManager.this.clients.size(), this.cb, this.ctx);
                Iterator it = RegionManager.this.clients.iterator();
                while (it.hasNext()) {
                    HedwigSubscriber subscriber = ((HedwigHubClient) it.next()).getSubscriber();
                    try {
                        if (subscriber.hasSubscription(this.topic, RegionManager.this.mySubId)) {
                            subscriber.asyncUnsubscribe(this.topic, RegionManager.this.mySubId, multiCallback, (Object) null);
                        } else {
                            if (RegionManager.LOGGER.isDebugEnabled()) {
                                RegionManager.LOGGER.debug("[" + RegionManager.this.myRegion + "] cross-region subscription for topic " + this.topic.toStringUtf8() + " has existed before.");
                            }
                            multiCallback.operationFinished((Object) null, (Object) null);
                        }
                    } catch (PubSubException e) {
                        RegionManager.LOGGER.error("[" + RegionManager.this.myRegion + "] checking cross-region subscription for topic " + this.topic.toStringUtf8() + " failed (this is should not happen): ", e);
                        multiCallback.operationFailed(this.ctx, e);
                    }
                }
            }
        });
    }

    public void stop() {
        this.timer.cancel();
        Iterator<HedwigHubClient> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
