AI media pipeline with Inngest and Ittybit
Most AI media pipelines look simple on a whiteboard — extract audio, transcribe, generate subtitles, translate — but each step depends on the last, any step can fail, and you don’t want to re-extract an hour of audio because the translation API timed out. Inngest step functions make each stage independently retryable. Combined with Ittybit webhooks and step.waitForEvent, the pipeline sleeps between stages instead of polling, and picks up exactly where it left off after a failure.
Inngest client
// inngest/client.ts
import { Inngest } from 'inngest';
export const inngest = new Inngest({ id: 'media-app' });
Webhook endpoint
Ittybit sends a job.succeeded or job.failed webhook when processing finishes. Forward these as Inngest events so step.waitForEvent can react to them.
// app/api/webhooks/ittybit/route.ts
import { inngest } from "@/inngest/client";
export async function POST(req: Request) {
const payload = await req.json();
await inngest.send({
name: `ittybit/${payload.type}`,
data: payload,
});
return new Response("ok");
}
// routes/webhooks.ts
import express from "express";
import { inngest } from "../inngest/client";
const router = express.Router();
router.post("/webhooks/ittybit", async (req, res) => {
await inngest.send({
name: `ittybit/${req.body.type}`,
data: req.body,
});
res.sendStatus(200);
});
export default router; Ittybit helper
A thin wrapper around the Tasks API. Every step that creates a task uses this, then waits for the webhook rather than polling.
// inngest/ittybit.ts
const ITTYBIT_API = 'https://api.ittybit.com';
export async function createTask(body: Record<string, unknown>) {
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!res.ok) {
throw new Error(`Ittybit API error: ${res.status}`);
}
return res.json() as Promise<{ id: string; status: string }>;
}
The pipeline function
Four stages, each its own retryable step. If the translation step fails after transcription succeeded, Inngest retries only the translation — the audio and transcript are safe.
// inngest/functions/ai-pipeline.ts
import { inngest } from '../client';
import { createTask } from '../ittybit';
export const aiMediaPipeline = inngest.createFunction(
{ id: 'ai-media-pipeline', retries: 3 },
{ event: 'video/uploaded' },
async ({ event, step }) => {
const { videoId, sourceUrl } = event.data;
// Step 1: Extract audio from the video
const audio = await step.run('extract-audio', async () => {
return createTask({
input: sourceUrl,
kind: 'audio',
options: { format: 'mp3', quality: 'high' },
metadata: { videoId },
});
});
const audioResult = await step.waitForEvent('audio-ready', {
event: 'ittybit/job.succeeded',
if: `async.data.task_id == '${audio.id}'`,
timeout: '30m',
});
// Step 2: Transcribe the extracted audio
const transcription = await step.run('transcribe', async () => {
return createTask({
input: audioResult.data.output_url,
kind: 'transcription',
metadata: { videoId },
});
});
const transcriptResult = await step.waitForEvent('transcript-ready', {
event: 'ittybit/job.succeeded',
if: `async.data.task_id == '${transcription.id}'`,
timeout: '30m',
});
// Step 3: Generate subtitles (SRT/VTT) from the transcript
const subtitles = await step.run('generate-subtitles', async () => {
return createTask({
input: transcriptResult.data.output_url,
kind: 'subtitles',
options: { format: 'vtt' },
metadata: { videoId },
});
});
const subtitleResult = await step.waitForEvent('subtitles-ready', {
event: 'ittybit/job.succeeded',
if: `async.data.task_id == '${subtitles.id}'`,
timeout: '15m',
});
// Step 4: Translate subtitles into multiple languages in parallel
const languages = ['es', 'fr', 'de', 'ja'];
const translations = await Promise.all(
languages.map((lang) =>
step.run(`translate-${lang}`, async () => {
return createTask({
input: subtitleResult.data.output_url,
kind: 'translation',
options: { language: lang, format: 'vtt' },
metadata: { videoId, language: lang },
});
}),
),
);
const translationResults = await Promise.all(
translations.map((task, i) =>
step.waitForEvent(`translation-${languages[i]}-ready`, {
event: 'ittybit/job.succeeded',
if: `async.data.task_id == '${task.id}'`,
timeout: '15m',
}),
),
);
// Step 5: Persist results
await step.run('save-results', async () => {
await db.video.update({
where: { id: videoId },
data: {
status: 'ready',
audioUrl: audioResult.data.output_url,
transcriptUrl: transcriptResult.data.output_url,
subtitleUrl: subtitleResult.data.output_url,
translations: translationResults.map((r, i) => ({
language: languages[i],
url: r.data.output_url,
})),
},
});
});
return { videoId, languages, status: 'ready' };
},
);
Each step.waitForEvent suspends the function without consuming compute. When the matching webhook arrives, Inngest wakes the function and continues from that exact point. If the function’s container is recycled in the meantime, Inngest replays the completed steps from its event log and resumes at the pending wait.
Trigger the pipeline
Fire the initial event from your upload handler. Inngest picks it up and starts the function.
// app/api/upload/route.ts
import { inngest } from "@/inngest/client";
export async function POST(req: Request) {
const { videoId, sourceUrl } = await req.json();
await inngest.send({
name: "video/uploaded",
data: { videoId, sourceUrl },
});
return Response.json({ status: "processing" });
}
// routes/upload.ts
import { inngest } from "../inngest/client";
app.post("/upload", async (req, res) => {
const { videoId, sourceUrl } = req.body;
await inngest.send({
name: "video/uploaded",
data: { videoId, sourceUrl },
});
res.json({ status: "processing" });
}); Serve the functions
Register the pipeline with the Inngest serve handler.
// app/api/inngest/route.ts
import { serve } from "inngest/next";
import { inngest } from "@/inngest/client";
import { aiMediaPipeline } from "@/inngest/functions/ai-pipeline";
export const { GET, POST, PUT } = serve({
client: inngest,
functions: [aiMediaPipeline],
});// server.ts
import express from 'express';
import { serve } from 'inngest/express';
import { inngest } from './inngest/client';
import { aiMediaPipeline } from './inngest/functions/ai-pipeline';
const app = express();
app.use('/api/inngest', serve({ client: inngest, functions: [aiMediaPipeline] }));
app.listen(3000); See also
- Event-driven video processing with Inngest — simpler pipeline with polling
- Build a user upload pipeline — multi-task processing for uploads
- Extract audio from video — audio extraction options