import { Ticker } from '@protos/v2/ticker';
import { ChannelType, StreamEvent, StreamV2, Subscription } from '@services/StreamV2';
export type TickerCallback = (ticker: Ticker) => void;

interface ProductStream {
  onMessage: TickerCallback;
  subscribe: (callback: TickerCallback) => void;
  callbacks: Set<TickerCallback>;
}

const productStream = (product: string): ProductStream => {
  const callbacks = new Set<TickerCallback>();
  const cache: Record<string, Ticker> = {};

  const subscribe = (callback: TickerCallback) => {
    Object.values(cache).forEach(ticker => callback(ticker));
    callbacks.add(callback);
  };

  const onMessage = (ticker: Ticker) => {
    cache[ticker.symbol as string] = ticker;
    for (const callback of callbacks) {
      callback(ticker);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
  };
};

export const streamV2ProductService = (stream: StreamV2) => {
  const productStreams: Record<string, ProductStream> = {};

  const scheduleUnsubscribe = (productSymbols: string[]) => {
    setTimeout(() => {
      const toUnsubscribe: string[] = [];

      productSymbols.forEach(productSymbol => {
        const ps = productStreams[productSymbol];
        if (ps && ps.callbacks.size === 0) {
          delete productStreams[productSymbol];
          toUnsubscribe.push(productSymbol);
        }
      });

      if (toUnsubscribe.length > 0) {
        stream.unsubscribe(Subscription.tickers(toUnsubscribe));
      }
    }, 5000);
  };

  stream.onEvent(ChannelType.Tickers, (event: StreamEvent) => {
    const tickers = event.asTickers();
    if (typeof tickers === 'string' || !tickers?.length) return;

    tickers.forEach(ticker => {
      productStreams[ticker.product_symbol]?.onMessage(ticker);
    });
  });

  stream.onConnect(() => {
    const products = Object.keys(productStreams);
    if (products.length > 0) {
      stream.subscribe(Subscription.tickers(products));
    }
  }, ChannelType.Tickers);

  const subscribe = (productSymbols: string[], callback: TickerCallback) => {
    const toSubscribe = productSymbols.reduce((toSubscribe: string[], productSymbol: string) => {
      let ps = productStreams[productSymbol];
      if (!ps) {
        ps = productStream(productSymbol);
        productStreams[productSymbol] = ps;
        toSubscribe.push(productSymbol);
      }
      ps.subscribe(callback);
      return toSubscribe;
    }, []);

    if (toSubscribe.length > 0) {
      stream.subscribe(Subscription.tickers(toSubscribe));
    }
  };

  const unsubscribe = (productSymbols: string[], callback: TickerCallback) => {
    const toUnsubscribe: string[] = [];

    productSymbols.forEach((productSymbol: string) => {
      const ps = productStreams[productSymbol];
      if (ps) {
        ps.callbacks.delete(callback);
        if (ps.callbacks.size === 0) {
          toUnsubscribe.push(productSymbol);
        }
      }
    });

    if (toUnsubscribe.length > 0) {
      scheduleUnsubscribe(toUnsubscribe);
    }
  };

  return {
    subscribe,
    unsubscribe,
  };
};
