package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.binance.BinanceUserDataChannel;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.service.BaseExchangeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import si.mazi.rescu.Interceptor;
import si.mazi.rescu.RestProxyFactory;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingExchange.class */
public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {
    private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingExchange.class);
    private static final String API_BASE_URI = "wss://stream.binance.com:9443/";
    private BinanceStreamingService streamingService;
    private BinanceUserDataStreamingService userDataStreamingService;
    private BinanceStreamingMarketDataService streamingMarketDataService;
    private BinanceStreamingAccountService streamingAccountService;
    private BinanceStreamingTradeService streamingTradeService;
    private BinanceUserDataChannel userDataChannel;

    /* JADX WARN: Type inference failed for: r2v6, types: [info.bitrich.xchangestream.binance.BinanceStreamingExchange$1] */
    public Completable connect(ProductSubscription... productSubscriptionArr) {
        if (productSubscriptionArr == null || productSubscriptionArr.length == 0) {
            throw new IllegalArgumentException("Subscriptions must be made at connection time");
        }
        if (this.streamingService != null) {
            throw new UnsupportedOperationException("Exchange only handles a single connection - disconnect the current connection.");
        }
        ProductSubscription productSubscription = productSubscriptionArr[0];
        this.streamingService = createStreamingService(productSubscription);
        ArrayList arrayList = new ArrayList();
        if (!productSubscription.isEmpty()) {
            arrayList.add(this.streamingService.connect());
        }
        if (this.exchangeSpecification.getApiKey() != null) {
            LOG.info("Connecting to authenticated web socket");
            this.userDataChannel = new BinanceUserDataChannel((BinanceAuthenticated) RestProxyFactory.createProxy(BinanceAuthenticated.class, getExchangeSpecification().getSslUri(), new BaseExchangeService<BinanceExchange>(this) { // from class: info.bitrich.xchangestream.binance.BinanceStreamingExchange.1
            }.getClientConfig(), new Interceptor[0]), this.exchangeSpecification.getApiKey());
            try {
                arrayList.add(createAndConnectUserDataService(this.userDataChannel.getListenKey()));
            } catch (BinanceUserDataChannel.NoActiveChannelException e) {
                throw new IllegalStateException("Failed to establish user data channel", e);
            }
        }
        this.streamingMarketDataService = new BinanceStreamingMarketDataService(this.streamingService, this.marketDataService);
        this.streamingAccountService = new BinanceStreamingAccountService(this.userDataStreamingService);
        this.streamingTradeService = new BinanceStreamingTradeService(this.userDataStreamingService);
        return Completable.concat(arrayList).doOnComplete(() -> {
            this.streamingMarketDataService.openSubscriptions(productSubscription);
        }).doOnComplete(() -> {
            this.streamingAccountService.openSubscriptions();
        }).doOnComplete(() -> {
            this.streamingTradeService.openSubscriptions();
        });
    }

    private Completable createAndConnectUserDataService(String str) {
        this.userDataStreamingService = BinanceUserDataStreamingService.create(str);
        return this.userDataStreamingService.connect().doOnComplete(() -> {
            LOG.info("Connected to authenticated web socket");
            this.userDataChannel.onChangeListenKey(str2 -> {
                this.userDataStreamingService.disconnect().doOnComplete(() -> {
                    createAndConnectUserDataService(str2).doOnComplete(() -> {
                        this.streamingAccountService.setUserDataStreamingService(this.userDataStreamingService);
                        this.streamingTradeService.setUserDataStreamingService(this.userDataStreamingService);
                    });
                });
            });
        });
    }

    public Completable disconnect() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.streamingService.disconnect());
        this.streamingService = null;
        if (this.userDataStreamingService != null) {
            arrayList.add(this.userDataStreamingService.disconnect());
            this.userDataStreamingService = null;
        }
        if (this.userDataChannel != null) {
            this.userDataChannel.close();
            this.userDataChannel = null;
        }
        this.streamingMarketDataService = null;
        return Completable.concat(arrayList);
    }

    public boolean isAlive() {
        return this.streamingService != null && this.streamingService.isSocketOpen();
    }

    public Observable<Throwable> reconnectFailure() {
        return this.streamingService.subscribeReconnectFailure();
    }

    public Observable<Object> connectionSuccess() {
        return this.streamingService.subscribeConnectionSuccess();
    }

    /* renamed from: getStreamingMarketDataService, reason: merged with bridge method [inline-methods] */
    public BinanceStreamingMarketDataService m3getStreamingMarketDataService() {
        return this.streamingMarketDataService;
    }

    /* renamed from: getStreamingAccountService, reason: merged with bridge method [inline-methods] */
    public BinanceStreamingAccountService m2getStreamingAccountService() {
        return this.streamingAccountService;
    }

    /* renamed from: getStreamingTradeService, reason: merged with bridge method [inline-methods] */
    public BinanceStreamingTradeService m1getStreamingTradeService() {
        return this.streamingTradeService;
    }

    private BinanceStreamingService createStreamingService(ProductSubscription productSubscription) {
        return new BinanceStreamingService("wss://stream.binance.com:9443/stream?streams=" + buildSubscriptionStreams(productSubscription), productSubscription);
    }

    public static String buildSubscriptionStreams(ProductSubscription productSubscription) {
        return (String) Stream.of((Object[]) new String[]{buildSubscriptionStrings(productSubscription.getTicker(), "ticker"), buildSubscriptionStrings(productSubscription.getOrderBook(), "depth"), buildSubscriptionStrings(productSubscription.getTrades(), "trade")}).filter(str -> {
            return !str.isEmpty();
        }).collect(Collectors.joining("/"));
    }

    private static String buildSubscriptionStrings(List<CurrencyPair> list, String str) {
        return (String) subscriptionStrings(list).map(str2 -> {
            return str2 + "@" + str;
        }).collect(Collectors.joining("/"));
    }

    private static Stream<String> subscriptionStrings(List<CurrencyPair> list) {
        return list.stream().map(currencyPair -> {
            return String.join("", currencyPair.toString().split("/")).toLowerCase();
        });
    }

    public void useCompressedMessages(boolean z) {
        this.streamingService.useCompressedMessages(z);
    }
}
