qemu: More fun refactoring
The QMP client has been refactored slightly, mostly just to clean up its edges slightly. QemuVM however has seen a big refactor, especially connecting to QMP. Flattening out this logic is something I should have done a long time ago. This seemingly has finally hammered out the bugs, although time will tell.
This commit is contained in:
@@ -113,9 +113,6 @@ export default class CollabVMServer {
|
||||
|
||||
this.OnDisplayResized(initSize);
|
||||
|
||||
// vm.GetDisplay().on('resize', (size: Size) => this.OnDisplayResized(size));
|
||||
// vm.GetDisplay().on('rect', (rect: Rect) => this.OnDisplayRectangle(rect));
|
||||
|
||||
this.VM = vm;
|
||||
|
||||
// hack but whatever (TODO: less rickity)
|
||||
@@ -123,15 +120,12 @@ export default class CollabVMServer {
|
||||
if (config.vm.type == 'qemu') {
|
||||
(vm as QemuVM).on('statechange', (newState: VMState) => {
|
||||
if(newState == VMState.Started) {
|
||||
//self.logger.Info("started!!");
|
||||
|
||||
// well aware this sucks but whatever
|
||||
self.VM.GetDisplay().on('resize', (size: Size) => self.OnDisplayResized(size));
|
||||
self.VM.GetDisplay().on('rect', (rect: Rect) => self.OnDisplayRectangle(rect));
|
||||
}
|
||||
|
||||
if (newState == VMState.Stopped) {
|
||||
//self.logger.Info('stopped ?');
|
||||
setTimeout(async () => {
|
||||
self.logger.Info('restarting VM');
|
||||
await self.VM.Start();
|
||||
@@ -397,14 +391,14 @@ export default class CollabVMServer {
|
||||
var y = parseInt(msgArr[2]);
|
||||
var mask = parseInt(msgArr[3]);
|
||||
if (x === undefined || y === undefined || mask === undefined) return;
|
||||
this.VM.GetDisplay()!.MouseEvent(x, y, mask);
|
||||
this.VM.GetDisplay()?.MouseEvent(x, y, mask);
|
||||
break;
|
||||
case 'key':
|
||||
if (this.TurnQueue.peek() !== client && client.rank !== Rank.Admin) return;
|
||||
var keysym = parseInt(msgArr[1]);
|
||||
var down = parseInt(msgArr[2]);
|
||||
if (keysym === undefined || (down !== 0 && down !== 1)) return;
|
||||
this.VM.GetDisplay()!.KeyboardEvent(keysym, down === 1 ? true : false);
|
||||
this.VM.GetDisplay()?.KeyboardEvent(keysym, down === 1 ? true : false);
|
||||
break;
|
||||
case 'vote':
|
||||
if (!this.VM.SnapshotsSupported()) return;
|
||||
@@ -503,14 +497,8 @@ export default class CollabVMServer {
|
||||
case '5':
|
||||
// QEMU Monitor
|
||||
if (client.rank !== Rank.Admin) return;
|
||||
/* Surely there could be rudimentary processing to convert some qemu monitor syntax to [XYZ hypervisor] if possible
|
||||
if (!(this.VM instanceof QEMUVM)) {
|
||||
client.sendMsg(cvm.guacEncode("admin", "2", "This is not a QEMU VM and therefore QEMU monitor commands cannot be run."));
|
||||
return;
|
||||
}
|
||||
*/
|
||||
if (msgArr.length !== 4 || msgArr[2] !== this.Config.collabvm.node) return;
|
||||
var output = await this.VM.MonitorCommand(msgArr[3]);
|
||||
let output = await this.VM.MonitorCommand(msgArr[3]);
|
||||
client.sendMsg(cvm.guacEncode('admin', '2', String(output)));
|
||||
break;
|
||||
case '8':
|
||||
|
||||
@@ -17,9 +17,9 @@ export enum VMState {
|
||||
// TODO: Add bits to this to allow usage (optionally)
|
||||
// of VNC/QMP port. This will be needed to fix up Windows support.
|
||||
export type QemuVmDefinition = {
|
||||
id: string,
|
||||
command: string,
|
||||
snapshot: boolean
|
||||
id: string;
|
||||
command: string;
|
||||
snapshot: boolean;
|
||||
};
|
||||
|
||||
/// Temporary path base (for UNIX sockets/etc.)
|
||||
@@ -35,19 +35,18 @@ class SocketWriter implements IQmpClientWriter {
|
||||
client;
|
||||
|
||||
constructor(socket: Socket, client: QmpClient) {
|
||||
this.socket = socket;
|
||||
this.client = client;
|
||||
this.socket = socket;
|
||||
this.client = client;
|
||||
|
||||
this.socket.on('data', (data) => {
|
||||
this.client.feed(data);
|
||||
});
|
||||
this.socket.on('data', (data) => {
|
||||
this.client.feed(data);
|
||||
});
|
||||
}
|
||||
|
||||
writeSome(buffer: Buffer) {
|
||||
this.socket.write(buffer);
|
||||
this.socket.write(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
export class QemuVM extends EventEmitter {
|
||||
private state = VMState.Stopped;
|
||||
@@ -72,13 +71,12 @@ export class QemuVM extends EventEmitter {
|
||||
|
||||
this.display = new QemuDisplay(this.GetVncPath());
|
||||
|
||||
|
||||
let self = this;
|
||||
|
||||
// Handle the STOP event sent when using -no-shutdown
|
||||
this.qmpInstance.on(QmpEvent.Stop, async () => {
|
||||
await self.qmpInstance.execute('system_reset');
|
||||
})
|
||||
});
|
||||
|
||||
this.qmpInstance.on(QmpEvent.Reset, async () => {
|
||||
await self.qmpInstance.execute('cont');
|
||||
@@ -100,7 +98,7 @@ export class QemuVM extends EventEmitter {
|
||||
async Start() {
|
||||
// Don't start while either trying to start or starting.
|
||||
//if (this.state == VMState.Started || this.state == VMState.Starting) return;
|
||||
if(this.qemuProcess) return;
|
||||
if (this.qemuProcess) return;
|
||||
|
||||
let cmd = this.definition.command;
|
||||
|
||||
@@ -116,43 +114,27 @@ export class QemuVM extends EventEmitter {
|
||||
await this.StartQemu(cmd);
|
||||
}
|
||||
|
||||
SnapshotsSupported() : boolean {
|
||||
SnapshotsSupported(): boolean {
|
||||
return this.definition.snapshot;
|
||||
}
|
||||
|
||||
async Reboot() : Promise<void> {
|
||||
async Reboot(): Promise<void> {
|
||||
await this.MonitorCommand('system_reset');
|
||||
}
|
||||
|
||||
async Stop() {
|
||||
// This is called in certain lifecycle places where we can't safely assert state yet
|
||||
//this.AssertState(VMState.Started, 'cannot use QemuVM#Stop on a non-started VM');
|
||||
this.AssertState(VMState.Started, 'cannot use QemuVM#Stop on a non-started VM');
|
||||
|
||||
// Indicate we're stopping, so we don't
|
||||
// erroneously start trying to restart everything
|
||||
// we're going to tear down.
|
||||
// Indicate we're stopping, so we don't erroneously start trying to restart everything we're going to tear down.
|
||||
this.SetState(VMState.Stopping);
|
||||
|
||||
// Kill the QEMU process and QMP/display connections if they are running.
|
||||
await this.DisconnectQmp();
|
||||
await this.DisconnectDisplay();
|
||||
// Stop the QEMU process, which will bring down everything else.
|
||||
await this.StopQemu();
|
||||
|
||||
}
|
||||
|
||||
async Reset() {
|
||||
//this.AssertState(VMState.Started, 'cannot use QemuVM#Reset on a non-started VM');
|
||||
|
||||
// let code know the VM is going to reset
|
||||
this.emit('reset');
|
||||
|
||||
if(this.qemuProcess !== null) {
|
||||
// Do magic.
|
||||
await this.StopQemu();
|
||||
} else {
|
||||
// N.B we always get here when addl. arguments are added
|
||||
await this.StartQemu(this.definition.command);
|
||||
}
|
||||
this.AssertState(VMState.Started, 'cannot use QemuVM#Reset on a non-started VM');
|
||||
await this.StopQemu();
|
||||
}
|
||||
|
||||
async QmpCommand(command: string, args: any | null): Promise<any> {
|
||||
@@ -161,9 +143,11 @@ export class QemuVM extends EventEmitter {
|
||||
|
||||
async MonitorCommand(command: string) {
|
||||
this.AssertState(VMState.Started, 'cannot use QemuVM#MonitorCommand on a non-started VM');
|
||||
return await this.QmpCommand('human-monitor-command', {
|
||||
let result = await this.QmpCommand('human-monitor-command', {
|
||||
'command-line': command
|
||||
});
|
||||
if (result == null) result = '';
|
||||
return result;
|
||||
}
|
||||
|
||||
async ChangeRemovableMedia(deviceName: string, imagePath: string): Promise<void> {
|
||||
@@ -201,7 +185,7 @@ export class QemuVM extends EventEmitter {
|
||||
this.emit('statechange', this.state);
|
||||
|
||||
// reset QMP fail count when the VM is (re)starting or stopped
|
||||
if(this.state == VMState.Stopped || this.state == VMState.Starting) {
|
||||
if (this.state == VMState.Stopped || this.state == VMState.Starting) {
|
||||
this.qmpFailCount = 0;
|
||||
}
|
||||
}
|
||||
@@ -225,43 +209,41 @@ export class QemuVM extends EventEmitter {
|
||||
this.qemuProcess = execaCommand(split);
|
||||
|
||||
this.qemuProcess.stderr?.on('data', (data) => {
|
||||
self.VMLog().Error("QEMU stderr: {0}", data.toString('utf8'));
|
||||
})
|
||||
self.VMLog().Error('QEMU stderr: {0}', data.toString('utf8'));
|
||||
});
|
||||
|
||||
this.qemuProcess.on('spawn', async () => {
|
||||
self.VMLog().Info("QEMU started");
|
||||
await Shared.Sleep(500);
|
||||
self.VMLog().Info('QEMU started');
|
||||
await Shared.Sleep(250);
|
||||
await self.ConnectQmp();
|
||||
});
|
||||
|
||||
this.qemuProcess.on('exit', async (code) => {
|
||||
self.VMLog().Info("QEMU process exited");
|
||||
self.VMLog().Info('QEMU process exited');
|
||||
|
||||
// this should be being done anways but it's very clearly not sometimes so
|
||||
// fuck it, let's just force it here
|
||||
// Disconnect from the display and QMP connections.
|
||||
await self.DisconnectDisplay();
|
||||
await self.DisconnectQmp();
|
||||
|
||||
// Remove the sockets for VNC and QMP.
|
||||
try {
|
||||
await unlink(this.GetVncPath());
|
||||
} catch(_) {}
|
||||
} catch (_) {}
|
||||
|
||||
try {
|
||||
await unlink(this.GetQmpPath());
|
||||
} catch(_) {}
|
||||
|
||||
// ?
|
||||
if (self.qmpConnected) {
|
||||
await self.DisconnectQmp();
|
||||
}
|
||||
|
||||
await self.DisconnectDisplay();
|
||||
} catch (_) {}
|
||||
|
||||
if (self.state != VMState.Stopping) {
|
||||
if (code == 0) {
|
||||
// Wait a bit and restart QEMU.
|
||||
await Shared.Sleep(500);
|
||||
await Shared.Sleep(250);
|
||||
await self.StartQemu(split);
|
||||
} else {
|
||||
self.VMLog().Error('QEMU exited with a non-zero exit code. This usually means an error in the command line. Stopping VM.');
|
||||
await self.Stop();
|
||||
// Note that we've already tore down everything upon entry to this event handler; therefore
|
||||
// we can simply set the state and move on.
|
||||
this.SetState(VMState.Stopped);
|
||||
}
|
||||
} else {
|
||||
// Indicate we have stopped.
|
||||
@@ -280,46 +262,42 @@ export class QemuVM extends EventEmitter {
|
||||
private async ConnectQmp() {
|
||||
let self = this;
|
||||
|
||||
if (!this.qmpConnected) {
|
||||
try {
|
||||
await Shared.Sleep(500);
|
||||
this.qmpSocket = connect(this.GetQmpPath());
|
||||
|
||||
let onQmpClose = async () => {
|
||||
if(self.qmpConnected) {
|
||||
self.qmpConnected = false;
|
||||
self.qmpSocket = null;
|
||||
|
||||
// If we aren't stopping, then we should care QMP disconnected
|
||||
if (self.state != VMState.Stopping) {
|
||||
if (self.qmpFailCount++ < kMaxFailCount) {
|
||||
self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`);
|
||||
await Shared.Sleep(500);
|
||||
await self.ConnectQmp();
|
||||
} else {
|
||||
self.VMLog().Error(`Reached max retries, giving up.`);
|
||||
await self.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
this.qmpSocket.on('close', onQmpClose);
|
||||
|
||||
this.qmpSocket.on('error', (e: Error) => {
|
||||
self.VMLog().Error("QMP Error: {0}", e.message);
|
||||
});
|
||||
|
||||
// Setup the QMP client.
|
||||
let writer = new SocketWriter(this.qmpSocket, this.qmpInstance);
|
||||
this.qmpInstance.reset();
|
||||
this.qmpInstance.setWriter(writer);
|
||||
} catch (err) {
|
||||
// just try again
|
||||
//await Shared.Sleep(500);
|
||||
//await this.ConnectQmp();
|
||||
}
|
||||
if (this.qmpConnected) {
|
||||
this.VMLog().Error('Already connected to QMP!');
|
||||
return;
|
||||
}
|
||||
|
||||
this.qmpSocket = connect(this.GetQmpPath());
|
||||
|
||||
this.qmpSocket.on('close', async () => {
|
||||
self.qmpSocket?.removeAllListeners();
|
||||
|
||||
if (self.qmpConnected) {
|
||||
await self.DisconnectQmp();
|
||||
|
||||
// If we aren't stopping, then we should care QMP disconnected
|
||||
if (self.state != VMState.Stopping) {
|
||||
if (self.qmpFailCount++ < kMaxFailCount) {
|
||||
self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`);
|
||||
await Shared.Sleep(250);
|
||||
await self.ConnectQmp();
|
||||
} else {
|
||||
self.VMLog().Error(`Reached max retries, giving up.`);
|
||||
await self.Stop();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
this.qmpSocket.on('error', (e: Error) => {
|
||||
self.VMLog().Error('QMP socket error: {0}', e.message);
|
||||
});
|
||||
|
||||
// Setup the QMP client.
|
||||
let writer = new SocketWriter(this.qmpSocket, this.qmpInstance);
|
||||
this.qmpInstance.reset();
|
||||
this.qmpInstance.setWriter(writer);
|
||||
}
|
||||
|
||||
private async DisconnectDisplay() {
|
||||
@@ -332,14 +310,11 @@ export class QemuVM extends EventEmitter {
|
||||
}
|
||||
|
||||
private async DisconnectQmp() {
|
||||
if (this.qmpConnected) return;
|
||||
if (this.qmpSocket == null) return;
|
||||
|
||||
if (!this.qmpConnected) return;
|
||||
this.qmpConnected = false;
|
||||
this.qmpSocket?.end();
|
||||
|
||||
try {
|
||||
await unlink(this.GetQmpPath());
|
||||
} catch (err) {}
|
||||
if (this.qmpSocket == null) return;
|
||||
this.qmpSocket?.end();
|
||||
this.qmpSocket = null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,11 +51,8 @@ class LineStream extends EventEmitter {
|
||||
if (lines.length > 1) {
|
||||
this.buffer = lines.pop()!;
|
||||
lines = lines.filter((l) => !!l);
|
||||
|
||||
//console.log(lines)
|
||||
lines.forEach(l => this.emit('line', l));
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
reset() {
|
||||
@@ -98,16 +95,21 @@ export class QmpClient extends EventEmitter {
|
||||
switch (this.state) {
|
||||
case QmpClientState.Handshaking:
|
||||
if (obj['return'] != undefined) {
|
||||
// Once we get a return from our handshake execution,
|
||||
// we have exited handshake state.
|
||||
this.state = QmpClientState.Connected;
|
||||
this.emit('connected');
|
||||
return;
|
||||
} else if(obj['QMP'] != undefined) {
|
||||
// Send a `qmp_capabilities` command, to exit handshake state.
|
||||
// We do not support any of the supported extended QMP capabilities currently,
|
||||
// and probably never will (due to their relative uselessness.)
|
||||
let capabilities = qmpStringify({
|
||||
execute: 'qmp_capabilities'
|
||||
});
|
||||
|
||||
this.writer?.writeSome(Buffer.from(capabilities, 'utf8'));
|
||||
}
|
||||
|
||||
let capabilities = qmpStringify({
|
||||
execute: 'qmp_capabilities'
|
||||
});
|
||||
|
||||
this.writer?.writeSome(Buffer.from(capabilities, 'utf8'));
|
||||
break;
|
||||
|
||||
case QmpClientState.Connected:
|
||||
@@ -119,7 +121,7 @@ export class QmpClient extends EventEmitter {
|
||||
|
||||
let error: Error | null = obj.error ? new Error(obj.error.desc) : null;
|
||||
|
||||
if (cb.callback) cb.callback(error, obj.return);
|
||||
if (cb.callback) cb.callback(error, obj.return || null);
|
||||
|
||||
this.callbacks.slice(this.callbacks.indexOf(cb));
|
||||
} else if (obj['event']) {
|
||||
@@ -132,7 +134,8 @@ export class QmpClient extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
executeSync(command: string, args: any | undefined, callback: QmpClientCallback | null) {
|
||||
// Executes a QMP command, using a user-provided callback for completion notification
|
||||
executeCallback(command: string, args: any | undefined, callback: QmpClientCallback | null) {
|
||||
let entry = {
|
||||
callback: callback,
|
||||
id: ++this.lastID
|
||||
@@ -149,9 +152,10 @@ export class QmpClient extends EventEmitter {
|
||||
this.writer?.writeSome(Buffer.from(qmpStringify(qmpOut), 'utf8'));
|
||||
}
|
||||
|
||||
// Executes a QMP command asynchronously.
|
||||
async execute(command: string, args: any | undefined = undefined): Promise<any> {
|
||||
return new Promise((res, rej) => {
|
||||
this.executeSync(command, args, (err, result) => {
|
||||
this.executeCallback(command, args, (err, result) => {
|
||||
if (err) rej(err);
|
||||
res(result);
|
||||
});
|
||||
@@ -159,6 +163,7 @@ export class QmpClient extends EventEmitter {
|
||||
}
|
||||
|
||||
reset() {
|
||||
// Reset the line stream so it doesn't go awry
|
||||
this.lineStream.reset();
|
||||
this.state = QmpClientState.Handshaking;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user