Fan-out adaptive streaming with Inngest and Ittybit
A single video upload often needs more than one output: an HLS package for streaming, a poster image, a GIF preview for hover states, and an audio transcription for search. These jobs are independent of each other, so running them sequentially wastes time. Inngest’s step.run lets you fan out to all four in parallel, with independent retries per branch. If the GIF fails, Inngest retries it without re-running the HLS package.
Send the upload event
When a video lands in your system, fire a video/uploaded event. This is the single trigger that kicks off the entire fan-out.
import { inngest } from './client';
await inngest.send({
name: 'video/uploaded',
data: {
mediaId: 'med_abc123',
sourceUrl: 'https://your-bucket.s3.amazonaws.com/uploads/raw.mov',
},
});
Helper functions
Wrap the Ittybit Task API so each step stays focused on its intent.
const ITTYBIT_API = 'https://api.ittybit.com';
async function createTask(body: Record<string, unknown>) {
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!res.ok) throw new Error(`Ittybit API error: ${res.status}`);
return res.json();
}
async function pollTask(taskId: string) {
while (true) {
const res = await fetch(`${ITTYBIT_API}/tasks/${taskId}`, {
headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
});
const task = await res.json();
if (task.status === 'completed') return task;
if (task.status === 'failed') throw new Error(`Task ${taskId} failed`);
await new Promise((r) => setTimeout(r, 5000));
}
}
Define the fan-out function
All four branches run concurrently inside Promise.all. Each step.run is independently retried — if the transcription step throws, Inngest retries only that step. The HLS package, poster, and GIF results are already memoized.
import { inngest } from './client';
export const fanOutAdaptive = inngest.createFunction(
{ id: 'fanout-adaptive-streaming', retries: 3 },
{ event: 'video/uploaded' },
async ({ event, step }) => {
const { mediaId, sourceUrl } = event.data;
// Fan out: all four tasks run in parallel
const [hls, poster, gif, transcription] = await Promise.all([
// Branch 1: HLS adaptive streaming package
step.run('hls-package', async () => {
const task = await createTask({
input: sourceUrl,
kind: 'adaptive_video',
options: { format: 'hls' },
metadata: { mediaId },
});
return pollTask(task.id);
}),
// Branch 2: Poster image at the 2-second mark
step.run('poster-image', async () => {
const task = await createTask({
input: sourceUrl,
kind: 'image',
options: { start: 2, width: 1280, format: 'webp' },
metadata: { mediaId },
});
return pollTask(task.id);
}),
// Branch 3: Short GIF preview for hover states
step.run('gif-preview', async () => {
const task = await createTask({
input: sourceUrl,
kind: 'video',
options: { start: 1, end: 4, width: 480, format: 'gif' },
metadata: { mediaId },
});
return pollTask(task.id);
}),
// Branch 4: Audio transcription for search
step.run('audio-transcription', async () => {
const task = await createTask({
input: sourceUrl,
kind: 'audio',
options: { format: 'json', language: 'en' },
metadata: { mediaId },
});
return pollTask(task.id);
}),
]);
// Final step: assemble all output URLs into a media record
const record = await step.run('assemble', async () => {
const media = {
id: mediaId,
status: 'ready',
hlsUrl: hls.output.url,
posterUrl: poster.output.url,
gifUrl: gif.output.url,
transcriptionUrl: transcription.output.url,
};
await db.media.update({
where: { id: mediaId },
data: media,
});
return media;
});
return record;
},
);
The key property here is independence. Each branch creates its own Ittybit task and polls for completion. A slow HLS encode does not block the poster image from completing. A failed transcription does not invalidate the GIF. Inngest tracks each step’s result separately, so retries are surgical.
Serve the function
// app/api/inngest/route.ts
import { serve } from "inngest/next";
import { inngest } from "@/inngest/client";
import { fanOutAdaptive } from "@/inngest/functions";
export const { GET, POST, PUT } = serve({
client: inngest,
functions: [fanOutAdaptive],
});// server.ts
import express from 'express';
import { serve } from 'inngest/express';
import { inngest } from './inngest/client';
import { fanOutAdaptive } from './inngest/functions';
const app = express();
app.use('/api/inngest', serve({ client: inngest, functions: [fanOutAdaptive] }));
app.listen(3000); See also
- Event-driven video processing with Inngest — sequential transcoding pipeline
- HLS streaming — adaptive bitrate output with Ittybit
- Build a user upload pipeline — multi-task processing for uploads