import { OtcQuoteMessage, OtcQuoteOrder, OtcQuoteSymbol } from '@protos/v2/otcQuote';
import { ChannelType, StreamEvent, StreamV2, Subscription } from '@services/StreamV2';
import { getQuoteSymbolString } from '@shared/utils/symbol';

export type QuoteCallback = (otcQuote: OtcQuoteMessage) => void;

interface RFQStream {
  onMessage: QuoteCallback;
  subscribe: (callback: QuoteCallback) => void;
  callbacks: Set<QuoteCallback>;
  size: string;
}

const createRFQStream = (size: string): RFQStream => {
  const callbacks = new Set<QuoteCallback>();
  // cache of rfqs - required for callbacks that subscribe after the server snapshot
  const cache: Record<string, OtcQuoteMessage> = {};

  const subscribe = (callback: QuoteCallback) => {
    Object.values(cache).forEach(ticker => callback(ticker));
    callbacks.add(callback);
  };

  const onMessage = (ticker: OtcQuoteMessage) => {
    cache[ticker.symbol as string] = ticker;
    for (const callback of callbacks) {
      callback(ticker);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
    size,
  };
};

export const streamV2RFQService = (stream: StreamV2) => {
  const rfqStream: Record<string, RFQStream> = {};

  stream.onEvent(ChannelType.RFQ, (event: StreamEvent) => {
    const quote = event.asQuotes();
    if (!quote) return;

    const flatQuoteSymbolKey = getQuoteSymbolString(quote.symbol);
    rfqStream[`${flatQuoteSymbolKey}-${quote.exchange}`]?.onMessage(quote);
  });

  stream.onConnect(() => {
    const rfqs = Object.keys(rfqStream);
    if (rfqs.length > 0) {
      rfqs.forEach(rfq => {
        const symbolElements = rfq.split('-');
        if (symbolElements.length === 3) {
          const [frontSymbol, backSymbol, exchange] = symbolElements;
          const rfqSize = rfqStream[`${frontSymbol}-${backSymbol}-${exchange}`]?.size;
          stream.subscribe(Subscription.rfq({ symbol: `${frontSymbol}-${backSymbol}`, size: rfqSize, exchange }));
        } else if (symbolElements.length === 2) {
          const [symbol, exchange] = symbolElements;
          const rfqSize = rfqStream[`${symbol}-${exchange}`]?.size;
          stream.subscribe(Subscription.rfq({ symbol, size: rfqSize, exchange }));
        }
      });
    }
  }, ChannelType.RFQ);

  const subscribe = (symbol: OtcQuoteSymbol, size: string, exchange: string, callback: QuoteCallback) => {
    const flatQuoteSymbolKey = getQuoteSymbolString(symbol);

    let currentStream = rfqStream[`${flatQuoteSymbolKey}-${exchange}`];
    if (!currentStream) {
      currentStream = createRFQStream(size);
      rfqStream[`${flatQuoteSymbolKey}-${exchange}`] = currentStream;
    }
    currentStream.subscribe(callback);
    stream.subscribe(Subscription.rfq({ symbol, size, exchange }));
  };

  const unsubscribe = (symbol: OtcQuoteSymbol, exchange: string) => {
    const flatQuoteSymbolKey = getQuoteSymbolString(symbol);
    if (!flatQuoteSymbolKey) return;

    const rfqSize = rfqStream[`${flatQuoteSymbolKey}-${exchange}`]?.size;
    if (!rfqSize || !symbol || !exchange) return;

    delete rfqStream[`${flatQuoteSymbolKey}-${exchange}`];
    stream.unsubscribe(Subscription.rfq({ symbol, size: rfqSize, exchange }));
  };

  const placeOrderQuote = (order: OtcQuoteOrder) => {
    stream.placeOtcQuoteOrder(order);
  };

  return { subscribe, unsubscribe, placeOrderQuote };
};
