import {
  ContentType,
  Message,
  MessageFromClient,
  MessageToClient,
  CreateMailboxResponse,
  SendMessageRequest,
  CreateMailboxRequest,
  FirstMessage,
  RpcFrameType,
  KeyValPair,
  ChangeDataCaptureSubscription,
  MessageEnvelope,
  MailboxSubscription,
  SendReceipt,
  Ping,
  Subscription,
  NefarioSubscription,
  Pong,
} from "./proto/wsmessages";
import { memoize, undef } from "../Utils";
// Your code here

export interface ContentTypeHandler {
  readonly webSocketUrlPath: string;
  sendMessageFromClient(mfc: MessageFromClient, ws: WebSocket): void;
}

const JsonContentTypeHandler: ContentTypeHandler = {
  webSocketUrlPath: "/api/ws/send_receive_json",

  sendMessageFromClient(mfc: MessageFromClient, ws: WebSocket): void {
    const obj = MessageFromClient.toJSON(mfc);
    const jsonStr = JSON.stringify(obj);
    ws.send(jsonStr);
  },
};

const ProtobufContentTypeHandler: ContentTypeHandler = {
  webSocketUrlPath: "/api/ws/send_receive_proto",
  sendMessageFromClient(mfc: MessageFromClient, ws: WebSocket): void {
    const bytes = MessageFromClient.encode(mfc).finish();
    ws.send(bytes);
  },
};

export const ContentTypeHandlerWrapper = {
  Json: JsonContentTypeHandler,
  Protobuf: ProtobufContentTypeHandler,
};

export function newHermesClient(
  rootUrl: string,
  contentTypeHandler: ContentTypeHandler
): HermesClient {
  const hci = new HermesClientImpl(rootUrl, contentTypeHandler);

  hci.mailbox().then((mbox) => {
    const correlations = hci.correlations;
    hci.channelMessageSubscribe(
      {
        id: "rpc-inbox",
        state: "rpc-inbox",
        readerKey: mbox.readerKey,
        channel: "rpc-inbox",
        startSeq: "all",
      },
      (me, msg) => {
        if (me.messageBytes) {
          try {
            const msg = Message.decode(me.messageBytes);
            const endPoint = msg.header?.rpcHeader?.endPoint;
            if (
              msg.header?.rpcHeader?.frameType === RpcFrameType.Request &&
              endPoint === "ping"
            ) {
              hci.sendPongResponse(mbox, msg, endPoint);
            } else {
              const correlationId = msg.header?.rpcHeader?.correlationId;
              if (correlationId) {
                const resolve = correlations.get(correlationId);
                if (resolve !== undefined) {
                  resolve(msg);
                }
                correlations.delete(correlationId);
              }
            }
          } catch (e) {
            console.error("error decoding message", e);
          }
        }
        // noop since we are only interested in the correlationId for rpc and that happens in onMessage
      }
    );
  });

  // send ping every 30 seconds
  setInterval(() => hci.sendPing(), 30 * 1000);
  return hci;
}

/**
 * Create the mailbox
 * @param channels
 * @param rootUrl
 * @returns
 */
async function createMailbox(
  channels: string[],
  rootUrl: string
): Promise<CreateMailboxResponse> {
  const mbox: CreateMailboxRequest = {
    channels: channels,
    privateMetadata: {},
    publicMetadata: {},
    purgeTimeoutInMillis: 0,
    closeTimeoutInMillis: 0,
    extraData: {},
  };
  const mboxObj = CreateMailboxRequest.toJSON(mbox);
  const mboxJson = JSON.stringify(mboxObj);

  let mailboxResponse: CreateMailboxResponse | undefined = undefined;
  const response = await fetch(`${rootUrl}/api/create_mailbox`, {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: mboxJson,
  });

  if (response.ok) {
    const responseJsonStr = await response.text();
    mailboxResponse = CreateMailboxResponse.fromJSON(
      JSON.parse(responseJsonStr)
    );
  } else {
    throw new Error(`createMailbox failed with status ${response.status}`);
  }

  return mailboxResponse;
}

