package info.bitrich.xchangestream.service.wamp;

import hu.akarnokd.rxjava.interop.RxJavaInterop;
import info.bitrich.xchangestream.service.exception.NotConnectedException;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ws.wamp.jawampa.PubSubData;
import ws.wamp.jawampa.WampClient;
import ws.wamp.jawampa.WampClientBuilder;
import ws.wamp.jawampa.transport.netty.NettyWampClientConnectorProvider;

/* loaded from: input_file:info/bitrich/xchangestream/service/wamp/WampStreamingService.class */
public class WampStreamingService {
    private static final Logger LOG = LoggerFactory.getLogger(WampStreamingService.class);
    private final String uri;
    private final String realm;
    private WampClient client;
    private WampClient.State connectedState;

    public WampStreamingService(String str, String str2) {
        this.uri = str;
        this.realm = str2;
    }

    public Completable connect() {
        return Completable.create(completableEmitter -> {
            NettyWampClientConnectorProvider nettyWampClientConnectorProvider = new NettyWampClientConnectorProvider();
            try {
                WampClientBuilder wampClientBuilder = new WampClientBuilder();
                wampClientBuilder.withConnectorProvider(nettyWampClientConnectorProvider).withUri(this.uri).withRealm(this.realm).withInfiniteReconnects().withReconnectInterval(5, TimeUnit.SECONDS);
                this.client = wampClientBuilder.build();
                this.client.statusChanged().subscribe(state -> {
                    LOG.debug("State changed: {}", state);
                    if ((state instanceof WampClient.DisconnectedState) && (this.connectedState instanceof WampClient.ConnectingState)) {
                        if (((WampClient.DisconnectedState) state).disconnectReason() != null) {
                            completableEmitter.onError(((WampClient.DisconnectedState) state).disconnectReason());
                        } else {
                            completableEmitter.onError(new IllegalStateException("Cannot connect to the exchange."));
                        }
                    }
                    this.connectedState = state;
                    if (state instanceof WampClient.ConnectedState) {
                        completableEmitter.onComplete();
                    }
                });
                this.client.open();
            } catch (Exception e) {
                completableEmitter.onError(e);
            }
        });
    }

    public Observable<PubSubData> subscribeChannel(String str) {
        return !(this.connectedState instanceof WampClient.ConnectedState) ? Observable.error(new NotConnectedException()) : RxJavaInterop.toV2Observable(this.client.makeSubscription(str));
    }

    public boolean isSocketOpen() {
        return !this.client.statusChanged().hasCompleted();
    }

    public void useCompressedMessages(boolean z) {
        throw new UnsupportedOperationException();
    }
}
