Scheduled batch processing with Trigger.dev and Ittybit
Your media library grows every day, but not everything gets processed on upload. Maybe files arrive via bulk import, maybe you defer expensive work to off-peak hours, or maybe you want a nightly sweep that catches anything that slipped through. Trigger.dev’s cron triggers let you schedule a task that queries your database for unprocessed media, fans out Ittybit processing jobs with concurrency controls, and marks each record as complete — all without managing infrastructure or writing retry logic.
Install dependencies
npm install @trigger.dev/sdk
Set your environment variables:
TRIGGER_SECRET_KEY=tr_dev_...
ITTYBIT_API_KEY=your_ittybit_api_key
DATABASE_URL=postgresql://...
Ittybit helper functions
These wrap the Ittybit Task API. createIttybitTask kicks off a processing job, and pollForCompletion waits for it to finish.
const ITTYBIT_API = 'https://api.ittybit.com';
async function createIttybitTask(body: {
input: string;
kind: string;
options: Record<string, unknown>;
}) {
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!res.ok) {
throw new Error(`Ittybit API error: ${res.status}`);
}
return res.json() as Promise<{
id: string;
status: string;
output_url?: string;
}>;
}
async function pollForCompletion(taskId: string, intervalMs = 5000, maxAttempts = 120) {
for (let i = 0; i < maxAttempts; i++) {
const res = await fetch(`${ITTYBIT_API}/tasks/${taskId}`, {
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
},
});
const task = (await res.json()) as {
id: string;
status: string;
output_url?: string;
};
if (task.status === 'completed') return task;
if (task.status === 'failed') throw new Error(`Task ${taskId} failed`);
await new Promise((r) => setTimeout(r, intervalMs));
}
throw new Error(`Task ${taskId} timed out after polling`);
}
Define the per-item task
Each media item gets its own Trigger.dev task. This keeps retries isolated — if one file fails, it doesn’t block the rest of the batch. The concurrencyLimit prevents you from overwhelming the Ittybit API with too many simultaneous requests.
import { task } from '@trigger.dev/sdk/v3';
export const processMediaItem = task({
id: 'process-media-item',
retry: { maxAttempts: 3 },
queue: {
name: 'ittybit-batch',
concurrencyLimit: 10,
},
run: async (payload: {
id: string;
sourceUrl: string;
kind: string;
options: Record<string, unknown>;
}) => {
const { id, sourceUrl, kind, options } = payload;
// Create the Ittybit task
const ittybitTask = await createIttybitTask({
input: sourceUrl,
kind,
options,
});
// Poll until done
const completed = await pollForCompletion(ittybitTask.id);
// Update the record in your database
await db.media.update({
where: { id },
data: {
status: 'processed',
outputUrl: completed.output_url,
processedAt: new Date(),
},
});
return { id, outputUrl: completed.output_url };
},
});
The concurrencyLimit: 10 means at most 10 of these tasks run at the same time across your entire Trigger.dev deployment. Adjust this based on your Ittybit plan limits.
Define the scheduled task
The scheduled task runs at 2 AM every night, queries for unprocessed records, and fans out to the per-item task using batchTrigger.
import { schedules, task } from '@trigger.dev/sdk/v3';
import { processMediaItem } from './process-media-item';
export const nightlyBatchProcess = schedules.task({
id: 'nightly-batch-process',
cron: '0 2 * * *',
run: async () => {
// Query for unprocessed media
const pending = await db.media.findMany({
where: { status: 'pending' },
orderBy: { createdAt: 'asc' },
take: 500,
});
if (pending.length === 0) {
console.log('No unprocessed media found.');
return { processed: 0 };
}
console.log(`Found ${pending.length} unprocessed items.`);
// Mark them as queued so the next run doesn't pick them up again
const ids = pending.map((item) => item.id);
await db.media.updateMany({
where: { id: { in: ids } },
data: { status: 'queued' },
});
// Fan out -- one sub-task per item
const handle = await processMediaItem.batchTrigger(
pending.map((item) => ({
payload: {
id: item.id,
sourceUrl: item.sourceUrl,
kind: item.kind ?? 'video',
options: {
width: 1920,
format: 'mp4',
quality: 'high',
},
},
})),
);
return {
processed: pending.length,
batchId: handle.batchId,
};
},
});
The take: 500 limits each run to 500 items. If your backlog is larger, the next night’s run picks up the rest. This prevents a single run from consuming unbounded resources.
Handle multiple processing profiles
If different media types need different processing options, map them before fanning out.
function getProcessingOptions(item: { kind: string; width?: number }) {
switch (item.kind) {
case 'video':
return {
kind: 'video',
options: {
width: 1920,
format: 'mp4',
codec: 'h264',
quality: 'high',
},
};
case 'image':
return {
kind: 'image',
options: {
width: item.width ?? 2048,
format: 'webp',
quality: 'high',
},
};
case 'audio':
return {
kind: 'audio',
options: {
format: 'mp3',
bitrate: '192k',
},
};
default:
return {
kind: item.kind,
options: { format: 'mp4' },
};
}
}
// In the scheduled task, replace the static options:
const handle = await processMediaItem.batchTrigger(
pending.map((item) => {
const profile = getProcessingOptions(item);
return {
payload: {
id: item.id,
sourceUrl: item.sourceUrl,
kind: profile.kind,
options: profile.options,
},
};
}),
);
Trigger the task from your API
Register the schedule and optionally expose an endpoint that triggers the batch manually — useful for backfills or testing.
import { schedules } from "@trigger.dev/sdk/v3";
import { nightlyBatchProcess } from "./trigger/nightly-batch";
// Manual trigger from a Next.js API route
export async function POST(req: Request) {
const handle = await schedules.trigger(
"nightly-batch-process",
);
return Response.json({
message: "Batch processing started",
id: handle.id,
});
}
import { schedules } from "@trigger.dev/sdk/v3";
import { nightlyBatchProcess } from "./trigger/nightly-batch";
app.post("/admin/batch-process", async (req, res) => {
const handle = await schedules.trigger(
"nightly-batch-process",
);
res.json({
message: "Batch processing started",
id: handle.id,
});
}); Monitor batch progress
Query your database to check how the batch is progressing. The per-item task updates each record’s status as it completes.
app.get('/admin/batch-status', async (req, res) => {
const counts = await db.media.groupBy({
by: ['status'],
_count: { id: true },
});
const summary = Object.fromEntries(counts.map((c) => [c.status, c._count.id]));
res.json(summary);
// { pending: 142, queued: 350, processed: 508, failed: 3 }
});
See also
- Trigger.dev docs
- Video processing pipeline with Trigger.dev — single-upload event-driven pattern
- Ittybit Task API reference
- Batch processing with Cloud Tasks — GCP-native alternative