Fan-out adaptive streaming with Inngest and Ittybit

View Markdown

A single video upload often needs more than one output: an HLS package for streaming, a poster image, a GIF preview for hover states, and an audio transcription for search. These jobs are independent of each other, so running them sequentially wastes time. Inngest’s step.run lets you fan out to all four in parallel, with independent retries per branch. If the GIF fails, Inngest retries it without re-running the HLS package.

Send the upload event

When a video lands in your system, fire a video/uploaded event. This is the single trigger that kicks off the entire fan-out.

import { inngest } from './client';

await inngest.send({
  name: 'video/uploaded',
  data: {
    mediaId: 'med_abc123',
    sourceUrl: 'https://your-bucket.s3.amazonaws.com/uploads/raw.mov',
  },
});

Helper functions

Wrap the Ittybit Task API so each step stays focused on its intent.

const ITTYBIT_API = 'https://api.ittybit.com';

async function createTask(body: Record<string, unknown>) {
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(body),
  });
  if (!res.ok) throw new Error(`Ittybit API error: ${res.status}`);
  return res.json();
}

async function pollTask(taskId: string) {
  while (true) {
    const res = await fetch(`${ITTYBIT_API}/tasks/${taskId}`, {
      headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
    });
    const task = await res.json();
    if (task.status === 'completed') return task;
    if (task.status === 'failed') throw new Error(`Task ${taskId} failed`);
    await new Promise((r) => setTimeout(r, 5000));
  }
}

Define the fan-out function

All four branches run concurrently inside Promise.all. Each step.run is independently retried — if the transcription step throws, Inngest retries only that step. The HLS package, poster, and GIF results are already memoized.

import { inngest } from './client';

export const fanOutAdaptive = inngest.createFunction(
  { id: 'fanout-adaptive-streaming', retries: 3 },
  { event: 'video/uploaded' },
  async ({ event, step }) => {
    const { mediaId, sourceUrl } = event.data;

    // Fan out: all four tasks run in parallel
    const [hls, poster, gif, transcription] = await Promise.all([
      // Branch 1: HLS adaptive streaming package
      step.run('hls-package', async () => {
        const task = await createTask({
          input: sourceUrl,
          kind: 'adaptive_video',
          options: { format: 'hls' },
          metadata: { mediaId },
        });
        return pollTask(task.id);
      }),

      // Branch 2: Poster image at the 2-second mark
      step.run('poster-image', async () => {
        const task = await createTask({
          input: sourceUrl,
          kind: 'image',
          options: { start: 2, width: 1280, format: 'webp' },
          metadata: { mediaId },
        });
        return pollTask(task.id);
      }),

      // Branch 3: Short GIF preview for hover states
      step.run('gif-preview', async () => {
        const task = await createTask({
          input: sourceUrl,
          kind: 'video',
          options: { start: 1, end: 4, width: 480, format: 'gif' },
          metadata: { mediaId },
        });
        return pollTask(task.id);
      }),

      // Branch 4: Audio transcription for search
      step.run('audio-transcription', async () => {
        const task = await createTask({
          input: sourceUrl,
          kind: 'audio',
          options: { format: 'json', language: 'en' },
          metadata: { mediaId },
        });
        return pollTask(task.id);
      }),
    ]);

    // Final step: assemble all output URLs into a media record
    const record = await step.run('assemble', async () => {
      const media = {
        id: mediaId,
        status: 'ready',
        hlsUrl: hls.output.url,
        posterUrl: poster.output.url,
        gifUrl: gif.output.url,
        transcriptionUrl: transcription.output.url,
      };

      await db.media.update({
        where: { id: mediaId },
        data: media,
      });

      return media;
    });

    return record;
  },
);

The key property here is independence. Each branch creates its own Ittybit task and polls for completion. A slow HLS encode does not block the poster image from completing. A failed transcription does not invalidate the GIF. Inngest tracks each step’s result separately, so retries are surgical.

Serve the function

// app/api/inngest/route.ts
import { serve } from "inngest/next";
import { inngest } from "@/inngest/client";
import { fanOutAdaptive } from "@/inngest/functions";

export const { GET, POST, PUT } = serve({
  client: inngest,
  functions: [fanOutAdaptive],
});
// server.ts
import express from 'express';
import { serve } from 'inngest/express';
import { inngest } from './inngest/client';
import { fanOutAdaptive } from './inngest/functions';

const app = express();

app.use('/api/inngest', serve({ client: inngest, functions: [fanOutAdaptive] }));

app.listen(3000);

See also