# AI media pipeline with Inngest and Ittybit

Chain video processing, transcription, and translation in a durable Inngest step function

Most AI media pipelines look simple on a whiteboard -- extract audio, transcribe, generate subtitles, translate -- but each step depends on the last, any step can fail, and you don't want to re-extract an hour of audio because the translation API timed out. Inngest step functions make each stage independently retryable. Combined with Ittybit webhooks and `step.waitForEvent`, the pipeline sleeps between stages instead of polling, and picks up exactly where it left off after a failure.

## Inngest client

```typescript
// inngest/client.ts

export const inngest = new Inngest({ id: 'media-app' });
```

## Webhook endpoint

Ittybit sends a `job.succeeded` or `job.failed` webhook when processing finishes. Forward these as Inngest events so `step.waitForEvent` can react to them.

<CodeGroup labels={["Next.js", "Express"]}>
```typescript
// app/api/webhooks/ittybit/route.ts

export async function POST(req: Request) {
  const payload = await req.json();

await inngest.send({
name: `ittybit/${payload.type}`,
data: payload,
});

return new Response("ok");
}

````

```typescript
// routes/webhooks.ts

const router = express.Router();

router.post("/webhooks/ittybit", async (req, res) => {
  await inngest.send({
    name: `ittybit/${req.body.type}`,
    data: req.body,
  });

  res.sendStatus(200);
});

export default router;
````

</CodeGroup>

## Ittybit helper

A thin wrapper around the Tasks API. Every step that creates a task uses this, then waits for the webhook rather than polling.

```typescript
// inngest/ittybit.ts
const ITTYBIT_API = 'https://api.ittybit.com';

export async function createTask(body: Record<string, unknown>) {
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    throw new Error(`Ittybit API error: ${res.status}`);
  }

  return res.json() as Promise<{ id: string; status: string }>;
}
```

## The pipeline function

Four stages, each its own retryable step. If the translation step fails after transcription succeeded, Inngest retries only the translation -- the audio and transcript are safe.

```typescript
// inngest/functions/ai-pipeline.ts

export const aiMediaPipeline = inngest.createFunction(
  { id: 'ai-media-pipeline', retries: 3 },
  { event: 'video/uploaded' },
  async ({ event, step }) => {
    const { videoId, sourceUrl } = event.data;

    // Step 1: Extract audio from the video
    const audio = await step.run('extract-audio', async () => {
      return createTask({
        input: sourceUrl,
        kind: 'audio',
        options: { format: 'mp3', quality: 'high' },
        metadata: { videoId },
      });
    });

    const audioResult = await step.waitForEvent('audio-ready', {
      event: 'ittybit/job.succeeded',
      if: `async.data.task_id == '${audio.id}'`,
      timeout: '30m',
    });

    // Step 2: Transcribe the extracted audio
    const transcription = await step.run('transcribe', async () => {
      return createTask({
        input: audioResult.data.output_url,
        kind: 'transcription',
        metadata: { videoId },
      });
    });

    const transcriptResult = await step.waitForEvent('transcript-ready', {
      event: 'ittybit/job.succeeded',
      if: `async.data.task_id == '${transcription.id}'`,
      timeout: '30m',
    });

    // Step 3: Generate subtitles (SRT/VTT) from the transcript
    const subtitles = await step.run('generate-subtitles', async () => {
      return createTask({
        input: transcriptResult.data.output_url,
        kind: 'subtitles',
        options: { format: 'vtt' },
        metadata: { videoId },
      });
    });

    const subtitleResult = await step.waitForEvent('subtitles-ready', {
      event: 'ittybit/job.succeeded',
      if: `async.data.task_id == '${subtitles.id}'`,
      timeout: '15m',
    });

    // Step 4: Translate subtitles into multiple languages in parallel
    const languages = ['es', 'fr', 'de', 'ja'];

    const translations = await Promise.all(
      languages.map((lang) =>
        step.run(`translate-${lang}`, async () => {
          return createTask({
            input: subtitleResult.data.output_url,
            kind: 'translation',
            options: { language: lang, format: 'vtt' },
            metadata: { videoId, language: lang },
          });
        }),
      ),
    );

    const translationResults = await Promise.all(
      translations.map((task, i) =>
        step.waitForEvent(`translation-${languages[i]}-ready`, {
          event: 'ittybit/job.succeeded',
          if: `async.data.task_id == '${task.id}'`,
          timeout: '15m',
        }),
      ),
    );

    // Step 5: Persist results
    await step.run('save-results', async () => {
      await db.video.update({
        where: { id: videoId },
        data: {
          status: 'ready',
          audioUrl: audioResult.data.output_url,
          transcriptUrl: transcriptResult.data.output_url,
          subtitleUrl: subtitleResult.data.output_url,
          translations: translationResults.map((r, i) => ({
            language: languages[i],
            url: r.data.output_url,
          })),
        },
      });
    });

    return { videoId, languages, status: 'ready' };
  },
);
```

Each `step.waitForEvent` suspends the function without consuming compute. When the matching webhook arrives, Inngest wakes the function and continues from that exact point. If the function's container is recycled in the meantime, Inngest replays the completed steps from its event log and resumes at the pending wait.

## Trigger the pipeline

Fire the initial event from your upload handler. Inngest picks it up and starts the function.

<CodeGroup labels={["Next.js", "Express"]}>
```typescript
// app/api/upload/route.ts

export async function POST(req: Request) {
  const { videoId, sourceUrl } = await req.json();

await inngest.send({
name: "video/uploaded",
data: { videoId, sourceUrl },
});

return Response.json({ status: "processing" });
}

````

```typescript
// routes/upload.ts

app.post("/upload", async (req, res) => {
  const { videoId, sourceUrl } = req.body;

  await inngest.send({
    name: "video/uploaded",
    data: { videoId, sourceUrl },
  });

  res.json({ status: "processing" });
});
````

</CodeGroup>

## Serve the functions

Register the pipeline with the Inngest serve handler.

<CodeGroup labels={["Next.js", "Express"]}>
```typescript
// app/api/inngest/route.ts

export const { GET, POST, PUT } = serve({
  client: inngest,
  functions: [aiMediaPipeline],
});
```

```typescript
// server.ts

const app = express();

app.use('/api/inngest', serve({ client: inngest, functions: [aiMediaPipeline] }));

app.listen(3000);
```

</CodeGroup>

## See also

- [Event-driven video processing with Inngest](/guides/event-driven-video-processing-with-inngest) -- simpler pipeline with polling
- [Build a user upload pipeline](/guides/build-a-user-upload-pipeline) -- multi-task processing for uploads
- [Extract audio from video](/guides/extract-audio-from-video) -- audio extraction options