From 25b32b23b7bbdc0b85c5c7779263fee05f8f2bf3 Mon Sep 17 00:00:00 2001 From: modeco80 Date: Thu, 11 Jul 2024 03:24:22 -0400 Subject: [PATCH] qemu: More fun refactoring The QMP client has been refactored slightly, mostly just to clean up its edges slightly. QemuVM however has seen a big refactor, especially connecting to QMP. Flattening out this logic is something I should have done a long time ago. This seemingly has finally hammered out the bugs, although time will tell. --- cvmts/src/CollabVMServer.ts | 18 +--- qemu/src/QemuVM.ts | 189 ++++++++++++++++-------------------- qemu/src/QmpClient.ts | 29 +++--- 3 files changed, 102 insertions(+), 134 deletions(-) diff --git a/cvmts/src/CollabVMServer.ts b/cvmts/src/CollabVMServer.ts index e72fcd3..d9a2aa2 100644 --- a/cvmts/src/CollabVMServer.ts +++ b/cvmts/src/CollabVMServer.ts @@ -113,9 +113,6 @@ export default class CollabVMServer { this.OnDisplayResized(initSize); -// vm.GetDisplay().on('resize', (size: Size) => this.OnDisplayResized(size)); -// vm.GetDisplay().on('rect', (rect: Rect) => this.OnDisplayRectangle(rect)); - this.VM = vm; // hack but whatever (TODO: less rickity) @@ -123,15 +120,12 @@ export default class CollabVMServer { if (config.vm.type == 'qemu') { (vm as QemuVM).on('statechange', (newState: VMState) => { if(newState == VMState.Started) { - //self.logger.Info("started!!"); - // well aware this sucks but whatever self.VM.GetDisplay().on('resize', (size: Size) => self.OnDisplayResized(size)); self.VM.GetDisplay().on('rect', (rect: Rect) => self.OnDisplayRectangle(rect)); } if (newState == VMState.Stopped) { - //self.logger.Info('stopped ?'); setTimeout(async () => { self.logger.Info('restarting VM'); await self.VM.Start(); @@ -397,14 +391,14 @@ export default class CollabVMServer { var y = parseInt(msgArr[2]); var mask = parseInt(msgArr[3]); if (x === undefined || y === undefined || mask === undefined) return; - this.VM.GetDisplay()!.MouseEvent(x, y, mask); + this.VM.GetDisplay()?.MouseEvent(x, y, mask); break; case 'key': if (this.TurnQueue.peek() !== client && client.rank !== Rank.Admin) return; var keysym = parseInt(msgArr[1]); var down = parseInt(msgArr[2]); if (keysym === undefined || (down !== 0 && down !== 1)) return; - this.VM.GetDisplay()!.KeyboardEvent(keysym, down === 1 ? true : false); + this.VM.GetDisplay()?.KeyboardEvent(keysym, down === 1 ? true : false); break; case 'vote': if (!this.VM.SnapshotsSupported()) return; @@ -503,14 +497,8 @@ export default class CollabVMServer { case '5': // QEMU Monitor if (client.rank !== Rank.Admin) return; - /* Surely there could be rudimentary processing to convert some qemu monitor syntax to [XYZ hypervisor] if possible - if (!(this.VM instanceof QEMUVM)) { - client.sendMsg(cvm.guacEncode("admin", "2", "This is not a QEMU VM and therefore QEMU monitor commands cannot be run.")); - return; - } -*/ if (msgArr.length !== 4 || msgArr[2] !== this.Config.collabvm.node) return; - var output = await this.VM.MonitorCommand(msgArr[3]); + let output = await this.VM.MonitorCommand(msgArr[3]); client.sendMsg(cvm.guacEncode('admin', '2', String(output))); break; case '8': diff --git a/qemu/src/QemuVM.ts b/qemu/src/QemuVM.ts index 9cd0b4e..bcdf79e 100644 --- a/qemu/src/QemuVM.ts +++ b/qemu/src/QemuVM.ts @@ -17,9 +17,9 @@ export enum VMState { // TODO: Add bits to this to allow usage (optionally) // of VNC/QMP port. This will be needed to fix up Windows support. export type QemuVmDefinition = { - id: string, - command: string, - snapshot: boolean + id: string; + command: string; + snapshot: boolean; }; /// Temporary path base (for UNIX sockets/etc.) @@ -33,21 +33,20 @@ const kMaxFailCount = 5; class SocketWriter implements IQmpClientWriter { socket; client; - + constructor(socket: Socket, client: QmpClient) { - this.socket = socket; - this.client = client; - - this.socket.on('data', (data) => { - this.client.feed(data); - }); + this.socket = socket; + this.client = client; + + this.socket.on('data', (data) => { + this.client.feed(data); + }); } - + writeSome(buffer: Buffer) { - this.socket.write(buffer); + this.socket.write(buffer); } - } - +} export class QemuVM extends EventEmitter { private state = VMState.Stopped; @@ -72,13 +71,12 @@ export class QemuVM extends EventEmitter { this.display = new QemuDisplay(this.GetVncPath()); - let self = this; // Handle the STOP event sent when using -no-shutdown this.qmpInstance.on(QmpEvent.Stop, async () => { await self.qmpInstance.execute('system_reset'); - }) + }); this.qmpInstance.on(QmpEvent.Reset, async () => { await self.qmpInstance.execute('cont'); @@ -100,7 +98,7 @@ export class QemuVM extends EventEmitter { async Start() { // Don't start while either trying to start or starting. //if (this.state == VMState.Started || this.state == VMState.Starting) return; - if(this.qemuProcess) return; + if (this.qemuProcess) return; let cmd = this.definition.command; @@ -116,43 +114,27 @@ export class QemuVM extends EventEmitter { await this.StartQemu(cmd); } - SnapshotsSupported() : boolean { + SnapshotsSupported(): boolean { return this.definition.snapshot; } - async Reboot() : Promise { + async Reboot(): Promise { await this.MonitorCommand('system_reset'); } async Stop() { - // This is called in certain lifecycle places where we can't safely assert state yet - //this.AssertState(VMState.Started, 'cannot use QemuVM#Stop on a non-started VM'); - - // Indicate we're stopping, so we don't - // erroneously start trying to restart everything - // we're going to tear down. + this.AssertState(VMState.Started, 'cannot use QemuVM#Stop on a non-started VM'); + + // Indicate we're stopping, so we don't erroneously start trying to restart everything we're going to tear down. this.SetState(VMState.Stopping); - // Kill the QEMU process and QMP/display connections if they are running. - await this.DisconnectQmp(); - await this.DisconnectDisplay(); + // Stop the QEMU process, which will bring down everything else. await this.StopQemu(); - } async Reset() { - //this.AssertState(VMState.Started, 'cannot use QemuVM#Reset on a non-started VM'); - - // let code know the VM is going to reset - this.emit('reset'); - - if(this.qemuProcess !== null) { - // Do magic. - await this.StopQemu(); - } else { - // N.B we always get here when addl. arguments are added - await this.StartQemu(this.definition.command); - } + this.AssertState(VMState.Started, 'cannot use QemuVM#Reset on a non-started VM'); + await this.StopQemu(); } async QmpCommand(command: string, args: any | null): Promise { @@ -161,9 +143,11 @@ export class QemuVM extends EventEmitter { async MonitorCommand(command: string) { this.AssertState(VMState.Started, 'cannot use QemuVM#MonitorCommand on a non-started VM'); - return await this.QmpCommand('human-monitor-command', { + let result = await this.QmpCommand('human-monitor-command', { 'command-line': command }); + if (result == null) result = ''; + return result; } async ChangeRemovableMedia(deviceName: string, imagePath: string): Promise { @@ -201,7 +185,7 @@ export class QemuVM extends EventEmitter { this.emit('statechange', this.state); // reset QMP fail count when the VM is (re)starting or stopped - if(this.state == VMState.Stopped || this.state == VMState.Starting) { + if (this.state == VMState.Stopped || this.state == VMState.Starting) { this.qmpFailCount = 0; } } @@ -225,43 +209,41 @@ export class QemuVM extends EventEmitter { this.qemuProcess = execaCommand(split); this.qemuProcess.stderr?.on('data', (data) => { - self.VMLog().Error("QEMU stderr: {0}", data.toString('utf8')); - }) + self.VMLog().Error('QEMU stderr: {0}', data.toString('utf8')); + }); this.qemuProcess.on('spawn', async () => { - self.VMLog().Info("QEMU started"); - await Shared.Sleep(500); + self.VMLog().Info('QEMU started'); + await Shared.Sleep(250); await self.ConnectQmp(); }); this.qemuProcess.on('exit', async (code) => { - self.VMLog().Info("QEMU process exited"); + self.VMLog().Info('QEMU process exited'); - // this should be being done anways but it's very clearly not sometimes so - // fuck it, let's just force it here + // Disconnect from the display and QMP connections. + await self.DisconnectDisplay(); + await self.DisconnectQmp(); + + // Remove the sockets for VNC and QMP. try { await unlink(this.GetVncPath()); - } catch(_) {} + } catch (_) {} try { await unlink(this.GetQmpPath()); - } catch(_) {} - - // ? - if (self.qmpConnected) { - await self.DisconnectQmp(); - } - - await self.DisconnectDisplay(); + } catch (_) {} if (self.state != VMState.Stopping) { if (code == 0) { // Wait a bit and restart QEMU. - await Shared.Sleep(500); + await Shared.Sleep(250); await self.StartQemu(split); } else { self.VMLog().Error('QEMU exited with a non-zero exit code. This usually means an error in the command line. Stopping VM.'); - await self.Stop(); + // Note that we've already tore down everything upon entry to this event handler; therefore + // we can simply set the state and move on. + this.SetState(VMState.Stopped); } } else { // Indicate we have stopped. @@ -280,46 +262,42 @@ export class QemuVM extends EventEmitter { private async ConnectQmp() { let self = this; - if (!this.qmpConnected) { - try { - await Shared.Sleep(500); - this.qmpSocket = connect(this.GetQmpPath()); - - let onQmpClose = async () => { - if(self.qmpConnected) { - self.qmpConnected = false; - self.qmpSocket = null; - - // If we aren't stopping, then we should care QMP disconnected - if (self.state != VMState.Stopping) { - if (self.qmpFailCount++ < kMaxFailCount) { - self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`); - await Shared.Sleep(500); - await self.ConnectQmp(); - } else { - self.VMLog().Error(`Reached max retries, giving up.`); - await self.Stop(); - } - } - } - }; - - this.qmpSocket.on('close', onQmpClose); - - this.qmpSocket.on('error', (e: Error) => { - self.VMLog().Error("QMP Error: {0}", e.message); - }); - - // Setup the QMP client. - let writer = new SocketWriter(this.qmpSocket, this.qmpInstance); - this.qmpInstance.reset(); - this.qmpInstance.setWriter(writer); - } catch (err) { - // just try again - //await Shared.Sleep(500); - //await this.ConnectQmp(); - } + if (this.qmpConnected) { + this.VMLog().Error('Already connected to QMP!'); + return; } + + this.qmpSocket = connect(this.GetQmpPath()); + + this.qmpSocket.on('close', async () => { + self.qmpSocket?.removeAllListeners(); + + if (self.qmpConnected) { + await self.DisconnectQmp(); + + // If we aren't stopping, then we should care QMP disconnected + if (self.state != VMState.Stopping) { + if (self.qmpFailCount++ < kMaxFailCount) { + self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`); + await Shared.Sleep(250); + await self.ConnectQmp(); + } else { + self.VMLog().Error(`Reached max retries, giving up.`); + await self.Stop(); + return; + } + } + } + }); + + this.qmpSocket.on('error', (e: Error) => { + self.VMLog().Error('QMP socket error: {0}', e.message); + }); + + // Setup the QMP client. + let writer = new SocketWriter(this.qmpSocket, this.qmpInstance); + this.qmpInstance.reset(); + this.qmpInstance.setWriter(writer); } private async DisconnectDisplay() { @@ -332,14 +310,11 @@ export class QemuVM extends EventEmitter { } private async DisconnectQmp() { - if (this.qmpConnected) return; - if (this.qmpSocket == null) return; - + if (!this.qmpConnected) return; this.qmpConnected = false; - this.qmpSocket?.end(); - try { - await unlink(this.GetQmpPath()); - } catch (err) {} + if (this.qmpSocket == null) return; + this.qmpSocket?.end(); + this.qmpSocket = null; } } diff --git a/qemu/src/QmpClient.ts b/qemu/src/QmpClient.ts index 29ff0ac..68d7a5f 100644 --- a/qemu/src/QmpClient.ts +++ b/qemu/src/QmpClient.ts @@ -51,11 +51,8 @@ class LineStream extends EventEmitter { if (lines.length > 1) { this.buffer = lines.pop()!; lines = lines.filter((l) => !!l); - - //console.log(lines) lines.forEach(l => this.emit('line', l)); } - return []; } reset() { @@ -98,16 +95,21 @@ export class QmpClient extends EventEmitter { switch (this.state) { case QmpClientState.Handshaking: if (obj['return'] != undefined) { + // Once we get a return from our handshake execution, + // we have exited handshake state. this.state = QmpClientState.Connected; this.emit('connected'); return; + } else if(obj['QMP'] != undefined) { + // Send a `qmp_capabilities` command, to exit handshake state. + // We do not support any of the supported extended QMP capabilities currently, + // and probably never will (due to their relative uselessness.) + let capabilities = qmpStringify({ + execute: 'qmp_capabilities' + }); + + this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); } - - let capabilities = qmpStringify({ - execute: 'qmp_capabilities' - }); - - this.writer?.writeSome(Buffer.from(capabilities, 'utf8')); break; case QmpClientState.Connected: @@ -119,7 +121,7 @@ export class QmpClient extends EventEmitter { let error: Error | null = obj.error ? new Error(obj.error.desc) : null; - if (cb.callback) cb.callback(error, obj.return); + if (cb.callback) cb.callback(error, obj.return || null); this.callbacks.slice(this.callbacks.indexOf(cb)); } else if (obj['event']) { @@ -132,7 +134,8 @@ export class QmpClient extends EventEmitter { } } - executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) { + // Executes a QMP command, using a user-provided callback for completion notification + executeCallback(command: string, args: any | undefined, callback: QmpClientCallback | null) { let entry = { callback: callback, id: ++this.lastID @@ -149,9 +152,10 @@ export class QmpClient extends EventEmitter { this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8')); } + // Executes a QMP command asynchronously. async execute(command: string, args: any | undefined = undefined): Promise { return new Promise((res, rej) => { - this.executeSync(command, args, (err, result) => { + this.executeCallback(command, args, (err, result) => { if (err) rej(err); res(result); }); @@ -159,6 +163,7 @@ export class QmpClient extends EventEmitter { } reset() { + // Reset the line stream so it doesn't go awry this.lineStream.reset(); this.state = QmpClientState.Handshaking; }