# Batch fan-out processing with Temporal and Ittybit

Process large media batches with controlled concurrency using Temporal and Ittybit

You have 500 files to transcode and you need them done in parallel -- but not all at once. Unbounded concurrency hammers the API, blows through rate limits, and makes failures hard to track. Temporal's workflow model gives you a semaphore pattern for capping concurrent activities, automatic retries per file, and `continueAsNew` to avoid blowing the event history on large batches. This guide builds a fan-out/fan-in workflow that processes files N-at-a-time and produces a summary manifest when the batch completes.

## Project setup

```bash
npm init -y
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
```

Project structure:

```
src/
  activities.ts   # Ittybit API calls
  workflows.ts    # Fan-out/fan-in orchestration
  worker.ts       # Temporal worker
  client.ts       # Kick off a batch
```

Set your environment variables:

```bash
ITTYBIT_API_KEY=your_ittybit_api_key
TEMPORAL_ADDRESS=localhost:7233
```

## Activities

Two activities: one creates an Ittybit task and polls until it reaches a terminal state, the other writes the final manifest. Both are plain async functions -- Temporal handles retry and heartbeating.

```typescript
// src/activities.ts

const ITTYBIT_API = 'https://api.ittybit.com';

export interface FileItem {
  url: string;
  kind: string;
  options: Record<string, unknown>;
}

export interface FileResult {
  input: string;
  taskId: string;
  status: 'succeeded' | 'failed';
  outputUrl?: string;
  error?: string;
}

export interface BatchManifest {
  total: number;
  succeeded: number;
  failed: number;
  results: FileResult[];
}

export async function processFile(file: FileItem): Promise<FileResult> {
  // Create the task
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify({
      input: file.url,
      kind: file.kind,
      options: file.options,
    }),
  });

  if (!res.ok) {
    throw new Error(`Ittybit returned ${res.status}: ${await res.text()}`);
  }

  const task = (await res.json()) as {
    id: string;
    status: string;
    output?: { url: string };
  };

  // Poll until terminal state
  let status = task.status;
  let output = task.output;

  while (status === 'queued' || status === 'processing') {
    await new Promise((r) => setTimeout(r, 3_000));
    heartbeat();

    const poll = await fetch(`${ITTYBIT_API}/tasks/${task.id}`, {
      headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
    });
    const updated = (await poll.json()) as {
      status: string;
      output?: { url: string };
    };
    status = updated.status;
    output = updated.output;
  }

  if (status === 'failed') {
    return {
      input: file.url,
      taskId: task.id,
      status: 'failed',
      error: `Task ${task.id} failed`,
    };
  }

  return {
    input: file.url,
    taskId: task.id,
    status: 'succeeded',
    outputUrl: output?.url,
  };
}

export async function writeManifest(manifest: BatchManifest): Promise<string> {
  const key = `batch-manifest-${Date.now()}.json`;

  // Write to your storage layer (S3, GCS, local disk, etc.)
  // This example logs it -- replace with your actual storage call
  console.log(`Writing manifest: ${manifest.succeeded}/${manifest.total} succeeded`);
  console.log(JSON.stringify(manifest, null, 2));

  return key;
}
```

The activity returns a result object instead of throwing on task failure. This lets the workflow collect partial results rather than aborting the entire batch when one file fails.

## Workflow

The core of the pattern: a semaphore controls how many activities run at the same time. The workflow fans out across all files, waits for every result, then writes a manifest. For batches larger than a few hundred files, `continueAsNew` resets the event history so Temporal doesn't choke on a massive workflow.

```typescript
// src/workflows.ts

const { processFile } = proxyActivities<typeof activities>({
  startToCloseTimeout: '15 minutes',
  heartbeatTimeout: '30 seconds',
  retry: {
    maximumAttempts: 3,
    initialInterval: '5s',
    backoffCoefficient: 2,
  },
});

const { writeManifest } = proxyActivities<typeof activities>({
  startToCloseTimeout: '2 minutes',
  retry: { maximumAttempts: 3 },
});

const BATCH_SIZE = 200; // Files per continueAsNew cycle

export interface BatchInput {
  files: FileItem[];
  concurrency: number;
  previousResults?: FileResult[];
}

export async function batchFanOut(input: BatchInput): Promise<string> {
  const { files, concurrency, previousResults = [] } = input;

  // If the batch is large, process a chunk and continue-as-new
  if (files.length > BATCH_SIZE) {
    const chunk = files.slice(0, BATCH_SIZE);
    const remaining = files.slice(BATCH_SIZE);

    const chunkResults = await processWithSemaphore(chunk, concurrency);

    await continueAsNew<typeof batchFanOut>({
      files: remaining,
      concurrency,
      previousResults: [...previousResults, ...chunkResults],
    });

    // unreachable, but TypeScript needs it
    return '';
  }

  // Process the final (or only) chunk
  const results = await processWithSemaphore(files, concurrency);
  const allResults = [...previousResults, ...results];

  // Build and write the manifest
  const manifest: BatchManifest = {
    total: allResults.length,
    succeeded: allResults.filter((r) => r.status === 'succeeded').length,
    failed: allResults.filter((r) => r.status === 'failed').length,
    results: allResults,
  };

  return await writeManifest(manifest);
}

async function processWithSemaphore(files: FileItem[], concurrency: number): Promise<FileResult[]> {
  const results: FileResult[] = [];
  let cursor = 0;

  async function next(): Promise<void> {
    while (cursor < files.length) {
      const index = cursor++;
      const result = await processFile(files[index]);
      results.push(result);
    }
  }

  // Launch N workers that pull from the shared cursor
  const workers = Array.from({ length: Math.min(concurrency, files.length) }, () => next());
  await Promise.all(workers);

  return results;
}
```

