diff --git a/.gitignore b/.gitignore index 6fbbf6a..ddd9804 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,7 @@ cvm-rs/target cvm-rs/index.node # geolite shit -**/geoip/ \ No newline at end of file +**/geoip/ + +# staff audit log +audit.log diff --git a/config.example.toml b/config.example.toml index d0df6f8..2506247 100644 --- a/config.example.toml +++ b/config.example.toml @@ -57,6 +57,32 @@ qemuArgs = "qemu-system-x86_64" vncPort = 5900 snapshots = true +# Resource limits. +# Only works on Linux, with `Delegate=yes` set in your .service file. No-op on other platforms. +# +# cpuUsageMax is an optional value specifies the max CPU usage as percentage in the common top notation. +# 200% means 2 CPUs, 400% is 4 CPUs. +# +# A general reccomendation is to set this to 100*[vCPU count]. +# For example, if your QEMU command line contains -smp cores=2, then cpuUsageMax should be 200. +# For an overcomitted host, you can use lower values, +# but it *can* get noticable if you throttle CPU too low. +# +# runOnCpus is an optional array that specifies what CPUs the VM is allowed to run on. +# This option will *not* work if you do not use a system service. (effectively, it will be a loud no-op that logs an error on startup). +# +# periodMs is an optional value in milliseconds that specifies the cgroup's CPU accounting period. +# The default is 100 ms (which matches the cgroups2 defaults), and should work in pretty much all cases, but +# it's a knob provided for any addl. tuning purposes. +# +# limitProcess is an optional boolean (default false) that determines if only qemu vCPU threads are put into the cgroup, +# or the entire QEMU process (incl. all its threads). The default behaviour of only limiting vCPU threads +# is more than likely what you want, so the example configuration omits specifying this key. +# +# Commenting or removing this table from the configuration disables resource limiting entirely. +resourceLimits = { cpuUsageMax = 100, runOnCpus = [ 2, 4 ] } + + # VNC options # Only used if vm.type is set to "vncvm" [vncvm] diff --git a/cvm-rs/src/guac.rs b/cvm-rs/src/guac.rs index a83517b..67b6572 100644 --- a/cvm-rs/src/guac.rs +++ b/cvm-rs/src/guac.rs @@ -92,6 +92,12 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult { // We bound this anyways and do quite the checks, so even though it's not great, // it should be generally fine (TM). loop { + // Make sure scanning the element length does not try to + // go out of bounds. + if current_position >= chars.len() { + return Err(DecodeError::InvalidFormat); + } + let c = chars[current_position]; if c >= '0' && c <= '9' { @@ -103,6 +109,7 @@ pub fn decode_instruction(element_string: &String) -> DecodeResult { return Err(DecodeError::InvalidFormat); } + current_position += 1; } @@ -189,4 +196,11 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), vec); } + + #[test] + fn out_of_bounds_element_does_not_panic() { + let element_string = "0".into(); + let res = decode_instruction(&element_string); + assert!(res.is_err()); + } } diff --git a/cvmts/package.json b/cvmts/package.json index aaa879b..fafeb5d 100644 --- a/cvmts/package.json +++ b/cvmts/package.json @@ -13,7 +13,7 @@ "license": "GPL-3.0", "dependencies": { "@computernewb/nodejs-rfb": "^0.3.0", - "@computernewb/superqemu": "0.2.4", + "@computernewb/superqemu": "^0.3.0", "@cvmts/cvm-rs": "*", "@maxmind/geoip2-node": "^5.0.0", "execa": "^8.0.1", diff --git a/cvmts/src/AuditLog.ts b/cvmts/src/AuditLog.ts new file mode 100644 index 0000000..315e8ae --- /dev/null +++ b/cvmts/src/AuditLog.ts @@ -0,0 +1,62 @@ +import pino from 'pino'; +import { Rank, User } from './User.js'; + +// Staff audit log. +// TODO: +// - Hook this up to a db or something instead of misusing pino +export class AuditLog { + private auditLogger = pino({ + name: 'AuditLog', + transport: { + target: 'pino/file', + options: { + destination: './audit.log' + } + } + }); + + private static StaffHonorFromRank(user: User, uppercase: boolean) { + switch (user.rank) { + case Rank.Moderator: + if (uppercase) return 'Moderator'; + else return 'moderator'; + + case Rank.Admin: + if (uppercase) return 'Administrator'; + else return 'administrator'; + + default: + throw new Error("input user is not staff.. how'd you even get here?"); + } + } + + onReset(callingUser: User) { + this.auditLogger.info({ staffUsername: callingUser.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} reset the virtual machine.`); + } + + onReboot(callingUser: User) { + this.auditLogger.info({ staffUsername: callingUser.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} rebooted the virtual machine.`); + } + + onMute(callingUser: User, target: User, perm: boolean) { + this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username, perm: perm }, `${AuditLog.StaffHonorFromRank(callingUser, true)} muted user.`); + } + + onUnmute(callingUser: User, target: User) { + this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} unmuted user.`); + } + + onKick(callingUser: User, target: User) { + this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} kicked user.`); + } + + onBan(callingUser: User, target: User) { + this.auditLogger.info({ staffUsername: callingUser.username, targetUsername: target.username }, `${AuditLog.StaffHonorFromRank(callingUser, true)} banned user.`); + } + + onMonitorCommand(callingUser: User, command: string) { + this.auditLogger.info({ staffUsername: callingUser.username, commandLine: command }, `${AuditLog.StaffHonorFromRank(callingUser, true)} executed monitor command.`); + } +} + +export let TheAuditLog = new AuditLog(); diff --git a/cvmts/src/CollabVMServer.ts b/cvmts/src/CollabVMServer.ts index ab209bf..8806a62 100644 --- a/cvmts/src/CollabVMServer.ts +++ b/cvmts/src/CollabVMServer.ts @@ -128,17 +128,17 @@ export default class CollabVMServer implements IProtocolMessageHandler { if (newState == VMState.Started) { self.logger.info('VM started'); - // start the display + // start the display and add the events once if (self.VM.GetDisplay() == null) { self.VM.StartDisplay(); - } - self.VM.GetDisplay()?.on('connected', () => { - // well aware this sucks but whatever + self.logger.info('started display, adding events now'); + + // add events self.VM.GetDisplay()?.on('resize', (size: Size) => self.OnDisplayResized(size)); self.VM.GetDisplay()?.on('rect', (rect: Rect) => self.OnDisplayRectangle(rect)); self.VM.GetDisplay()?.on('frame', () => self.OnDisplayFrame()); - }); + } } if (newState == VMState.Stopped) { @@ -900,7 +900,9 @@ export default class CollabVMServer implements IProtocolMessageHandler { for (let rect of self.rectQueue) promises.push(doRect(rect)); - this.rectQueue = []; + // javascript is a very solidly designed language with no holes + // or usability traps inside of it whatsoever + this.rectQueue.length = 0; await Promise.all(promises); } diff --git a/cvmts/src/IConfig.ts b/cvmts/src/IConfig.ts index 2217718..667c509 100644 --- a/cvmts/src/IConfig.ts +++ b/cvmts/src/IConfig.ts @@ -1,3 +1,4 @@ +import { CgroupLimits } from './vm/qemu_launcher'; import VNCVMDef from './vm/vnc/VNCVMDef'; export default interface IConfig { @@ -38,6 +39,7 @@ export default interface IConfig { qemuArgs: string; vncPort: number; snapshots: boolean; + resourceLimits?: CgroupLimits }; vncvm: VNCVMDef; mysql: MySQLConfig; diff --git a/cvmts/src/index.ts b/cvmts/src/index.ts index 26d89ed..eae77f6 100644 --- a/cvmts/src/index.ts +++ b/cvmts/src/index.ts @@ -36,7 +36,7 @@ try { var configRaw = fs.readFileSync('config.toml').toString(); Config = toml.parse(configRaw); } catch (e) { - logger.error('Fatal error: Failed to read or parse the config file: {0}', (e as Error).message); + logger.error({err: e}, 'Fatal error: Failed to read or parse the config file'); process.exit(1); } @@ -84,7 +84,7 @@ async function start() { vncPort: Config.qemu.vncPort, }; - VM = new QemuVMShim(def); + VM = new QemuVMShim(def, Config.qemu.resourceLimits); break; } case 'vncvm': { diff --git a/cvmts/src/net/ws/WSClient.ts b/cvmts/src/net/ws/WSClient.ts index d0c0f04..b79a465 100644 --- a/cvmts/src/net/ws/WSClient.ts +++ b/cvmts/src/net/ws/WSClient.ts @@ -1,11 +1,13 @@ import { WebSocket } from 'ws'; import { NetworkClient } from '../NetworkClient.js'; import EventEmitter from 'events'; +import pino from 'pino'; export default class WSClient extends EventEmitter implements NetworkClient { socket: WebSocket; ip: string; enforceTextOnly = true + private logger = pino({ name: "CVMTS.WebsocketClient" }); constructor(ws: WebSocket, ip: string) { super(); @@ -22,6 +24,10 @@ export default class WSClient extends EventEmitter implements NetworkClient { this.emit('msg', buf, isBinary); }); + this.socket.on('error', (err: Error) => { + this.logger.error(err, 'WebSocket recv error'); + }) + this.socket.on('close', () => { this.emit('disconnect'); }); @@ -37,11 +43,13 @@ export default class WSClient extends EventEmitter implements NetworkClient { send(msg: string): Promise { return new Promise((res, rej) => { - if (!this.isOpen()) res(); + if (!this.isOpen()) return res(); this.socket.send(msg, (err) => { if (err) { - rej(err); + this.logger.error(err, 'WebSocket send error'); + this.close(); + res(); return; } res(); @@ -51,11 +59,13 @@ export default class WSClient extends EventEmitter implements NetworkClient { sendBinary(msg: Uint8Array): Promise { return new Promise((res, rej) => { - if (!this.isOpen()) res(); + if (!this.isOpen()) return res(); this.socket.send(msg, (err) => { if (err) { - rej(err); + this.logger.error(err, 'WebSocket send error'); + this.close(); + res(); return; } res(); diff --git a/cvmts/src/net/ws/WSServer.ts b/cvmts/src/net/ws/WSServer.ts index 7660424..8436e63 100644 --- a/cvmts/src/net/ws/WSServer.ts +++ b/cvmts/src/net/ws/WSServer.ts @@ -24,7 +24,7 @@ export default class WSServer extends EventEmitter implements NetworkServer { this.Config = config; this.clients = []; this.httpServer = http.createServer(); - this.wsServer = new WebSocketServer({ noServer: true }); + this.wsServer = new WebSocketServer({ noServer: true, perMessageDeflate: false, clientTracking: false }); this.httpServer.on('upgrade', (req: http.IncomingMessage, socket: internal.Duplex, head: Buffer) => this.httpOnUpgrade(req, socket, head)); this.httpServer.on('request', (req, res) => { res.writeHead(426); diff --git a/cvmts/src/util/cgroup.ts b/cvmts/src/util/cgroup.ts new file mode 100644 index 0000000..7c97e89 --- /dev/null +++ b/cvmts/src/util/cgroup.ts @@ -0,0 +1,116 @@ +// Cgroup management code +// this sucks, ill mess with it later + +import { appendFileSync, existsSync, mkdirSync, readFileSync, rmdirSync, writeFileSync } from 'node:fs'; +import path from 'node:path'; +import pino from 'pino'; + +let logger = pino({ name: 'CVMTS/CGroup' }); + +export class CGroupController { + private controller; + private cg: CGroup; + + constructor(controller: string, cg: CGroup) { + this.controller = controller; + this.cg = cg; + } + + WriteValue(key: string, value: string) { + try { + writeFileSync(path.join(this.cg.Path(), `${this.controller}.${key}`), value); + } catch (e) { + logger.error({ error: e, controller_name: this.controller, controller_key: `${this.controller}.${key}`, value: value }, 'Failed to set CGroup controller value'); + } + } +} + +export class CGroup { + private path; + + constructor(path: string) { + this.path = path; + } + + InitControllers(wants_cpuset: boolean) { + // Configure this "root" cgroup to provide cpu and cpuset controllers to the leaf + // QEMU cgroups. A bit iffy but whatever. + if (wants_cpuset) { + try { + writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu +cpuset'); + } catch (err) { + logger.error({ error: err }, 'Could not provide cpuset controller to subtree. runOnCpus will not function.'); + // just give up if this fails + writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu'); + } + } else { + writeFileSync(path.join(this.path, 'cgroup.subtree_control'), '+cpu'); + } + } + + GetController(controller: string) { + return new CGroupController(controller, this); + } + + Path(): string { + return this.path; + } + + HasSubgroup(name: string): boolean { + let subgroup_root = path.join(this.path, name); + if (existsSync(subgroup_root)) return true; + return false; + } + + DeleteSubgroup(name: string): void { + let subgroup_root = path.join(this.path, name); + if (!this.HasSubgroup(name)) { + throw new Error(`Subgroup ${name} does not exist`); + } + + //console.log("Deleting subgroup", name); + rmdirSync(subgroup_root); + } + + // Gets a CGroup inside of this cgroup. + GetSubgroup(name: string): CGroup { + // make the subgroup if it doesn't already exist + let subgroup_root = path.join(this.path, name); + if (!this.HasSubgroup(name)) { + mkdirSync(subgroup_root); + // We need to make the subgroup threaded before we can attach a process to it. + // It's a bit weird, but oh well. Blame linux people, not me. + writeFileSync(path.join(subgroup_root, 'cgroup.type'), 'threaded'); + } + return new CGroup(subgroup_root); + } + + // Attaches a process to this cgroup. + AttachProcess(pid: number) { + appendFileSync(path.join(this.path, 'cgroup.procs'), pid.toString()); + } + + // Attaches a thread to this cgroup. (The CGroup is a threaded one. See above) + AttachThread(tid: number) { + appendFileSync(path.join(this.path, 'cgroup.threads'), tid.toString()); + } + + // Returns a CGroup instance for the process' current cgroup, prepared for subgroup usage. + // This will only fail if you are not using systemd or elogind, + // since even logind user sessions are run inside of a user@[UID] slice. + // NOTE: This only supports cgroups2-only systems. Systemd practically enforces that so /shrug + static Self(): CGroup { + const kCgroupSelfPath = '/proc/self/cgroup'; + if (!existsSync(kCgroupSelfPath)) throw new Error('This process is not in a CGroup.'); + let res = readFileSync(kCgroupSelfPath, { encoding: 'utf-8' }); + + // Make sure the first/only line is a cgroups2 0::/path/to/cgroup entry. + // Legacy cgroups1 is not supported. + if (res[0] != '0') throw new Error('CGroup.Self() does not work with cgroups 1 systems. Please do not the cgroups 1.'); + let cg_path = res.substring(3, res.indexOf('\n')); + + let cg = new CGroup(path.join('/sys/fs/cgroup', cg_path)); + + return cg; + } +} diff --git a/cvmts/src/vm/qemu.ts b/cvmts/src/vm/qemu.ts index 38eab09..a0b4b00 100644 --- a/cvmts/src/vm/qemu.ts +++ b/cvmts/src/vm/qemu.ts @@ -4,16 +4,39 @@ import { QemuVM, QemuVmDefinition, VMState } from '@computernewb/superqemu'; import { VMDisplay } from '../display/interface.js'; import { VncDisplay } from '../display/vnc.js'; import pino from 'pino'; +import { CgroupLimits, QemuResourceLimitedLauncher } from './qemu_launcher.js'; // shim over superqemu because it diverges from the VM interface export class QemuVMShim implements VM { private vm; private display: VncDisplay | null = null; private logger; + private cg_launcher: QemuResourceLimitedLauncher | null = null; + private resource_limits: CgroupLimits | null = null; - constructor(def: QemuVmDefinition) { - this.vm = new QemuVM(def); + constructor(def: QemuVmDefinition, resourceLimits?: CgroupLimits) { this.logger = pino({ name: `CVMTS.QemuVMShim/${def.id}` }); + + if (resourceLimits) { + if (process.platform == 'linux') { + this.resource_limits = resourceLimits; + this.cg_launcher = new QemuResourceLimitedLauncher(def.id, resourceLimits); + this.vm = new QemuVM(def, this.cg_launcher); + } else { + // Just use the default Superqemu launcher on non-Linux platforms, + // .. regardless of if resource control is (somehow) enabled. + this.logger.warn({ platform: process.platform }, 'Resource control is not supported on this platform. Please remove or comment it out from your configuration.'); + this.vm = new QemuVM(def); + } + } else { + this.vm = new QemuVM(def); + } + + this.vm.on('statechange', async (newState) => { + if (newState == VMState.Started) { + await this.PlaceVCPUThreadsIntoCGroup(); + } + }); } Start(): Promise { @@ -39,6 +62,27 @@ export class QemuVMShim implements VM { return this.vm.MonitorCommand(command); } + async PlaceVCPUThreadsIntoCGroup() { + let pin_vcpu_threads = false; + if (this.cg_launcher) { + // messy as all hell but oh well + if (this.resource_limits?.limitProcess == undefined) { + pin_vcpu_threads = true; + } else { + pin_vcpu_threads = !this.resource_limits?.limitProcess; + } + + if (pin_vcpu_threads) { + // Get all vCPUs and pin them to the CGroup. + let cpu_res = await this.vm.QmpCommand('query-cpus-fast', {}); + for (let cpu of cpu_res) { + this.logger.info(`Placing vCPU thread with TID ${cpu['thread-id']} to cgroup`); + this.cg_launcher.group.AttachThread(cpu['thread-id']); + } + } + } + } + StartDisplay(): void { // boot it up let info = this.vm.GetDisplayInfo(); diff --git a/cvmts/src/vm/qemu_launcher.ts b/cvmts/src/vm/qemu_launcher.ts new file mode 100644 index 0000000..54134d2 --- /dev/null +++ b/cvmts/src/vm/qemu_launcher.ts @@ -0,0 +1,144 @@ +import EventEmitter from 'events'; +import { IProcess, IProcessLauncher, ProcessLaunchOptions } from '@computernewb/superqemu'; +import { execaCommand } from 'execa'; +import { Readable, Writable } from 'stream'; +import { CGroup } from '../util/cgroup.js'; + +export interface CgroupLimits { + cpuUsageMax?: number; + runOnCpus?: number[]; + periodMs?: number; + limitProcess?: boolean; +} + +interface CGroupValue { + controller: string; + key: string; + value: string; +} + +function MakeValuesFromLimits(limits: CgroupLimits): CGroupValue[] { + let option_array = []; + + // The default period is 100 ms, which matches cgroups2 defaults. + let periodUs = 100 * 1000; + + // Convert a user-configured period to us, since that's what cgroups2 expects. + if(limits.periodMs) + periodUs = limits.periodMs * 1000; + + if (limits.cpuUsageMax) { + // cpu.max + option_array.push({ + controller: 'cpu', + key: 'max', + value: `${(limits.cpuUsageMax / 100) * periodUs} ${periodUs}` + }); + } + + if(limits.runOnCpus) { + // Make sure a CPU is not specified more than once. Bit hacky but oh well + let unique = [...new Set(limits.runOnCpus)]; + option_array.push({ + controller: 'cpuset', + key: 'cpus', + value: `${unique.join(',')}` + }); + } + + return option_array; +} + +// A process automatically placed in a given cgroup. +class CGroupLimitedProcess extends EventEmitter implements IProcess { + private process; + stdin: Writable | null = null; + stdout: Readable | null = null; + stderr: Readable | null = null; + private root_cgroup: CGroup; + private cgroup: CGroup; + private id; + private limits; + + constructor(cgroup_root: CGroup, id: string, limits: CgroupLimits, command: string, opts?: ProcessLaunchOptions) { + super(); + this.root_cgroup = cgroup_root; + this.cgroup = cgroup_root.GetSubgroup(id); + this.id = id; + this.limits = limits; + + if(!this.limits.limitProcess) + this.limits.limitProcess = false; + + this.process = execaCommand(command, opts); + + this.stdin = this.process.stdin; + this.stdout = this.process.stdout; + this.stderr = this.process.stderr; + + let self = this; + this.process.on('spawn', () => { + self.initCgroup(); + + if(self.limits.limitProcess) { + // it should have one! + self.cgroup.AttachProcess(self.process.pid!); + } + self.emit('spawn'); + }); + + this.process.on('exit', (code) => { + self.emit('exit', code); + }); + } + + initCgroup() { + // Set cgroup keys. + for(const val of MakeValuesFromLimits(this.limits)) { + let controller = this.cgroup.GetController(val.controller); + controller.WriteValue(val.key, val.value); + } + } + + kill(signal?: number | NodeJS.Signals): boolean { + return this.process.kill(signal); + } + + dispose(): void { + this.stdin = null; + this.stdout = null; + this.stderr = null; + + this.root_cgroup.DeleteSubgroup(this.id); + this.process.removeAllListeners(); + this.removeAllListeners(); + } +} + +export class QemuResourceLimitedLauncher implements IProcessLauncher { + private limits; + private name; + private root; + public group; + + constructor(name: string, limits: CgroupLimits) { + this.root = CGroup.Self(); + + // Make sure + if(limits.runOnCpus) { + this.root.InitControllers(true); + } else { + this.root.InitControllers(false); + } + + this.name = name; + this.limits = limits; + + // XXX figure something better out + this.group = this.root.GetSubgroup(this.name); + } + + launch(command: string, opts?: ProcessLaunchOptions | undefined): IProcess { + return new CGroupLimitedProcess(this.root, this.name, this.limits, command, opts); + } +} diff --git a/yarn.lock b/yarn.lock index ded4934..1710db0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -41,13 +41,13 @@ __metadata: languageName: node linkType: hard -"@computernewb/superqemu@npm:0.2.4": - version: 0.2.4 - resolution: "@computernewb/superqemu@npm:0.2.4" +"@computernewb/superqemu@npm:^0.3.0": + version: 0.3.2 + resolution: "@computernewb/superqemu@npm:0.3.2" dependencies: execa: "npm:^8.0.1" pino: "npm:^9.3.1" - checksum: 10c0/9ed3190bd95c60a6f74fbb6d29cbd9909ff18b04d64b5a09c02dec91169304f439d7b0ac91848b69621066810cdfef4a0dbf97075938ee40b3aebd74376b4440 + checksum: 10c0/845f1732f1e92b19bbf09b4bfc75381e707d367902535b1d520f1dc323e57f97cdf56d37a2d98e79c99443222224276d488d920e34010d199d798da7c564f7d1 languageName: node linkType: hard @@ -75,7 +75,7 @@ __metadata: resolution: "@cvmts/cvmts@workspace:cvmts" dependencies: "@computernewb/nodejs-rfb": "npm:^0.3.0" - "@computernewb/superqemu": "npm:0.2.4" + "@computernewb/superqemu": "npm:^0.3.0" "@cvmts/cvm-rs": "npm:*" "@maxmind/geoip2-node": "npm:^5.0.0" "@types/node": "npm:^20.12.5" @@ -2944,12 +2944,12 @@ __metadata: linkType: hard "micromatch@npm:^4.0.5": - version: 4.0.7 - resolution: "micromatch@npm:4.0.7" + version: 4.0.8 + resolution: "micromatch@npm:4.0.8" dependencies: braces: "npm:^3.0.3" picomatch: "npm:^2.3.1" - checksum: 10c0/58fa99bc5265edec206e9163a1d2cec5fabc46a5b473c45f4a700adce88c2520456ae35f2b301e4410fb3afb27e9521fb2813f6fc96be0a48a89430e0916a772 + checksum: 10c0/166fa6eb926b9553f32ef81f5f531d27b4ce7da60e5baf8c021d043b27a388fb95e46a8038d5045877881e673f8134122b59624d5cecbd16eb50a42e7a6b5ca8 languageName: node linkType: hard