Batch fan-out processing with Temporal and Ittybit

View Markdown

You have 500 files to transcode and you need them done in parallel — but not all at once. Unbounded concurrency hammers the API, blows through rate limits, and makes failures hard to track. Temporal’s workflow model gives you a semaphore pattern for capping concurrent activities, automatic retries per file, and continueAsNew to avoid blowing the event history on large batches. This guide builds a fan-out/fan-in workflow that processes files N-at-a-time and produces a summary manifest when the batch completes.

Project setup

npm init -y
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity

Project structure:

src/
  activities.ts   # Ittybit API calls
  workflows.ts    # Fan-out/fan-in orchestration
  worker.ts       # Temporal worker
  client.ts       # Kick off a batch

Set your environment variables:

ITTYBIT_API_KEY=your_ittybit_api_key
TEMPORAL_ADDRESS=localhost:7233

Activities

Two activities: one creates an Ittybit task and polls until it reaches a terminal state, the other writes the final manifest. Both are plain async functions — Temporal handles retry and heartbeating.

// src/activities.ts
import { heartbeat } from '@temporalio/activity';

const ITTYBIT_API = 'https://api.ittybit.com';

export interface FileItem {
  url: string;
  kind: string;
  options: Record<string, unknown>;
}

export interface FileResult {
  input: string;
  taskId: string;
  status: 'succeeded' | 'failed';
  outputUrl?: string;
  error?: string;
}

export interface BatchManifest {
  total: number;
  succeeded: number;
  failed: number;
  results: FileResult[];
}

export async function processFile(file: FileItem): Promise<FileResult> {
  // Create the task
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      input: file.url,
      kind: file.kind,
      options: file.options,
    }),
  });

  if (!res.ok) {
    throw new Error(`Ittybit returned ${res.status}: ${await res.text()}`);
  }

  const task = (await res.json()) as {
    id: string;
    status: string;
    output?: { url: string };
  };

  // Poll until terminal state
  let status = task.status;
  let output = task.output;

  while (status === 'queued' || status === 'processing') {
    await new Promise((r) => setTimeout(r, 3_000));
    heartbeat();

    const poll = await fetch(`${ITTYBIT_API}/tasks/${task.id}`, {
      headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
    });
    const updated = (await poll.json()) as {
      status: string;
      output?: { url: string };
    };
    status = updated.status;
    output = updated.output;
  }

  if (status === 'failed') {
    return {
      input: file.url,
      taskId: task.id,
      status: 'failed',
      error: `Task ${task.id} failed`,
    };
  }

  return {
    input: file.url,
    taskId: task.id,
    status: 'succeeded',
    outputUrl: output?.url,
  };
}

export async function writeManifest(manifest: BatchManifest): Promise<string> {
  const key = `batch-manifest-${Date.now()}.json`;

  // Write to your storage layer (S3, GCS, local disk, etc.)
  // This example logs it -- replace with your actual storage call
  console.log(`Writing manifest: ${manifest.succeeded}/${manifest.total} succeeded`);
  console.log(JSON.stringify(manifest, null, 2));

  return key;
}

The activity returns a result object instead of throwing on task failure. This lets the workflow collect partial results rather than aborting the entire batch when one file fails.

Workflow

The core of the pattern: a semaphore controls how many activities run at the same time. The workflow fans out across all files, waits for every result, then writes a manifest. For batches larger than a few hundred files, continueAsNew resets the event history so Temporal doesn’t choke on a massive workflow.

// src/workflows.ts
import { proxyActivities, continueAsNew } from '@temporalio/workflow';
import type * as activities from './activities';
import type { FileItem, FileResult, BatchManifest } from './activities';

const { processFile } = proxyActivities<typeof activities>({
  startToCloseTimeout: '15 minutes',
  heartbeatTimeout: '30 seconds',
  retry: {
    maximumAttempts: 3,
    initialInterval: '5s',
    backoffCoefficient: 2,
  },
});

const { writeManifest } = proxyActivities<typeof activities>({
  startToCloseTimeout: '2 minutes',
  retry: { maximumAttempts: 3 },
});

const BATCH_SIZE = 200; // Files per continueAsNew cycle

export interface BatchInput {
  files: FileItem[];
  concurrency: number;
  previousResults?: FileResult[];
}

