import { retry } from '@lifeomic/attempt';
import { MESSAGE_TYPE } from '@protos/options';
import { Dashboards } from '@protos/v2/dashboard';
import { Order } from '@protos/v2/order';
import { OrderBookTopMessage } from '@protos/v2/orderBookTop';
import { OtcQuoteMessage, OtcQuoteOrder, OtcQuoteOrderResponse, OtcQuoteSymbol } from '@protos/v2/otcQuote';
import { ProductRiskKey, ProductRiskMessage } from '@protos/v2/productRisk';
import { Snapshot } from '@protos/v2/snapshots';
import { Ticker } from '@protos/v2/ticker';
import AsyncLock from 'async-lock';
import { logger } from './context';
import { gridUpdateService } from './GridUpdateService';
import { toastifyService } from './ToastifyService';

export interface StreamResponse {
  id: string;
  method: string;
  timestamp: string;
  message: string;
}

export enum ChannelType {
  ServerInfo = 'server_info',
  RFQ = 'rfq',
  Tickers = 'tickers',
  OrderBookTop = 'order_book_top',
  ProductRisk = 'product_risk',
  Snapshots = 'snapshots',
  Dashboards = 'dashboards',
  Orders = 'internal_orders',
}
export type StreamEventCallback = (event: StreamEvent) => void;
export type VoidCallback = () => void;
export type QuotePayload = { symbol: OtcQuoteSymbol; size: string; exchange: string };
export type OrderBookTopPayload = { products: string[] };
export type ErrorCallback = (error: { hasError: boolean; message: string } | undefined) => void;

export class StreamEvent {
  channel: string;
  message: any;
  raw: MessageEvent;
  messageType = 'unknown';
  timestamp = 0;

  constructor(channel: string, message: any, messageType: number, event: MessageEvent) {
    this.raw = event;
    this.timestamp = event.timeStamp;
    this.channel = channel;
    this.message = message;
    // TODO: we do not need this in v2 any more, get rid of i
    this.messageType = MESSAGE_TYPE[messageType] || 'unknown';
  }

  asQuotes(): OtcQuoteMessage | null {
    if (this.channel !== 'rfq') return null;
    return this.message as OtcQuoteMessage;
  }

  asTickers(): Ticker[] | string | null {
    if (this.channel !== 'tickers') return null;
    return this.message as Ticker[] | string;
  }

  asOrderBookTop(): OrderBookTopMessage[] | null {
    if (this.channel !== 'order_book_top') return null;
    return this.message as OrderBookTopMessage[];
  }

  asProductRisk(): ProductRiskMessage | null {
    if (this.channel !== 'product_risk') return null;
    return this.message as ProductRiskMessage;
  }

  asSnapshots(): Snapshot | null {
    if (this.channel !== 'snapshots') return null;
    return this.message as Snapshot;
  }

  asDashboards(): Dashboards | null {
    if (this.channel !== 'dashboards') return null;
    return new Dashboards(this.message, this.messageType);
  }

  asOrder(): Order | null {
    if (this.channel !== 'internal_orders' || typeof this.message === 'string') return null;
    return this.message as Order;
  }
}

export class Subscription {
  channel: string;
  payload: Record<string, any>;

  constructor(channel: string, payload: Record<string, any>) {
    this.channel = channel;
    this.payload = payload;
  }

  getPayload(): Record<string, any> {
    switch (this.channel) {
      case 'rfq':
        return { rfq: this.payload };
      case 'product_risk':
        return { product_risk: this.payload };
      default:
        return { [this.channel]: this.payload };
    }
  }

  static serverInfo() {
    return new Subscription('server_info', {});
  }

  static productRisk(payload: ProductRiskKey) {
    return new Subscription('product_risk', payload);
  }

  static rfq(payload: QuotePayload) {
    return new Subscription('rfq', payload);
  }

  static tickers(products: string[]) {
    return new Subscription('tickers', { products });
  }

  static orderBookTop(payload: OrderBookTopPayload) {
    return new Subscription('order_book_top', payload);
  }

  static dashboards() {
    return new Subscription('dashboards', {});
  }

  static snapshots() {
    return new Subscription('snapshots', {});
  }

  static order() {
    return new Subscription('internal_orders', {});
  }
}

class Channel {
  id: string;
  lastUpdate = 0;
  handlers: Set<StreamEventCallback>;

  constructor(id: string) {
    this.id = id;
    this.lastUpdate = 0;
    this.handlers = new Set();
  }

