# Batch processing with SQS and Ittybit

Process large media backlogs using SQS for rate-limited dispatch to Ittybit

You have 50,000 videos in S3 that need transcoding. Hitting the API in a tight loop will get you rate-limited. SQS gives you a controlled buffer: enqueue every S3 key, let Lambda drain the queue at a pace the API can handle, and track results via webhooks. Add a dead-letter queue and nothing gets lost.

## Architecture

```
S3 bucket listing -> enqueue script -> SQS (batch queue)
                                          |
                        Lambda (consumer, concurrency: 2, batchSize: 5)
                                          |
                                    Ittybit API (POST /jobs)
                                          |
DynamoDB <- Lambda (webhook) <- API Gateway <- Ittybit webhook
     |
     +-- SQS (DLQ) <- failed messages
```

## Prerequisites

- An S3 bucket containing the files to process
- An [Ittybit connection](/guides/process-files-from-s3) configured for your bucket
- `ITTYBIT_API_KEY` stored in AWS Secrets Manager or SSM Parameter Store
- A DynamoDB table for results (partition key `pk`, sort key `sk`)

## Create the queues

Two queues: one for work, one for failures. The dead-letter queue catches messages that fail after three delivery attempts.

```bash
# Dead-letter queue
aws sqs create-queue \
  --queue-name media-batch-dlq

# Work queue with DLQ redrive policy
DLQ_ARN=$(aws sqs get-queue-attributes \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789/media-batch-dlq \
  --attribute-names QueueArn --query 'Attributes.QueueArn' --output text)

aws sqs create-queue \
  --queue-name media-batch \
  --attributes '{
    "VisibilityTimeout": "120",
    "RedrivePolicy": "{\"deadLetterTargetArn\":\"'$DLQ_ARN'\",\"maxReceiveCount\":\"3\"}"
  }'
```

## Enqueue S3 keys

List objects in your bucket and push each key onto the queue. This runs once to seed the backlog.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

  S3Client,
  ListObjectsV2Command,
} from "@aws-sdk/client-s3";

const s3 = new S3Client({});
const sqs = new SQSClient({});

const BUCKET = "my-media-bucket";
const PREFIX = "uploads/";
const QUEUE_URL = process.env.SQS_QUEUE_URL!;

async function enqueue() {
let token: string | undefined;
let total = 0;

do {
const list = await s3.send(
new ListObjectsV2Command({
Bucket: BUCKET,
Prefix: PREFIX,
ContinuationToken: token,
})
);

    const keys = (list.Contents ?? [])
      .map((obj) => obj.Key!)
      .filter((key) => /\.(mp4|mov|mkv|webm)$/i.test(key));

    // SQS accepts up to 10 messages per batch
    for (let i = 0; i < keys.length; i += 10) {
      const batch = keys.slice(i, i + 10);
      await sqs.send(
        new SendMessageBatchCommand({
          QueueUrl: QUEUE_URL,
          Entries: batch.map((key, idx) => ({
            Id: String(idx),
            MessageBody: JSON.stringify({ bucket: BUCKET, key }),
          })),
        })
      );
    }

    total += keys.length;
    token = list.NextContinuationToken;

} while (token);

console.log(`Enqueued ${total} files`);
}

enqueue();

````

```python

BUCKET = "my-media-bucket"
PREFIX = "uploads/"
QUEUE_URL = os.environ["SQS_QUEUE_URL"]

s3 = boto3.client("s3")
sqs = boto3.client("sqs")

EXTENSIONS = {".mp4", ".mov", ".mkv", ".webm"}

def enqueue():
    paginator = s3.get_paginator("list_objects_v2")
    total = 0

    for page in paginator.paginate(Bucket=BUCKET, Prefix=PREFIX):
        keys = [
            obj["Key"]
            for obj in page.get("Contents", [])
            if any(obj["Key"].lower().endswith(ext) for ext in EXTENSIONS)
        ]

        # SQS accepts up to 10 messages per batch
        for i in range(0, len(keys), 10):
            batch = keys[i : i + 10]
            sqs.send_message_batch(
                QueueUrl=QUEUE_URL,
                Entries=[
                    {
                        "Id": str(idx),
                        "MessageBody": json.dumps({"bucket": BUCKET, "key": key}),
                    }
                    for idx, key in enumerate(batch)
                ],
            )

        total += len(keys)

    print(f"Enqueued {total} files")

enqueue()
````

</CodeGroup>

## Consumer Lambda

This Lambda is triggered by SQS with a batch size of 5. Set `ReservedConcurrentExecutions` to 2 so at most 10 requests hit Ittybit at any time (2 invocations x 5 messages each). Increase these numbers once you've confirmed your rate limits.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

const ITTYBIT_API_KEY = process.env.ITTYBIT_API_KEY!;
const WEBHOOK_URL = process.env.WEBHOOK_URL!;
const CONNECTION_ID = process.env.ITTYBIT_CONNECTION_ID!;

export const handler = async (event: SQSEvent) => {
  const failures: { itemIdentifier: string }[] = [];

for (const record of event.Records) {
try {
const { bucket, key } = JSON.parse(record.body);
const input = `s3://${bucket}/${key}`;

      const res = await fetch("https://api.ittybit.com/jobs", {
        method: "POST",
        headers: {
          Authorization: `Bearer ${ITTYBIT_API_KEY}`,
          "Content-Type": "application/json",
        },
        body: JSON.stringify({
          input,
          kind: "video",
          options: {
            connection_id: CONNECTION_ID,
            width: 1920,
            format: "mp4",
            quality: "high",
          },
          webhook_url: WEBHOOK_URL,
          metadata: { source_bucket: bucket, source_key: key },
        }),
      });

      if (!res.ok) {
        const body = await res.text();
        console.error(`Failed ${key}: ${res.status} ${body}`);
        failures.push({ itemIdentifier: record.messageId });
      } else {
        const data = await res.json();
        console.log(`Task created for ${key}: ${data.id}`);
      }
    } catch (err) {
      console.error(`Error processing ${record.messageId}:`, err);
      failures.push({ itemIdentifier: record.messageId });
    }

}