export interface ChangeDataCaptureEvent {
  id: string;
  schema: string;
  table: string;
  action: string;
  data: any;
  commitTime: string;
}

class Constants {
  static readonly rpcInboxChannelName = "rpc-inbox";
  static readonly rpcSentChannelName = "rpc-sent";
}

type MessageListener = (msg: Message) => void;
type MessageEnvelopeListener = (me: MessageEnvelope) => void;

class HermesConnection {
  readonly clientImpl: HermesClientImpl;
  readonly mailbox: CreateMailboxResponse;
  readonly webSocket: WebSocket;

  constructor(
    clientImpl: HermesClientImpl,
    mailbox: CreateMailboxResponse,
    webSocket: WebSocket
  ) {
    this.clientImpl = clientImpl;
    this.mailbox = mailbox;
    this.webSocket = webSocket;

    const self = this;

    webSocket.onmessage = function (event) {
      if (event.data instanceof ArrayBuffer) {
        self.onWebSocketBinaryMessage(event.data);
      } else {
        self.onWebSocketTextMessage(event.data);
      }
    };

    webSocket.onclose = function (event) {
      console.log("HermesConnection websocket closed", event);
      clientImpl.reconnect();
    };

    // resend un ack'ed messages
    clientImpl.sentMessagesWaitingForAck.forEach((smr, idempotentId) => {
      self.sendSendMessageRequest(smr, false);
    });
  }

  onWebSocketTextMessage(message: string): void {
    const jsonObj = JSON.parse(message);
    const m2c = MessageToClient.fromJSON(jsonObj);
    this.onMessageToClient(m2c);
  }

  onWebSocketBinaryMessage(message: ArrayBuffer): void {
    const m2c = MessageToClient.decode(new Uint8Array(message));
    this.onMessageToClient(m2c);
  }

  onMessageToClient(m2c: MessageToClient): void {
    if (m2c.notification !== undefined) {
      console.log(
        "hermes client received notification " + m2c.notification,
        m2c.notification
      );
    } else if (m2c.messageEnvelope !== undefined) {
      const me = m2c.messageEnvelope;
      if (me.messageBytes === undefined) {
        console.log(
          "hermes client received empty messageEnvelope",
          m2c.messageEnvelope
        );
      } else {
        const subscriptionId = me.serverEnvelope?.subscriptionId;
        if (subscriptionId) {
          const activeSub =
            this.clientImpl.activeSubscriptions.get(subscriptionId);
          if (activeSub) {
            const startSeq = me.serverEnvelope?.sequence;
            if (startSeq) {
              activeSub.protoRawSubscription.startSeq = String(startSeq);
            }
            activeSub.onMessageEvent(me);
          }
        }
      }
    } else if (m2c.sendMessageResponse !== undefined) {
      const id = m2c.sendMessageResponse?.idempotentId;
      if (id) {
        this.clientImpl.sentMessagesWaitingForAck.delete(id);
      }
      console.log(
        "hermes client received SendMessageResponse",
        m2c.sendMessageResponse
      );
    } else if (m2c.subscribeResponse !== undefined) {
      console.log(
        "hermes client received subscribeResponse",
        m2c.subscribeResponse
      );
    } else if (m2c.ping !== undefined) {
      this.webSocket.send(JSON.stringify({ pong: {} }));
    } else if (m2c.pong !== undefined) {
      console.log("hermes client received pong");
    }
  }

  sendMessageFromClient(mfc: MessageFromClient): void {
    console.log("sending websocket message", mfc);
    this.clientImpl.contentTypeHandler.sendMessageFromClient(
      mfc,
      this.webSocket
    );
  }

  addActiveSubscription(activeSub: ActiveSubscription): void {
    const listeners = this.clientImpl.activeSubscriptions.get(
      activeSub.subscriptionId
    );
    if (listeners) {
      throw Error(
        `subscriptionId ${activeSub.subscriptionId} is already subscribed`
      );
    } else {
      this.clientImpl.activeSubscriptions.set(
        activeSub.subscriptionId,
        activeSub
      );
    }
  }

