From 227a17111030b8c26bb407de479f765511fdde4f Mon Sep 17 00:00:00 2001 From: modeco80 Date: Wed, 10 Jul 2024 22:20:12 -0400 Subject: [PATCH] qemu: Completely rewrite QMP client from scratch It sucked. The new one is using Sans I/O principles, so it does not directly do I/O or talk to a net.Socket directly (instead, QemuVM implements the layer to do I/O). This means in the future this library could actually be tested, but for now, I'm not bothering with that. There's also some other cleanups that were bothering me. --- cvmts/src/CollabVMServer.ts | 13 ++- package.json | 2 +- qemu/package.json | 4 +- qemu/src/QemuDisplay.ts | 4 + qemu/src/QemuVM.ts | 149 ++++++++++++++----------- qemu/src/QmpClient.ts | 215 +++++++++++++++++++----------------- qemu/src/index.ts | 2 + tsconfig.json | 1 + yarn.lock | 40 +------ 9 files changed, 220 insertions(+), 210 deletions(-) diff --git a/cvmts/src/CollabVMServer.ts b/cvmts/src/CollabVMServer.ts index 0607221..e72fcd3 100644 --- a/cvmts/src/CollabVMServer.ts +++ b/cvmts/src/CollabVMServer.ts @@ -119,21 +119,22 @@ export default class CollabVMServer { this.VM = vm; // hack but whatever (TODO: less rickity) + let self = this; if (config.vm.type == 'qemu') { (vm as QemuVM).on('statechange', (newState: VMState) => { if(newState == VMState.Started) { - this.logger.Info("started!!"); + //self.logger.Info("started!!"); // well aware this sucks but whatever - this.VM.GetDisplay().on('resize', (size: Size) => this.OnDisplayResized(size)); - this.VM.GetDisplay().on('rect', (rect: Rect) => this.OnDisplayRectangle(rect)); + self.VM.GetDisplay().on('resize', (size: Size) => self.OnDisplayResized(size)); + self.VM.GetDisplay().on('rect', (rect: Rect) => self.OnDisplayRectangle(rect)); } if (newState == VMState.Stopped) { - this.logger.Info('stopped ?'); + //self.logger.Info('stopped ?'); setTimeout(async () => { - this.logger.Info('restarting VM'); - await this.VM.Start(); + self.logger.Info('restarting VM'); + await self.VM.Start(); }, kRestartTimeout); } }); diff --git a/package.json b/package.json index 2d2e53e..0cbe009 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ "@parcel/packager-ts": "2.12.0", "@parcel/transformer-sass": "2.12.0", "@parcel/transformer-typescript-types": "2.12.0", - "@types/node": "^20.12.5", + "@types/node": "^20.14.10", "just-install": "^2.0.1", "parcel": "^2.12.0", "prettier": "^3.2.5", diff --git a/qemu/package.json b/qemu/package.json index ce9d2e0..a65b021 100644 --- a/qemu/package.json +++ b/qemu/package.json @@ -21,11 +21,9 @@ "dependencies": { "@computernewb/nodejs-rfb": "*", "@cvmts/shared": "*", - "execa": "^8.0.1", - "split": "^1.0.1" + "execa": "^8.0.1" }, "devDependencies": { - "@types/split": "^1.0.5", "parcel": "^2.12.0" } } diff --git a/qemu/src/QemuDisplay.ts b/qemu/src/QemuDisplay.ts index efcfd47..8bfcbaa 100644 --- a/qemu/src/QemuDisplay.ts +++ b/qemu/src/QemuDisplay.ts @@ -114,6 +114,10 @@ export class QemuDisplay extends EventEmitter { Disconnect() { this.vncShouldReconnect = false; this.displayVnc.disconnect(); + + // bye bye! + this.displayVnc.removeAllListeners(); + this.removeAllListeners(); } Connected() { diff --git a/qemu/src/QemuVM.ts b/qemu/src/QemuVM.ts index 34dffc8..9cd0b4e 100644 --- a/qemu/src/QemuVM.ts +++ b/qemu/src/QemuVM.ts @@ -1,10 +1,11 @@ import { execa, execaCommand, ExecaChildProcess } from 'execa'; import { EventEmitter } from 'events'; -import QmpClient from './QmpClient.js'; +import { QmpClient, IQmpClientWriter, QmpEvent } from './QmpClient.js'; import { QemuDisplay } from './QemuDisplay.js'; import { unlink } from 'node:fs/promises'; import * as Shared from '@cvmts/shared'; +import { Socket, connect } from 'net'; export enum VMState { Stopped, @@ -28,10 +29,31 @@ const kVmTmpPathBase = `/tmp`; /// the VM is forcefully stopped. const kMaxFailCount = 5; +// writer implementation for net.Socket +class SocketWriter implements IQmpClientWriter { + socket; + client; + + constructor(socket: Socket, client: QmpClient) { + this.socket = socket; + this.client = client; + + this.socket.on('data', (data) => { + this.client.feed(data); + }); + } + + writeSome(buffer: Buffer) { + this.socket.write(buffer); + } + } + + export class QemuVM extends EventEmitter { private state = VMState.Stopped; - private qmpInstance: QmpClient | null = null; + private qmpInstance: QmpClient = new QmpClient(); + private qmpSocket: Socket | null = null; private qmpConnected = false; private qmpFailCount = 0; @@ -49,6 +71,30 @@ export class QemuVM extends EventEmitter { this.logger = new Shared.Logger(`CVMTS.QEMU.QemuVM/${this.definition.id}`); this.display = new QemuDisplay(this.GetVncPath()); + + + let self = this; + + // Handle the STOP event sent when using -no-shutdown + this.qmpInstance.on(QmpEvent.Stop, async () => { + await self.qmpInstance.execute('system_reset'); + }) + + this.qmpInstance.on(QmpEvent.Reset, async () => { + await self.qmpInstance.execute('cont'); + }); + + this.qmpInstance.on('connected', async () => { + self.VMLog().Info('QMP ready'); + + this.display = new QemuDisplay(this.GetVncPath()); + self.display?.Connect(); + + // QMP has been connected so the VM is ready to be considered started + self.qmpFailCount = 0; + self.qmpConnected = true; + self.SetState(VMState.Started); + }); } async Start() { @@ -110,7 +156,7 @@ export class QemuVM extends EventEmitter { } async QmpCommand(command: string, args: any | null): Promise { - return await this.qmpInstance?.Execute(command, args); + return await this.qmpInstance?.execute(command, args); } async MonitorCommand(command: string) { @@ -191,7 +237,6 @@ export class QemuVM extends EventEmitter { this.qemuProcess.on('exit', async (code) => { self.VMLog().Info("QEMU process exited"); - // this should be being done anways but it's very clearly not sometimes so // fuck it, let's just force it here try { @@ -209,7 +254,6 @@ export class QemuVM extends EventEmitter { await self.DisconnectDisplay(); - if (self.state != VMState.Stopping) { if (code == 0) { // Wait a bit and restart QEMU. @@ -237,62 +281,43 @@ export class QemuVM extends EventEmitter { let self = this; if (!this.qmpConnected) { - self.qmpInstance = new QmpClient(); - - let onQmpError = async () => { - if(self.qmpConnected) { - self.qmpConnected = false; - - // If we aren't stopping, then we should care QMP disconnected - if (self.state != VMState.Stopping) { - if (self.qmpFailCount++ < kMaxFailCount) { - self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`); - await Shared.Sleep(500); - await self.ConnectQmp(); - } else { - self.VMLog().Error(`Reached max retries, giving up.`); - await self.Stop(); - } - } - } - }; - - self.qmpInstance.on('close', onQmpError); - self.qmpInstance.on('error', (e: Error) => { - self.VMLog().Error("QMP Error: {0}", e.message); - onQmpError(); - }); - - self.qmpInstance.on('event', async (ev) => { - switch (ev.event) { - // Handle the STOP event sent when using -no-shutdown - case 'STOP': - await self.qmpInstance?.Execute('system_reset'); - break; - case 'RESET': - await self.qmpInstance?.Execute('cont'); - break; - } - }); - - self.qmpInstance.on('qmp-ready', async (hadError) => { - self.VMLog().Info('QMP ready'); - - self.display?.Connect(); - - // QMP has been connected so the VM is ready to be considered started - self.qmpFailCount = 0; - self.qmpConnected = true; - self.SetState(VMState.Started); - }); - try { await Shared.Sleep(500); - this.qmpInstance?.ConnectUNIX(this.GetQmpPath()); + this.qmpSocket = connect(this.GetQmpPath()); + + let onQmpClose = async () => { + if(self.qmpConnected) { + self.qmpConnected = false; + self.qmpSocket = null; + + // If we aren't stopping, then we should care QMP disconnected + if (self.state != VMState.Stopping) { + if (self.qmpFailCount++ < kMaxFailCount) { + self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`); + await Shared.Sleep(500); + await self.ConnectQmp(); + } else { + self.VMLog().Error(`Reached max retries, giving up.`); + await self.Stop(); + } + } + } + }; + + this.qmpSocket.on('close', onQmpClose); + + this.qmpSocket.on('error', (e: Error) => { + self.VMLog().Error("QMP Error: {0}", e.message); + }); + + // Setup the QMP client. + let writer = new SocketWriter(this.qmpSocket, this.qmpInstance); + this.qmpInstance.reset(); + this.qmpInstance.setWriter(writer); } catch (err) { // just try again - await Shared.Sleep(500); - await this.ConnectQmp(); + //await Shared.Sleep(500); + //await this.ConnectQmp(); } } } @@ -300,9 +325,7 @@ export class QemuVM extends EventEmitter { private async DisconnectDisplay() { try { this.display?.Disconnect(); - - // create a new display (and gc the old one) - this.display = new QemuDisplay(this.GetVncPath()); + this.display = null; } catch (err) { // oh well lol } @@ -310,11 +333,11 @@ export class QemuVM extends EventEmitter { private async DisconnectQmp() { if (this.qmpConnected) return; - if (this.qmpInstance == null) return; + if (this.qmpSocket == null) return; this.qmpConnected = false; - this.qmpInstance.end(); - this.qmpInstance = null; + this.qmpSocket?.end(); + try { await unlink(this.GetQmpPath()); } catch (err) {} diff --git a/qemu/src/QmpClient.ts b/qemu/src/QmpClient.ts index fe93d68..eaf7c7c 100644 --- a/qemu/src/QmpClient.ts +++ b/qemu/src/QmpClient.ts @@ -1,130 +1,139 @@ -// This was originally based off the contents of the node-qemu-qmp package, -// but I've modified it possibly to the point where it could be treated as my own creation. +import { EventEmitter } from "node:events"; -import split from 'split'; +enum QmpClientState { + Handshaking, + Connected +} -import { Socket } from 'net'; +function qmpStringify(obj: any) { + return JSON.stringify(obj) + '\r\n'; +} -export type QmpCallback = (err: Error | null, res: any | null) => void; +// this writer interface is used to poll back to a higher level +// I/O layer that we want to write some data. +export interface IQmpClientWriter { + writeSome(data: Buffer) : void; +} -type QmpCommandEntry = { - callback: QmpCallback | null; - id: number; +export type QmpClientCallback = (err: Error | null, res: any | null) => void; + +type QmpClientCallbackEntry = { + id: number, + callback: QmpClientCallback | null }; -// TODO: Instead of the client "Is-A"ing a Socket, this should instead contain/store a Socket, -// (preferrably) passed by the user, to use for QMP communications. -// The client shouldn't have to know or care about the protocol, and it effectively hackily uses the fact -// Socket extends EventEmitter. +export enum QmpEvent { + BlockIOError = 'BLOCK_IO_ERROR', + Reset = 'RESET', + Resume = 'RESUME', + RtcChange = 'RTC_CHANGE', + Shutdown = 'SHUTDOWN', + Stop = 'STOP', + VncConnected = 'VNC_CONNECTED', + VncDisconnected = 'VNC_DISCONNECTED', + VncInitalized = 'VNC_INITALIZED', + Watchdog = 'WATCHDOG' +}; -export default class QmpClient extends Socket { - public qmpHandshakeData: any; - private commandEntries: QmpCommandEntry[] = []; - private lastID = 0; - constructor() { - super(); +// A QMP client +export class QmpClient extends EventEmitter { + private state = QmpClientState.Handshaking; + private capabilities = ""; + private writer: IQmpClientWriter | null = null; - this.assignHandlers(); - } + private lastID = 0; + private callbacks = new Array(); - private ExecuteSync(command: string, args: any | null, callback: QmpCallback | null) { - let cmd: QmpCommandEntry = { - callback: callback, - id: ++this.lastID - }; + constructor() { + super(); + } - let qmpOut: any = { - execute: command, - id: cmd.id - }; + setWriter(writer: IQmpClientWriter) { + this.writer = writer; + } - if (args) qmpOut['arguments'] = args; + feed(data: Buffer) : void { + let str = data.toString(); - // Add stuff - this.commandEntries.push(cmd); - this.write(JSON.stringify(qmpOut)); - } + /* I don't think this is needed but if it is i'm keeping this for now + if(!str.endsWith('\r\n')) { + console.log("incomplete message!"); + return; + } + */ - // TODO: Make this function a bit more ergonomic? - async Execute(command: string, args: any | null = null): Promise { - return new Promise((res, rej) => { - this.ExecuteSync(command, args, (err, result) => { - if (err) rej(err); - res(result); - }); - }); - } + let obj = JSON.parse(str); - private Handshake(callback: () => void) { - this.write( - JSON.stringify({ - execute: 'qmp_capabilities' - }) - ); + switch(this.state) { + case QmpClientState.Handshaking: + if(obj["return"] != undefined) { + this.state = QmpClientState.Connected; + this.emit('connected'); + return; + } - this.once('data', (data) => { - // Once QEMU replies to us, the handshake is done. - // We do not negotiate anything special. - callback(); - }); - } + let capabilities = qmpStringify({ + execute: "qmp_capabilities" + }); - // this can probably be made async - private assignHandlers() { - let self = this; + this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); + break; - this.on('connect', () => { - // this should be more correct? - this.once('data', (data) => { - // Handshake QMP with the server. - self.qmpHandshakeData = JSON.parse(data.toString('utf8')).QMP; - self.Handshake(() => { - // Now ready to parse QMP responses/events. - self.pipe(split(JSON.parse)) - .on('data', (json: any) => { - if (json == null) return self.end(); + case QmpClientState.Connected: + if(obj["return"] != undefined || obj['error'] != undefined) { + if(obj['id'] == null) + return; - if (json.return || json.error) { - // Our handshake has a spurious return because we never assign it an ID, - // and it is gathered by this pipe for some reason I'm not quite sure about. - // So, just for safety's sake, don't process any return objects which don't have an ID attached to them. - if (json.id == null) return; + let cb = this.callbacks.find((v) => v.id == obj['id']); + if(cb == undefined) + return; - let callbackEntry = this.commandEntries.find((entry) => entry.id === json.id); - let error: Error | null = json.error ? new Error(json.error.desc) : null; + let error: Error | null = obj.error ? new Error(obj.error.desc) : null; - // we somehow didn't find a callback entry for this response. - // I don't know how. Techinically not an error..., but I guess you're not getting a reponse to whatever causes this to happen - if (callbackEntry == null) return; + if(cb.callback) + cb.callback(error, obj.return); - if (callbackEntry?.callback) callbackEntry.callback(error, json.return); + this.callbacks.slice(this.callbacks.indexOf(cb)); + } else if (obj['event']) { + this.emit(obj.event, { + timestamp: obj.timestamp, + data: obj.data + }); + } + break; + } + } - // Remove the completed callback entry. - this.commandEntries.slice(this.commandEntries.indexOf(callbackEntry)); - } else if (json.event) { - this.emit('event', json); - } - }) - .on('error', () => { - // Give up. - return self.end(); - }); - this.emit('qmp-ready'); - }); - }); - }); + executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) { + let entry = { + callback: callback, + id: ++this.lastID + }; - this.on('close', () => { - this.end(); - }); - } + let qmpOut: any = { + execute: command, + id: entry.id + }; - Connect(host: string, port: number) { - super.connect(port, host); - } + if(args !== undefined) + qmpOut['arguments'] = args; - ConnectUNIX(path: string) { - super.connect(path); - } + this.callbacks.push(entry); + this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8')); + } + + async execute(command: string, args: any | undefined = undefined) : Promise { + return new Promise((res, rej) => { + this.executeSync(command, args, (err, result) => { + if(err) + rej(err); + res(result); + }); + }); + } + + reset() { + this.state = QmpClientState.Handshaking; + } } diff --git a/qemu/src/index.ts b/qemu/src/index.ts index 274f400..a3deb21 100644 --- a/qemu/src/index.ts +++ b/qemu/src/index.ts @@ -1,3 +1,5 @@ +/// + export * from './QemuDisplay.js'; export * from './QemuUtil.js'; export * from './QemuVM.js'; diff --git a/tsconfig.json b/tsconfig.json index 1cad5cb..1579da8 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,6 +4,7 @@ "target": "ES2022", "module": "ES2022", "moduleResolution": "Node", + "types": ["node"], "allowSyntheticDefaultImports": true, "strict": true, } diff --git a/yarn.lock b/yarn.lock index 63d43bd..2dc7fb3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -92,10 +92,8 @@ __metadata: dependencies: "@computernewb/nodejs-rfb": "npm:*" "@cvmts/shared": "npm:*" - "@types/split": "npm:^1.0.5" execa: "npm:^8.0.1" parcel: "npm:^2.12.0" - split: "npm:^1.0.1" languageName: unknown linkType: soft @@ -1545,22 +1543,12 @@ __metadata: languageName: node linkType: hard -"@types/split@npm:^1.0.5": - version: 1.0.5 - resolution: "@types/split@npm:1.0.5" +"@types/node@npm:^20.14.10": + version: 20.14.10 + resolution: "@types/node@npm:20.14.10" dependencies: - "@types/node": "npm:*" - "@types/through": "npm:*" - checksum: 10c0/eb187a3b07e5064928e49bffd5c45ad1f1109135fee52344bb7623cdb55e2ebb16bd6ca009a30a0a6e2b262f7ebb7bf18030ff873819e80fafd4cbb51dba1a74 - languageName: node - linkType: hard - -"@types/through@npm:*": - version: 0.0.33 - resolution: "@types/through@npm:0.0.33" - dependencies: - "@types/node": "npm:*" - checksum: 10c0/6a8edd7f40cd7e197318e86310a40e568cddd380609dde59b30d5cc6c5f8276ddc698905eac4b3b429eb39f2e8ee326bc20dc6e95a2cdc41c4d3fc9a1ebd4929 + undici-types: "npm:~5.26.4" + checksum: 10c0/0b06cff14365c2d0085dc16cc8cbea5c40ec09cfc1fea966be9eeecf35562760bfde8f88e86de6edfaf394501236e229d9c1084fad04fb4dec472ae245d8ae69 languageName: node linkType: hard @@ -1998,7 +1986,7 @@ __metadata: "@parcel/packager-ts": "npm:2.12.0" "@parcel/transformer-sass": "npm:2.12.0" "@parcel/transformer-typescript-types": "npm:2.12.0" - "@types/node": "npm:^20.12.5" + "@types/node": "npm:^20.14.10" just-install: "npm:^2.0.1" parcel: "npm:^2.12.0" prettier: "npm:^3.2.5" @@ -3713,15 +3701,6 @@ __metadata: languageName: node linkType: hard -"split@npm:^1.0.1": - version: 1.0.1 - resolution: "split@npm:1.0.1" - dependencies: - through: "npm:2" - checksum: 10c0/7f489e7ed5ff8a2e43295f30a5197ffcb2d6202c9cf99357f9690d645b19c812bccf0be3ff336fea5054cda17ac96b91d67147d95dbfc31fbb5804c61962af85 - languageName: node - linkType: hard - "sprintf-js@npm:^1.1.3": version: 1.1.3 resolution: "sprintf-js@npm:1.1.3" @@ -3855,13 +3834,6 @@ __metadata: languageName: node linkType: hard -"through@npm:2": - version: 2.3.8 - resolution: "through@npm:2.3.8" - checksum: 10c0/4b09f3774099de0d4df26d95c5821a62faee32c7e96fb1f4ebd54a2d7c11c57fe88b0a0d49cf375de5fee5ae6bf4eb56dbbf29d07366864e2ee805349970d3cc - languageName: node - linkType: hard - "timsort@npm:^0.3.0": version: 0.3.0 resolution: "timsort@npm:0.3.0"