qemu: Switch to QMP over stdio

Simply a more convinent pipe. Additionally, because the pipe will only break when the process exits,
this means we can now remove QMP reconnection logic entirely. Can't exactly have problems
when the problem code is factored out ;)
This commit is contained in:
modeco80
2024-07-14 19:04:19 -04:00
parent e28bb3a9d7
commit 7413059193
2 changed files with 30 additions and 75 deletions

View File

@@ -6,6 +6,7 @@ import { unlink } from 'node:fs/promises';
import * as Shared from '@cvmts/shared'; import * as Shared from '@cvmts/shared';
import { Socket, connect } from 'net'; import { Socket, connect } from 'net';
import { Readable, Stream, Writable } from 'stream';
export enum VMState { export enum VMState {
Stopped, Stopped,
@@ -14,8 +15,6 @@ export enum VMState {
Stopping Stopping
} }
// TODO: Add bits to this to allow usage (optionally)
// of VNC/QMP port. This will be needed to fix up Windows support.
export type QemuVmDefinition = { export type QemuVmDefinition = {
id: string; id: string;
command: string; command: string;
@@ -25,26 +24,24 @@ export type QemuVmDefinition = {
/// Temporary path base (for UNIX sockets/etc.) /// Temporary path base (for UNIX sockets/etc.)
const kVmTmpPathBase = `/tmp`; const kVmTmpPathBase = `/tmp`;
/// The max amount of times QMP connection is allowed to fail before // writer implementation for process standard I/O
/// the VM is forcefully stopped. class StdioWriter implements IQmpClientWriter {
const kMaxFailCount = 5; stdout;
stdin;
// writer implementation for net.Socket
class SocketWriter implements IQmpClientWriter {
socket;
client; client;
constructor(socket: Socket, client: QmpClient) { constructor(stdout: Readable, stdin: Writable, client: QmpClient) {
this.socket = socket; this.stdout = stdout;
this.stdin = stdin;
this.client = client; this.client = client;
this.socket.on('data', (data) => { this.stdout.on('data', (data) => {
this.client.feed(data); this.client.feed(data);
}); });
} }
writeSome(buffer: Buffer) { writeSome(buffer: Buffer) {
this.socket.write(buffer); this.stdin.write(buffer);
} }
} }
@@ -53,8 +50,6 @@ export class QemuVM extends EventEmitter {
// QMP stuff. // QMP stuff.
private qmpInstance: QmpClient = new QmpClient(); private qmpInstance: QmpClient = new QmpClient();
private qmpSocket: Socket | null = null;
private qmpFailCount = 0;
private qemuProcess: ExecaChildProcess | null = null; private qemuProcess: ExecaChildProcess | null = null;
@@ -91,8 +86,7 @@ export class QemuVM extends EventEmitter {
self.SetState(VMState.Started); self.SetState(VMState.Started);
}) })
// now that we've connected to VNC, connect to the display // now that QMP has connected, connect to the display
self.qmpFailCount = 0;
self.display?.Connect(); self.display?.Connect();
}); });
} }
@@ -108,7 +102,7 @@ export class QemuVM extends EventEmitter {
if (!this.addedAdditionalArguments) { if (!this.addedAdditionalArguments) {
cmd += ' -no-shutdown'; cmd += ' -no-shutdown';
if (this.definition.snapshot) cmd += ' -snapshot'; if (this.definition.snapshot) cmd += ' -snapshot';
cmd += ` -qmp unix:${this.GetQmpPath()},server,wait -vnc unix:${this.GetVncPath()}`; cmd += ` -qmp stdio -vnc unix:${this.GetVncPath()}`;
this.definition.command = cmd; this.definition.command = cmd;
this.addedAdditionalArguments = true; this.addedAdditionalArguments = true;
} }
@@ -189,11 +183,6 @@ export class QemuVM extends EventEmitter {
private SetState(state: VMState) { private SetState(state: VMState) {
this.state = state; this.state = state;
this.emit('statechange', this.state); this.emit('statechange', this.state);
// reset QMP fail count when the VM is (re)starting or stopped
if (this.state == VMState.Stopped || this.state == VMState.Starting) {
this.qmpFailCount = 0;
}
} }
private GetQmpPath() { private GetQmpPath() {
@@ -212,7 +201,11 @@ export class QemuVM extends EventEmitter {
this.VMLog().Info(`Starting QEMU with command \"${split}\"`); this.VMLog().Info(`Starting QEMU with command \"${split}\"`);
// Start QEMU // Start QEMU
this.qemuProcess = execaCommand(split); this.qemuProcess = execaCommand(split, {
stdin: 'pipe',
stdout: 'pipe',
stderr: 'pipe'
});
this.qemuProcess.stderr?.on('data', (data) => { this.qemuProcess.stderr?.on('data', (data) => {
self.VMLog().Error('QEMU stderr: {0}', data.toString('utf8')); self.VMLog().Error('QEMU stderr: {0}', data.toString('utf8'));
@@ -220,8 +213,7 @@ export class QemuVM extends EventEmitter {
this.qemuProcess.on('spawn', async () => { this.qemuProcess.on('spawn', async () => {
self.VMLog().Info('QEMU started'); self.VMLog().Info('QEMU started');
await Shared.Sleep(500); await self.QmpStdioInit();
await self.ConnectQmp();
}); });
this.qemuProcess.on('exit', async (code) => { this.qemuProcess.on('exit', async (code) => {
@@ -230,15 +222,14 @@ export class QemuVM extends EventEmitter {
// Disconnect from the display and QMP connections. // Disconnect from the display and QMP connections.
await self.DisconnectDisplay(); await self.DisconnectDisplay();
// Remove the sockets for VNC and QMP. self.qmpInstance.reset();
self.qmpInstance.setWriter(null);
// Remove the VNC UDS socket.
try { try {
await unlink(this.GetVncPath()); await unlink(this.GetVncPath());
} catch (_) {} } catch (_) {}
try {
await unlink(this.GetQmpPath());
} catch (_) {}
if (self.state != VMState.Stopping) { if (self.state != VMState.Stopping) {
if (code == 0) { if (code == 0) {
// Wait a bit and restart QEMU. // Wait a bit and restart QEMU.
@@ -264,51 +255,15 @@ export class QemuVM extends EventEmitter {
} }
} }
private async ConnectQmp() { private async QmpStdioInit() {
let self = this; let self = this;
if (this.qmpSocket) { self.VMLog().Info("Initializing QMP over stdio");
// This isn't really a problem (since we gate it)
// but I'd like to see if i could eliminate this
this.VMLog().Warning('QemuVM.ConnectQmp(): Already connected to QMP socket!');
return;
}
await Shared.Sleep(500);
this.qmpSocket = connect(this.GetQmpPath());
this.qmpSocket.on('close', async () => {
self.qmpSocket?.removeAllListeners();
self.qmpSocket = null;
// If we aren't stopping (i.e: disconnection wasn't because we disconnected),
// then we should care QMP disconnected
if (self.state != VMState.Stopping) {
if (self.qmpFailCount++ < kMaxFailCount) {
self.VMLog().Error(`Failed to connect to QMP ${self.qmpFailCount} times.`);
await Shared.Sleep(500);
await self.ConnectQmp();
} else {
self.VMLog().Error(`Reached max retries, giving up.`);
await self.Stop();
return;
}
}
});
this.qmpSocket.on('error', (e: Error) => {
self.VMLog().Error('QMP socket error: {0}', e.message);
});
this.qmpSocket.on('connect', () => {
self.VMLog().Info("Connected to QMP socket");
// Setup the QMP client. // Setup the QMP client.
let writer = new SocketWriter(self.qmpSocket!, self.qmpInstance); let writer = new StdioWriter(this.qemuProcess?.stdout!, this.qemuProcess?.stdin!, self.qmpInstance);
self.qmpInstance.reset(); self.qmpInstance.reset();
self.qmpInstance.setWriter(writer); self.qmpInstance.setWriter(writer);
})
} }
private async DisconnectDisplay() { private async DisconnectDisplay() {

View File

@@ -31,7 +31,7 @@ export enum QmpEvent {
Stop = 'STOP', Stop = 'STOP',
VncConnected = 'VNC_CONNECTED', VncConnected = 'VNC_CONNECTED',
VncDisconnected = 'VNC_DISCONNECTED', VncDisconnected = 'VNC_DISCONNECTED',
VncInitalized = 'VNC_INITALIZED', VncInitialized = 'VNC_INITIALIZED',
Watchdog = 'WATCHDOG' Watchdog = 'WATCHDOG'
} }
@@ -79,7 +79,7 @@ export class QmpClient extends EventEmitter {
}); });
} }
setWriter(writer: IQmpClientWriter) { setWriter(writer: IQmpClientWriter|null) {
this.writer = writer; this.writer = writer;
} }