package org.apache.hedwig.server.handlers;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.util.VarArgs;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/handlers/SubscriptionChannelManager.class */
public class SubscriptionChannelManager implements ChannelDisconnectListener {
    static Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class);
    final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel = new ConcurrentHashMap<>();
    final ConcurrentHashMap<Channel, Set<TopicSubscriber>> channel2sub = new ConcurrentHashMap<>();
    final List<SubChannelDisconnectedListener> listeners = new ArrayList();

    /* loaded from: input_file:org/apache/hedwig/server/handlers/SubscriptionChannelManager$CloseSubscriptionListener.class */
    static class CloseSubscriptionListener implements ChannelFutureListener {
        final TopicSubscriber ts;

        CloseSubscriptionListener(TopicSubscriber topicSubscriber) {
            this.ts = topicSubscriber;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                SubscriptionChannelManager.logger.debug("Close old subscription {} succeed.", this.ts);
            } else {
                SubscriptionChannelManager.logger.warn("Failed to write response to close old subscription {}.", this.ts);
            }
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/handlers/SubscriptionChannelManager$SubChannelDisconnectedListener.class */
    public interface SubChannelDisconnectedListener {
        void onSubChannelDisconnected(TopicSubscriber topicSubscriber);
    }

    public void addSubChannelDisconnectedListener(SubChannelDisconnectedListener subChannelDisconnectedListener) {
        if (null != subChannelDisconnectedListener) {
            this.listeners.add(subChannelDisconnectedListener);
        }
    }

    @Override // org.apache.hedwig.server.handlers.ChannelDisconnectListener
    public void channelDisconnected(Channel channel) {
        Set<TopicSubscriber> remove;
        synchronized (channel) {
            remove = this.channel2sub.remove(channel);
        }
        if (remove != null) {
            for (TopicSubscriber topicSubscriber : remove) {
                logger.info("Subscription channel {} for {} is disconnected.", VarArgs.va(new Object[]{channel.getRemoteAddress(), topicSubscriber}));
                this.sub2Channel.remove(topicSubscriber, channel);
                Iterator<SubChannelDisconnectedListener> it = this.listeners.iterator();
                while (it.hasNext()) {
                    it.next().onSubChannelDisconnected(topicSubscriber);
                }
            }
        }
    }

    public int getNumSubscriptionChannels() {
        return this.channel2sub.size();
    }

    public int getNumSubscriptions() {
        return this.sub2Channel.size();
    }

    public Channel put(TopicSubscriber topicSubscriber, Channel channel, boolean z) {
        synchronized (channel) {
            Channel putIfAbsent = this.sub2Channel.putIfAbsent(topicSubscriber, channel);
            if (null != putIfAbsent && !putIfAbsent.equals(channel)) {
                boolean z2 = false;
                if (z) {
                    synchronized (putIfAbsent) {
                        Set<TopicSubscriber> set = this.channel2sub.get(putIfAbsent);
                        if (null != set) {
                            if (!set.remove(topicSubscriber)) {
                                logger.warn("Failed to remove old subscription ({}) due to it isn't on channel ({}).", VarArgs.va(new Object[]{topicSubscriber, putIfAbsent}));
                            } else if (set.isEmpty()) {
                                this.channel2sub.remove(putIfAbsent);
                            }
                        }
                    }
                    putIfAbsent.write(PubSubResponseUtils.getResponseForSubscriptionEvent(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), PubSubProtocol.SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED)).addListener(new CloseSubscriptionListener(topicSubscriber));
                    logger.info("Subscribe request for ({}) from channel ({}) closes old subscripiton on channel ({}).", VarArgs.va(new Object[]{topicSubscriber, channel, putIfAbsent}));
                    if (this.sub2Channel.replace(topicSubscriber, putIfAbsent, channel)) {
                        z2 = true;
                    } else {
                        putIfAbsent = this.sub2Channel.putIfAbsent(topicSubscriber, channel);
                        if (null == putIfAbsent) {
                            z2 = true;
                        }
                    }
                }
                if (!z2) {
                    logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).", VarArgs.va(new Object[]{topicSubscriber, channel, putIfAbsent}));
                    return putIfAbsent;
                }
            }
            Set<TopicSubscriber> set2 = this.channel2sub.get(channel);
            if (null == set2) {
                set2 = new HashSet();
                this.channel2sub.put(channel, set2);
            }
            set2.add(topicSubscriber);
            return null;
        }
    }

    public void remove(TopicSubscriber topicSubscriber, Channel channel) {
        synchronized (channel) {
            Set<TopicSubscriber> set = this.channel2sub.get(channel);
            if (null != set) {
                if (!set.remove(topicSubscriber)) {
                    logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).", VarArgs.va(new Object[]{topicSubscriber, channel}));
                } else if (set.isEmpty()) {
                    this.channel2sub.remove(channel);
                }
            }
            if (!this.sub2Channel.remove(topicSubscriber, channel)) {
                logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.", VarArgs.va(new Object[]{channel, topicSubscriber}));
            }
        }
    }
}