  dispatch(msg: StreamEvent) {
    this.lastUpdate = msg.timestamp;
    for (const handler of this.handlers) {
      try {
        handler(msg);
      } catch (e) {
        logger.error('could not deliver stream event', msg, e);
      }
    }
  }

  removeHandler(store: StreamV2, handler: StreamEventCallback) {
    this.handlers.delete(handler);
    if (this.handlers.size === 0) store.channels.delete(this.id);
  }
}

export enum Status {
  Connected = 'connected',
  Reconnecting = 'reconnecting',
  Disconnected = 'disconnected',
}

export class StreamV2 {
  private lock: AsyncLock;
  private wsUrl: string;
  private messageId = 0;
  channels: Map<string, Channel>;
  private isAuthenticated: boolean = false;
  private token?: string = undefined;
  socket?: WebSocket = undefined;
  status: Status;
  private resolvers: Map<string, any>;
  private socketError: { hasError: boolean; message: string } | undefined = undefined;
  onConnects: Map<ChannelType, VoidCallback>;
  uid = '';
  private prerequisiteSubscriptions: Set<ChannelType.ServerInfo> = new Set();
  private errorListeners: ((newError: { hasError: boolean; message: string } | undefined) => void)[] = [];

  constructor(wsUrl: string) {
    this.wsUrl = wsUrl;
    this.channels = new Map<string, Channel>();
    this.status = Status.Disconnected;
    this.lock = new AsyncLock();
    this.onConnects = new Map<ChannelType, VoidCallback>();
    this.resolvers = new Map<string, any>();
    this.prerequisiteSubscriptions = new Set<ChannelType.ServerInfo>();
    this.errorListeners = [];
  }

  onConnect(handler: VoidCallback, channel: ChannelType) {
    this.onConnects.set(channel, handler);
  }

  get isStreamAuthenticated(): boolean {
    return this.isAuthenticated;
  }

  get getAuthToken(): string | undefined {
    return this.token;
  }

  newId(): string {
    this.messageId += 1;
    return `flux-ws-v2-${this.messageId}`;
  }

  private getChannel(channelId: string): Channel {
    let channel = this.channels.get(channelId);
    if (!channel) {
      channel = new Channel(channelId);
      this.channels.set(channelId, channel);
    }
    return channel;
  }

  private onResponse(data: any) {
    const resolver = this.resolvers.get(data.id);
    this.resolvers.delete(data.id);
    if (data.error) {
      logger.error('Stream V2 - Response Error', data);
      this.setSocketError({ hasError: data.error, message: data.message?.Message || '' });
    } else {
      logger.log('Stream V2 - Response: ', data);
      if (data.method.toLocaleLowerCase() === 'auth') {
        this.isAuthenticated = true;
      }
    }
    if (resolver) resolver(data);
  }

  private reSubscribe() {
    for (const onConnect of this.onConnects.values()) {
      onConnect();
    }
  }

  private removeHandler(channel: string, handler: StreamEventCallback) {
    const c = this.channels.get(channel);
    if (c) c.removeHandler(this, handler);
  }

  private dispatch(msg: StreamEvent) {
    if (!msg.message) return;
    const channel = this.channels.get(msg.channel);
    if (channel) channel.dispatch(msg);
  }

  private reset() {
    this.status = Status.Disconnected;
    this.socket = undefined;
    this.isAuthenticated = false;
  }

  private async reconnect(): Promise<WebSocket> {
    this.reset();
    const sock = await this.getSocket();
    if (this.token) await this.authenticate(this.token);
    return sock;
  }

  private connect(): Promise<WebSocket> {
    return new Promise((resolve, reject) => {
      const socket = new WebSocket(this.wsUrl);
      socket.onopen = event => {
        this.reset();
        this.socket = socket;
        logger.info('Stream V2 - New Websocket Connection', socket);
        this.status = Status.Connected;
        resolve(socket);
      };
      socket.onerror = event => {
        this.setSocketError({ hasError: true, message: 'Stream V2 - Websocket Connection Error' });
        this.reset();
      };
      socket.onclose = event => {
        if (this.socket) {
          logger.warn('Stream V2 - Lost Websocket Connection, Reconnecting');
        } else reject(event);
        this.reconnect();
      };
      socket.onmessage = (event: MessageEvent) => {
        const data = JSON.parse(event.data);
        if (data.channel) {
          const streamEvent = new StreamEvent(data.channel, data.message, data.message_type, event);
          this.dispatch(streamEvent);
        } else {
          this.onResponse(data);
          if (data.message?.includes('429'))
            this.setSocketError({
              hasError: true,
              message: 'Stream V2 - Authentication failed (code 429): Too many open websockets, limit is 5',
            });
        }
      };
    });
  }