export async function batchFanOut(input: BatchInput): Promise<string> {
  const { files, concurrency, previousResults = [] } = input;

  // If the batch is large, process a chunk and continue-as-new
  if (files.length > BATCH_SIZE) {
    const chunk = files.slice(0, BATCH_SIZE);
    const remaining = files.slice(BATCH_SIZE);

    const chunkResults = await processWithSemaphore(chunk, concurrency);

    await continueAsNew<typeof batchFanOut>({
      files: remaining,
      concurrency,
      previousResults: [...previousResults, ...chunkResults],
    });

    // unreachable, but TypeScript needs it
    return '';
  }

  // Process the final (or only) chunk
  const results = await processWithSemaphore(files, concurrency);
  const allResults = [...previousResults, ...results];

  // Build and write the manifest
  const manifest: BatchManifest = {
    total: allResults.length,
    succeeded: allResults.filter((r) => r.status === 'succeeded').length,
    failed: allResults.filter((r) => r.status === 'failed').length,
    results: allResults,
  };

  return await writeManifest(manifest);
}

async function processWithSemaphore(files: FileItem[], concurrency: number): Promise<FileResult[]> {
  const results: FileResult[] = [];
  let cursor = 0;

  async function next(): Promise<void> {
    while (cursor < files.length) {
      const index = cursor++;
      const result = await processFile(files[index]);
      results.push(result);
    }
  }

  // Launch N workers that pull from the shared cursor
  const workers = Array.from({ length: Math.min(concurrency, files.length) }, () => next());
  await Promise.all(workers);

  return results;
}

The processWithSemaphore function is the concurrency control. It spawns N async “workers” that each pull the next file from a shared cursor. At any given moment, at most N processFile activities are in flight. When one finishes, that worker immediately picks up the next file — no idle slots.

continueAsNew is critical for large batches. Temporal stores every event in the workflow history, and histories above ~50K events get rejected. By chunking 200 files at a time and continuing as a new execution, each cycle stays well within limits. Results accumulate across cycles via the previousResults parameter.

Worker

// src/worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve('./workflows'),
    activities,
    taskQueue: 'batch-processing',
    maxConcurrentActivityTaskExecutions: 50,
  });

  await worker.run();
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});

maxConcurrentActivityTaskExecutions caps what the worker will accept at the infrastructure level. The workflow semaphore controls logical concurrency within a single batch. You might run multiple workers with a limit of 50 each, but each batch workflow still respects its own concurrency parameter.

Start a batch

// src/client.ts
import { Client } from '@temporalio/client';
import { batchFanOut } from './workflows';

async function run() {
  const client = new Client();

  // Build your file list -- from a database, S3 listing, CSV, etc.
  const files = Array.from({ length: 500 }, (_, i) => ({
    url: `https://storage.example.com/videos/file-${i.toString().padStart(4, '0')}.mov`,
    kind: 'video',
    options: { width: 1920, format: 'mp4', codec: 'h264', quality: 'high' },
  }));

  const handle = await client.workflow.start(batchFanOut, {
    taskQueue: 'batch-processing',
    workflowId: `batch-${Date.now()}`,
    args: [{ files, concurrency: 10 }],
    // Allow plenty of time for large batches
    workflowExecutionTimeout: '24 hours',
  });

  console.log(`Started batch: ${handle.workflowId}`);

  // Optionally wait for the result
  const manifestKey = await handle.result();
  console.log(`Batch complete. Manifest: ${manifestKey}`);
}

run().catch(console.error);

Start the worker in one terminal, run the client in another. The Temporal UI shows each activity as a separate span — you can see which files are in flight, which retried, and which failed.

Tuning concurrency

The concurrency parameter in BatchInput controls how many Ittybit tasks run in parallel within a single batch. Pick a value based on your Ittybit plan limits and how much backpressure you want.

// Conservative: 5 at a time for a free-tier account
const handle = await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-conservative-${Date.now()}`,
  args: [{ files, concurrency: 5 }],
});

// Aggressive: 25 at a time for a production account
const handle = await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-aggressive-${Date.now()}`,
  args: [{ files, concurrency: 25 }],
});

If you need different processing per file type, split your file list and run separate batches:

const videos = files.filter((f) => f.kind === 'video');
const images = files.filter((f) => f.kind === 'image');

// Videos are heavier -- lower concurrency
await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-videos-${Date.now()}`,
  args: [{ files: videos, concurrency: 5 }],
});

// Images are fast -- higher concurrency
await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-images-${Date.now()}`,
  args: [{ files: images, concurrency: 20 }],
});

See also