# Batch processing with Cloud Tasks and Ittybit

Process large media libraries at scale using Cloud Tasks for rate-limited Ittybit dispatch

You have 50,000 videos in a Cloud Storage bucket and need them all transcoded by Friday. Hitting the Ittybit API in a tight loop will burn through rate limits and bury your error handling in retry spaghetti. Cloud Tasks solves this -- it queues one task per file, enforces a dispatch rate you control, and retries failures with exponential backoff. A webhook writes each result to BigQuery so you can track progress and run analytics on the finished library.

## Architecture

```
GCS manifest (JSONL) -> Cloud Run orchestrator -> Cloud Tasks queue -> Cloud Run handler -> Ittybit API
                                                                                              |
BigQuery dataset <- Cloud Run webhook handler <- Ittybit webhook
```

1. A JSONL manifest in Cloud Storage lists every file to process
2. A Cloud Run orchestrator reads the manifest and enqueues one Cloud Task per file
3. Cloud Tasks dispatches at a controlled rate (e.g. 50/s) with automatic retry
4. Each task handler POSTs to Ittybit's `/tasks` endpoint
5. Ittybit sends a webhook on completion
6. A webhook handler inserts the result into BigQuery

## Queue configuration

Create the queue with rate limiting and retry settings. This caps dispatch at 50 tasks per second with up to 5 retries per task.

```bash
gcloud tasks queues create ittybit-batch \
  --max-dispatches-per-second=50 \
  --max-concurrent-dispatches=200 \
  --max-attempts=5 \
  --min-backoff=10s \
  --max-backoff=300s
```

## Manifest format

Store a JSONL file in Cloud Storage with one entry per file. Each line includes the GCS path and any processing options.

```json
{"path": "gs://my-media-bucket/videos/001.mov", "kind": "video", "options": {"width": 1920, "format": "mp4", "quality": "high"}}
{"path": "gs://my-media-bucket/videos/002.mov", "kind": "video", "options": {"width": 1920, "format": "mp4", "quality": "high"}}
{"path": "gs://my-media-bucket/videos/003.mov", "kind": "video", "options": {"width": 1280, "format": "mp4", "quality": "medium"}}
```

## Orchestrator

The orchestrator reads the manifest and enqueues one Cloud Task per line. Run it as a Cloud Run job or trigger it manually.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

const tasks = new CloudTasksClient();
const storage = new Storage();

const PROJECT = process.env.GCP_PROJECT!;
const LOCATION = process.env.GCP_LOCATION!;
const QUEUE = "ittybit-batch";
const HANDLER_URL = process.env.HANDLER_URL!; // Cloud Run handler URL

async function enqueueManifest(bucket: string, manifestPath: string) {
const [content] = await storage.bucket(bucket).file(manifestPath).download();
const lines = content.toString().split("\n").filter(Boolean);

const parent = tasks.queuePath(PROJECT, LOCATION, QUEUE);

let enqueued = 0;
for (const line of lines) {
const item = JSON.parse(line);

    await tasks.createTask({
      parent,
      task: {
        httpRequest: {
          httpMethod: "POST",
          url: HANDLER_URL,
          headers: { "Content-Type": "application/json" },
          body: Buffer.from(JSON.stringify(item)).toString("base64"),
          oidcToken: {
            serviceAccountEmail: process.env.SERVICE_ACCOUNT_EMAIL,
          },
        },
      },
    });

    enqueued++;
    if (enqueued % 1000 === 0) {
      console.log(`Enqueued ${enqueued}/${lines.length}`);
    }

}

console.log(`Done. Enqueued ${enqueued} tasks.`);
}

enqueueManifest("my-media-bucket", "manifests/batch-2026-04.jsonl");

````

