Durable media pipeline with Temporal and Ittybit
Media pipelines break. Network calls time out, services restart, deployments happen mid-job. Temporal makes each step in your pipeline a retryable, observable activity so a transient failure in thumbnail generation doesn’t force you to re-ingest a 4 GB file. This guide wires up a four-stage Ittybit pipeline — ingest, thumbnails, audio extraction, transcoding — as a single Temporal workflow.
Project setup
npm init -y
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
Your project structure:
src/
activities.ts # Ittybit API calls
workflows.ts # Pipeline orchestration
worker.ts # Temporal worker
client.ts # Kick off a workflow
Activities
Each activity posts a task to Ittybit, then polls until it completes. They’re plain async functions — Temporal handles retry, timeout, and heartbeating.
// src/activities.ts
import { heartbeat } from '@temporalio/activity';
const ITTYBIT_API = 'https://api.ittybit.com';
interface TaskResult {
id: string;
status: 'queued' | 'processing' | 'succeeded' | 'failed';
output?: { url: string };
}
async function createAndAwaitTask(body: Record<string, unknown>): Promise<TaskResult> {
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!res.ok) {
throw new Error(`Ittybit returned ${res.status}: ${await res.text()}`);
}
const task: TaskResult = await res.json();
// Poll until terminal state
while (task.status === 'queued' || task.status === 'processing') {
await new Promise((r) => setTimeout(r, 3_000));
heartbeat(); // let Temporal know we're still alive
const poll = await fetch(`${ITTYBIT_API}/tasks/${task.id}`, {
headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
});
const updated: TaskResult = await poll.json();
task.status = updated.status;
task.output = updated.output;
}
if (task.status === 'failed') {
throw new Error(`Task ${task.id} failed`);
}
return task;
}
export async function ingest(sourceUrl: string): Promise<string> {
const task = await createAndAwaitTask({
input: sourceUrl,
kind: 'video',
options: { format: 'mp4', quality: 'high' },
});
return task.output!.url;
}
export async function generateThumbnails(videoUrl: string): Promise<string> {
const task = await createAndAwaitTask({
input: videoUrl,
kind: 'image',
options: { width: 640, format: 'webp', start: 2 },
});
return task.output!.url;
}
export async function extractAudio(videoUrl: string): Promise<string> {
const task = await createAndAwaitTask({
input: videoUrl,
kind: 'audio',
options: { format: 'mp3', quality: 'high' },
});
return task.output!.url;
}
export async function transcode(videoUrl: string): Promise<string> {
const task = await createAndAwaitTask({
input: videoUrl,
kind: 'video',
options: { width: 1280, format: 'mp4', codec: 'h264', quality: 'medium' },
});
return task.output!.url;
}
Workflow
The workflow function chains the activities sequentially. Each step gets its own retry policy — a thumbnail failure retries three times before the workflow gives up, without re-running ingestion.
// src/workflows.ts
import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';
const { ingest, generateThumbnails, extractAudio, transcode } = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes',
heartbeatTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
initialInterval: '5s',
backoffCoefficient: 2,
},
});
export interface PipelineResult {
ingestedUrl: string;
thumbnailUrl: string;
audioUrl: string;
transcodedUrl: string;
}
export async function mediaPipeline(sourceUrl: string): Promise<PipelineResult> {
// Stage 1: ingest the source file
const ingestedUrl = await ingest(sourceUrl);
// Stages 2 & 3 run in parallel -- they both read from the ingested file
const [thumbnailUrl, audioUrl] = await Promise.all([
generateThumbnails(ingestedUrl),
extractAudio(ingestedUrl),
]);
// Stage 4: transcode to web-ready format
const transcodedUrl = await transcode(ingestedUrl);
return { ingestedUrl, thumbnailUrl, audioUrl, transcodedUrl };
}
Thumbnails and audio extraction are independent of each other, so they run in parallel. The transcode step waits for ingestion but could also run in parallel with stages 2 and 3 if you prefer throughput over simplicity.
Worker
// src/worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'media-pipeline',
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
Start a workflow
// src/client.ts
import { Client } from '@temporalio/client';
import { mediaPipeline } from './workflows';
async function run() {
const client = new Client();
const result = await client.workflow.execute(mediaPipeline, {
taskQueue: 'media-pipeline',
workflowId: `media-${Date.now()}`,
args: ['https://example.com/raw/interview.mov'],
});
console.log('Pipeline complete:', result);
}
run().catch(console.error);
Start the worker in one terminal, run the client in another. Each activity appears as a separate span in the Temporal UI — you can see exactly which stage is running, which retried, and what each one returned.
Customizing retry per stage
If some stages are flakier than others, override the activity options inline:
// src/workflows.ts
import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';
const defaultOpts = {
startToCloseTimeout: '10 minutes',
heartbeatTimeout: '30 seconds',
retry: { maximumAttempts: 3, initialInterval: '5s', backoffCoefficient: 2 },
} as const;
const { ingest, transcode } = proxyActivities<typeof activities>(defaultOpts);
// Thumbnails are cheap -- retry more aggressively
const { generateThumbnails } = proxyActivities<typeof activities>({
...defaultOpts,
startToCloseTimeout: '2 minutes',
retry: { maximumAttempts: 5, initialInterval: '2s', backoffCoefficient: 2 },
});
// Audio extraction can take longer for big files
const { extractAudio } = proxyActivities<typeof activities>({
...defaultOpts,
startToCloseTimeout: '15 minutes',
});