Giant refactoring (or at least the start)

In short:
- cvmts is now bundled/built via parcel and inside of a npm/yarn workspace with multiple nodejs projects
- cvmts now uses the crusttest QEMU management and RFB library (or a fork, if you so prefer).
- cvmts does NOT use node-canvas anymore, instead we opt for the same route crusttest took and just encode jpegs ourselves from the RFB provoded framebuffer via jpeg-turbo. this means funnily enough sharp is back for more for thumbnails, but actually seems to WORK this time
- IPData is now managed in a very similar way to the original cvm 1.2 implementation where a central manager and reference count exist. tbh it wouldn't be that hard to implement multinode either, but for now, I'm not going to take much time on doing that.

this refactor is still incomplete. please do not treat it as generally available while it's not on the default branch. if you want to use it (and report bugs or send fixes) feel free to, but while it may "just work" in certain situations it may be very broken in others.

(yes, I know windows support is partially totaled by this; it's something that can and will be fixed)
This commit is contained in:
modeco80
2024-04-23 09:57:02 -04:00
parent 28dddfc363
commit cb297e15c4
46 changed files with 5661 additions and 1011 deletions

143
qemu/src/QemuDisplay.ts Normal file
View File

@@ -0,0 +1,143 @@
import { VncClient } from '@computernewb/nodejs-rfb';
import { EventEmitter } from 'node:events';
import { BatchRects } from './QemuUtil.js';
import { Size, Rect, Clamp } from '@cvmts/shared';
const kQemuFps = 60;
export type VncRect = {
x: number;
y: number;
width: number;
height: number;
};
// events:
//
// 'resize' -> (w, h) -> done when resize occurs
// 'rect' -> (x, y, ImageData) -> framebuffer
// 'frame' -> () -> done at end of frame
export class QemuDisplay extends EventEmitter {
private displayVnc = new VncClient({
debug: false,
fps: kQemuFps,
encodings: [
VncClient.consts.encodings.raw,
//VncClient.consts.encodings.pseudoQemuAudio,
VncClient.consts.encodings.pseudoDesktopSize
// For now?
//VncClient.consts.encodings.pseudoCursor
]
});
private vncShouldReconnect: boolean = false;
private vncSocketPath: string;
constructor(socketPath: string) {
super();
this.vncSocketPath = socketPath;
this.displayVnc.on('connectTimeout', () => {
this.Reconnect();
});
this.displayVnc.on('authError', () => {
this.Reconnect();
});
this.displayVnc.on('disconnect', () => {
this.Reconnect();
});
this.displayVnc.on('closed', () => {
this.Reconnect();
});
this.displayVnc.on('firstFrameUpdate', () => {
// apparently this library is this good.
// at least it's better than the two others which exist.
this.displayVnc.changeFps(kQemuFps);
this.emit('connected');
this.emit('resize', { width: this.displayVnc.clientWidth, height: this.displayVnc.clientHeight });
//this.emit('rect', { x: 0, y: 0, width: this.displayVnc.clientWidth, height: this.displayVnc.clientHeight });
this.emit('frame');
});
this.displayVnc.on('desktopSizeChanged', (size: Size) => {
this.emit('resize', size);
});
let rects: Rect[] = [];
this.displayVnc.on('rectUpdateProcessed', (rect: Rect) => {
rects.push(rect);
});
this.displayVnc.on('frameUpdated', (fb: Buffer) => {
// use the cvmts batcher
let batched = BatchRects(this.Size(), rects);
this.emit('rect', batched);
// unbatched (watch the performace go now)
//for(let rect of rects)
// this.emit('rect', rect);
rects = [];
this.emit('frame');
});
}
private Reconnect() {
if (this.displayVnc.connected) return;
if (!this.vncShouldReconnect) return;
// TODO: this should also give up after a max tries count
// if we fail after max tries, emit a event
this.displayVnc.connect({
path: this.vncSocketPath
});
}
Connect() {
this.vncShouldReconnect = true;
this.Reconnect();
}
Disconnect() {
this.vncShouldReconnect = false;
this.displayVnc.disconnect();
}
Buffer(): Buffer {
return this.displayVnc.fb;
}
Size(): Size {
if (!this.displayVnc.connected)
return {
width: 0,
height: 0
};
return {
width: this.displayVnc.clientWidth,
height: this.displayVnc.clientHeight
};
}
MouseEvent(x: number, y: number, buttons: number) {
if (this.displayVnc.connected) this.displayVnc.sendPointerEvent(Clamp(x, 0, this.displayVnc.clientWidth), Clamp(y, 0, this.displayVnc.clientHeight), buttons);
}
KeyboardEvent(keysym: number, pressed: boolean) {
if (this.displayVnc.connected) this.displayVnc.sendKeyEvent(keysym, pressed);
}
}

41
qemu/src/QemuUtil.ts Normal file
View File

@@ -0,0 +1,41 @@
import { Size, Rect } from "@cvmts/shared";
export function BatchRects(size: Size, rects: Array<Rect>): Rect {
var mergedX = size.width;
var mergedY = size.height;
var mergedHeight = 0;
var mergedWidth = 0;
// can't batch these
if (rects.length == 0) {
return {
x: 0,
y: 0,
width: size.width,
height: size.height
};
}
if (rects.length == 1) {
if (rects[0].width == size.width && rects[0].height == size.height) {
return rects[0];
}
}
rects.forEach((r) => {
if (r.x < mergedX) mergedX = r.x;
if (r.y < mergedY) mergedY = r.y;
});
rects.forEach((r) => {
if (r.height + r.y - mergedY > mergedHeight) mergedHeight = r.height + r.y - mergedY;
if (r.width + r.x - mergedX > mergedWidth) mergedWidth = r.width + r.x - mergedX;
});
return {
x: mergedX,
y: mergedY,
width: mergedWidth,
height: mergedHeight
};
}

290
qemu/src/QemuVM.ts Normal file
View File

@@ -0,0 +1,290 @@
import { execa, execaCommand, ExecaChildProcess } from 'execa';
import { EventEmitter } from 'events';
import QmpClient from './QmpClient.js';
import { QemuDisplay } from './QemuDisplay.js';
import { unlink } from 'node:fs/promises';
import * as Shared from '@cvmts/shared';
export enum VMState {
Stopped,
Starting,
Started,
Stopping
}
// TODO: Add bits to this to allow usage (optionally)
// of VNC/QMP port. This will be needed to fix up Windows support.
export type QemuVmDefinition = {
id: string;
command: string;
};
/// Temporary path base (for UNIX sockets/etc.)
const kVmTmpPathBase = `/tmp`;
/// The max amount of times QMP connection is allowed to fail before
/// the VM is forcefully stopped.
const kMaxFailCount = 5;
// TODO: This should be added to QemuVmDefinition and the below export removed
let gVMShouldSnapshot = true;
export function setSnapshot(val: boolean) {
gVMShouldSnapshot = val;
}
export class QemuVM extends EventEmitter {
private state = VMState.Stopped;
private qmpInstance: QmpClient | null = null;
private qmpConnected = false;
private qmpFailCount = 0;
private qemuProcess: ExecaChildProcess | null = null;
private qemuRunning = false;
private display: QemuDisplay;
private definition: QemuVmDefinition;
private addedAdditionalArguments = false;
private logger: Shared.Logger;
constructor(def: QemuVmDefinition) {
super();
this.definition = def;
this.logger = new Shared.Logger(`CVMTS.QEMU.QemuVM/${this.definition.id}`);
this.display = new QemuDisplay(this.GetVncPath());
}
async Start() {
// Don't start while either trying to start or starting.
if (this.state == VMState.Started || this.state == VMState.Starting) return;
let cmd = this.definition.command;
// build additional command line statements to enable qmp/vnc over unix sockets
// FIXME: Still use TCP if on Windows.
if(!this.addedAdditionalArguments) {
cmd += ' -no-shutdown';
if(gVMShouldSnapshot)
cmd += ' -snapshot';
cmd += ` -qmp unix:${this.GetQmpPath()},server,wait -vnc unix:${this.GetVncPath()}`;
this.definition.command = cmd;
this.addedAdditionalArguments = true;
}
this.VMLog().Info(`Starting QEMU with command \"${cmd}\"`);
await this.StartQemu(cmd);
}
async Stop() {
// This is called in certain lifecycle places where we can't safely assert state yet
//this.AssertState(VMState.Started, 'cannot use QemuVM#Stop on a non-started VM');
// Start indicating we're stopping, so we don't
// erroneously start trying to restart everything
// we're going to tear down in this function call.
this.SetState(VMState.Stopping);
// Kill the QEMU process and QMP/display connections if they are running.
await this.DisconnectQmp();
this.DisconnectDisplay();
await this.StopQemu();
}
async Reset() {
this.AssertState(VMState.Started, 'cannot use QemuVM#Reset on a non-started VM');
// let code know the VM is going to reset
// N.B: In the crusttest world, a reset simply amounts to a
// mean cold reboot of the qemu process basically
this.emit('reset');
await this.Stop();
await Shared.Sleep(500);
await this.Start();
}
async QmpCommand(command: string, args: any | null): Promise<any> {
return await this.qmpInstance?.Execute(command, args);
}
async MonitorCommand(command: string) {
this.AssertState(VMState.Started, 'cannot use QemuVM#MonitorCommand on a non-started VM');
return await this.QmpCommand('human-monitor-command', {
'command-line': command
});
}
async ChangeRemovableMedia(deviceName: string, imagePath: string): Promise<void> {
this.AssertState(VMState.Started, 'cannot use QemuVM#ChangeRemovableMedia on a non-started VM');
// N.B: if this throws, the code which called this should handle the error accordingly
await this.QmpCommand('blockdev-change-medium', {
device: deviceName, // techinically deprecated, but I don't feel like figuring out QOM path just for a simple function
filename: imagePath
});
}
async EjectRemovableMedia(deviceName: string) {
this.AssertState(VMState.Started, 'cannot use QemuVM#EjectRemovableMedia on a non-started VM');
await this.QmpCommand('eject', {
device: deviceName
});
}
GetDisplay() {
return this.display;
}
/// Private fun bits :)
private VMLog() {
return this.logger;
}
private AssertState(stateShouldBe: VMState, message: string) {
if (this.state !== stateShouldBe) throw new Error(message);
}
private SetState(state: VMState) {
this.state = state;
this.emit('statechange', this.state);
}
private GetQmpPath() {
return `${kVmTmpPathBase}/cvmts-${this.definition.id}-mon`;
}
private GetVncPath() {
return `${kVmTmpPathBase}/cvmts-${this.definition.id}-vnc`;
}
private async StartQemu(split: string) {
let self = this;
this.SetState(VMState.Starting);
// Start QEMU
this.qemuProcess = execaCommand(split);
this.qemuProcess.on('spawn', async () => {
self.qemuRunning = true;
await Shared.Sleep(500);
await self.ConnectQmp();
});
this.qemuProcess.on('exit', async (code) => {
self.qemuRunning = false;
// ?
if (self.qmpConnected) {
await self.DisconnectQmp();
}
self.DisconnectDisplay();
if (self.state != VMState.Stopping) {
if (code == 0) {
await Shared.Sleep(500);
await self.StartQemu(split);
} else {
self.VMLog().Error('QEMU exited with a non-zero exit code. This usually means an error in the command line. Stopping VM.');
await self.Stop();
}
} else {
this.SetState(VMState.Stopped);
}
});
}
private async StopQemu() {
if (this.qemuRunning == true) this.qemuProcess?.kill('SIGTERM');
}
private async ConnectQmp() {
let self = this;
if (!this.qmpConnected) {
self.qmpInstance = new QmpClient();
self.qmpInstance.on('close', async () => {
self.qmpConnected = false;
// If we aren't stopping, then we do actually need to care QMP disconnected
if (self.state != VMState.Stopping) {
if (self.qmpFailCount++ < kMaxFailCount) {
this.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times`);
await Shared.Sleep(500);
await self.ConnectQmp();
} else {
this.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times, giving up`);
await self.Stop();
}
}
});
self.qmpInstance.on('event', async (ev) => {
switch (ev.event) {
// Handle the STOP event sent when using -no-shutdown
case 'STOP':
await self.qmpInstance?.Execute('system_reset');
break;
case 'RESET':
await self.qmpInstance?.Execute('cont');
break;
}
});
self.qmpInstance.on('qmp-ready', async (hadError) => {
self.VMLog().Info('QMP ready');
self.display.Connect();
// QMP has been connected so the VM is ready to be considered started
self.qmpFailCount = 0;
self.qmpConnected = true;
self.SetState(VMState.Started);
});
try {
await Shared.Sleep(500);
this.qmpInstance?.ConnectUNIX(this.GetQmpPath());
} catch (err) {
// just try again
await Shared.Sleep(500);
await this.ConnectQmp();
}
}
}
private async DisconnectDisplay() {
try {
this.display?.Disconnect();
//this.display = null; // disassociate with that display object.
await unlink(this.GetVncPath());
// qemu *should* do this on its own but it really doesn't like doing so sometimes
await unlink(this.GetQmpPath());
} catch (err) {
// oh well lol
}
}
private async DisconnectQmp() {
if (this.qmpConnected) return;
if(this.qmpInstance == null)
return;
this.qmpConnected = false;
this.qmpInstance.end();
this.qmpInstance = null;
try {
await unlink(this.GetQmpPath());
} catch(err) {
}
}
}