  cdcSubscribe<A>(
    cdcs: CdcSubscription,
    listener: (cdcEvent: ChangeDataCaptureEvent, a: A) => void
  ): void {
    const subscriptionId =
      "cdc-" + cdcs.tables.map((t) => t.database + "." + t.table).join("-");

    const protoCdcs: ChangeDataCaptureSubscription = {
      id: subscriptionId,
      matchers: cdcs.tables,
      startSeq: cdcs.startSeq,
    };

    this.sendMessageFromClient({
      subscribeRequest: { subscriptions: [{ changeDataCapture: protoCdcs }] },
    });

    function onMessage(msg: MessageEnvelope): void {
      const json = new TextDecoder().decode(msg.messageBytes);
      const cdcEvent = JSON.parse(json) as ChangeDataCaptureEvent;
      listener(cdcEvent, cdcEvent.data as A);
    }

    this.addActiveSubscription({
      subscriptionId: subscriptionId,
      protoRawSubscription: protoCdcs,
      onMessageEvent: onMessage,
      protoSubscription: { changeDataCapture: protoCdcs },
    });
  }

  channelMessageSubscribe(
    ms: MailboxSubscription,
    listener: (me: MessageEnvelope, msg: Message) => void
  ): void {
    this.rawChannelSubscribe(ms, Message.decode, listener);
  }

  channelSendReceiptSubscribe(
    ms: MailboxSubscription,
    listener: (me: MessageEnvelope, msg: SendReceipt) => void
  ): void {
    this.rawChannelSubscribe(ms, SendReceipt.decode, listener);
  }

  rawChannelSubscribe<A>(
    ms: MailboxSubscription,
    decoder: (bytes: Uint8Array) => A,
    listener: (me: MessageEnvelope, a: A) => void
  ): void {
    const subscriptionId = ms.id;
    if (!subscriptionId) {
      throw new Error("MailboxSubscription id is undefined");
    }

    function onMessage(msg: MessageEnvelope): void {
      if (msg.messageBytes === undefined) {
        console.error("MessageEnvelope.messageBytes is undefined");
        return;
      }
      const a = decoder(msg.messageBytes);
      listener(msg, a);
    }

    this.sendMessageFromClient({
      subscribeRequest: { subscriptions: [{ mailbox: ms }] },
    });

    this.addActiveSubscription({
      subscriptionId: subscriptionId,
      onMessageEvent: onMessage,
      protoRawSubscription: ms,
      protoSubscription: { mailbox: ms },
    });
  }

  sendPing(): void {
    this.sendMessageFromClient({ ping: {} });
  }

  sendSendMessageRequest(
    smr: SendMessageRequest,
    registerForAck: boolean
  ): void {
    if (registerForAck && smr.idempotentId) {
      this.clientImpl.sentMessagesWaitingForAck.set(smr.idempotentId, smr);
    }
    this.sendMessageFromClient({ sendMessageRequest: smr });
  }

  rawRpcCall(request: RawRpcRequest): Promise<Message> {
    const emptyBytes = new Uint8Array(0);
    const correlationId =
      (this.mailbox.address ?? "") +
      "-" +
      this.clientImpl.correlationIdCounter++;
    const idempotentId = this.mailbox.address + correlationId;
    const smr: SendMessageRequest = {
      channel: Constants.rpcInboxChannelName,
      to: [request.to],
      idempotentId: idempotentId,
      message: {
        header: {
          rpcHeader: {
            correlationId: correlationId,
            endPoint: request.endPoint,
            frameType: RpcFrameType.Request,
            errorInfo: undefined,
          },
          sender: this.mailbox.address,
          contentType: request.contentType,
          extraHeaders: request.headers,
          senderSequence: 0,
        },
        serverEnvelope: undefined,
        senderEnvelope: {
          created: Date.now(),
        },
        data: request.body !== undefined ? request.body : emptyBytes,
      },
    };

    const promise = new Promise<Message>((resolve, reject) => {
      this.clientImpl.correlations.set(correlationId, resolve);
    });

    this.sendSendMessageRequest(smr, true);

    return promise;
  }

