Batch fan-out processing with Temporal and Ittybit
You have 500 files to transcode and you need them done in parallel — but not all at once. Unbounded concurrency hammers the API, blows through rate limits, and makes failures hard to track. Temporal’s workflow model gives you a semaphore pattern for capping concurrent activities, automatic retries per file, and continueAsNew to avoid blowing the event history on large batches. This guide builds a fan-out/fan-in workflow that processes files N-at-a-time and produces a summary manifest when the batch completes.
Project setup
npm init -y
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
Project structure:
src/
activities.ts # Ittybit API calls
workflows.ts # Fan-out/fan-in orchestration
worker.ts # Temporal worker
client.ts # Kick off a batch
Set your environment variables:
ITTYBIT_API_KEY=your_ittybit_api_key
TEMPORAL_ADDRESS=localhost:7233
Activities
Two activities: one creates an Ittybit task and polls until it reaches a terminal state, the other writes the final manifest. Both are plain async functions — Temporal handles retry and heartbeating.
// src/activities.ts
import { heartbeat } from '@temporalio/activity';
const ITTYBIT_API = 'https://api.ittybit.com';
export interface FileItem {
url: string;
kind: string;
options: Record<string, unknown>;
}
export interface FileResult {
input: string;
taskId: string;
status: 'succeeded' | 'failed';
outputUrl?: string;
error?: string;
}
export interface BatchManifest {
total: number;
succeeded: number;
failed: number;
results: FileResult[];
}
export async function processFile(file: FileItem): Promise<FileResult> {
// Create the task
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
input: file.url,
kind: file.kind,
options: file.options,
}),
});
if (!res.ok) {
throw new Error(`Ittybit returned ${res.status}: ${await res.text()}`);
}
const task = (await res.json()) as {
id: string;
status: string;
output?: { url: string };
};
// Poll until terminal state
let status = task.status;
let output = task.output;
while (status === 'queued' || status === 'processing') {
await new Promise((r) => setTimeout(r, 3_000));
heartbeat();
const poll = await fetch(`${ITTYBIT_API}/tasks/${task.id}`, {
headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
});
const updated = (await poll.json()) as {
status: string;
output?: { url: string };
};
status = updated.status;
output = updated.output;
}
if (status === 'failed') {
return {
input: file.url,
taskId: task.id,
status: 'failed',
error: `Task ${task.id} failed`,
};
}
return {
input: file.url,
taskId: task.id,
status: 'succeeded',
outputUrl: output?.url,
};
}
export async function writeManifest(manifest: BatchManifest): Promise<string> {
const key = `batch-manifest-${Date.now()}.json`;
// Write to your storage layer (S3, GCS, local disk, etc.)
// This example logs it -- replace with your actual storage call
console.log(`Writing manifest: ${manifest.succeeded}/${manifest.total} succeeded`);
console.log(JSON.stringify(manifest, null, 2));
return key;
}
The activity returns a result object instead of throwing on task failure. This lets the workflow collect partial results rather than aborting the entire batch when one file fails.
Workflow
The core of the pattern: a semaphore controls how many activities run at the same time. The workflow fans out across all files, waits for every result, then writes a manifest. For batches larger than a few hundred files, continueAsNew resets the event history so Temporal doesn’t choke on a massive workflow.
// src/workflows.ts
import { proxyActivities, continueAsNew } from '@temporalio/workflow';
import type * as activities from './activities';
import type { FileItem, FileResult, BatchManifest } from './activities';
const { processFile } = proxyActivities<typeof activities>({
startToCloseTimeout: '15 minutes',
heartbeatTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
initialInterval: '5s',
backoffCoefficient: 2,
},
});
const { writeManifest } = proxyActivities<typeof activities>({
startToCloseTimeout: '2 minutes',
retry: { maximumAttempts: 3 },
});
const BATCH_SIZE = 200; // Files per continueAsNew cycle
export interface BatchInput {
files: FileItem[];
concurrency: number;
previousResults?: FileResult[];
}
export async function batchFanOut(input: BatchInput): Promise<string> {
const { files, concurrency, previousResults = [] } = input;
// If the batch is large, process a chunk and continue-as-new
if (files.length > BATCH_SIZE) {
const chunk = files.slice(0, BATCH_SIZE);
const remaining = files.slice(BATCH_SIZE);
const chunkResults = await processWithSemaphore(chunk, concurrency);
await continueAsNew<typeof batchFanOut>({
files: remaining,
concurrency,
previousResults: [...previousResults, ...chunkResults],
});
// unreachable, but TypeScript needs it
return '';
}
// Process the final (or only) chunk
const results = await processWithSemaphore(files, concurrency);
const allResults = [...previousResults, ...results];
// Build and write the manifest
const manifest: BatchManifest = {
total: allResults.length,
succeeded: allResults.filter((r) => r.status === 'succeeded').length,
failed: allResults.filter((r) => r.status === 'failed').length,
results: allResults,
};
return await writeManifest(manifest);
}
async function processWithSemaphore(files: FileItem[], concurrency: number): Promise<FileResult[]> {
const results: FileResult[] = [];
let cursor = 0;
async function next(): Promise<void> {
while (cursor < files.length) {
const index = cursor++;
const result = await processFile(files[index]);
results.push(result);
}
}
// Launch N workers that pull from the shared cursor
const workers = Array.from({ length: Math.min(concurrency, files.length) }, () => next());
await Promise.all(workers);
return results;
}
The processWithSemaphore function is the concurrency control. It spawns N async “workers” that each pull the next file from a shared cursor. At any given moment, at most N processFile activities are in flight. When one finishes, that worker immediately picks up the next file — no idle slots.
continueAsNew is critical for large batches. Temporal stores every event in the workflow history, and histories above ~50K events get rejected. By chunking 200 files at a time and continuing as a new execution, each cycle stays well within limits. Results accumulate across cycles via the previousResults parameter.
Worker
// src/worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'batch-processing',
maxConcurrentActivityTaskExecutions: 50,
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
maxConcurrentActivityTaskExecutions caps what the worker will accept at the infrastructure level. The workflow semaphore controls logical concurrency within a single batch. You might run multiple workers with a limit of 50 each, but each batch workflow still respects its own concurrency parameter.
Start a batch
// src/client.ts
import { Client } from '@temporalio/client';
import { batchFanOut } from './workflows';
async function run() {
const client = new Client();
// Build your file list -- from a database, S3 listing, CSV, etc.
const files = Array.from({ length: 500 }, (_, i) => ({
url: `https://storage.example.com/videos/file-${i.toString().padStart(4, '0')}.mov`,
kind: 'video',
options: { width: 1920, format: 'mp4', codec: 'h264', quality: 'high' },
}));
const handle = await client.workflow.start(batchFanOut, {
taskQueue: 'batch-processing',
workflowId: `batch-${Date.now()}`,
args: [{ files, concurrency: 10 }],
// Allow plenty of time for large batches
workflowExecutionTimeout: '24 hours',
});
console.log(`Started batch: ${handle.workflowId}`);
// Optionally wait for the result
const manifestKey = await handle.result();
console.log(`Batch complete. Manifest: ${manifestKey}`);
}
run().catch(console.error);
Start the worker in one terminal, run the client in another. The Temporal UI shows each activity as a separate span — you can see which files are in flight, which retried, and which failed.
Tuning concurrency
The concurrency parameter in BatchInput controls how many Ittybit tasks run in parallel within a single batch. Pick a value based on your Ittybit plan limits and how much backpressure you want.
// Conservative: 5 at a time for a free-tier account
const handle = await client.workflow.start(batchFanOut, {
taskQueue: 'batch-processing',
workflowId: `batch-conservative-${Date.now()}`,
args: [{ files, concurrency: 5 }],
});
// Aggressive: 25 at a time for a production account
const handle = await client.workflow.start(batchFanOut, {
taskQueue: 'batch-processing',
workflowId: `batch-aggressive-${Date.now()}`,
args: [{ files, concurrency: 25 }],
});
If you need different processing per file type, split your file list and run separate batches:
const videos = files.filter((f) => f.kind === 'video');
const images = files.filter((f) => f.kind === 'image');
// Videos are heavier -- lower concurrency
await client.workflow.start(batchFanOut, {
taskQueue: 'batch-processing',
workflowId: `batch-videos-${Date.now()}`,
args: [{ files: videos, concurrency: 5 }],
});
// Images are fast -- higher concurrency
await client.workflow.start(batchFanOut, {
taskQueue: 'batch-processing',
workflowId: `batch-images-${Date.now()}`,
args: [{ files: images, concurrency: 20 }],
});
See also
- Temporal TypeScript SDK docs
- Durable media pipeline with Temporal — sequential multi-stage pipeline
- Batch processing with Cloud Tasks — alternative batch pattern using GCP
- Ittybit Task API reference