# Scheduled batch processing with Trigger.dev and Ittybit

Run nightly media processing jobs with Trigger.dev cron triggers and Ittybit fan-out

Your media library grows every day, but not everything gets processed on upload. Maybe files arrive via bulk import, maybe you defer expensive work to off-peak hours, or maybe you want a nightly sweep that catches anything that slipped through. Trigger.dev's cron triggers let you schedule a task that queries your database for unprocessed media, fans out Ittybit processing jobs with concurrency controls, and marks each record as complete -- all without managing infrastructure or writing retry logic.

## Install dependencies

```bash
npm install @trigger.dev/sdk
```

Set your environment variables:

```bash
TRIGGER_SECRET_KEY=tr_dev_...
ITTYBIT_API_KEY=your_ittybit_api_key
DATABASE_URL=postgresql://...
```

## Ittybit helper functions

These wrap the Ittybit Task API. `createIttybitTask` kicks off a processing job, and `pollForCompletion` waits for it to finish.

```typescript
const ITTYBIT_API = 'https://api.ittybit.com';

async function createIttybitTask(body: {
  input: string;
  kind: string;
  options: Record<string, unknown>;
}) {
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    throw new Error(`Ittybit API error: ${res.status}`);
  }

  return res.json() as Promise<{
    id: string;
    status: string;
    output_url?: string;
  }>;
}

async function pollForCompletion(taskId: string, intervalMs = 5000, maxAttempts = 120) {
  for (let i = 0; i < maxAttempts; i++) {
    const res = await fetch(`${ITTYBIT_API}/tasks/${taskId}`, {
      headers: {
        Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      },
    });

    const task = (await res.json()) as {
      id: string;
      status: string;
      output_url?: string;
    };

    if (task.status === 'completed') return task;
    if (task.status === 'failed') throw new Error(`Task ${taskId} failed`);

    await new Promise((r) => setTimeout(r, intervalMs));
  }

  throw new Error(`Task ${taskId} timed out after polling`);
}
```

## Define the per-item task

Each media item gets its own Trigger.dev task. This keeps retries isolated -- if one file fails, it doesn't block the rest of the batch. The `concurrencyLimit` prevents you from overwhelming the Ittybit API with too many simultaneous requests.

```typescript

export const processMediaItem = task({
  id: 'process-media-item',
  retry: { maxAttempts: 3 },
  queue: {
    name: 'ittybit-batch',
    concurrencyLimit: 10,
  },
  run: async (payload: {
    id: string;
    sourceUrl: string;
    kind: string;
    options: Record<string, unknown>;
  }) => {
    const { id, sourceUrl, kind, options } = payload;

    // Create the Ittybit task
    const ittybitTask = await createIttybitTask({
      input: sourceUrl,
      kind,
      options,
    });

    // Poll until done
    const completed = await pollForCompletion(ittybitTask.id);

    // Update the record in your database
    await db.media.update({
      where: { id },
      data: {
        status: 'processed',
        outputUrl: completed.output_url,
        processedAt: new Date(),
      },
    });

    return { id, outputUrl: completed.output_url };
  },
});
```

The `concurrencyLimit: 10` means at most 10 of these tasks run at the same time across your entire Trigger.dev deployment. Adjust this based on your Ittybit plan limits.

## Define the scheduled task

The scheduled task runs at 2 AM every night, queries for unprocessed records, and fans out to the per-item task using `batchTrigger`.

```typescript

export const nightlyBatchProcess = schedules.task({
  id: 'nightly-batch-process',
  cron: '0 2 * * *',
  run: async () => {
    // Query for unprocessed media
    const pending = await db.media.findMany({
      where: { status: 'pending' },
      orderBy: { createdAt: 'asc' },
      take: 500,
    });

    if (pending.length === 0) {
      console.log('No unprocessed media found.');
      return { processed: 0 };
    }

    console.log(`Found ${pending.length} unprocessed items.`);

    // Mark them as queued so the next run doesn't pick them up again
    const ids = pending.map((item) => item.id);
    await db.media.updateMany({
      where: { id: { in: ids } },
      data: { status: 'queued' },
    });

    // Fan out -- one sub-task per item
    const handle = await processMediaItem.batchTrigger(
      pending.map((item) => ({
        payload: {
          id: item.id,
          sourceUrl: item.sourceUrl,
          kind: item.kind ?? 'video',
          options: {
            width: 1920,
            format: 'mp4',
            quality: 'high',
          },
        },
      })),
    );

    return {
      processed: pending.length,
      batchId: handle.batchId,
    };
  },
});
```

The `take: 500` limits each run to 500 items. If your backlog is larger, the next night's run picks up the rest. This prevents a single run from consuming unbounded resources.

## Handle multiple processing profiles

If different media types need different processing options, map them before fanning out.

```typescript
function getProcessingOptions(item: { kind: string; width?: number }) {
  switch (item.kind) {
    case 'video':
      return {
        kind: 'video',
        options: {
          width: 1920,
          format: 'mp4',
          codec: 'h264',
          quality: 'high',
        },
      };
    case 'image':
      return {
        kind: 'image',
        options: {
          width: item.width ?? 2048,
          format: 'webp',
          quality: 'high',
        },
      };
    case 'audio':
      return {
        kind: 'audio',
        options: {
          format: 'mp3',
          bitrate: '192k',
        },
      };
    default:
      return {
        kind: item.kind,
        options: { format: 'mp4' },
      };
  }
}

// In the scheduled task, replace the static options:
const handle = await processMediaItem.batchTrigger(
  pending.map((item) => {
    const profile = getProcessingOptions(item);
    return {
      payload: {
        id: item.id,
        sourceUrl: item.sourceUrl,
        kind: profile.kind,
        options: profile.options,
      },
    };
  }),
);
```

## Trigger the task from your API

Register the schedule and optionally expose an endpoint that triggers the batch manually -- useful for backfills or testing.

<CodeGroup labels={["Next.js", "Express"]}>
```typescript

// Manual trigger from a Next.js API route
export async function POST(req: Request) {
const handle = await schedules.trigger(
"nightly-batch-process",
);

return Response.json({
message: "Batch processing started",
id: handle.id,
});
}

````

```typescript

app.post("/admin/batch-process", async (req, res) => {
  const handle = await schedules.trigger(
    "nightly-batch-process",
  );

  res.json({
    message: "Batch processing started",
    id: handle.id,
  });
});
````

</CodeGroup>

## Monitor batch progress

Query your database to check how the batch is progressing. The per-item task updates each record's status as it completes.

```typescript
app.get('/admin/batch-status', async (req, res) => {
  const counts = await db.media.groupBy({
    by: ['status'],
    _count: { id: true },
  });

  const summary = Object.fromEntries(counts.map((c) => [c.status, c._count.id]));

  res.json(summary);
  // { pending: 142, queued: 350, processed: 508, failed: 3 }
});
```

## See also

- [Trigger.dev docs](https://trigger.dev/docs)
- [Video processing pipeline with Trigger.dev](/guides/video-processing-pipeline-with-trigger-dev) -- single-upload event-driven pattern
- [Ittybit Task API reference](/reference/tasks)
- [Batch processing with Cloud Tasks](/guides/batch-processing-with-cloud-tasks) -- GCP-native alternative