```python

from google.cloud import tasks_v2, storage

tasks_client = tasks_v2.CloudTasksClient()
storage_client = storage.Client()

PROJECT = os.environ["GCP_PROJECT"]
LOCATION = os.environ["GCP_LOCATION"]
QUEUE = "ittybit-batch"
HANDLER_URL = os.environ["HANDLER_URL"]
SERVICE_ACCOUNT_EMAIL = os.environ["SERVICE_ACCOUNT_EMAIL"]

def enqueue_manifest(bucket_name: str, manifest_path: str):
    blob = storage_client.bucket(bucket_name).blob(manifest_path)
    content = blob.download_as_text()
    lines = [line for line in content.strip().split("\n") if line]

    parent = tasks_client.queue_path(PROJECT, LOCATION, QUEUE)

    enqueued = 0
    for line in lines:
        item = json.loads(line)

        task = tasks_v2.Task(
            http_request=tasks_v2.HttpRequest(
                http_method=tasks_v2.HttpMethod.POST,
                url=HANDLER_URL,
                headers={"Content-Type": "application/json"},
                body=json.dumps(item).encode(),
                oidc_token=tasks_v2.OidcToken(
                    service_account_email=SERVICE_ACCOUNT_EMAIL,
                ),
            ),
        )

        tasks_client.create_task(
            parent=parent,
            task=task,
        )

        enqueued += 1
        if enqueued % 1000 == 0:
            print(f"Enqueued {enqueued}/{len(lines)}")

    print(f"Done. Enqueued {enqueued} tasks.")

enqueue_manifest("my-media-bucket", "manifests/batch-2026-04.jsonl")
````

</CodeGroup>

## Task handler

Each Cloud Task hits this handler, which generates a signed URL for the GCS file and POSTs a task to Ittybit. Deploy this as a Cloud Run service.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

const app = express();
app.use(express.json());

const storage = new Storage();
const ITTYBIT_API_KEY = process.env.ITTYBIT_API_KEY!;
const WEBHOOK_URL = process.env.WEBHOOK_URL!;

app.post("/", async (req, res) => {
const { path, kind, options } = req.body;

// Parse gs:// URL
const match = path.match(/^gs:\/\/([^/]+)\/(.+)$/);
if (!match) {
res.status(400).send("Invalid GCS path");
return;
}

const [, bucket, key] = match;

const [signedUrl] = await storage
.bucket(bucket)
.file(key)
.getSignedUrl({
version: "v4",
action: "read",
expires: Date.now() + 2 _ 60 _ 60 \* 1000,
});

const task = {
input: signedUrl,
kind,
options,
webhook_url: WEBHOOK_URL,
metadata: {
source_bucket: bucket,
source_key: key,
},
};

const response = await fetch("https://api.ittybit.com/jobs", {
method: "POST",
headers: {
Authorization: `Bearer ${ITTYBIT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify(task),
});

if (!response.ok) {
const body = await response.text();
console.error(`Ittybit error ${response.status}: ${body}`);
res.status(response.status >= 500 ? 500 : 400).send(body);
return;
}

const data = await response.json();
console.log(`Task created: ${data.id} for ${key}`);
res.status(200).json({ task_id: data.id });
});

app.listen(8080, () => console.log("Handler listening on :8080"));

````

```python

from flask import Flask, request, jsonify
from google.cloud import storage
from datetime import timedelta

app = Flask(__name__)

storage_client = storage.Client()
ITTYBIT_API_KEY = os.environ["ITTYBIT_API_KEY"]
WEBHOOK_URL = os.environ["WEBHOOK_URL"]

@app.route("/", methods=["POST"])
def handle_task():
    body = request.get_json()
    path = body["path"]
    kind = body["kind"]
    options = body.get("options", {})

    match = re.match(r"^gs://([^/]+)/(.+)$", path)
    if not match:
        return "Invalid GCS path", 400

    bucket_name, key = match.groups()

    blob = storage_client.bucket(bucket_name).blob(key)
    signed_url = blob.generate_signed_url(
        version="v4",
        expiration=timedelta(hours=2),
        method="GET",
    )

    task = {
        "input": signed_url,
        "kind": kind,
        "options": options,
        "webhook_url": WEBHOOK_URL,
        "metadata": {
            "source_bucket": bucket_name,
            "source_key": key,
        },
    }

    res = requests.post(
        "https://api.ittybit.com/jobs",
        headers={"Authorization": f"Bearer {ITTYBIT_API_KEY}"},
        json=task,
    )

    if not res.ok:
        print(f"Ittybit error {res.status_code}: {res.text}")
        status = 500 if res.status_code >= 500 else 400
        return res.text, status

    data = res.json()
    print(f"Task created: {data['id']} for {key}")
    return jsonify({"task_id": data["id"]})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)
````

</CodeGroup>

The handler returns a 5xx for upstream server errors so Cloud Tasks retries them, and a 4xx for bad requests so they go to the dead-letter queue instead of retrying forever.

## Webhook to BigQuery

Register this endpoint as your webhook URL. It receives task completion events and streams them into BigQuery.

<CodeGroup labels={["TypeScript", "Python"]}>
```typescript