  rpcObserverSubscribe(
    readerKey: string,
    listener: (correlation: RpcRequestResponse) => void
  ): void {
    console.log("rpcObserverSubscribe", readerKey);

    const correlations = new Map<string, RpcRequestResponse>();

    const msInbox: MailboxSubscription = {
      id: "rpc-inbox-" + readerKey,
      readerKey: readerKey,
      channel: "rpc-inbox",
      startSeq: "first",
    };
    this.channelMessageSubscribe(msInbox, (me, msg) => {
      const correlationId = msg.header?.rpcHeader?.correlationId;
      if (correlationId) {
        var correlation = correlations.get(correlationId);
        if (!correlation) {
          correlation = new RpcRequestResponse(correlationId);
          correlations.set(correlationId, correlation);
        }
        correlation.inboxEnvelope = me;
        correlation.inboxMessage = msg;
        listener(correlation);
      }
    });

    const msSent: MailboxSubscription = {
      id: "rpc-sent-" + readerKey,
      readerKey: readerKey,
      channel: "rpc-sent",
      startSeq: "first",
    };
    this.channelSendReceiptSubscribe(msSent, (me, sr) => {
      const msg = sr.request?.message;
      const correlationId =
        sr.request?.message?.header?.rpcHeader?.correlationId;
      if (correlationId !== undefined) {
        var correlation = correlations.get(correlationId);
        if (correlation === undefined) {
          correlation = new RpcRequestResponse(correlationId);
          correlations.set(correlationId, correlation);
        }
        correlation.sentMessage = msg;
        correlation.sendReceiptEnvelope = me;
        correlation.sendReceipt = sr;
        listener(correlation);
      }
    });
  }
}

interface RawRpcRequest {
  to: string;
  endPoint: string;
  body?: Uint8Array;
  contentType?: ContentType;
  headers?: KeyValPair[];
  state?: Uint8Array;
}

interface RpcRequest<A> {
  to: string;
  endPoint: string;
  request: A;
  headers?: KeyValPair[];
  state?: Uint8Array;
}

// interface RawSubscription<A> {
//   mailboxSubscription: MailboxSubscription;
//   decoder: (bytes: Uint8Array)=>A;
//   listener: (me: MessageEnvelope, a: A)=>void;
//   rawListener: MessageEnvelopeListener;
// }

type RawSubscription =
  | MailboxSubscription
  | ChangeDataCaptureSubscription
  | NefarioSubscription;
interface ActiveSubscription {
  subscriptionId: string;
  protoRawSubscription: RawSubscription;
  onMessageEvent(messageEnvelope: MessageEnvelope): void;
  protoSubscription: Subscription;
}

class HermesClientImpl implements HermesClient {
  currentConn: Promise<HermesConnection>;
  contentTypeHandler: ContentTypeHandler;
  wsUrl: string;
  mailboxResponseP: Promise<CreateMailboxResponse>;
  correlationIdCounter: number = 0;
  correlations: Map<string, MessageListener> = new Map();
  activeSubscriptions: Map<string, ActiveSubscription> = new Map();
  sentMessagesWaitingForAck: Map<string, SendMessageRequest> = new Map();
  rootUrl: string;

  constructor(rootUrl: string, contentTypeHandler: ContentTypeHandler) {
    const thisHermesClientImpl = this;

    this.rootUrl = rootUrl;
    this.contentTypeHandler = contentTypeHandler;

    this.mailboxResponseP = createMailbox(
      [Constants.rpcInboxChannelName, Constants.rpcSentChannelName],
      rootUrl
    );
    var tempMailboxResponseP = this.mailboxResponseP;

    var tempWsUrl = new URL(rootUrl);
    tempWsUrl.protocol = tempWsUrl.protocol.replace("http", "ws");

    tempWsUrl.pathname = contentTypeHandler.webSocketUrlPath;

    this.wsUrl = tempWsUrl.toString();

    this.currentConn = this.newHermesConnection();
  }

