1. How are you feeding frames to FFmpeg?
Using pipes.
- Video input:
pipe:3 (stdio[3])
- Audio input:
pipe:4 (stdio[4])
2. Are you setting explicit presentation timestamps (PTS)?
PTS is adjusted via filters.
Tracking:
if (msg.msg_type === MessageType.MEDIA_DATA_AUDIO && msg.content?.data) {
const buffer = Buffer.from(msg.content.data, 'base64');
const timestamp = msg.content.timestamp;
// Track base timestamp for synchronization
if (this._baseAudioTimestamp === undefined) {
this._baseAudioTimestamp = timestamp;
logger.debug(`${logPrefix} First audio packet timestamp: ${timestamp}`);
}
this._lastAudioTimestamp = timestamp;
this._ffmpegHandler.processAudioPacket({ data: buffer, timestamp });
return;
}
if (msg.msg_type === MessageType.MEDIA_DATA_VIDEO && msg.content?.data) {
const buffer = Buffer.from(msg.content.data, 'base64');
const timestamp = msg.content.timestamp;
// Track base timestamp for synchronization
if (this._baseVideoTimestamp === undefined) {
this._baseVideoTimestamp = timestamp;
logger.debug(`${logPrefix} First video packet timestamp: ${timestamp}`);
}
this._lastVideoTimestamp = timestamp;
Offset calculation and PTS adjustment:
private configureOffset(offsetSeconds: number): { videoFilter: string; audioFilter: string } {
let videoFilter =
'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2,setsar=1';
let audioFilter = 'aresample=48000,aformat=channel_layouts=stereo';
/*
* offsetSeconds is the difference in seconds between the video and audio arrival times.
* If offsetSeconds is positive, the audio arrived before the video.
* If offsetSeconds is negative, the video arrived before the audio.
* If offsetSeconds is zero, the video and audio arrived at the same time.
*
* We buffer packets from whichever arrives first. When we flush the buffer,
* the early-arriving stream gets a "head start" in PTS timeline.
* We need to shift the late-arriving stream's PTS forward to catch up.
*/
if (offsetSeconds > 0) {
// Shift video PTS forward by offsetSeconds to match audio
videoFilter = `setpts=PTS+${offsetSeconds}/TB,${videoFilter}`;
audioFilter = `asetpts=PTS,${audioFilter}`;
logger.debug(`${logPrefix} Configuring offset: Video PTS shifted forward by ${offsetSeconds.toFixed(3)}s`);
} else if (offsetSeconds < 0) {
// Shift audio PTS forward by abs(offsetSeconds) to match video
videoFilter = `setpts=PTS,${videoFilter}`;
audioFilter = `asetpts=PTS+${Math.abs(offsetSeconds)}/TB,${audioFilter}`;
logger.debug(
`${logPrefix} Configuring offset: Audio PTS shifted forward by ${Math.abs(offsetSeconds).toFixed(3)}s`,
);
} else {
// Reset both to start at 0
videoFilter = `setpts=PTS-STARTPTS,${videoFilter}`;
audioFilter = `asetpts=PTS-STARTPTS,${audioFilter}`;
logger.debug(`${logPrefix} Configuring offset: No offset (streams already synchronized)`);
}
return { videoFilter, audioFilter };
}
3. FFmpeg command/config
Built here:
public configureStream(offsetSeconds: number): void {
const { videoFilter, audioFilter } = this.configureOffset(offsetSeconds);
const processFlags = ['-filter_complex', `[0:v]${videoFilter}[vout];[1:a]${audioFilter}[aout]`];
const ffmpegCommand = [
...this.videoInputFlags,
...this.audioInputFlags,
...processFlags,
...this.mapFlags,
...this.videoEncodeFlags,
...this.audioEncodeFlags,
...this.syncFlags,
...this.whipOutputFlags,
];
this.spawnProcess(ffmpegCommand);
}
Sync flags:
private get syncFlags(): string[] {
return ['-async', '1', '-fps_mode', 'cfr'];
}
Here is my ffmpeg.ts file contents
import type { ChildProcess, StdioOptions } from 'node:child_process';
import { spawn } from 'node:child_process';
import type internal from 'node:stream';
import logger from '~/lib/useConsoleLogger';
import fs from 'node:fs';
import path from 'node:path';
type BufferedPacket = {
data: Buffer;
timestamp: number;
};
const logPrefix = 'FfmpegHandler:';
const resourcesPath = path.join(import.meta.dirname, '..', '..', '..', 'resources');
const blackFramesLoopPath = path.join(resourcesPath, 'black_frames_loop.h264');
const blackFramesLoop = fs.readFileSync(blackFramesLoopPath);
export class FfmpegHandler {
private _token: string;
private _ffmpeg?: ChildProcess;
private _videoStream?: internal.Writable;
private _audioStream?: internal.Writable;
private _videoBuffer: BufferedPacket[] = [];
private _audioBuffer: BufferedPacket[] = [];
private _blackFrameInterval?: NodeJS.Timeout;
private _shouldInjectBlackFrames = true;
constructor(token: string) {
this._token = token;
}
private get videoInputFlags(): string[] {
return ['-framerate', '25', '-f', 'h264', '-i', 'pipe:3'];
}
private get videoEncodeFlags(): string[] {
return [
'-c:v',
'libx264',
'-profile:v',
'baseline',
'-preset',
'veryfast',
'-tune',
'zerolatency',
'-g',
'50', // 25 fps * 2 sec
'-keyint_min',
'50',
'-sc_threshold',
'0',
'-b:v',
'3000k', // 720p bitrate
'-maxrate',
'3000k',
'-bufsize',
'6000k',
'-force_key_frames',
'expr:gte(t-prev_forced_t,2)', // Force I-frame every 2 seconds
];
}
private get audioInputFlags(): string[] {
return ['-f', 's16le', '-ar', '16000', '-ac', '1', '-i', 'pipe:4'];
}
private get audioEncodeFlags(): string[] {
return ['-c:a', 'libopus', '-b:a', '128k', '-ar', '48000', '-ac', '2'];
}
private get mapFlags(): string[] {
return ['-map', '[vout]', '-map', '[aout]'];
}
private get syncFlags(): string[] {
return ['-async', '1', '-fps_mode', 'cfr'];
}
private get whipOutputFlags(): string[] {
return ['-f', 'whip', '-authorization', this._token, 'https://global.whip.live-video.net'];
}
private get stdioOptions(): StdioOptions {
return ['ignore', 'inherit', 'inherit', 'pipe', 'pipe'];
}
private flushBuffers(): void {
logger.debug(
`${logPrefix} Flushing buffered packets (audio: ${this._audioBuffer.length}, video: ${this._videoBuffer.length})`,
);
for (const packet of this._audioBuffer) {
if (this._audioStream) {
this._audioStream.write(packet.data);
}
}
for (const packet of this._videoBuffer) {
if (this._videoStream) {
this._videoStream.write(packet.data);
}
}
this._audioBuffer = [];
this._videoBuffer = [];
logger.debug(`${logPrefix} FFmpeg started and buffers flushed successfully`);
}
private spawnProcess(command: string[]): void {
logger.debug(`${logPrefix} Command spawned: /opt/ffmpeg-whip/bin/ffmpeg ${command.join(' ')}`);
this._ffmpeg = spawn('/opt/ffmpeg-whip/bin/ffmpeg', command, { stdio: this.stdioOptions });
this._videoStream = this._ffmpeg.stdio[3] as internal.Writable;
this._audioStream = this._ffmpeg.stdio[4] as internal.Writable;
this._ffmpeg.on('spawn', () => {
logger.info(`${logPrefix} Process spawned`);
this.flushBuffers();
// Only inject black frames on initial start, not on restart
if (this._shouldInjectBlackFrames) {
this.startBlackFrameInjection();
}
});
this._ffmpeg.on('error', (err) => logger.error(new Error(`${logPrefix} Process error: ${err.message}`)));
this._ffmpeg.on('exit', (code, signal) => {
if (code !== 0) {
logger.error(new Error(`${logPrefix} Process exited with error ${JSON.stringify({ code, signal })}`));
return;
}
logger.info(`${logPrefix} Process exited successfully ${JSON.stringify({ code, signal })}`);
});
this._ffmpeg.stdout?.on('data', (data: Buffer) => {
const message = data.toString().trim();
if (!message) return;
logger.debug(`${logPrefix} Process stdout: ${message}`);
});
this._ffmpeg.stderr?.on('data', (data: Buffer) => {
const message = data.toString().trim();
if (!message) return;
logger.debug(`${logPrefix} Process stderr: ${message}`);
});
}
private configureOffset(offsetSeconds: number): { videoFilter: string; audioFilter: string } {
let videoFilter =
'scale=1280:720:force_original_aspect_ratio=decrease,pad=1280:720:(ow-iw)/2:(oh-ih)/2,setsar=1';
let audioFilter = 'aresample=48000,aformat=channel_layouts=stereo';
/*
* offsetSeconds is the difference in seconds between the video and audio arrival times.
* If offsetSeconds is positive, the audio arrived before the video.
* If offsetSeconds is negative, the video arrived before the audio.
* If offsetSeconds is zero, the video and audio arrived at the same time.
*
* We buffer packets from whichever arrives first. When we flush the buffer,
* the early-arriving stream gets a "head start" in PTS timeline.
* We need to shift the late-arriving stream's PTS forward to catch up.
*/
if (offsetSeconds > 0) {
// Shift video PTS forward by offsetSeconds to match audio
videoFilter = `setpts=PTS+${offsetSeconds}/TB,${videoFilter}`;
audioFilter = `asetpts=PTS,${audioFilter}`;
logger.debug(`${logPrefix} Configuring offset: Video PTS shifted forward by ${offsetSeconds.toFixed(3)}s`);
} else if (offsetSeconds < 0) {
// Shift audio PTS forward by abs(offsetSeconds) to match video
videoFilter = `setpts=PTS,${videoFilter}`;
audioFilter = `asetpts=PTS+${Math.abs(offsetSeconds)}/TB,${audioFilter}`;
logger.debug(
`${logPrefix} Configuring offset: Audio PTS shifted forward by ${Math.abs(offsetSeconds).toFixed(3)}s`,
);
} else {
// Reset both to start at 0
videoFilter = `setpts=PTS-STARTPTS,${videoFilter}`;
audioFilter = `asetpts=PTS-STARTPTS,${audioFilter}`;
logger.debug(`${logPrefix} Configuring offset: No offset (streams already synchronized)`);
}
return { videoFilter, audioFilter };
}
get started(): boolean {
return this._ffmpeg !== undefined;
}
public configureStream(offsetSeconds: number): void {
const { videoFilter, audioFilter } = this.configureOffset(offsetSeconds);
const processFlags = ['-filter_complex', `[0:v]${videoFilter}[vout];[1:a]${audioFilter}[aout]`];
const ffmpegCommand = [
...this.videoInputFlags,
...this.audioInputFlags,
...processFlags,
...this.mapFlags,
...this.videoEncodeFlags,
...this.audioEncodeFlags,
...this.syncFlags,
...this.whipOutputFlags,
];
this.spawnProcess(ffmpegCommand);
}
public startBlackFrameInjection() {
if (!this._videoStream || this._blackFrameInterval) return;
logger.debug(`${logPrefix} Starting black frame injection loop`);
this._videoStream.write(blackFramesLoop);
this._blackFrameInterval = setInterval(() => {
if (!this._videoStream) return;
this._videoStream.write(blackFramesLoop);
}, 2000); // 2 second loop to match your black frame file duration
}
public stopBlackFrameInjection() {
if (!this._blackFrameInterval) return;
logger.debug(`${logPrefix} Stopped black frame injection loop`);
clearInterval(this._blackFrameInterval);
this._blackFrameInterval = undefined;
}
public reset(): void {
this.stopBlackFrameInjection();
this._ffmpeg = undefined;
this._videoStream = undefined;
this._audioStream = undefined;
this._videoBuffer = [];
this._audioBuffer = [];
this._shouldInjectBlackFrames = true; // Reset flag for next initial start
}
private killProcess(): void {
if (!this._ffmpeg) return;
this._ffmpeg.kill('SIGINT');
this._ffmpeg = undefined;
this._videoStream = undefined;
this._audioStream = undefined;
}
public close(): void {
this.killProcess();
this.reset();
}
public processAudioPacket(packet: BufferedPacket): void {
if (this.started && this._audioStream) {
this._audioStream.write(packet.data);
} else {
this._audioBuffer.push(packet);
}
}
public processVideoPacket(packet: BufferedPacket): void {
if (this.started && this._videoStream) {
this._videoStream.write(packet.data);
} else {
this._videoBuffer.push(packet);
}
}
public clearVideoBuffer(): void {
logger.debug(`${logPrefix} Clearing video buffer (${this._videoBuffer.length} packets)`);
this._videoBuffer = [];
}}