135
qemu/src/QmpClient.ts Normal file
View File

@@ -0,0 +1,135 @@
// This was originally based off the contents of the node-qemu-qmp package,
// but I've modified it possibly to the point where it could be treated as my own creation.
import split from 'split';
import { Socket } from 'net';
export type QmpCallback = (err: Error | null, res: any | null) => void;
type QmpCommandEntry = {
callback: QmpCallback | null;
id: number;
};
// TODO: Instead of the client "Is-A"ing a Socket, this should instead contain/store a Socket,
// (preferrably) passed by the user, to use for QMP communications.
// The client shouldn't have to know or care about the protocol, and it effectively hackily uses the fact
// Socket extends EventEmitter.
export default class QmpClient extends Socket {
public qmpHandshakeData: any;
private commandEntries: QmpCommandEntry[] = [];
private lastID = 0;
private ExecuteSync(command: string, args: any | null, callback: QmpCallback | null) {
let cmd: QmpCommandEntry = {
callback: callback,
id: ++this.lastID
};
let qmpOut: any = {
execute: command,
id: cmd.id
};
if (args) qmpOut['arguments'] = args;
// Add stuff
this.commandEntries.push(cmd);
this.write(JSON.stringify(qmpOut));
}
// TODO: Make this function a bit more ergonomic?
async Execute(command: string, args: any | null = null): Promise<any> {
return new Promise((res, rej) => {
this.ExecuteSync(command, args, (err, result) => {
if (err) rej(err);
res(result);
});
});
}
private Handshake(callback: () => void) {
this.write(
JSON.stringify({
execute: 'qmp_capabilities'
})
);
this.once('data', (data) => {
// Once QEMU replies to us, the handshake is done.
// We do not negotiate anything special.
callback();
});
}
// this can probably be made async
private ConnectImpl() {
let self = this;
this.once('connect', () => {
this.removeAllListeners('error');
});
this.once('error', (err) => {
// just rethrow lol
//throw err;
console.log("you have pants: rules,", err);
});
this.once('data', (data) => {
// Handshake QMP with the server.
self.qmpHandshakeData = JSON.parse(data.toString('utf8')).QMP;
self.Handshake(() => {
// Now ready to parse QMP responses/events.
self.pipe(split(JSON.parse))
.on('data', (json: any) => {
if (json == null) return self.end();
if (json.return || json.error) {
// Our handshake has a spurious return because we never assign it an ID,
// and it is gathered by this pipe for some reason I'm not quite sure about.
// So, just for safety's sake, don't process any return objects which don't have an ID attached to them.
if (json.id == null) return;
let callbackEntry = this.commandEntries.find((entry) => entry.id === json.id);
let error: Error | null = json.error ? new Error(json.error.desc) : null;
// we somehow didn't find a callback entry for this response.
// I don't know how. Techinically not an error..., but I guess you're not getting a reponse to whatever causes this to happen
if (callbackEntry == null) return;
if (callbackEntry?.callback) callbackEntry.callback(error, json.return);
// Remove the completed callback entry.
this.commandEntries.slice(this.commandEntries.indexOf(callbackEntry));
} else if (json.event) {
this.emit('event', json);
}
})
.on('error', () => {
// Give up.
return self.end();
});
this.emit('qmp-ready');
});
});
this.once('close', () => {
this.end();
this.removeAllListeners('data'); // wow. good job bud. cool memory leak
});
}
Connect(host: string, port: number) {
super.connect(port, host);
this.ConnectImpl();
}
ConnectUNIX(path: string) {
super.connect(path);
this.ConnectImpl();
}
}

3
qemu/src/index.ts Normal file
View File

@@ -0,0 +1,3 @@
export * from './QemuDisplay.js';
export * from './QemuUtil.js';
export * from './QemuVM.js';