  private async getSocket(): Promise<WebSocket> {
    return await this.lock.acquire(
      'websocket',
      async () =>
        await retry(
          async () => {
            if (this.socket) return this.socket;
            if (this.socketError?.hasError && this.socketError?.message.includes('429')) return;
            return await this.connect();
          },
          { delay: 200, factor: 2, maxAttempts: 0 }
        )
    );
  }

  onEvent(channel: string, handler: StreamEventCallback): VoidCallback {
    this.getChannel(channel).handlers.add(handler);
    return () => this.removeHandler(channel, handler);
  }

  async rpc(method: string, payload: Record<string, unknown>) {
    const id = this.newId();
    const timestamp = new Date().toISOString();
    const promise = new Promise((resolve, reject) => {
      this.resolvers.set(id, resolve);
    });
    await this.sendJson({
      id,
      method,
      timestamp,
      ...payload,
    });
    await promise;
  }

  async authenticate(token: string) {
    this.token = token;
    await this.rpc('auth', { token });
    await this.reSubscribe();
  }

  async send(msg: string) {
    const socket = await this.getSocket();
    if (socket) socket.send(msg);
  }

  async sendJson(payload: Record<string, unknown>) {
    logger.info('Stream V2 - Request', payload);
    const msg = JSON.stringify(payload);
    await this.send(msg);
  }

  async maybeSendJson(payload: Record<string, unknown>) {
    if (this.isAuthenticated && this.socket) {
      await this.sendJson(payload);
    }
  }

  async subscribe(subscription: Subscription) {
    if ([ChannelType.ServerInfo].includes(subscription.channel as ChannelType.ServerInfo)) {
      const alreadySubscribed = this.prerequisiteSubscriptions.has(subscription.channel as ChannelType.ServerInfo);
      if (!alreadySubscribed) {
        this.prerequisiteSubscriptions.add(subscription.channel as ChannelType.ServerInfo);
        await this.maybeSendJson({
          id: this.newId(),
          timestamp: new Date().toISOString(),
          method: 'subscribe',
          channel: subscription.getPayload(),
        });
      }
    } else {
      await this.maybeSendJson({
        id: this.newId(),
        timestamp: new Date().toISOString(),
        method: 'subscribe',
        channel: subscription.getPayload(),
      });
    }
  }

  async unsubscribe(subscription: Subscription) {
    if ([ChannelType.ServerInfo].includes(subscription.channel as ChannelType.ServerInfo)) {
      const isSubscribed = this.prerequisiteSubscriptions.has(subscription.channel as ChannelType.ServerInfo);
      if (isSubscribed) {
        this.prerequisiteSubscriptions.delete(subscription.channel as ChannelType.ServerInfo);
      }
    }

    await this.maybeSendJson({
      id: this.newId(),
      timestamp: new Date().toISOString(),
      method: 'unsubscribe',
      channel: subscription.getPayload(),
    });
  }

  async placeOtcQuoteOrder(order: OtcQuoteOrder) {
    const id = this.newId();
    this.resolvers.set(id, (response: OtcQuoteOrderResponse) => {
      toastifyService.showV2TradeConfirmation(response);
      if (response.method.toLocaleLowerCase() === 'otc_error') return;
      gridUpdateService.setUpdatedRow(response);
    });
    await this.maybeSendJson({
      id,
      timestamp: new Date().toISOString(),
      method: 'order',
      ...order,
    });
  }

  public getSocketError() {
    return this.socketError;
  }

  private setSocketError(error: { hasError: boolean; message: string }) {
    this.socketError = error;
    this.errorListeners.forEach(listener => listener(this.socketError));
  }

  public errorSubscribe(listener: (newValue: { hasError: boolean; message: string } | undefined) => void) {
    this.errorListeners.push(listener);
    return () => {
      this.errorListeners = this.errorListeners.filter(l => l !== listener);
    };
  }

  async modifyDashboard(id: number, body: any) {
    const messageId = this.newId();

    this.resolvers.set(messageId, (response: StreamResponse) => {
      const { message } = response;
      if (message !== 'ok') {
        logger.error('stream response error', response);
      }
    });

    const newDashboard = {
      id,
      ...body,
    };

    await this.maybeSendJson({
      id: messageId,
      method: 'update_dashboard',
      timestamp: new Date().toISOString(),
      dashboard: newDashboard,
    });
    return newDashboard;
  }
}
