qemu: Completely rewrite QMP client from scratch
It sucked. The new one is using Sans I/O principles, so it does not directly do I/O or talk to a net.Socket directly (instead, QemuVM implements the layer to do I/O). This means in the future this library could actually be tested, but for now, I'm not bothering with that. There's also some other cleanups that were bothering me.
This commit is contained in:
@@ -1,130 +1,139 @@
|
||||
// This was originally based off the contents of the node-qemu-qmp package,
|
||||
// but I've modified it possibly to the point where it could be treated as my own creation.
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import split from 'split';
|
||||
enum QmpClientState {
|
||||
Handshaking,
|
||||
Connected
|
||||
}
|
||||
|
||||
import { Socket } from 'net';
|
||||
function qmpStringify(obj: any) {
|
||||
return JSON.stringify(obj) + '\r\n';
|
||||
}
|
||||
|
||||
export type QmpCallback = (err: Error | null, res: any | null) => void;
|
||||
// this writer interface is used to poll back to a higher level
|
||||
// I/O layer that we want to write some data.
|
||||
export interface IQmpClientWriter {
|
||||
writeSome(data: Buffer) : void;
|
||||
}
|
||||
|
||||
type QmpCommandEntry = {
|
||||
callback: QmpCallback | null;
|
||||
id: number;
|
||||
export type QmpClientCallback = (err: Error | null, res: any | null) => void;
|
||||
|
||||
type QmpClientCallbackEntry = {
|
||||
id: number,
|
||||
callback: QmpClientCallback | null
|
||||
};
|
||||
|
||||
// TODO: Instead of the client "Is-A"ing a Socket, this should instead contain/store a Socket,
|
||||
// (preferrably) passed by the user, to use for QMP communications.
|
||||
// The client shouldn't have to know or care about the protocol, and it effectively hackily uses the fact
|
||||
// Socket extends EventEmitter.
|
||||
export enum QmpEvent {
|
||||
BlockIOError = 'BLOCK_IO_ERROR',
|
||||
Reset = 'RESET',
|
||||
Resume = 'RESUME',
|
||||
RtcChange = 'RTC_CHANGE',
|
||||
Shutdown = 'SHUTDOWN',
|
||||
Stop = 'STOP',
|
||||
VncConnected = 'VNC_CONNECTED',
|
||||
VncDisconnected = 'VNC_DISCONNECTED',
|
||||
VncInitalized = 'VNC_INITALIZED',
|
||||
Watchdog = 'WATCHDOG'
|
||||
};
|
||||
|
||||
export default class QmpClient extends Socket {
|
||||
public qmpHandshakeData: any;
|
||||
private commandEntries: QmpCommandEntry[] = [];
|
||||
private lastID = 0;
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
// A QMP client
|
||||
export class QmpClient extends EventEmitter {
|
||||
private state = QmpClientState.Handshaking;
|
||||
private capabilities = "";
|
||||
private writer: IQmpClientWriter | null = null;
|
||||
|
||||
this.assignHandlers();
|
||||
}
|
||||
private lastID = 0;
|
||||
private callbacks = new Array<QmpClientCallbackEntry>();
|
||||
|
||||
private ExecuteSync(command: string, args: any | null, callback: QmpCallback | null) {
|
||||
let cmd: QmpCommandEntry = {
|
||||
callback: callback,
|
||||
id: ++this.lastID
|
||||
};
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
let qmpOut: any = {
|
||||
execute: command,
|
||||
id: cmd.id
|
||||
};
|
||||
setWriter(writer: IQmpClientWriter) {
|
||||
this.writer = writer;
|
||||
}
|
||||
|
||||
if (args) qmpOut['arguments'] = args;
|
||||
feed(data: Buffer) : void {
|
||||
let str = data.toString();
|
||||
|
||||
// Add stuff
|
||||
this.commandEntries.push(cmd);
|
||||
this.write(JSON.stringify(qmpOut));
|
||||
}
|
||||
/* I don't think this is needed but if it is i'm keeping this for now
|
||||
if(!str.endsWith('\r\n')) {
|
||||
console.log("incomplete message!");
|
||||
return;
|
||||
}
|
||||
*/
|
||||
|
||||
// TODO: Make this function a bit more ergonomic?
|
||||
async Execute(command: string, args: any | null = null): Promise<any> {
|
||||
return new Promise((res, rej) => {
|
||||
this.ExecuteSync(command, args, (err, result) => {
|
||||
if (err) rej(err);
|
||||
res(result);
|
||||
});
|
||||
});
|
||||
}
|
||||
let obj = JSON.parse(str);
|
||||
|
||||
private Handshake(callback: () => void) {
|
||||
this.write(
|
||||
JSON.stringify({
|
||||
execute: 'qmp_capabilities'
|
||||
})
|
||||
);
|
||||
switch(this.state) {
|
||||
case QmpClientState.Handshaking:
|
||||
if(obj["return"] != undefined) {
|
||||
this.state = QmpClientState.Connected;
|
||||
this.emit('connected');
|
||||
return;
|
||||
}
|
||||
|
||||
this.once('data', (data) => {
|
||||
// Once QEMU replies to us, the handshake is done.
|
||||
// We do not negotiate anything special.
|
||||
callback();
|
||||
});
|
||||
}
|
||||
let capabilities = qmpStringify({
|
||||
execute: "qmp_capabilities"
|
||||
});
|
||||
|
||||
// this can probably be made async
|
||||
private assignHandlers() {
|
||||
let self = this;
|
||||
this.writer?.writeSome(Buffer.from(capabilities, 'utf8'));
|
||||
break;
|
||||
|
||||
this.on('connect', () => {
|
||||
// this should be more correct?
|
||||
this.once('data', (data) => {
|
||||
// Handshake QMP with the server.
|
||||
self.qmpHandshakeData = JSON.parse(data.toString('utf8')).QMP;
|
||||
self.Handshake(() => {
|
||||
// Now ready to parse QMP responses/events.
|
||||
self.pipe(split(JSON.parse))
|
||||
.on('data', (json: any) => {
|
||||
if (json == null) return self.end();
|
||||
case QmpClientState.Connected:
|
||||
if(obj["return"] != undefined || obj['error'] != undefined) {
|
||||
if(obj['id'] == null)
|
||||
return;
|
||||
|
||||
if (json.return || json.error) {
|
||||
// Our handshake has a spurious return because we never assign it an ID,
|
||||
// and it is gathered by this pipe for some reason I'm not quite sure about.
|
||||
// So, just for safety's sake, don't process any return objects which don't have an ID attached to them.
|
||||
if (json.id == null) return;
|
||||
let cb = this.callbacks.find((v) => v.id == obj['id']);
|
||||
if(cb == undefined)
|
||||
return;
|
||||
|
||||
let callbackEntry = this.commandEntries.find((entry) => entry.id === json.id);
|
||||
let error: Error | null = json.error ? new Error(json.error.desc) : null;
|
||||
let error: Error | null = obj.error ? new Error(obj.error.desc) : null;
|
||||
|
||||
// we somehow didn't find a callback entry for this response.
|
||||
// I don't know how. Techinically not an error..., but I guess you're not getting a reponse to whatever causes this to happen
|
||||
if (callbackEntry == null) return;
|
||||
if(cb.callback)
|
||||
cb.callback(error, obj.return);
|
||||
|
||||
if (callbackEntry?.callback) callbackEntry.callback(error, json.return);
|
||||
this.callbacks.slice(this.callbacks.indexOf(cb));
|
||||
} else if (obj['event']) {
|
||||
this.emit(obj.event, {
|
||||
timestamp: obj.timestamp,
|
||||
data: obj.data
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the completed callback entry.
|
||||
this.commandEntries.slice(this.commandEntries.indexOf(callbackEntry));
|
||||
} else if (json.event) {
|
||||
this.emit('event', json);
|
||||
}
|
||||
})
|
||||
.on('error', () => {
|
||||
// Give up.
|
||||
return self.end();
|
||||
});
|
||||
this.emit('qmp-ready');
|
||||
});
|
||||
});
|
||||
});
|
||||
executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) {
|
||||
let entry = {
|
||||
callback: callback,
|
||||
id: ++this.lastID
|
||||
};
|
||||
|
||||
this.on('close', () => {
|
||||
this.end();
|
||||
});
|
||||
}
|
||||
let qmpOut: any = {
|
||||
execute: command,
|
||||
id: entry.id
|
||||
};
|
||||
|
||||
Connect(host: string, port: number) {
|
||||
super.connect(port, host);
|
||||
}
|
||||
if(args !== undefined)
|
||||
qmpOut['arguments'] = args;
|
||||
|
||||
ConnectUNIX(path: string) {
|
||||
super.connect(path);
|
||||
}
|
||||
this.callbacks.push(entry);
|
||||
this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8'));
|
||||
}
|
||||
|
||||
async execute(command: string, args: any | undefined = undefined) : Promise<any> {
|
||||
return new Promise((res, rej) => {
|
||||
this.executeSync(command, args, (err, result) => {
|
||||
if(err)
|
||||
rej(err);
|
||||
res(result);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.state = QmpClientState.Handshaking;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user