  sendPongResponse(
    mbox: CreateMailboxResponse,
    pingMsg: Message,
    endPoint: string
  ): void {
    const correlationId = pingMsg.header?.rpcHeader?.correlationId;
    const sender = pingMsg?.header?.sender;
    const contentType =
      pingMsg?.header?.contentType ?? ContentType.UnspecifiedCT;
    if (correlationId !== undefined && sender !== undefined) {
      var ping: Ping = {};
      if (pingMsg.data !== undefined) {
        if (contentType === ContentType.Json) {
          ping = Ping.fromJSON(pingMsg.data);
        } else {
          ping = Ping.decode(pingMsg.data);
        }
      }
      const pong: Pong = { payload: ping.payload };
      var data: Uint8Array;
      if (contentType === ContentType.Json) {
        data = new TextEncoder().encode(JSON.stringify(Pong.toJSON(pong)));
      } else {
        data = Pong.encode(pong).finish();
      }
      const idempotentId = mbox.address + correlationId;
      const smr: SendMessageRequest = {
        channel: Constants.rpcInboxChannelName,
        to: [sender],
        idempotentId: idempotentId,
        message: {
          header: {
            rpcHeader: {
              correlationId: correlationId,
              endPoint: endPoint,
              frameType: RpcFrameType.SuccessResponse,
              errorInfo: undefined,
            },
            sender: mbox.address,
            contentType: pingMsg?.header?.contentType,
          },
          serverEnvelope: undefined,
          senderEnvelope: {
            created: Date.now(),
          },
          data: data,
        },
      };
      this.withConn((conn) => {
        conn.sendSendMessageRequest(smr, true);
      });
    } else {
      console.log("ignoring ping no correlation id", pingMsg);
    }
  }

  reconnect() {
    this.currentConn = this.newHermesConnection();
  }

  newHermesConnection(): Promise<HermesConnection> {
    const outerThis = this;
    return new Promise((resolve, reject) => {
      this.mailboxResponseP.then((mbox) => {
        var webSocket = new WebSocket(this.wsUrl);
        webSocket.binaryType = "arraybuffer";

        webSocket.onopen = function (event) {
          console.log("hermes client websocket opened, sending first message");

          const resubscriptions = Object.values(
            outerThis.activeSubscriptions
          ).map((as) => {
            return as.protoSubscription;
          });

          // send first message
          const firstMessage: FirstMessage = {
            senderInfo: {
              readerKey: mbox.readerKey,
              address: mbox.address,
            },
            subscriptions: resubscriptions,
            mailboxTimeoutInMs: 2 * 60 * 1000, // 2 minutes
          };

          const mfc: MessageFromClient = {
            firstMessage: firstMessage,
          };

          console.log("sending first message");
          outerThis.contentTypeHandler.sendMessageFromClient(mfc, webSocket);

          console.log("resolving promise");
          resolve(new HermesConnection(outerThis, mbox, webSocket));
        };
      });
    });
  }

  mailbox(): Promise<CreateMailboxResponse> {
    return this.mailboxResponseP;
  }

  async withConn<T>(fn: (fn: HermesConnection) => T): Promise<T> {
    return this.currentConn.then((conn) => fn(conn));
  }

  async withConnP<T>(fn: (conn: HermesConnection) => Promise<T>): Promise<T> {
    return this.currentConn.then((conn) => fn(conn));
  }

  rawRpcCall(request: RawRpcRequest): Promise<Message> {
    return this.withConnP((conn) => {
      return conn.rawRpcCall(request);
    });
  }

  cdcSubscribe<A>(
    cdcs: CdcSubscription,
    listener: (cdcEvent: ChangeDataCaptureEvent, a: A) => void
  ): void {
    this.withConn((conn) => {
      conn.cdcSubscribe(cdcs, listener);
    });
  }

  rpcObserverSubscribe(
    readerKey: string,
    listener: (correlation: RpcRequestResponse) => void
  ): void {
    console.log("outer rpcObserverSubscribe", readerKey);
    this.withConn((conn) => {
      console.log("inner rpcObserverSubscribe", readerKey);
      conn.rpcObserverSubscribe(readerKey, listener);
    });
  }

