Scheduled batch processing with Trigger.dev and Ittybit

View Markdown

Your media library grows every day, but not everything gets processed on upload. Maybe files arrive via bulk import, maybe you defer expensive work to off-peak hours, or maybe you want a nightly sweep that catches anything that slipped through. Trigger.dev’s cron triggers let you schedule a task that queries your database for unprocessed media, fans out Ittybit processing jobs with concurrency controls, and marks each record as complete — all without managing infrastructure or writing retry logic.

Install dependencies

npm install @trigger.dev/sdk

Set your environment variables:

TRIGGER_SECRET_KEY=tr_dev_...
ITTYBIT_API_KEY=your_ittybit_api_key
DATABASE_URL=postgresql://...

Ittybit helper functions

These wrap the Ittybit Task API. createIttybitTask kicks off a processing job, and pollForCompletion waits for it to finish.

const ITTYBIT_API = 'https://api.ittybit.com';

async function createIttybitTask(body: {
  input: string;
  kind: string;
  options: Record<string, unknown>;
}) {
  const res = await fetch(`${ITTYBIT_API}/tasks`, {
    method: 'POST',
    headers: {
      Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      'Content-Type': 'application/json',
    },
    body: JSON.stringify(body),
  });

  if (!res.ok) {
    throw new Error(`Ittybit API error: ${res.status}`);
  }

  return res.json() as Promise<{
    id: string;
    status: string;
    output_url?: string;
  }>;
}

async function pollForCompletion(taskId: string, intervalMs = 5000, maxAttempts = 120) {
  for (let i = 0; i < maxAttempts; i++) {
    const res = await fetch(`${ITTYBIT_API}/tasks/${taskId}`, {
      headers: {
        Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
      },
    });

    const task = (await res.json()) as {
      id: string;
      status: string;
      output_url?: string;
    };

    if (task.status === 'completed') return task;
    if (task.status === 'failed') throw new Error(`Task ${taskId} failed`);

    await new Promise((r) => setTimeout(r, intervalMs));
  }

  throw new Error(`Task ${taskId} timed out after polling`);
}

Define the per-item task

Each media item gets its own Trigger.dev task. This keeps retries isolated — if one file fails, it doesn’t block the rest of the batch. The concurrencyLimit prevents you from overwhelming the Ittybit API with too many simultaneous requests.

import { task } from '@trigger.dev/sdk/v3';

export const processMediaItem = task({
  id: 'process-media-item',
  retry: { maxAttempts: 3 },
  queue: {
    name: 'ittybit-batch',
    concurrencyLimit: 10,
  },
  run: async (payload: {
    id: string;
    sourceUrl: string;
    kind: string;
    options: Record<string, unknown>;
  }) => {
    const { id, sourceUrl, kind, options } = payload;

    // Create the Ittybit task
    const ittybitTask = await createIttybitTask({
      input: sourceUrl,
      kind,
      options,
    });

    // Poll until done
    const completed = await pollForCompletion(ittybitTask.id);

    // Update the record in your database
    await db.media.update({
      where: { id },
      data: {
        status: 'processed',
        outputUrl: completed.output_url,
        processedAt: new Date(),
      },
    });

    return { id, outputUrl: completed.output_url };
  },
});

The concurrencyLimit: 10 means at most 10 of these tasks run at the same time across your entire Trigger.dev deployment. Adjust this based on your Ittybit plan limits.

Define the scheduled task

The scheduled task runs at 2 AM every night, queries for unprocessed records, and fans out to the per-item task using batchTrigger.

import { schedules, task } from '@trigger.dev/sdk/v3';
import { processMediaItem } from './process-media-item';

export const nightlyBatchProcess = schedules.task({
  id: 'nightly-batch-process',
  cron: '0 2 * * *',
  run: async () => {
    // Query for unprocessed media
    const pending = await db.media.findMany({
      where: { status: 'pending' },
      orderBy: { createdAt: 'asc' },
      take: 500,
    });

    if (pending.length === 0) {
      console.log('No unprocessed media found.');
      return { processed: 0 };
    }

    console.log(`Found ${pending.length} unprocessed items.`);

    // Mark them as queued so the next run doesn't pick them up again
    const ids = pending.map((item) => item.id);
    await db.media.updateMany({
      where: { id: { in: ids } },
      data: { status: 'queued' },
    });

    // Fan out -- one sub-task per item
    const handle = await processMediaItem.batchTrigger(
      pending.map((item) => ({
        payload: {
          id: item.id,
          sourceUrl: item.sourceUrl,
          kind: item.kind ?? 'video',
          options: {
            width: 1920,
            format: 'mp4',
            quality: 'high',
          },
        },
      })),
    );

    return {
      processed: pending.length,
      batchId: handle.batchId,
    };
  },
});

The take: 500 limits each run to 500 items. If your backlog is larger, the next night’s run picks up the rest. This prevents a single run from consuming unbounded resources.

Handle multiple processing profiles

If different media types need different processing options, map them before fanning out.

function getProcessingOptions(item: { kind: string; width?: number }) {
  switch (item.kind) {
    case 'video':
      return {
        kind: 'video',
        options: {
          width: 1920,
          format: 'mp4',
          codec: 'h264',
          quality: 'high',
        },
      };
    case 'image':
      return {
        kind: 'image',
        options: {
          width: item.width ?? 2048,
          format: 'webp',
          quality: 'high',
        },
      };
    case 'audio':
      return {
        kind: 'audio',
        options: {
          format: 'mp3',
          bitrate: '192k',
        },
      };
    default:
      return {
        kind: item.kind,
        options: { format: 'mp4' },
      };
  }
}

// In the scheduled task, replace the static options:
const handle = await processMediaItem.batchTrigger(
  pending.map((item) => {
    const profile = getProcessingOptions(item);
    return {
      payload: {
        id: item.id,
        sourceUrl: item.sourceUrl,
        kind: profile.kind,
        options: profile.options,
      },
    };
  }),
);

Trigger the task from your API

Register the schedule and optionally expose an endpoint that triggers the batch manually — useful for backfills or testing.

import { schedules } from "@trigger.dev/sdk/v3";
import { nightlyBatchProcess } from "./trigger/nightly-batch";

// Manual trigger from a Next.js API route
export async function POST(req: Request) {
const handle = await schedules.trigger(
"nightly-batch-process",
);

return Response.json({
message: "Batch processing started",
id: handle.id,
});
}
import { schedules } from "@trigger.dev/sdk/v3";
import { nightlyBatchProcess } from "./trigger/nightly-batch";

app.post("/admin/batch-process", async (req, res) => {
  const handle = await schedules.trigger(
    "nightly-batch-process",
  );

  res.json({
    message: "Batch processing started",
    id: handle.id,
  });
});

Monitor batch progress

Query your database to check how the batch is progressing. The per-item task updates each record’s status as it completes.

app.get('/admin/batch-status', async (req, res) => {
  const counts = await db.media.groupBy({
    by: ['status'],
    _count: { id: true },
  });

  const summary = Object.fromEntries(counts.map((c) => [c.status, c._count.id]));

  res.json(summary);
  // { pending: 142, queued: 350, processed: 508, failed: 3 }
});

See also