import {
  fillTemplate,
  testTopicTemplate,
  extractFilling,
  FillingInputValue,
} from "mqtt-helpers";
import mqtt from "mqtt";
import EventEmitter from "events";
import stringify from "fast-json-stable-stringify";
import { useState, useCallback, useEffect, useRef } from "react";

export enum TOPICS {
  ASSET_RATE = "rates/global/asset/{asset}",
  PRICE_CHANGE = "token_metrics/{asset}/{interval}/price_change",
  PAIR_RATE_BY_EXCHANGE = "rates/exchange/{exchange}/pair/{base}/{quote}",
  EXCHANGE_RATE = "realtime/market/+/{exchange}/spot/{base}/{quote}/trades",
  ASSET_PRICE_BY_EXCHANGE = "rates/exchange/{exchange}/asset/{asset}",
  ASSET_PRICE = "rates/exchange/+/asset/{asset}",
  ASSET_PRICE_CHANGE = "token_metrics/{exchange}/{base}/{quote}/{interval}/price_change",
  PERPETUAL_PRICE = "realtime/market/+/{exchange}/perpetual/{base}/{quote}/{symbol}/trades",
  FUTURES_PRICE = "realtime/market/+/{exchange}/futures/{base}/{quote}/{symbol}/trades",
}

// mqtt topics cache
interface CacheItem {
  filling: Record<string, string>;
  msg: unknown;
}

const cache = new Map<string, Map<string, CacheItem>>();

// number of subscriptions per topic with filled params
export const subscriptions = new Map<string, number>();

const client = new EventEmitter();

export default client;

export let mqttClient;

if (typeof window !== "undefined") {
  mqttClient = mqtt.connect(process.env.NEXT_PUBLIC_MQTT_URL);

  mqttClient.on("message", (topic, originalMsg: Buffer, packet) => {
    let msg: Record<string, unknown>;

    try {
      msg = JSON.parse(originalMsg.toString());
    } catch (err) {
      console.error("Couldn't parse MQTT message as JSON", msg, err);
      return;
    }

    if (typeof msg !== "object") {
      return;
    }

    client.emit("msg", topic, msg, packet);

    for (const topicTemplate of Object.values(TOPICS)) {
      if (testTopicTemplate(topic, topicTemplate) === true) {
        const filling = extractFilling(topic, topicTemplate);

        if (cache.has(topicTemplate) === false) {
          cache.set(topicTemplate, new Map());
        }

        cache.get(topicTemplate).set(topic, {
          filling,
          msg,
        });

        client.emit(topicTemplate, filling, msg);
      }
    }
  });
}

export function subscribe(
  topicTemplate: TOPICS,
  params: Record<string, FillingInputValue>,
): boolean | CacheItem {
  if (typeof window === "undefined") {
    return false;
  }

  const sub = stringify({
    ...params,
    topic: topicTemplate,
  });

  if (typeof subscriptions.get(sub) === "undefined") {
    subscriptions.set(sub, 1);
    mqttClient.subscribe(fillTemplate(topicTemplate, params));
    return true;
  } else {
    subscriptions.set(sub, (subscriptions.get(sub) as number) + 1);

    if (cache.has(topicTemplate) === true) {
      const topic = fillTemplate(topicTemplate, params);

      if (cache.get(topicTemplate).has(topic) === true) {
        return cache.get(topicTemplate).get(topic);
      }
    }

    return true;
  }
}

export function unsubscribe(
  topicTemplate: TOPICS,
  params: Record<string, FillingInputValue>,
) {
  if (typeof window === "undefined") {
    return;
  }

  const sub = stringify({
    ...params,
    topic: topicTemplate,
  });

  if (typeof subscriptions.get(sub) === "undefined") {
    return true;
  }

  subscriptions.set(sub, (subscriptions.get(sub) as number) - 1);

  if (subscriptions.get(sub) === 0) {
    subscriptions.delete(sub);
    const topic = fillTemplate(topicTemplate, params);
    mqttClient.unsubscribe(topic);

    if (cache.has(topicTemplate) === true) {
      cache.get(topicTemplate).delete(topic);
    }
  }
}

export interface IUseMqttTopicProps<TValue, TMsg> {
  topic: TOPICS;
  subs: Record<string, FillingInputValue>[];
  extractKeyValue: (
    filling: Record<string, FillingInputValue>,
    msg: TMsg,
  ) => null | { key: FillingInputValue; value: TValue };
  batchDelay?: number;
  onNewValues?: (
    oldValues: Map<string, TValue>,
    newValues: Map<string, TValue>,
    changedKeys: string[],
  ) => void;
}