// Return failed message IDs so SQS only deletes the successful ones
return { batchItemFailures: failures };
};

````

```python

ITTYBIT_API_KEY = os.environ["ITTYBIT_API_KEY"]
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
CONNECTION_ID = os.environ["ITTYBIT_CONNECTION_ID"]

def handler(event, context):
    failures = []

    for record in event["Records"]:
        try:
            msg = json.loads(record["body"])
            bucket = msg["bucket"]
            key = msg["key"]
            input_url = f"s3://{bucket}/{key}"

            task = {
                "input": input_url,
                "kind": "video",
                "options": {
                    "connection_id": CONNECTION_ID,
                    "width": 1920,
                    "format": "mp4",
                    "quality": "high",
                },
                "webhook_url": WEBHOOK_URL,
                "metadata": {"source_bucket": bucket, "source_key": key},
            }

            req = urllib.request.Request(
                "https://api.ittybit.com/jobs",
                data=json.dumps(task).encode(),
                headers={
                    "Authorization": f"Bearer {ITTYBIT_API_KEY}",
                    "Content-Type": "application/json",
                },
                method="POST",
            )

            with urllib.request.urlopen(req) as res:
                data = json.loads(res.read())
                print(f"Task created for {key}: {data['id']}")

        except Exception as e:
            print(f"Error processing {record['messageId']}: {e}")
            failures.append({"itemIdentifier": record["messageId"]})

    return {"batchItemFailures": failures}
````

</CodeGroup>

## Event source mapping

Wire SQS to your Lambda. The batch size and concurrency cap are what keep you under rate limits.

```bash
aws lambda create-event-source-mapping \
  --function-name media-batch-consumer \
  --event-source-arn arn:aws:sqs:us-east-1:123456789:media-batch \
  --batch-size 5 \
  --maximum-batching-window-in-seconds 10 \
  --function-response-types ReportBatchItemFailures \
  --scaling-config '{"MaximumConcurrency": 2}'
```

## Webhook Lambda

Ittybit POSTs to your `webhook_url` when each task completes. Write the result to DynamoDB so you can query progress across the batch.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

const db = new DynamoDBClient({});
const TABLE_NAME = process.env.TABLE_NAME!;

export const handler = async (event: { body: string }) => {
  const payload = JSON.parse(event.body);
  const task = payload.data;

await db.send(
new PutItemCommand({
TableName: TABLE_NAME,
Item: marshall({
pk: task.metadata.source_key,
sk: `task#${task.id}`,
status: task.status,
kind: task.kind,
output_url: task.output?.url,
duration_ms: task.duration_ms,
created_at: task.created_at,
completed_at: job.succeeded_at,
}),
})
);

console.log(`Stored result for ${task.id}: ${task.status}`);
return { statusCode: 200, body: "ok" };
};

````

```python

TABLE_NAME = os.environ["TABLE_NAME"]
table = boto3.resource("dynamodb").Table(TABLE_NAME)

def handler(event, context):
    payload = json.loads(event["body"])
    task = payload["data"]

    table.put_item(
        Item={
            "pk": task["metadata"]["source_key"],
            "sk": f"task#{task['id']}",
            "status": task["status"],
            "kind": task["kind"],
            "output_url": task.get("output", {}).get("url"),
            "duration_ms": task.get("duration_ms"),
            "created_at": task["created_at"],
            "completed_at": task.get("completed_at"),
        }
    )

    print(f"Stored result for {task['id']}: {task['status']}")
    return {"statusCode": 200, "body": "ok"}
````

</CodeGroup>

## Monitor the DLQ

Messages that fail three times land in the dead-letter queue. Poll it periodically or set up a CloudWatch alarm so you know when something needs attention.

```bash
# Check how many messages are sitting in the DLQ
aws sqs get-queue-attributes \
  --queue-url https://sqs.us-east-1.amazonaws.com/123456789/media-batch-dlq \
  --attribute-names ApproximateNumberOfMessages

# Redrive failed messages back to the work queue after fixing the issue
aws sqs start-message-move-task \
  --source-arn arn:aws:sqs:us-east-1:123456789:media-batch-dlq \
  --destination-arn arn:aws:sqs:us-east-1:123456789:media-batch
```

## Tuning throughput

The two knobs that matter are `batch-size` and `MaximumConcurrency` on the event source mapping. Together they determine peak API request rate:

| batch-size | MaximumConcurrency | Peak requests/invocation cycle |
| ---------- | ------------------ | ------------------------------ |
| 5          | 2                  | 10                             |
| 10         | 5                  | 50                             |
| 10         | 10                 | 100                            |

Start low. Increase once you've confirmed your Ittybit plan's rate limits can handle the throughput. The `maximum-batching-window-in-seconds` setting lets SQS wait up to that long to fill a batch before invoking Lambda, which reduces invocation count during low-traffic periods.

## See also

- [Process files from S3](/guides/process-files-from-s3) -- setting up S3 connections
- [Write output to S3](/guides/write-output-to-s3) -- writing processed files back to your bucket
- [AWS event-driven processing with EventBridge](/guides/aws-event-driven-media-processing) -- real-time processing on upload
- [Build a user upload pipeline](/guides/build-a-user-upload-pipeline) -- multi-step processing patterns