package info.bitrich.xchangestream.binance;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.binance.dto.BinanceRawTrade;
import info.bitrich.xchangestream.binance.dto.BinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.DepthBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.TickerBinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.TradeBinanceWebsocketTransaction;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook;
import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.ExchangeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.class */
public class BinanceStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger(BinanceStreamingMarketDataService.class);
    private static final JavaType TICKER_TYPE = StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<TickerBinanceWebsocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.1
    });
    private static final JavaType TRADE_TYPE = StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<TradeBinanceWebsocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.2
    });
    private static final JavaType DEPTH_TYPE = StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.3
    });
    private final BinanceStreamingService service;
    private final Map<CurrencyPair, OrderbookSubscription> orderbooks = new HashMap();
    private final Map<CurrencyPair, Observable<BinanceTicker24h>> tickerSubscriptions = new HashMap();
    private final Map<CurrencyPair, Observable<OrderBook>> orderbookSubscriptions = new HashMap();
    private final Map<CurrencyPair, Observable<BinanceRawTrade>> tradeSubscriptions = new HashMap();
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final BinanceMarketDataService marketDataService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService$OrderbookSubscription.class */
    public final class OrderbookSubscription {
        long snapshotlastUpdateId;
        AtomicLong lastUpdateId;
        OrderBook orderBook;
        Observable<BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction>> stream;
        AtomicLong lastSyncTime;

        private OrderbookSubscription() {
            this.lastUpdateId = new AtomicLong(0L);
            this.lastSyncTime = new AtomicLong(0L);
        }

        void invalidateSnapshot() {
            this.snapshotlastUpdateId = 0L;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void initSnapshotIfInvalid(CurrencyPair currencyPair) {
            if (this.snapshotlastUpdateId != 0) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastSyncTime.get() < 3000) {
                return;
            }
            try {
                BinanceStreamingMarketDataService.LOG.info("Fetching initial orderbook snapshot for {} ", currencyPair);
                BinanceOrderbook binanceOrderbook = BinanceStreamingMarketDataService.this.marketDataService.getBinanceOrderbook(currencyPair, 1000);
                this.snapshotlastUpdateId = binanceOrderbook.lastUpdateId;
                this.lastUpdateId.set(binanceOrderbook.lastUpdateId);
                this.orderBook = BinanceMarketDataService.convertOrderBook(binanceOrderbook, currencyPair);
            } catch (Throwable th) {
                BinanceStreamingMarketDataService.LOG.error("Failed to fetch initial order book for " + currencyPair, th);
                this.snapshotlastUpdateId = 0L;
                this.lastUpdateId.set(0L);
                this.orderBook = null;
            }
            this.lastSyncTime.set(currentTimeMillis);
        }
    }

    public BinanceStreamingMarketDataService(BinanceStreamingService binanceStreamingService, BinanceMarketDataService binanceMarketDataService) {
        this.service = binanceStreamingService;
        this.marketDataService = binanceMarketDataService;
    }

    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        if (this.service.getProductSubscription().getOrderBook().contains(currencyPair)) {
            return this.orderbookSubscriptions.get(currencyPair);
        }
        throw new UnsupportedOperationException("Binance exchange only supports up front subscriptions - subscribe at connect time");
    }

    public Observable<BinanceTicker24h> getRawTicker(CurrencyPair currencyPair, Object... objArr) {
        if (this.service.getProductSubscription().getTicker().contains(currencyPair)) {
            return this.tickerSubscriptions.get(currencyPair);
        }
        throw new UnsupportedOperationException("Binance exchange only supports up front subscriptions - subscribe at connect time");
    }

    public Observable<BinanceRawTrade> getRawTrades(CurrencyPair currencyPair, Object... objArr) {
        if (this.service.getProductSubscription().getTrades().contains(currencyPair)) {
            return this.tradeSubscriptions.get(currencyPair);
        }
        throw new UnsupportedOperationException("Binance exchange only supports up front subscriptions - subscribe at connect time");
    }

    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return getRawTicker(currencyPair, new Object[0]).map((v0) -> {
            return v0.toTicker();
        });
    }

    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        return getRawTrades(currencyPair, objArr).map(binanceRawTrade -> {
            return new Trade(BinanceAdapters.convertType(binanceRawTrade.isBuyerMarketMaker()), binanceRawTrade.getQuantity(), currencyPair, binanceRawTrade.getPrice(), new Date(binanceRawTrade.getTimestamp()), String.valueOf(binanceRawTrade.getTradeId()));
        });
    }

    private static String channelFromCurrency(CurrencyPair currencyPair, String str) {
        return String.join("", currencyPair.toString().split("/")).toLowerCase() + "@" + str;
    }

    public void openSubscriptions(ProductSubscription productSubscription) {
        productSubscription.getTicker().forEach(currencyPair -> {
            this.tickerSubscriptions.put(currencyPair, triggerObservableBody(rawTickerStream(currencyPair).share()));
        });
        productSubscription.getOrderBook().forEach(currencyPair2 -> {
            this.orderbookSubscriptions.put(currencyPair2, triggerObservableBody(orderBookStream(currencyPair2).share()));
        });
        productSubscription.getTrades().forEach(currencyPair3 -> {
            this.tradeSubscriptions.put(currencyPair3, triggerObservableBody(rawTradeStream(currencyPair3).share()));
        });
    }

    private Observable<BinanceTicker24h> rawTickerStream(CurrencyPair currencyPair) {
        return this.service.subscribeChannel(channelFromCurrency(currencyPair, "ticker"), new Object[0]).map(this::tickerTransaction).filter(binanceWebsocketTransaction -> {
            return ((TickerBinanceWebsocketTransaction) binanceWebsocketTransaction.getData()).getCurrencyPair().equals(currencyPair);
        }).map(binanceWebsocketTransaction2 -> {
            return ((TickerBinanceWebsocketTransaction) binanceWebsocketTransaction2.getData()).getTicker();
        });
    }

    private OrderbookSubscription connectOrderBook(CurrencyPair currencyPair) {
        OrderbookSubscription orderbookSubscription = new OrderbookSubscription();
        orderbookSubscription.stream = this.service.subscribeChannel(channelFromCurrency(currencyPair, "depth"), new Object[0]).map(this::depthTransaction).filter(binanceWebsocketTransaction -> {
            return ((DepthBinanceWebSocketTransaction) binanceWebsocketTransaction.getData()).getCurrencyPair().equals(currencyPair);
        });
        return orderbookSubscription;
    }

    private Observable<OrderBook> orderBookStream(CurrencyPair currencyPair) {
        OrderbookSubscription computeIfAbsent = this.orderbooks.computeIfAbsent(currencyPair, currencyPair2 -> {
            return connectOrderBook(currencyPair2);
        });
        return computeIfAbsent.stream.doOnNext(binanceWebsocketTransaction -> {
            computeIfAbsent.initSnapshotIfInvalid(currencyPair);
        }).filter(binanceWebsocketTransaction2 -> {
            return computeIfAbsent.snapshotlastUpdateId > 0;
        }).map((v0) -> {
            return v0.getData();
        }).filter(depthBinanceWebSocketTransaction -> {
            return depthBinanceWebSocketTransaction.getLastUpdateId() > computeIfAbsent.snapshotlastUpdateId;
        }).filter(depthBinanceWebSocketTransaction2 -> {
            long j = computeIfAbsent.lastUpdateId.get();
            if (j == 0) {
                return depthBinanceWebSocketTransaction2.getFirstUpdateId() <= j + 1 && depthBinanceWebSocketTransaction2.getLastUpdateId() >= j + 1;
            }
            return true;
        }).filter(depthBinanceWebSocketTransaction3 -> {
            boolean z;
            long j = computeIfAbsent.lastUpdateId.get();
            if (j == 0) {
                z = true;
            } else {
                z = depthBinanceWebSocketTransaction3.getFirstUpdateId() == j + 1;
            }
            if (z) {
                computeIfAbsent.lastUpdateId.set(depthBinanceWebSocketTransaction3.getLastUpdateId());
            } else {
                LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", new Object[]{currencyPair, Long.valueOf(j), Long.valueOf(depthBinanceWebSocketTransaction3.getFirstUpdateId()), Long.valueOf(depthBinanceWebSocketTransaction3.getLastUpdateId())});
                computeIfAbsent.invalidateSnapshot();
            }
            return z;
        }).map(depthBinanceWebSocketTransaction4 -> {
            BinanceOrderbook orderBook = depthBinanceWebSocketTransaction4.getOrderBook();
            orderBook.bids.forEach((bigDecimal, bigDecimal2) -> {
                computeIfAbsent.orderBook.update(new OrderBookUpdate(Order.OrderType.BID, (BigDecimal) null, currencyPair, bigDecimal, depthBinanceWebSocketTransaction4.getEventTime(), bigDecimal2));
            });
            orderBook.asks.forEach((bigDecimal3, bigDecimal4) -> {
                computeIfAbsent.orderBook.update(new OrderBookUpdate(Order.OrderType.ASK, (BigDecimal) null, currencyPair, bigDecimal3, depthBinanceWebSocketTransaction4.getEventTime(), bigDecimal4));
            });
            return computeIfAbsent.orderBook;
        });
    }

    private Observable<BinanceRawTrade> rawTradeStream(CurrencyPair currencyPair) {
        return this.service.subscribeChannel(channelFromCurrency(currencyPair, "trade"), new Object[0]).map(this::tradeTransaction).filter(binanceWebsocketTransaction -> {
            return ((TradeBinanceWebsocketTransaction) binanceWebsocketTransaction.getData()).getCurrencyPair().equals(currencyPair);
        }).map(binanceWebsocketTransaction2 -> {
            return ((TradeBinanceWebsocketTransaction) binanceWebsocketTransaction2.getData()).getRawTrade();
        });
    }

    private <T> Observable<T> triggerObservableBody(Observable<T> observable) {
        observable.subscribe(obj -> {
        });
        return observable;
    }

    private BinanceWebsocketTransaction<TickerBinanceWebsocketTransaction> tickerTransaction(JsonNode jsonNode) {
        try {
            return (BinanceWebsocketTransaction) this.mapper.readValue(this.mapper.treeAsTokens(jsonNode), TICKER_TYPE);
        } catch (IOException e) {
            throw new ExchangeException("Unable to parse ticker transaction", e);
        }
    }

    private BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction> depthTransaction(JsonNode jsonNode) {
        try {
            return (BinanceWebsocketTransaction) this.mapper.readValue(this.mapper.treeAsTokens(jsonNode), DEPTH_TYPE);
        } catch (IOException e) {
            throw new ExchangeException("Unable to parse order book transaction", e);
        }
    }

    private BinanceWebsocketTransaction<TradeBinanceWebsocketTransaction> tradeTransaction(JsonNode jsonNode) {
        try {
            return (BinanceWebsocketTransaction) this.mapper.readValue(this.mapper.treeAsTokens(jsonNode), TRADE_TYPE);
        } catch (IOException e) {
            throw new ExchangeException("Unable to parse trade transaction", e);
        }
    }
}
