Thank you, I will pull the quickstart with the lastest SDK update and try replicating my code to generate a wave file there. In the meantime:
Here is the code I have in my current solution.
This runs on the rtms started event:
import EventEmitter from 'node:events';
import rtms, { type Client, type JoinParams } from '@zoom/rtms';
import type { ServerContext } from '../../context.js';
import { type ClientEvent, type ClientEventHandler, RecordingState } from './types.js';
import { logger } from '../../logger.js';
export class ClientRegistry<T> {
private readonly registry = new Map<string, ClientInfo<T>>();
private readonly eventEmitter = new EventEmitter({ captureRejections: true });
get size() {
return this.registry.size;
}
has(id: string) {
return this.registry.has(id);
}
async startClient(context: ServerContext, id: string, params: JoinParams, data: T) {
if (this.has(id)) throw new Error(`Client with meeting uuid ${id} already exists!`);
const client = new rtms.Client();
await context.services.audio.prepare(context, id);
const audioParams = {
channel: rtms.AudioChannel.MONO,
sampleRate: rtms.AudioSampleRate.SR_16K,
codec: rtms.AudioCodec.L16,
contentType: rtms.AudioContentType.RAW_AUDIO,
dataOpt: rtms.AudioDataOption.AUDIO_MULTI_STREAMS,
duration: 20,
frameSize: 320
};
client.setAudioParams(audioParams);
client.onActiveSpeakerEvent((event, timestamp, participants)=> {
console.log(`${event} ${timestamp} ${JSON.stringify(participants)}`)
});
client.onJoinConfirm(this.eventSinkFn(context, 'join-confirm', id));
client.onAudioData(this.eventSinkFn(context, 'audio', id));
client.onLeave(this.eventSinkFn(context, 'leave', id));
client.onSessionUpdate(this.eventSinkFn(context, 'session-update', id));
client.onUserUpdate(this.eventSinkFn(context, 'user-update', id));
this.setInfo(
id,
{
meetingUuid: id,
context,
client,
startTime: Date.now(),
state: RecordingState.Recording_Connecting,
paused: new Set(),
data,
},
false
);
try {
client.join(params);
this.setState(id, RecordingState.Recording_Start);
context.log.debug(`Added client to meeting(${id})`);
} catch (joinError) {
context.log.error(`Failed to join meeting ${id}: ${JSON.stringify(joinError)}`);
this.setState(id, RecordingState.Recording_Fail);
this.removeClient(id);
}
}
getClient(id: string) {
return this.getInfo(id)?.client;
}
removeClient(id: string) {
return this.registry.delete(id);
}
getStartTime(id: string) {
return this.getInfo(id)?.startTime;
}
getState(id: string) {
return this.getInfo(id)?.state;
}
setState(id: string, state: RecordingState, extraData?: unknown) {
const info = this.getInfo(id);
if (info == null) throw new Error('Cannot set state on non-existent client');
if (info.state !== state) {
const previousState = info.state;
info.state = state;
this.eventSink(info.context, 'state-transition', id, previousState, state, extraData);
}
}
register<T extends any[]>(event: ClientEvent, listener: ClientEventHandler<T>) {
logger.info(`Registered Client listener for event(${JSON.stringify(event)})`);
const eventName = `client:${event}`;
this.eventEmitter.addListener(eventName, listener);
return {
destroy: () => {
this.eventEmitter.removeListener(eventName, listener);
},
};
}
pause(id: string, event: ClientEvent) {
this.getInfo(id)?.paused.add(event);
}
resume(id: string, event: ClientEvent) {
this.getInfo(id)?.paused.delete(event);
}
getData(id: string): T | undefined {
return this.getInfo(id)?.data;
}
updateData(id: string, data: Partial<T>): boolean {
const savedData = this.getInfo(id)?.data;
if (data == null) return false;
Object.assign(savedData ?? {}, data);
return true;
}
private getInfo(id: string): ClientInfo<T> | undefined {
return this.registry.get(id);
}
private setInfo(id: string, info: ClientInfo<T>, processChanges?: boolean) {
const lastState = this.getInfo(id)?.state;
this.registry.set(id, info);
if ((processChanges ?? true) && (lastState == null || lastState !== info.state)) {
this.eventSink(info.context, 'state-transition', id, lastState, info.state);
}
}
private eventSink(context: ServerContext, event: ClientEvent, id: string, ...args: any[]) {
if (this.getInfo(id)?.paused.has(event)) return;
this.eventEmitter.emit(`client:${event}`, context, id, ...args);
}
private eventSinkFn(context: ServerContext, event: ClientEvent, id: string) {
return (...args: any[]) => this.eventSink(context, event, id, ...args);
}
}
interface ClientInfo<T> {
readonly meetingUuid: string;
readonly context: ServerContext;
startTime: number;
readonly client: Client;
state: RecordingState;
paused: Set<ClientEvent>;
readonly data: T;
}
This is in the onAudio event:
import type { Metadata } from '@zoom/rtms';
import type { ServerContext } from '../../context.js';
import { type ClientEventHandler, RecordingState } from '../../services/client-registry/types.js';
// Debug: track packet count per meeting
const packetCounts = new Map<string, number>();
export const onAudio: ClientEventHandler = async (
context: ServerContext,
meetingUuid: string,
buffer: Buffer,
_size: number,
_timestamp: number,
metadata: Metadata
) => {
try {
const selectedParticipantId = context.services.clients.getData(meetingUuid)?.participant?.participantId;
if (
metadata.userId == null ||
!meetingUuid ||
metadata.userId === -1 ||
(selectedParticipantId != null && selectedParticipantId !== metadata.userId.toString())
) {
return;
}
// Debug: Log first few packets to understand the format
const count = (packetCounts.get(meetingUuid) ?? 0) + 1;
packetCounts.set(meetingUuid, count);
if (count <= 5) {
const firstBytes = buffer.subarray(0, Math.min(20, buffer.length));
context.log.info(`[DEBUG] Audio packet #${count}: size=${buffer.length}, first20bytes=${firstBytes.toString('hex')}`);
}
const state = context.services.clients.getState(meetingUuid);
if (state === RecordingState.Recording_Start) {
if (context.services.audio.hasReachRecordingMaxTime(context, meetingUuid)) {
context.services.clients.setState(meetingUuid, RecordingState.Recording_Stop);
context.log.info(`Max recording time reached for meeting ${meetingUuid}, stopping RTMS.`);
// BUG: Will this cause any issues? What is the flow here?
context.services.clients.pause(meetingUuid, 'audio');
await context.services.audio.processStream(context, meetingUuid);
return;
}
context.services.audio.pushBytes(context, meetingUuid, buffer, metadata.userId);
}
} catch (error: any) {
context.log.error(`Error handling audio data for ${meetingUuid}:`, error.message);
}
};
And this writes the chunks to a file:
async pushBytes(context: ServerContext, meetingUuid: string, buffer: Buffer, userId: number) {
// Get or create meeting-specific streams Map
let meetingStreams = this.writeStreams.get(meetingUuid);
if (!meetingStreams) {
meetingStreams = new Map();
this.writeStreams.set(meetingUuid, meetingStreams);
}
const filePath = this.getFilePathForUser(meetingUuid, userId);
let stream = meetingStreams.get(userId.toString());
if (!stream) {
const ambient = context.services.clients.getData(meetingUuid)?.ambient ?? false;
stream = ambient ? AudioStream.ambient(context, filePath) : AudioStream.standard(context, filePath);
meetingStreams.set(userId.toString(), stream);
}
try {
await stream.write(buffer);
// Debug statements with low sample rate
if (Math.random() < 0.01)
context.log.debug(
`Audio received for user(${userId}); Processed Seconds(${stream.bytesIn / 2 / 16000}); Saved Seconds(${stream.bytesOut / 2 / 16000})`
);
} catch (e) {
context.log.error(`Failed to write to stream, ${JSON.stringify(e)}`);
}
}
From what I can tell that file is still OPUS encoded.