  channelMessageSubscribe(
    ms: MailboxSubscription,
    listener: (me: MessageEnvelope, msg: Message) => void
  ): void {
    this.withConn((conn) => {
      conn.channelMessageSubscribe(ms, listener);
    });
  }

  channelSendReceiptSubscribe(
    ms: MailboxSubscription,
    listener: (me: MessageEnvelope, msg: SendReceipt) => void
  ): void {
    this.withConn((conn) => {
      conn.channelSendReceiptSubscribe(ms, listener);
    });
  }

  sendPing(): void {
    this.withConn((conn) => {
      conn.sendPing();
    });
  }
}

interface HermesClientListener {
  onMessageReceived(smr: MessageToClient): void;
}

export class RpcRequestResponse {
  correlationId: string;

  sendReceiptEnvelope: MessageEnvelope | undefined;
  sendReceipt: SendReceipt | undefined;
  sentMessage: Message | undefined;

  inboxEnvelope: MessageEnvelope | undefined;
  inboxMessage: Message | undefined;

  constructor(correlationId: string) {
    this.correlationId = correlationId;
  }

  role(): string | undefined {
    const ic = this.isClient();
    if (ic) {
      return "client";
    } else if (ic === false) {
      return "server";
    }
  }

  contentType(): ContentType {
    const contentType =
      this.requestMessage()?.header?.contentType ??
      this.responseMessage()?.header?.contentType ??
      ContentType.UnspecifiedCT;
    return contentType;
  }

  isProtobuf(): boolean {
    return this.contentType() === ContentType.Protobuf;
  }

  isJson(): boolean {
    return this.contentType() === ContentType.Json;
  }

  isClient(): boolean | undefined {
    const inboxFrameType = this.inboxMessage?.header?.rpcHeader?.frameType;
    const sentFrameType = this.sentMessage?.header?.rpcHeader?.frameType;

    if (sentFrameType === RpcFrameType.Request) {
      return true;
    } else if (inboxFrameType === RpcFrameType.Request) {
      return false;
    }
  }

  hasRequestAndResponse(): boolean {
    return this.sendReceiptEnvelope && this.inboxEnvelope ? true : false;
  }

  timeStarted(): Date | undefined {
    const ic = this.isClient();
    let time = undef<number>();
    if (ic === true) {
      time = this.sendReceiptEnvelope?.serverEnvelope?.created;
    } else if (ic === false) {
      time = this.inboxEnvelope?.serverEnvelope?.created;
    }
    if (time) {
      return new Date(time);
    }
  }

  timeStartedL(): number | undefined {
    const ic = this.isClient();
    let time = undef<number>();
    if (ic === true) {
      time = this.sendReceiptEnvelope?.serverEnvelope?.created;
    } else if (ic === false) {
      time = this.inboxEnvelope?.serverEnvelope?.created;
    }
    return time;
  }

  timeCompleted(): Date | undefined {
    const ic = this.isClient();
    let time: number | undefined = undefined;
    if (ic === false) {
      time = this.sendReceiptEnvelope?.serverEnvelope?.created;
    } else if (ic === true) {
      time = this.inboxEnvelope?.serverEnvelope?.created;
    }
    if (time) {
      return new Date(time);
    }
  }

  durationInMillis(): number | undefined {
    const ts = this.timeStarted()?.getTime();
    const tc = this.timeCompleted()?.getTime();
    if (ts && tc) {
      return tc - ts;
    }
  }

  endPoint(): string | undefined {
    return this.requestMessage()?.header?.rpcHeader?.endPoint;
  }

  requestMessage(): Message | undefined {
    const ic = this.isClient();
    if (ic === true) {
      return this.sentMessage;
    } else if (ic === false) {
      return this.inboxMessage;
    }
  }

  requestEnvelope(): MessageEnvelope | undefined {
    const ic = this.isClient();
    if (ic === true) {
      return this.sendReceiptEnvelope;
    } else if (ic === false) {
      return this.inboxEnvelope;
    }
  }

