import { ProductRiskKey, ProductRiskMessage } from '@protos/v2/productRisk';
import { ChannelType, StreamEvent, StreamV2, Subscription } from '@services/StreamV2';

export type ProductRiskCallback = (productRisk: ProductRiskMessage) => void;

/**
 * Represents a product risk stream
 */
interface ProductRiskStream {
  /**
   * Handles incoming product risk messages
   * @param productRisk - The product risk message
   */

  onMessage: ProductRiskCallback;

  /**
   * Subscribes to the product risk stream and registers a callback to be
   * called when a product risk message is received
   *
   * @param callback - The callback to be called when a product risk message is received
   */

  subscribe: (callback: ProductRiskCallback) => void;

  /**
   * Set of callbacks subscribed to the product risk stream
   */
  callbacks: Set<ProductRiskCallback>;
}

const createProductRiskStream = (productRiskKey: string): ProductRiskStream => {
  const callbacks = new Set<ProductRiskCallback>();
  const cache: Record<string, ProductRiskMessage> = {};

  const subscribe = (callback: ProductRiskCallback) => {
    Object.values(cache).forEach(risk => callback(risk));
    callbacks.add(callback);
  };

  const onMessage = (productRisk: ProductRiskMessage) => {
    cache[productRiskKey] = productRisk;
    for (const callback of callbacks) {
      callback(productRisk);
    }
  };

  return {
    onMessage,
    subscribe,
    callbacks,
  };
};

/**
 * Creates a service to manage product risk streaming subscriptions
 * @param stream  - The WebSocket stream instance for v2 API
 * @returns Object with subscribe and unsubscribe methods
 */
export const streamV2ProductRiskService = (stream: StreamV2) => {
  // Stores all product risk streams indexed by account_id-product_symbol key
  const productRiskStreams: Record<string, ProductRiskStream> = {};

  // Handles incoming product risk messages
  stream.onEvent(ChannelType.ProductRisk, (event: StreamEvent) => {
    const productRisk = event.asProductRisk();
    const productRiskKey = Object.keys(productRiskStreams)[0];
    if (!productRisk) return;
    productRiskStreams[productRiskKey]?.onMessage(productRisk);
  });

  // Subscribes to product risk streams when the connection is established
  stream.onConnect(() => {
    const productRisks = Object.keys(productRiskStreams);
    if (productRisks.length > 0) {
      productRisks.forEach(productRiskKey => {
        const [account_id, product_symbol] = productRiskKey.split('-');
        stream.subscribe(Subscription.productRisk({ account_id, product_symbol }));
      });
    }
  }, ChannelType.ProductRisk);

  /**
   * Subscribes to a product risk stream
   * @param productRiskKey - The product risk key
   * @param callback - The callback to be called when a product risk message is received
   */
  const subscribe = (productRiskKey: ProductRiskKey, callback: ProductRiskCallback) => {
    const key = `${productRiskKey.account_id}-${productRiskKey.product_symbol}`;
    let productRiskStream = productRiskStreams[key];
    if (!productRiskStream) {
      productRiskStream = createProductRiskStream(key);
      productRiskStreams[key] = productRiskStream;
    }
    productRiskStream.subscribe(callback);
    stream.subscribe(Subscription.productRisk(productRiskKey));
  };

  /**
   * Unsubscribes from a product risk stream
   * @param productRiskKey - The product risk key
   * @param callback - The callback to be removed from the product risk stream, helps avoiding memory leaks
   */
  const unsubscribe = (productRiskKey: ProductRiskKey, callback: ProductRiskCallback) => {
    const key = `${productRiskKey.account_id}-${productRiskKey.product_symbol}`;
    const productRiskStream = productRiskStreams[key];
    if (productRiskStream) {
      productRiskStream.callbacks.delete(callback);
      if (productRiskStream.callbacks.size === 0) {
        delete productRiskStreams[key];
        stream.unsubscribe(Subscription.productRisk(productRiskKey));
      }
    }
  };

  return {
    subscribe,
    unsubscribe,
  };
};
