Batch processing with SQS and Ittybit
You have 50,000 videos in S3 that need transcoding. Hitting the API in a tight loop will get you rate-limited. SQS gives you a controlled buffer: enqueue every S3 key, let Lambda drain the queue at a pace the API can handle, and track results via webhooks. Add a dead-letter queue and nothing gets lost.
Architecture
S3 bucket listing -> enqueue script -> SQS (batch queue)
|
Lambda (consumer, concurrency: 2, batchSize: 5)
|
Ittybit API (POST /jobs)
|
DynamoDB <- Lambda (webhook) <- API Gateway <- Ittybit webhook
|
+-- SQS (DLQ) <- failed messages
Prerequisites
- An S3 bucket containing the files to process
- An Ittybit connection configured for your bucket
ITTYBIT_API_KEYstored in AWS Secrets Manager or SSM Parameter Store- A DynamoDB table for results (partition key
pk, sort keysk)
Create the queues
Two queues: one for work, one for failures. The dead-letter queue catches messages that fail after three delivery attempts.
# Dead-letter queue
aws sqs create-queue \
--queue-name media-batch-dlq
# Work queue with DLQ redrive policy
DLQ_ARN=$(aws sqs get-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789/media-batch-dlq \
--attribute-names QueueArn --query 'Attributes.QueueArn' --output text)
aws sqs create-queue \
--queue-name media-batch \
--attributes '{
"VisibilityTimeout": "120",
"RedrivePolicy": "{\"deadLetterTargetArn\":\"'$DLQ_ARN'\",\"maxReceiveCount\":\"3\"}"
}'
Enqueue S3 keys
List objects in your bucket and push each key onto the queue. This runs once to seed the backlog.
import {
S3Client,
ListObjectsV2Command,
} from "@aws-sdk/client-s3";
import { SQSClient, SendMessageBatchCommand } from "@aws-sdk/client-sqs";
const s3 = new S3Client({});
const sqs = new SQSClient({});
const BUCKET = "my-media-bucket";
const PREFIX = "uploads/";
const QUEUE_URL = process.env.SQS_QUEUE_URL!;
async function enqueue() {
let token: string | undefined;
let total = 0;
do {
const list = await s3.send(
new ListObjectsV2Command({
Bucket: BUCKET,
Prefix: PREFIX,
ContinuationToken: token,
})
);
const keys = (list.Contents ?? [])
.map((obj) => obj.Key!)
.filter((key) => /\.(mp4|mov|mkv|webm)$/i.test(key));
// SQS accepts up to 10 messages per batch
for (let i = 0; i < keys.length; i += 10) {
const batch = keys.slice(i, i + 10);
await sqs.send(
new SendMessageBatchCommand({
QueueUrl: QUEUE_URL,
Entries: batch.map((key, idx) => ({
Id: String(idx),
MessageBody: JSON.stringify({ bucket: BUCKET, key }),
})),
})
);
}
total += keys.length;
token = list.NextContinuationToken;
} while (token);
console.log(`Enqueued ${total} files`);
}
enqueue();
import json
import os
import boto3
BUCKET = "my-media-bucket"
PREFIX = "uploads/"
QUEUE_URL = os.environ["SQS_QUEUE_URL"]
s3 = boto3.client("s3")
sqs = boto3.client("sqs")
EXTENSIONS = {".mp4", ".mov", ".mkv", ".webm"}
def enqueue():
paginator = s3.get_paginator("list_objects_v2")
total = 0
for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX):
keys = [
obj["Key"]
for obj in page.get("Contents", [])
if any(obj["Key"].lower().endswith(ext) for ext in EXTENSIONS)
]
# SQS accepts up to 10 messages per batch
for i in range(0, len(keys), 10):
batch = keys[i : i + 10]
sqs.send_message_batch(
QueueUrl=QUEUE_URL,
Entries=[
{
"Id": str(idx),
"MessageBody": json.dumps({"bucket": BUCKET, "key": key}),
}
for idx, key in enumerate(batch)
],
)
total += len(keys)
print(f"Enqueued {total} files")
enqueue() Consumer Lambda
This Lambda is triggered by SQS with a batch size of 5. Set ReservedConcurrentExecutions to 2 so at most 10 requests hit Ittybit at any time (2 invocations x 5 messages each). Increase these numbers once you’ve confirmed your rate limits.
import { SQSEvent } from "aws-lambda";
const ITTYBIT_API_KEY = process.env.ITTYBIT_API_KEY!;
const WEBHOOK_URL = process.env.WEBHOOK_URL!;
const CONNECTION_ID = process.env.ITTYBIT_CONNECTION_ID!;
export const handler = async (event: SQSEvent) => {
const failures: { itemIdentifier: string }[] = [];
for (const record of event.Records) {
try {
const { bucket, key } = JSON.parse(record.body);
const input = `s3://${bucket}/${key}`;
const res = await fetch("https://api.ittybit.com/jobs", {
method: "POST",
headers: {
Authorization: `Bearer ${ITTYBIT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
input,
kind: "video",
options: {
connection_id: CONNECTION_ID,
width: 1920,
format: "mp4",
quality: "high",
},
webhook_url: WEBHOOK_URL,
metadata: { source_bucket: bucket, source_key: key },
}),
});
if (!res.ok) {
const body = await res.text();
console.error(`Failed ${key}: ${res.status} ${body}`);
failures.push({ itemIdentifier: record.messageId });
} else {
const data = await res.json();
console.log(`Task created for ${key}: ${data.id}`);
}
} catch (err) {
console.error(`Error processing ${record.messageId}:`, err);
failures.push({ itemIdentifier: record.messageId });
}
}
// Return failed message IDs so SQS only deletes the successful ones
return { batchItemFailures: failures };
};
import json
import os
import urllib.request
ITTYBIT_API_KEY = os.environ["ITTYBIT_API_KEY"]
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
CONNECTION_ID = os.environ["ITTYBIT_CONNECTION_ID"]
def handler(event, context):
failures = []
for record in event["Records"]:
try:
msg = json.loads(record["body"])
bucket = msg["bucket"]
key = msg["key"]
input_url = f"s3://{bucket}/{key}"
task = {
"input": input_url,
"kind": "video",
"options": {
"connection_id": CONNECTION_ID,
"width": 1920,
"format": "mp4",
"quality": "high",
},
"webhook_url": WEBHOOK_URL,
"metadata": {"source_bucket": bucket, "source_key": key},
}
req = urllib.request.Request(
"https://api.ittybit.com/jobs",
data=json.dumps(task).encode(),
headers={
"Authorization": f"Bearer {ITTYBIT_API_KEY}",
"Content-Type": "application/json",
},
method="POST",
)
with urllib.request.urlopen(req) as res:
data = json.loads(res.read())
print(f"Task created for {key}: {data['id']}")
except Exception as e:
print(f"Error processing {record['messageId']}: {e}")
failures.append({"itemIdentifier": record["messageId"]})
return {"batchItemFailures": failures} Event source mapping
Wire SQS to your Lambda. The batch size and concurrency cap are what keep you under rate limits.
aws lambda create-event-source-mapping \
--function-name media-batch-consumer \
--event-source-arn arn:aws:sqs:us-east-1:123456789:media-batch \
--batch-size 5 \
--maximum-batching-window-in-seconds 10 \
--function-response-types ReportBatchItemFailures \
--scaling-config '{"MaximumConcurrency": 2}'
Webhook Lambda
Ittybit POSTs to your webhook_url when each task completes. Write the result to DynamoDB so you can query progress across the batch.
import { DynamoDBClient, PutItemCommand } from "@aws-sdk/client-dynamodb";
import { marshall } from "@aws-sdk/util-dynamodb";
const db = new DynamoDBClient({});
const TABLE_NAME = process.env.TABLE_NAME!;
export const handler = async (event: { body: string }) => {
const payload = JSON.parse(event.body);
const task = payload.data;
await db.send(
new PutItemCommand({
TableName: TABLE_NAME,
Item: marshall({
pk: task.metadata.source_key,
sk: `task#${task.id}`,
status: task.status,
kind: task.kind,
output_url: task.output?.url,
duration_ms: task.duration_ms,
created_at: task.created_at,
completed_at: job.succeeded_at,
}),
})
);
console.log(`Stored result for ${task.id}: ${task.status}`);
return { statusCode: 200, body: "ok" };
};
import json
import os
import boto3
TABLE_NAME = os.environ["TABLE_NAME"]
table = boto3.resource("dynamodb").Table(TABLE_NAME)
def handler(event, context):
payload = json.loads(event["body"])
task = payload["data"]
table.put_item(
Item={
"pk": task["metadata"]["source_key"],
"sk": f"task#{task['id']}",
"status": task["status"],
"kind": task["kind"],
"output_url": task.get("output", {}).get("url"),
"duration_ms": task.get("duration_ms"),
"created_at": task["created_at"],
"completed_at": task.get("completed_at"),
}
)
print(f"Stored result for {task['id']}: {task['status']}")
return {"statusCode": 200, "body": "ok"} Monitor the DLQ
Messages that fail three times land in the dead-letter queue. Poll it periodically or set up a CloudWatch alarm so you know when something needs attention.
# Check how many messages are sitting in the DLQ
aws sqs get-queue-attributes \
--queue-url https://sqs.us-east-1.amazonaws.com/123456789/media-batch-dlq \
--attribute-names ApproximateNumberOfMessages
# Redrive failed messages back to the work queue after fixing the issue
aws sqs start-message-move-task \
--source-arn arn:aws:sqs:us-east-1:123456789:media-batch-dlq \
--destination-arn arn:aws:sqs:us-east-1:123456789:media-batch
Tuning throughput
The two knobs that matter are batch-size and MaximumConcurrency on the event source mapping. Together they determine peak API request rate:
| batch-size | MaximumConcurrency | Peak requests/invocation cycle |
|---|---|---|
| 5 | 2 | 10 |
| 10 | 5 | 50 |
| 10 | 10 | 100 |
Start low. Increase once you’ve confirmed your Ittybit plan’s rate limits can handle the throughput. The maximum-batching-window-in-seconds setting lets SQS wait up to that long to fill a batch before invoking Lambda, which reduces invocation count during low-traffic periods.
See also
- Process files from S3 — setting up S3 connections
- Write output to S3 — writing processed files back to your bucket
- AWS event-driven processing with EventBridge — real-time processing on upload
- Build a user upload pipeline — multi-step processing patterns