  responseMessage(): Message | undefined {
    const ic = this.isClient();
    if (ic === true) {
      return this.inboxMessage;
    } else if (ic === false) {
      return this.sentMessage;
    }
  }

  responseEnvelope(): MessageEnvelope | undefined {
    const ic = this.isClient();
    if (ic === true) {
      return this.inboxEnvelope;
    } else if (ic === false) {
      return this.sendReceiptEnvelope;
    }
  }

  status(): string {
    const frameType = this.responseMessage()?.header?.rpcHeader?.frameType;
    if (!frameType) {
      return "";
    } else if (frameType === RpcFrameType.ErrorResponse) {
      return "error";
    } else if (frameType === RpcFrameType.SuccessResponse) {
      return "success";
    } else {
      return `Unexpected frame types ${frameType}`;
    }
  }

  async processSchema(reqOrResp: "request" | "response", data?: Uint8Array) {
    if (this.isJson()) {
      const jsonStr = new TextDecoder().decode(data);
      return JSON.parse(jsonStr);
    } else {
      const endPoint = this.endPoint();
      if (endPoint === undefined) {
        return {
          error: "no endpoint",
        };
      }
      if (data === undefined) {
        return {};
      }
      return protobufToJson(endPoint, reqOrResp, data);
    }
  }

  async responseObj() {
    return this.processSchema("response", this.responseMessage()?.data);
  }

  async requestObj() {
    return this.processSchema("request", this.requestMessage()?.data);
  }
}

const GlobalClient = {
  get: memoize(() =>
    newHermesClient("https://hermes-go.ahsrcm.com", JsonContentTypeHandler)
  ),
};

export default GlobalClient;

export function runHermesClientTest() {}

export function runHermesClientTest2() {
  // const hc = newHermesClient("https://hermes-go.ahsrcm.com", ContentType.Protobuf);
  const hc = newHermesClient(
    "https://hermes-go.ahsrcm.com",
    JsonContentTypeHandler
  );
  hc.mailbox().then((mbox) => {
    const cdcs: CdcSubscription = {
      tables: [
        {
          database: "nefario",
          table: "service",
        },
      ],
      startSeq: "new",
    };

    hc.cdcSubscribe(cdcs, (cdcEvent, a) => {
      console.log("cdcEvent", cdcEvent);
    });
  });

  // hc.correlatedRpcReader("rrb07167144dc644a0be22a85301afea7e" , (correlation) => {
  //   console.log("correlation", correlation);
  // });
}

export interface CdcSubscription {
  tables: CdcTable[];
  startSeq?: string;
}

export interface CdcTable {
  database: string;
  table: string;
}

async function protobufToJson(
  schemaName: string,
  frametype: "request" | "response",
  bytes: Uint8Array
): Promise<string> {
  // const mboxObj = CreateMailboxRequest.toJSON(mbox);
  // const mboxJson = JSON.stringify(mboxObj);

  // let mailboxResponse: CreateMailboxResponse | undefined = undefined;
  const rootUrl = GlobalClient.get().rootUrl;
  const response = await fetch(
    `${rootUrl}/api/proto_to_json?schema=${schemaName}&frametype=${frametype}`,
    {
      method: "POST",
      body: bytes,
    }
  );

  if (response.ok) {
    const jsonStr = await response.text();
    return JSON.parse(jsonStr);
  } else {
    throw new Error(`proto_to_json failed with status ${response.status}`);
  }
}

export interface HermesClient {
  readonly rootUrl: string;
  mailbox(): Promise<CreateMailboxResponse>;
  rawRpcCall(request: RawRpcRequest): Promise<Message>;
  cdcSubscribe<A>(
    cdcs: CdcSubscription,
    listener: (cdcEvent: ChangeDataCaptureEvent, a: A) => void
  ): void;
  rpcObserverSubscribe(
    readerKey: string,
    listener: (correlation: RpcRequestResponse) => void
  ): void;
}