The `processWithSemaphore` function is the concurrency control. It spawns N async "workers" that each pull the next file from a shared cursor. At any given moment, at most N `processFile` activities are in flight. When one finishes, that worker immediately picks up the next file -- no idle slots.

`continueAsNew` is critical for large batches. Temporal stores every event in the workflow history, and histories above ~50K events get rejected. By chunking 200 files at a time and continuing as a new execution, each cycle stays well within limits. Results accumulate across cycles via the `previousResults` parameter.

## Worker

```typescript
// src/worker.ts

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve('./workflows'),
    activities,
    taskQueue: 'batch-processing',
    maxConcurrentActivityTaskExecutions: 50,
  });

  await worker.run();
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});
```

`maxConcurrentActivityTaskExecutions` caps what the worker will accept at the infrastructure level. The workflow semaphore controls logical concurrency within a single batch. You might run multiple workers with a limit of 50 each, but each batch workflow still respects its own `concurrency` parameter.

## Start a batch

```typescript
// src/client.ts

async function run() {
  const client = new Client();

  // Build your file list -- from a database, S3 listing, CSV, etc.
  const files = Array.from({ length: 500 }, (_, i) => ({
    url: `https://storage.example.com/videos/file-${i.toString().padStart(4, '0')}.mov`,
    kind: 'video',
    options: { width: 1920, format: 'mp4', codec: 'h264', quality: 'high' },
  }));

  const handle = await client.workflow.start(batchFanOut, {
    taskQueue: 'batch-processing',
    workflowId: `batch-${Date.now()}`,
    args: [{ files, concurrency: 10 }],
    // Allow plenty of time for large batches
    workflowExecutionTimeout: '24 hours',
  });

  console.log(`Started batch: ${handle.workflowId}`);

  // Optionally wait for the result
  const manifestKey = await handle.result();
  console.log(`Batch complete. Manifest: ${manifestKey}`);
}

run().catch(console.error);
```

Start the worker in one terminal, run the client in another. The Temporal UI shows each activity as a separate span -- you can see which files are in flight, which retried, and which failed.

## Tuning concurrency

The `concurrency` parameter in `BatchInput` controls how many Ittybit tasks run in parallel within a single batch. Pick a value based on your Ittybit plan limits and how much backpressure you want.

```typescript
// Conservative: 5 at a time for a free-tier account
const handle = await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-conservative-${Date.now()}`,
  args: [{ files, concurrency: 5 }],
});

// Aggressive: 25 at a time for a production account
const handle = await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-aggressive-${Date.now()}`,
  args: [{ files, concurrency: 25 }],
});
```

If you need different processing per file type, split your file list and run separate batches:

```typescript
const videos = files.filter((f) => f.kind === 'video');
const images = files.filter((f) => f.kind === 'image');

// Videos are heavier -- lower concurrency
await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-videos-${Date.now()}`,
  args: [{ files: videos, concurrency: 5 }],
});

// Images are fast -- higher concurrency
await client.workflow.start(batchFanOut, {
  taskQueue: 'batch-processing',
  workflowId: `batch-images-${Date.now()}`,
  args: [{ files: images, concurrency: 20 }],
});
```

## See also

- [Temporal TypeScript SDK docs](https://docs.temporal.io/develop/typescript)
- [Durable media pipeline with Temporal](/guides/durable-media-pipeline-with-temporal) -- sequential multi-stage pipeline
- [Batch processing with Cloud Tasks](/guides/batch-processing-with-cloud-tasks) -- alternative batch pattern using GCP
- [Ittybit Task API reference](/reference/tasks)