const app = express();
app.use(express.json());

const bigquery = new BigQuery();
const DATASET = "media_processing";
const TABLE = "task_results";

app.post("/webhook", async (req, res) => {
const event = req.body;

if (event.type !== "job.succeeded" && event.type !== "job.failed") {
res.status(200).send("Ignored");
return;
}

const task = event.data;

const row = {
task_id: task.id,
status: task.status,
kind: task.kind,
source_bucket: task.metadata?.source_bucket,
source_key: task.metadata?.source_key,
output_url: task.output?.url || null,
width: task.output?.width || null,
height: task.output?.height || null,
duration: task.output?.duration || null,
format: task.output?.format || null,
duration_ms: task.duration_ms,
created_at: task.created_at,
completed_at: job.succeeded_at,
};

await bigquery.dataset(DATASET).table(TABLE).insert([row]);

console.log(`Recorded ${task.id}: ${task.status}`);
res.status(200).send("OK");
});

app.listen(8080, () => console.log("Webhook handler listening on :8080"));

````

```python

from flask import Flask, request
from google.cloud import bigquery

app = Flask(__name__)

bq = bigquery.Client()
DATASET = "media_processing"
TABLE = "task_results"

@app.route("/webhook", methods=["POST"])
def webhook():
    event = request.get_json(silent=True)

    if not event or event.get("type") not in ("job.succeeded", "job.failed"):
        return "Ignored", 200

    task = event["data"]
    output = task.get("output", {})

    row = {
        "task_id": task["id"],
        "status": task["status"],
        "kind": task["kind"],
        "source_bucket": task.get("metadata", {}).get("source_bucket"),
        "source_key": task.get("metadata", {}).get("source_key"),
        "output_url": output.get("url"),
        "width": output.get("width"),
        "height": output.get("height"),
        "duration": output.get("duration"),
        "format": output.get("format"),
        "duration_ms": task.get("duration_ms"),
        "created_at": task["created_at"],
        "completed_at": task.get("completed_at"),
    }

    table_ref = f"{bq.project}.{DATASET}.{TABLE}"
    errors = bq.insert_rows_json(table_ref, [row])

    if errors:
        print(f"BigQuery insert errors: {errors}")
        return "Insert error", 500

    print(f"Recorded {task['id']}: {task['status']}")
    return "OK", 200

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8080)
````

</CodeGroup>

## BigQuery schema

Create the dataset and table before running the webhook handler.

```sql
CREATE SCHEMA IF NOT EXISTS media_processing;

CREATE TABLE IF NOT EXISTS media_processing.task_results (
  task_id STRING NOT NULL,
  status STRING NOT NULL,
  kind STRING NOT NULL,
  source_bucket STRING,
  source_key STRING,
  output_url STRING,
  width INT64,
  height INT64,
  duration FLOAT64,
  format STRING,
  duration_ms INT64,
  created_at TIMESTAMP,
  completed_at TIMESTAMP
);
```

Once results are flowing in, query your batch progress:

```sql
-- Completion rate
SELECT
  status,
  COUNT(*) AS total,
  ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS pct
FROM media_processing.task_results
GROUP BY status;

-- Average processing time by kind
SELECT
  kind,
  COUNT(*) AS tasks,
  ROUND(AVG(duration_ms) / 1000, 1) AS avg_seconds
FROM media_processing.task_results
WHERE status = 'completed'
GROUP BY kind;
```

## Deploy

```bash
# Deploy the task handler
gcloud run deploy ittybit-task-handler \
  --source . \
  --region us-central1 \
  --set-env-vars ITTYBIT_API_KEY=$ITTYBIT_API_KEY,WEBHOOK_URL=$WEBHOOK_URL \
  --no-allow-unauthenticated

# Deploy the webhook handler (must be publicly reachable)
gcloud run deploy ittybit-webhook-handler \
  --source . \
  --region us-central1 \
  --allow-unauthenticated
```

After deploying, copy the webhook handler URL and set it as `WEBHOOK_URL` in the task handler's environment, and register it in the [Ittybit dashboard](https://ittybit.com/dashboard/webhooks).

## See also

- [Google Cloud serverless pipeline](/guides/google-cloud-serverless-video-pipeline) -- event-driven processing with Cloud Functions
- [Process files from S3](/guides/process-files-from-s3) -- setting up storage connections
- [Build a user upload pipeline](/guides/build-a-user-upload-pipeline) -- multi-step processing patterns