import { ProductRisk } from '@protos/trading';
import { ChannelTypeV1, Stream, StreamEvent, Subscription } from './Stream';

export type ProductRiskCallback = (risk: any) => void;

interface ProductRiskStream {
  onMessage: ProductRiskCallback;
  subscribe: (callback: ProductRiskCallback) => void;
  callbacks: Set<ProductRiskCallback>;
}

const createProductRiskStream = (productRiskKey: string): ProductRiskStream => {
  const callbacks = new Set<ProductRiskCallback>();
  const cache: Record<string, any> = {};

  const subscribe = (callback: ProductRiskCallback) => {
    Object.values(cache).forEach(risk => callback(risk));
    callbacks.add(callback);
  };

  const onMessage = (risk: ProductRisk) => {
    cache[productRiskKey] = risk;
    for (const callback of callbacks) {
      callback(risk);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
  };
};
export const streamProductRiskService = (stream: Stream) => {
  const productRiskStream: Record<string, ProductRiskStream> = {};

  const productRiskUnsubscribe = (productRiskKey: string) => {
    const ps = productRiskStream[productRiskKey];
    if (ps && ps.callbacks.size === 0) {
      delete productRiskStream[productRiskKey];
      stream.unsubscribe(Subscription.product_risk(productRiskKey.split('-')[0], productRiskKey.split('-')[1]));
    }
  };

  stream.onEvent(ChannelTypeV1.ProductRisk, (event: StreamEvent) => {
    const productRisk = event.asProductRisk();
    const productRiskKey = Object.keys(productRiskStream)[0];
    if (!productRisk) return;
    productRiskStream[productRiskKey]?.onMessage(productRisk);
  });

  stream.onConnect(() => {
    const productRisks = Object.keys(productRiskStream);
    if (productRisks.length > 0) {
      productRisks.forEach(productRiskKey => {
        const [product_symbol, account_id] = productRiskKey.split('-');
        stream.subscribe(Subscription.product_risk(product_symbol, account_id));
      });
    }
  }, ChannelTypeV1.ProductRisk);

  const subscribe = (product_symbol: string, account_id: string, callback: ProductRiskCallback) => {
    const productRiskKey = `${product_symbol}-${account_id}`;
    let ps = productRiskStream[productRiskKey];
    if (!ps) {
      ps = createProductRiskStream(productRiskKey);
      productRiskStream[productRiskKey] = ps;
    }
    ps.subscribe(callback);
    stream.subscribe(Subscription.product_risk(product_symbol, account_id));
  };

  const unsubscribe = (product_symbol: string, account_id: string, callback: ProductRiskCallback) => {
    const productRiskKey = `${product_symbol}-${account_id}`;
    const ps = productRiskStream[productRiskKey];
    if (ps) {
      ps.callbacks.delete(callback);
      if (ps.callbacks.size === 0) {
        productRiskUnsubscribe(productRiskKey);
      }
    }
  };

  return {
    subscribe,
    unsubscribe,
  };
};
