Merge branch 'master' into dev/proto_capability_rework

This commit is contained in:
Elijah R
2025-03-21 19:03:55 -04:00
14 changed files with 448 additions and 25 deletions

View File

@@ -13,7 +13,7 @@
"license": "GPL-3.0",
"dependencies": {
"@computernewb/nodejs-rfb": "^0.3.0",
"@computernewb/superqemu": "0.2.4",
"@computernewb/superqemu": "^0.3.0",
"@cvmts/cvm-rs": "*",
"@maxmind/geoip2-node": "^5.0.0",
"execa": "^8.0.1",

62
cvmts/src/AuditLog.ts Normal file
View File

@@ -0,0 +1,62 @@
import pino from 'pino';
import { Rank, User } from './User.js';
// Staff audit log.
// TODO:
// - Hook this up to a db or something instead of misusing pino
export class AuditLog {
private auditLogger = pino({
name: 'AuditLog',
transport: {
target: 'pino/file',
options: {
destination: './audit.log'
}
}
});
private static StaffHonorFromRank(user: User, uppercase: boolean) {
switch (user.rank) {
case Rank.Moderator:
if (uppercase) return 'Moderator';
else return 'moderator';
case Rank.Admin:
if (uppercase) return 'Administrator';
else return 'administrator';
default:
throw new Error("input user is not staff.. how'd you even get here?");
}
}
onReset(callingUser: User) {
this.auditLogger.info({ staffUsername: callingUser.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} reset the virtual machine.`);
}
onReboot(callingUser: User) {
this.auditLogger.info({ staffUsername: callingUser.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} rebooted the virtual machine.`);
}
onMute(callingUser: User, target: User, perm: boolean) {
this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username, perm: perm }, `${AuditLog.StaffHonorFromRank(callingUser, true)} muted user.`);
}
onUnmute(callingUser: User, target: User) {
this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} unmuted user.`);
}
onKick(callingUser: User, target: User) {
this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} kicked user.`);
}
onBan(callingUser: User, target: User) {
this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} banned user.`);
}
onMonitorCommand(callingUser: User, command: string) {
this.auditLogger.info({ staffUsername: callingUser.username, commandLine: command }, `${AuditLog.StaffHonorFromRank(callingUser, true)} executed monitor command.`);
}
}
export let TheAuditLog = new AuditLog();

View File

@@ -128,17 +128,17 @@ export default class CollabVMServer implements IProtocolMessageHandler {
if (newState == VMState.Started) {
self.logger.info('VM started');
// start the display
// start the display and add the events once
if (self.VM.GetDisplay() == null) {
self.VM.StartDisplay();
}
self.VM.GetDisplay()?.on('connected', () => {
// well aware this sucks but whatever
self.logger.info('started display, adding events now');
// add events
self.VM.GetDisplay()?.on('resize', (size: Size) => self.OnDisplayResized(size));
self.VM.GetDisplay()?.on('rect', (rect: Rect) => self.OnDisplayRectangle(rect));
self.VM.GetDisplay()?.on('frame', () => self.OnDisplayFrame());
});
}
}
if (newState == VMState.Stopped) {
@@ -900,7 +900,9 @@ export default class CollabVMServer implements IProtocolMessageHandler {
for (let rect of self.rectQueue) promises.push(doRect(rect));
this.rectQueue = [];
// javascript is a very solidly designed language with no holes
// or usability traps inside of it whatsoever
this.rectQueue.length = 0;
await Promise.all(promises);
}

View File

@@ -1,3 +1,4 @@
import { CgroupLimits } from './vm/qemu_launcher';
import VNCVMDef from './vm/vnc/VNCVMDef';
export default interface IConfig {
@@ -38,6 +39,7 @@ export default interface IConfig {
qemuArgs: string;
vncPort: number;
snapshots: boolean;
resourceLimits?: CgroupLimits
};
vncvm: VNCVMDef;
mysql: MySQLConfig;

View File

@@ -36,7 +36,7 @@ try {
var configRaw = fs.readFileSync('config.toml').toString();
Config = toml.parse(configRaw);
} catch (e) {
logger.error('Fatal error: Failed to read or parse the config file: {0}', (e as Error).message);
logger.error({err: e}, 'Fatal error: Failed to read or parse the config file');
process.exit(1);
}
@@ -84,7 +84,7 @@ async function start() {
vncPort: Config.qemu.vncPort,
};
VM = new QemuVMShim(def);
VM = new QemuVMShim(def, Config.qemu.resourceLimits);
break;
}
case 'vncvm': {

View File

@@ -1,11 +1,13 @@
import { WebSocket } from 'ws';
import { NetworkClient } from '../NetworkClient.js';
import EventEmitter from 'events';
import pino from 'pino';
export default class WSClient extends EventEmitter implements NetworkClient {
socket: WebSocket;
ip: string;
enforceTextOnly = true
private logger = pino({ name: "CVMTS.WebsocketClient" });
constructor(ws: WebSocket, ip: string) {
super();
@@ -22,6 +24,10 @@ export default class WSClient extends EventEmitter implements NetworkClient {
this.emit('msg', buf, isBinary);
});
this.socket.on('error', (err: Error) => {
this.logger.error(err, 'WebSocket recv error');
})
this.socket.on('close', () => {
this.emit('disconnect');
});
@@ -37,11 +43,13 @@ export default class WSClient extends EventEmitter implements NetworkClient {
send(msg: string): Promise<void> {
return new Promise((res, rej) => {
if (!this.isOpen()) res();
if (!this.isOpen()) return res();
this.socket.send(msg, (err) => {
if (err) {
rej(err);
this.logger.error(err, 'WebSocket send error');
this.close();
res();
return;
}
res();
@@ -51,11 +59,13 @@ export default class WSClient extends EventEmitter implements NetworkClient {
sendBinary(msg: Uint8Array): Promise<void> {
return new Promise((res, rej) => {
if (!this.isOpen()) res();
if (!this.isOpen()) return res();
this.socket.send(msg, (err) => {
if (err) {
rej(err);
this.logger.error(err, 'WebSocket send error');
this.close();
res();
return;
}
res();

View File

@@ -24,7 +24,7 @@ export default class WSServer extends EventEmitter implements NetworkServer {
this.Config = config;
this.clients = [];
this.httpServer = http.createServer();
this.wsServer = new WebSocketServer({ noServer: true });
this.wsServer = new WebSocketServer({ noServer: true, perMessageDeflate: false, clientTracking: false });
this.httpServer.on('upgrade', (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => this.httpOnUpgrade(req, socket, head));
this.httpServer.on('request', (req, res) => {
res.writeHead(426);

116
cvmts/src/util/cgroup.ts Normal file
View File

@@ -0,0 +1,116 @@
// Cgroup management code
// this sucks, ill mess with it later
import { appendFileSync, existsSync, mkdirSync, readFileSync, rmdirSync, writeFileSync } from 'node:fs';
import path from 'node:path';
import pino from 'pino';
let logger = pino({ name: 'CVMTS/CGroup' });
export class CGroupController {
private controller;
private cg: CGroup;
constructor(controller: string, cg: CGroup) {
this.controller = controller;
this.cg = cg;
}
WriteValue(key: string, value: string) {
try {
writeFileSync(path.join(this.cg.Path(), `${this.controller}.${key}`), value);
} catch (e) {
logger.error({ error: e, controller_name: this.controller, controller_key: `${this.controller}.${key}`, value: value }, 'Failed to set CGroup controller value');
}
}
}
export class CGroup {
private path;
constructor(path: string) {
this.path = path;
}
InitControllers(wants_cpuset: boolean) {
// Configure this "root" cgroup to provide cpu and cpuset controllers to the leaf
// QEMU cgroups. A bit iffy but whatever.
if (wants_cpuset) {
try {
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu +cpuset');
} catch (err) {
logger.error({ error: err }, 'Could not provide cpuset controller to subtree. runOnCpus will not function.');
// just give up if this fails
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu');
}
} else {
writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu');
}
}
GetController(controller: string) {
return new CGroupController(controller, this);
}
Path(): string {
return this.path;
}
HasSubgroup(name: string): boolean {
let subgroup_root = path.join(this.path, name);
if (existsSync(subgroup_root)) return true;
return false;
}
DeleteSubgroup(name: string): void {
let subgroup_root = path.join(this.path, name);
if (!this.HasSubgroup(name)) {
throw new Error(`Subgroup ${name} does not exist`);
}
//console.log("Deleting subgroup", name);
rmdirSync(subgroup_root);
}
// Gets a CGroup inside of this cgroup.
GetSubgroup(name: string): CGroup {
// make the subgroup if it doesn't already exist
let subgroup_root = path.join(this.path, name);
if (!this.HasSubgroup(name)) {
mkdirSync(subgroup_root);
// We need to make the subgroup threaded before we can attach a process to it.
// It's a bit weird, but oh well. Blame linux people, not me.
writeFileSync(path.join(subgroup_root, 'cgroup.type'), 'threaded');
}
return new CGroup(subgroup_root);
}
// Attaches a process to this cgroup.
AttachProcess(pid: number) {
appendFileSync(path.join(this.path, 'cgroup.procs'), pid.toString());
}
// Attaches a thread to this cgroup. (The CGroup is a threaded one. See above)
AttachThread(tid: number) {
appendFileSync(path.join(this.path, 'cgroup.threads'), tid.toString());
}
// Returns a CGroup instance for the process' current cgroup, prepared for subgroup usage.
// This will only fail if you are not using systemd or elogind,
// since even logind user sessions are run inside of a user@[UID] slice.
// NOTE: This only supports cgroups2-only systems. Systemd practically enforces that so /shrug
static Self(): CGroup {
const kCgroupSelfPath = '/proc/self/cgroup';
if (!existsSync(kCgroupSelfPath)) throw new Error('This process is not in a CGroup.');
let res = readFileSync(kCgroupSelfPath, { encoding: 'utf-8' });
// Make sure the first/only line is a cgroups2 0::/path/to/cgroup entry.
// Legacy cgroups1 is not supported.
if (res[0] != '0') throw new Error('CGroup.Self() does not work with cgroups 1 systems. Please do not the cgroups 1.');
let cg_path = res.substring(3, res.indexOf('\n'));
let cg = new CGroup(path.join('/sys/fs/cgroup', cg_path));
return cg;
}
}

View File

@@ -4,16 +4,39 @@ import { QemuVM, QemuVmDefinition, VMState } from '@computernewb/superqemu';
import { VMDisplay } from '../display/interface.js';
import { VncDisplay } from '../display/vnc.js';
import pino from 'pino';
import { CgroupLimits, QemuResourceLimitedLauncher } from './qemu_launcher.js';
// shim over superqemu because it diverges from the VM interface
export class QemuVMShim implements VM {
private vm;
private display: VncDisplay | null = null;
private logger;
private cg_launcher: QemuResourceLimitedLauncher | null = null;
private resource_limits: CgroupLimits | null = null;
constructor(def: QemuVmDefinition) {
this.vm = new QemuVM(def);
constructor(def: QemuVmDefinition, resourceLimits?: CgroupLimits) {
this.logger = pino({ name: `CVMTS.QemuVMShim/${def.id}` });
if (resourceLimits) {
if (process.platform == 'linux') {
this.resource_limits = resourceLimits;
this.cg_launcher = new QemuResourceLimitedLauncher(def.id, resourceLimits);
this.vm = new QemuVM(def, this.cg_launcher);
} else {
// Just use the default Superqemu launcher on non-Linux platforms,
// .. regardless of if resource control is (somehow) enabled.
this.logger.warn({ platform: process.platform }, 'Resource control is not supported on this platform. Please remove or comment it out from your configuration.');
this.vm = new QemuVM(def);
}
} else {
this.vm = new QemuVM(def);
}
this.vm.on('statechange', async (newState) => {
if (newState == VMState.Started) {
await this.PlaceVCPUThreadsIntoCGroup();
}
});
}
Start(): Promise<void> {
@@ -39,6 +62,27 @@ export class QemuVMShim implements VM {
return this.vm.MonitorCommand(command);
}
async PlaceVCPUThreadsIntoCGroup() {
let pin_vcpu_threads = false;
if (this.cg_launcher) {
// messy as all hell but oh well
if (this.resource_limits?.limitProcess == undefined) {
pin_vcpu_threads = true;
} else {
pin_vcpu_threads = !this.resource_limits?.limitProcess;
}
if (pin_vcpu_threads) {
// Get all vCPUs and pin them to the CGroup.
let cpu_res = await this.vm.QmpCommand('query-cpus-fast', {});
for (let cpu of cpu_res) {
this.logger.info(`Placing vCPU thread with TID ${cpu['thread-id']} to cgroup`);
this.cg_launcher.group.AttachThread(cpu['thread-id']);
}
}
}
}
StartDisplay(): void {
// boot it up
let info = this.vm.GetDisplayInfo();

View File

@@ -0,0 +1,144 @@
import EventEmitter from 'events';
import { IProcess, IProcessLauncher, ProcessLaunchOptions } from '@computernewb/superqemu';
import { execaCommand } from 'execa';
import { Readable, Writable } from 'stream';
import { CGroup } from '../util/cgroup.js';
export interface CgroupLimits {
cpuUsageMax?: number;
runOnCpus?: number[];
periodMs?: number;
limitProcess?: boolean;
}
interface CGroupValue {
controller: string;
key: string;
value: string;
}
function MakeValuesFromLimits(limits: CgroupLimits): CGroupValue[] {
let option_array = [];
// The default period is 100 ms, which matches cgroups2 defaults.
let periodUs = 100 * 1000;
// Convert a user-configured period to us, since that's what cgroups2 expects.
if(limits.periodMs)
periodUs = limits.periodMs * 1000;
if (limits.cpuUsageMax) {
// cpu.max
option_array.push({
controller: 'cpu',
key: 'max',
value: `${(limits.cpuUsageMax / 100) * periodUs} ${periodUs}`
});
}
if(limits.runOnCpus) {
// Make sure a CPU is not specified more than once. Bit hacky but oh well
let unique = [...new Set(limits.runOnCpus)];
option_array.push({
controller: 'cpuset',
key: 'cpus',
value: `${unique.join(',')}`
});
}
return option_array;
}
// A process automatically placed in a given cgroup.
class CGroupLimitedProcess extends EventEmitter implements IProcess {
private process;
stdin: Writable | null = null;
stdout: Readable | null = null;
stderr: Readable | null = null;
private root_cgroup: CGroup;
private cgroup: CGroup;
private id;
private limits;
constructor(cgroup_root: CGroup, id: string, limits: CgroupLimits, command: string, opts?: ProcessLaunchOptions) {
super();
this.root_cgroup = cgroup_root;
this.cgroup = cgroup_root.GetSubgroup(id);
this.id = id;
this.limits = limits;
if(!this.limits.limitProcess)
this.limits.limitProcess = false;
this.process = execaCommand(command, opts);
this.stdin = this.process.stdin;
this.stdout = this.process.stdout;
this.stderr = this.process.stderr;
let self = this;
this.process.on('spawn', () => {
self.initCgroup();
if(self.limits.limitProcess) {
// it should have one!
self.cgroup.AttachProcess(self.process.pid!);
}
self.emit('spawn');
});
this.process.on('exit', (code) => {
self.emit('exit', code);
});
}
initCgroup() {
// Set cgroup keys.
for(const val of MakeValuesFromLimits(this.limits)) {
let controller = this.cgroup.GetController(val.controller);
controller.WriteValue(val.key, val.value);
}
}
kill(signal?: number | NodeJS.Signals): boolean {
return this.process.kill(signal);
}
dispose(): void {
this.stdin = null;
this.stdout = null;
this.stderr = null;
this.root_cgroup.DeleteSubgroup(this.id);
this.process.removeAllListeners();
this.removeAllListeners();
}
}
export class QemuResourceLimitedLauncher implements IProcessLauncher {
private limits;
private name;
private root;
public group;
constructor(name: string, limits: CgroupLimits) {
this.root = CGroup.Self();
// Make sure
if(limits.runOnCpus) {
this.root.InitControllers(true);
} else {
this.root.InitControllers(false);
}
this.name = name;
this.limits = limits;
// XXX figure something better out
this.group = this.root.GetSubgroup(this.name);
}
launch(command: string, opts?: ProcessLaunchOptions | undefined): IProcess {
return new CGroupLimitedProcess(this.root, this.name, this.limits, command, opts);
}
}