// TODO: add useMqttTopic with same batching?
export function useMqttTopic<TValue = unknown, TMsg = unknown>({
  topic,
  // initialSubs,
  subs: userSubs,
  batchDelay,
  extractKeyValue,
  onNewValues,
}: IUseMqttTopicProps<TValue, TMsg>) {
  // TODO: track subs changes
  const [values, setValues] = useState(new Map<string, TValue>());

  const subs = useRef(new Map());

  // const subs = useRef(new Map<string, object>((initialSubs ?? []).map(args => [
  //   stringify(args),
  //   args,
  // ])))

  const onMsg = useCallback(
    (filling, msg) => {
      const keyValue = extractKeyValue(filling, msg);

      if (keyValue === null) {
        return;
      }
      const { key, value } = keyValue;

      if (topic === TOPICS.PRICE_CHANGE) {
        Object.keys(value).forEach(function (key) {
          value[key] =
            new Date().getTime() - msg.dt > 14400000 ? "" : value[key];
        }, value);
      }

      if (typeof batchDelay !== "undefined") {
        // batching is enabled
        updateBatch.current.newValues.set(key, value);

        if (
          updateBatch.current.newValues.size !== 0 &&
          updateBatch.current.timeoutId === null
        ) {
          // schedule batch update
          updateBatch.current.timeoutId = setTimeout(() => {
            setValues((oldValues) => {
              const newValues = new Map(oldValues);

              for (const [key, value] of updateBatch.current.newValues) {
                newValues.set(key, value);
              }

              if (typeof onNewValues !== "undefined") {
                onNewValues(
                  oldValues,
                  newValues,
                  Array.from(updateBatch.current.newValues.keys()),
                );
              }

              return newValues;
            });

            updateBatch.current.timeoutId = null;
            updateBatch.current.newValues.clear();
          }, batchDelay);
        }
      } else {
        setValues((oldValues) => {
          const newValues = new Map(oldValues);

          newValues.set(key, value);

          if (typeof onNewValues !== "undefined") {
            onNewValues(oldValues, newValues, [key]);
          }

          return newValues;
        });
      }
    },
    [topic],
  );

  const updateBatch = useRef({
    timeoutId: null,
    newValues: new Map(),
  });

  const addSub = useCallback(
    (args) => {
      const subKey = stringify(args);

      if (subs.current.has(subKey) === true) {
        return false;
      }

      subs.current.set(subKey, args);
      const cache = subscribe(topic, args);

      if (typeof cache !== "boolean") {
        onMsg(cache.filling, cache.msg);
      }

      return true;
    },
    [topic],
  );

  const removeSub = useCallback(
    (args) => {
      const subKey = stringify(args);

      if (subs.current.has(subKey) === false) {
        return false;
      }

      subs.current.delete(subKey);
      unsubscribe(topic, args);

      return true;
    },
    [topic],
  );

  const clearSubs = useCallback(() => {
    for (const args of subs.current.values()) {
      unsubscribe(topic, args);
    }

    subs.current.clear();
  }, [topic]);

  // const setSubs = (subsArgs: object[]) => {
  //   // TODO: add elegant sub/unsub logic?
  //   clearSubs();
  //
  //   for (const args of subsArgs) {
  //     const subKey = stringify(args);
  //
  //     subs.current.set(subKey, args);
  //     subscribe(topic, args)
  //   }
  // }

  useEffect(() => {
    const userSubsMap = new Map(userSubs.map((v) => [stringify(v), v]));

    const allKeys = new Set(
      Array.from(userSubsMap.keys()).concat(Array.from(subs.current.keys())),
    );

    for (const key of allKeys) {
      if (subs.current.has(key) === false) {
        // new sub
        addSub(userSubsMap.get(key));
      }

      if (userSubsMap.has(key) === false) {
        // old sub
        removeSub(subs.current.get(key));
      }
    }

    subs.current = userSubsMap;
  }, [userSubs, topic]);

  useEffect(() => {
    // for (const args of subs.current.values()) {
    //   const cache = subscribe(topic, args)
    //
    //   if (typeof cache !== 'boolean') {
    //     onMsg(cache.filling, cache.msg);
    //   }
    // }

    client.on(topic, onMsg);

    return () => {
      clearSubs();
      client.off(topic, onMsg);
    };
  }, [topic]);

  return {
    // subs: subs.current,
    // addSub,
    // removeSub,
    // clearSubs,
    // setSubs,
    values,
  };
}
