diff --git a/qemu/src/QmpClient.ts b/qemu/src/QmpClient.ts index eaf7c7c..29ff0ac 100644 --- a/qemu/src/QmpClient.ts +++ b/qemu/src/QmpClient.ts @@ -1,139 +1,165 @@ -import { EventEmitter } from "node:events"; +import { EventEmitter } from 'node:events'; enum QmpClientState { - Handshaking, - Connected + Handshaking, + Connected } function qmpStringify(obj: any) { - return JSON.stringify(obj) + '\r\n'; + return JSON.stringify(obj) + '\r\n'; } // this writer interface is used to poll back to a higher level // I/O layer that we want to write some data. export interface IQmpClientWriter { - writeSome(data: Buffer) : void; + writeSome(data: Buffer): void; } export type QmpClientCallback = (err: Error | null, res: any | null) => void; type QmpClientCallbackEntry = { - id: number, - callback: QmpClientCallback | null + id: number; + callback: QmpClientCallback | null; }; export enum QmpEvent { - BlockIOError = 'BLOCK_IO_ERROR', - Reset = 'RESET', - Resume = 'RESUME', - RtcChange = 'RTC_CHANGE', - Shutdown = 'SHUTDOWN', - Stop = 'STOP', - VncConnected = 'VNC_CONNECTED', - VncDisconnected = 'VNC_DISCONNECTED', - VncInitalized = 'VNC_INITALIZED', - Watchdog = 'WATCHDOG' -}; + BlockIOError = 'BLOCK_IO_ERROR', + Reset = 'RESET', + Resume = 'RESUME', + RtcChange = 'RTC_CHANGE', + Shutdown = 'SHUTDOWN', + Stop = 'STOP', + VncConnected = 'VNC_CONNECTED', + VncDisconnected = 'VNC_DISCONNECTED', + VncInitalized = 'VNC_INITALIZED', + Watchdog = 'WATCHDOG' +} +class LineStream extends EventEmitter { + // The given line seperator for the stream + lineSeperator = '\r\n'; + buffer = ''; + + constructor() { + super(); + } + + push(data: Buffer) { + this.buffer += data.toString('utf-8'); + + let lines = this.buffer.split(this.lineSeperator); + if (lines.length > 1) { + this.buffer = lines.pop()!; + lines = lines.filter((l) => !!l); + + //console.log(lines) + lines.forEach(l => this.emit('line', l)); + } + return []; + } + + reset() { + this.buffer = ''; + } +} // A QMP client export class QmpClient extends EventEmitter { - private state = QmpClientState.Handshaking; - private capabilities = ""; - private writer: IQmpClientWriter | null = null; + private state = QmpClientState.Handshaking; + private writer: IQmpClientWriter | null = null; - private lastID = 0; - private callbacks = new Array(); + private lastID = 0; + private callbacks = new Array(); - constructor() { - super(); - } + private lineStream = new LineStream(); - setWriter(writer: IQmpClientWriter) { - this.writer = writer; - } + constructor() { + super(); - feed(data: Buffer) : void { - let str = data.toString(); + let self = this; + this.lineStream.on('line', (line: string) => { + self.handleQmpLine(line); + }); + } - /* I don't think this is needed but if it is i'm keeping this for now - if(!str.endsWith('\r\n')) { - console.log("incomplete message!"); - return; - } - */ + setWriter(writer: IQmpClientWriter) { + this.writer = writer; + } - let obj = JSON.parse(str); + feed(data: Buffer): void { + // Forward to the line stream. It will generate 'line' events + // as it is able to split out lines automatically. + this.lineStream.push(data); + } - switch(this.state) { - case QmpClientState.Handshaking: - if(obj["return"] != undefined) { - this.state = QmpClientState.Connected; - this.emit('connected'); - return; - } + private handleQmpLine(line: string) { + let obj = JSON.parse(line); - let capabilities = qmpStringify({ - execute: "qmp_capabilities" - }); + switch (this.state) { + case QmpClientState.Handshaking: + if (obj['return'] != undefined) { + this.state = QmpClientState.Connected; + this.emit('connected'); + return; + } - this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); - break; + let capabilities = qmpStringify({ + execute: 'qmp_capabilities' + }); - case QmpClientState.Connected: - if(obj["return"] != undefined || obj['error'] != undefined) { - if(obj['id'] == null) - return; + this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); + break; - let cb = this.callbacks.find((v) => v.id == obj['id']); - if(cb == undefined) - return; + case QmpClientState.Connected: + if (obj['return'] != undefined || obj['error'] != undefined) { + if (obj['id'] == null) return; - let error: Error | null = obj.error ? new Error(obj.error.desc) : null; + let cb = this.callbacks.find((v) => v.id == obj['id']); + if (cb == undefined) return; - if(cb.callback) - cb.callback(error, obj.return); + let error: Error | null = obj.error ? new Error(obj.error.desc) : null; - this.callbacks.slice(this.callbacks.indexOf(cb)); - } else if (obj['event']) { - this.emit(obj.event, { - timestamp: obj.timestamp, - data: obj.data - }); - } - break; - } - } + if (cb.callback) cb.callback(error, obj.return); - executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) { - let entry = { - callback: callback, - id: ++this.lastID - }; + this.callbacks.slice(this.callbacks.indexOf(cb)); + } else if (obj['event']) { + this.emit(obj.event, { + timestamp: obj.timestamp, + data: obj.data + }); + } + break; + } + } - let qmpOut: any = { - execute: command, - id: entry.id - }; + executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) { + let entry = { + callback: callback, + id: ++this.lastID + }; - if(args !== undefined) - qmpOut['arguments'] = args; + let qmpOut: any = { + execute: command, + id: entry.id + }; - this.callbacks.push(entry); - this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8')); - } + if (args !== undefined) qmpOut['arguments'] = args; - async execute(command: string, args: any | undefined = undefined) : Promise { - return new Promise((res, rej) => { - this.executeSync(command, args, (err, result) => { - if(err) - rej(err); - res(result); - }); - }); - } + this.callbacks.push(entry); + this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8')); + } - reset() { - this.state = QmpClientState.Handshaking; - } + async execute(command: string, args: any | undefined = undefined): Promise { + return new Promise((res, rej) => { + this.executeSync(command, args, (err, result) => { + if (err) rej(err); + res(result); + }); + }); + } + + reset() { + this.lineStream.reset(); + this.state = QmpClientState.Handshaking; + } }