QMP client now buffers lines properly

This commit is contained in:
modeco80
2024-07-11 02:29:26 -04:00
parent 227a171110
commit 0b59c6d8be

View File

@@ -1,139 +1,165 @@
import { EventEmitter } from "node:events"; import { EventEmitter } from 'node:events';
enum QmpClientState { enum QmpClientState {
Handshaking, Handshaking,
Connected Connected
} }
function qmpStringify(obj: any) { function qmpStringify(obj: any) {
return JSON.stringify(obj) + '\r\n'; return JSON.stringify(obj) + '\r\n';
} }
// this writer interface is used to poll back to a higher level // this writer interface is used to poll back to a higher level
// I/O layer that we want to write some data. // I/O layer that we want to write some data.
export interface IQmpClientWriter { export interface IQmpClientWriter {
writeSome(data: Buffer) : void; writeSome(data: Buffer): void;
} }
export type QmpClientCallback = (err: Error | null, res: any | null) => void; export type QmpClientCallback = (err: Error | null, res: any | null) => void;
type QmpClientCallbackEntry = { type QmpClientCallbackEntry = {
id: number, id: number;
callback: QmpClientCallback | null callback: QmpClientCallback | null;
}; };
export enum QmpEvent { export enum QmpEvent {
BlockIOError = 'BLOCK_IO_ERROR', BlockIOError = 'BLOCK_IO_ERROR',
Reset = 'RESET', Reset = 'RESET',
Resume = 'RESUME', Resume = 'RESUME',
RtcChange = 'RTC_CHANGE', RtcChange = 'RTC_CHANGE',
Shutdown = 'SHUTDOWN', Shutdown = 'SHUTDOWN',
Stop = 'STOP', Stop = 'STOP',
VncConnected = 'VNC_CONNECTED', VncConnected = 'VNC_CONNECTED',
VncDisconnected = 'VNC_DISCONNECTED', VncDisconnected = 'VNC_DISCONNECTED',
VncInitalized = 'VNC_INITALIZED', VncInitalized = 'VNC_INITALIZED',
Watchdog = 'WATCHDOG' Watchdog = 'WATCHDOG'
}; }
class LineStream extends EventEmitter {
// The given line seperator for the stream
lineSeperator = '\r\n';
buffer = '';
constructor() {
super();
}
push(data: Buffer) {
this.buffer += data.toString('utf-8');
let lines = this.buffer.split(this.lineSeperator);
if (lines.length > 1) {
this.buffer = lines.pop()!;
lines = lines.filter((l) => !!l);
//console.log(lines)
lines.forEach(l => this.emit('line', l));
}
return [];
}
reset() {
this.buffer = '';
}
}
// A QMP client // A QMP client
export class QmpClient extends EventEmitter { export class QmpClient extends EventEmitter {
private state = QmpClientState.Handshaking; private state = QmpClientState.Handshaking;
private capabilities = ""; private writer: IQmpClientWriter | null = null;
private writer: IQmpClientWriter | null = null;
private lastID = 0; private lastID = 0;
private callbacks = new Array<QmpClientCallbackEntry>(); private callbacks = new Array<QmpClientCallbackEntry>();
constructor() { private lineStream = new LineStream();
super();
}
setWriter(writer: IQmpClientWriter) { constructor() {
this.writer = writer; super();
}
feed(data: Buffer) : void { let self = this;
let str = data.toString(); this.lineStream.on('line', (line: string) => {
self.handleQmpLine(line);
});
}
/* I don't think this is needed but if it is i'm keeping this for now setWriter(writer: IQmpClientWriter) {
if(!str.endsWith('\r\n')) { this.writer = writer;
console.log("incomplete message!"); }
return;
}
*/
let obj = JSON.parse(str); feed(data: Buffer): void {
// Forward to the line stream. It will generate 'line' events
// as it is able to split out lines automatically.
this.lineStream.push(data);
}
switch(this.state) { private handleQmpLine(line: string) {
case QmpClientState.Handshaking: let obj = JSON.parse(line);
if(obj["return"] != undefined) {
this.state = QmpClientState.Connected;
this.emit('connected');
return;
}
let capabilities = qmpStringify({ switch (this.state) {
execute: "qmp_capabilities" case QmpClientState.Handshaking:
}); if (obj['return'] != undefined) {
this.state = QmpClientState.Connected;
this.emit('connected');
return;
}
this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); let capabilities = qmpStringify({
break; execute: 'qmp_capabilities'
});
case QmpClientState.Connected: this.writer?.writeSome(Buffer.from(capabilities, 'utf8'));
if(obj["return"] != undefined || obj['error'] != undefined) { break;
if(obj['id'] == null)
return;
let cb = this.callbacks.find((v) => v.id == obj['id']); case QmpClientState.Connected:
if(cb == undefined) if (obj['return'] != undefined || obj['error'] != undefined) {
return; if (obj['id'] == null) return;
let error: Error | null = obj.error ? new Error(obj.error.desc) : null; let cb = this.callbacks.find((v) => v.id == obj['id']);
if (cb == undefined) return;
if(cb.callback) let error: Error | null = obj.error ? new Error(obj.error.desc) : null;
cb.callback(error, obj.return);
this.callbacks.slice(this.callbacks.indexOf(cb)); if (cb.callback) cb.callback(error, obj.return);
} else if (obj['event']) {
this.emit(obj.event, {
timestamp: obj.timestamp,
data: obj.data
});
}
break;
}
}
executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) { this.callbacks.slice(this.callbacks.indexOf(cb));
let entry = { } else if (obj['event']) {
callback: callback, this.emit(obj.event, {
id: ++this.lastID timestamp: obj.timestamp,
}; data: obj.data
});
}
break;
}
}
let qmpOut: any = { executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) {
execute: command, let entry = {
id: entry.id callback: callback,
}; id: ++this.lastID
};
if(args !== undefined) let qmpOut: any = {
qmpOut['arguments'] = args; execute: command,
id: entry.id
};
this.callbacks.push(entry); if (args !== undefined) qmpOut['arguments'] = args;
this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8'));
}
async execute(command: string, args: any | undefined = undefined) : Promise<any> { this.callbacks.push(entry);
return new Promise((res, rej) => { this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8'));
this.executeSync(command, args, (err, result) => { }
if(err)
rej(err);
res(result);
});
});
}
reset() { async execute(command: string, args: any | undefined = undefined): Promise<any> {
this.state = QmpClientState.Handshaking; return new Promise((res, rej) => {
} this.executeSync(command, args, (err, result) => {
if (err) rej(err);
res(result);
});
});
}
reset() {
this.lineStream.reset();
this.state = QmpClientState.Handshaking;
}
} }