Batch processing with Cloud Tasks and Ittybit
You have 50,000 videos in a Cloud Storage bucket and need them all transcoded by Friday. Hitting the Ittybit API in a tight loop will burn through rate limits and bury your error handling in retry spaghetti. Cloud Tasks solves this — it queues one task per file, enforces a dispatch rate you control, and retries failures with exponential backoff. A webhook writes each result to BigQuery so you can track progress and run analytics on the finished library.
Architecture
GCS manifest (JSONL) -> Cloud Run orchestrator -> Cloud Tasks queue -> Cloud Run handler -> Ittybit API
|
BigQuery dataset <- Cloud Run webhook handler <- Ittybit webhook
- A JSONL manifest in Cloud Storage lists every file to process
- A Cloud Run orchestrator reads the manifest and enqueues one Cloud Task per file
- Cloud Tasks dispatches at a controlled rate (e.g. 50/s) with automatic retry
- Each task handler POSTs to Ittybit’s
/tasksendpoint - Ittybit sends a webhook on completion
- A webhook handler inserts the result into BigQuery
Queue configuration
Create the queue with rate limiting and retry settings. This caps dispatch at 50 tasks per second with up to 5 retries per task.
gcloud tasks queues create ittybit-batch \
--max-dispatches-per-second=50 \
--max-concurrent-dispatches=200 \
--max-attempts=5 \
--min-backoff=10s \
--max-backoff=300s
Manifest format
Store a JSONL file in Cloud Storage with one entry per file. Each line includes the GCS path and any processing options.
{"path": "gs://my-media-bucket/videos/001.mov", "kind": "video", "options": {"width": 1920, "format": "mp4", "quality": "high"}}
{"path": "gs://my-media-bucket/videos/002.mov", "kind": "video", "options": {"width": 1920, "format": "mp4", "quality": "high"}}
{"path": "gs://my-media-bucket/videos/003.mov", "kind": "video", "options": {"width": 1280, "format": "mp4", "quality": "medium"}}
Orchestrator
The orchestrator reads the manifest and enqueues one Cloud Task per line. Run it as a Cloud Run job or trigger it manually.
import { CloudTasksClient } from "@google-cloud/tasks";
import { Storage } from "@google-cloud/storage";
const tasks = new CloudTasksClient();
const storage = new Storage();
const PROJECT = process.env.GCP_PROJECT!;
const LOCATION = process.env.GCP_LOCATION!;
const QUEUE = "ittybit-batch";
const HANDLER_URL = process.env.HANDLER_URL!; // Cloud Run handler URL
async function enqueueManifest(bucket: string, manifestPath: string) {
const [content] = await storage.bucket(bucket).file(manifestPath).download();
const lines = content.toString().split("\n").filter(Boolean);
const parent = tasks.queuePath(PROJECT, LOCATION, QUEUE);
let enqueued = 0;
for (const line of lines) {
const item = JSON.parse(line);
await tasks.createTask({
parent,
task: {
httpRequest: {
httpMethod: "POST",
url: HANDLER_URL,
headers: { "Content-Type": "application/json" },
body: Buffer.from(JSON.stringify(item)).toString("base64"),
oidcToken: {
serviceAccountEmail: process.env.SERVICE_ACCOUNT_EMAIL,
},
},
},
});
enqueued++;
if (enqueued % 1000 === 0) {
console.log(`Enqueued ${enqueued}/${lines.length}`);
}
}
console.log(`Done. Enqueued ${enqueued} tasks.`);
}
enqueueManifest("my-media-bucket", "manifests/batch-2026-04.jsonl");
import json
import os
import base64
from google.cloud import tasks_v2, storage
tasks_client = tasks_v2.CloudTasksClient()
storage_client = storage.Client()
PROJECT = os.environ["GCP_PROJECT"]
LOCATION = os.environ["GCP_LOCATION"]
QUEUE = "ittybit-batch"
HANDLER_URL = os.environ["HANDLER_URL"]
SERVICE_ACCOUNT_EMAIL = os.environ["SERVICE_ACCOUNT_EMAIL"]
def enqueue_manifest(bucket_name: str, manifest_path: str):
blob = storage_client.bucket(bucket_name).blob(manifest_path)
content = blob.download_as_text()
lines = [line for line in content.strip().split("\n") if line]
parent = tasks_client.queue_path(PROJECT, LOCATION, QUEUE)
enqueued = 0
for line in lines:
item = json.loads(line)
task = tasks_v2.Task(
http_request=tasks_v2.HttpRequest(
http_method=tasks_v2.HttpMethod.POST,
url=HANDLER_URL,
headers={"Content-Type": "application/json"},
body=json.dumps(item).encode(),
oidc_token=tasks_v2.OidcToken(
service_account_email=SERVICE_ACCOUNT_EMAIL,
),
),
)
tasks_client.create_task(
parent=parent,
task=task,
)
enqueued += 1
if enqueued % 1000 == 0:
print(f"Enqueued {enqueued}/{len(lines)}")
print(f"Done. Enqueued {enqueued} tasks.")
enqueue_manifest("my-media-bucket", "manifests/batch-2026-04.jsonl") Task handler
Each Cloud Task hits this handler, which generates a signed URL for the GCS file and POSTs a task to Ittybit. Deploy this as a Cloud Run service.
import express from "express";
import { Storage } from "@google-cloud/storage";
const app = express();
app.use(express.json());
const storage = new Storage();
const ITTYBIT_API_KEY = process.env.ITTYBIT_API_KEY!;
const WEBHOOK_URL = process.env.WEBHOOK_URL!;
app.post("/", async (req, res) => {
const { path, kind, options } = req.body;
// Parse gs:// URL
const match = path.match(/^gs:\/\/([^/]+)\/(.+)$/);
if (!match) {
res.status(400).send("Invalid GCS path");
return;
}
const [, bucket, key] = match;
const [signedUrl] = await storage
.bucket(bucket)
.file(key)
.getSignedUrl({
version: "v4",
action: "read",
expires: Date.now() + 2 _ 60 _ 60 \* 1000,
});
const task = {
input: signedUrl,
kind,
options,
webhook_url: WEBHOOK_URL,
metadata: {
source_bucket: bucket,
source_key: key,
},
};
const response = await fetch("https://api.ittybit.com/jobs", {
method: "POST",
headers: {
Authorization: `Bearer ${ITTYBIT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify(task),
});
if (!response.ok) {
const body = await response.text();
console.error(`Ittybit error ${response.status}: ${body}`);
res.status(response.status >= 500 ? 500 : 400).send(body);
return;
}
const data = await response.json();
console.log(`Task created: ${data.id} for ${key}`);
res.status(200).json({ task_id: data.id });
});
app.listen(8080, () => console.log("Handler listening on :8080"));
import os
import re
import json
from flask import Flask, request, jsonify
from google.cloud import storage
from datetime import timedelta
import requests
app = Flask(__name__)
storage_client = storage.Client()
ITTYBIT_API_KEY = os.environ["ITTYBIT_API_KEY"]
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
@app.route("/", methods=["POST"])
def handle_task():
body = request.get_json()
path = body["path"]
kind = body["kind"]
options = body.get("options", {})
match = re.match(r"^gs://([^/]+)/(.+)$", path)
if not match:
return "Invalid GCS path", 400
bucket_name, key = match.groups()
blob = storage_client.bucket(bucket_name).blob(key)
signed_url = blob.generate_signed_url(
version="v4",
expiration=timedelta(hours=2),
method="GET",
)
task = {
"input": signed_url,
"kind": kind,
"options": options,
"webhook_url": WEBHOOK_URL,
"metadata": {
"source_bucket": bucket_name,
"source_key": key,
},
}
res = requests.post(
"https://api.ittybit.com/jobs",
headers={"Authorization": f"Bearer {ITTYBIT_API_KEY}"},
json=task,
)
if not res.ok:
print(f"Ittybit error {res.status_code}: {res.text}")
status = 500 if res.status_code >= 500 else 400
return res.text, status
data = res.json()
print(f"Task created: {data['id']} for {key}")
return jsonify({"task_id": data["id"]})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080) The handler returns a 5xx for upstream server errors so Cloud Tasks retries them, and a 4xx for bad requests so they go to the dead-letter queue instead of retrying forever.
Webhook to BigQuery
Register this endpoint as your webhook URL. It receives task completion events and streams them into BigQuery.
import express from "express";
import { BigQuery } from "@google-cloud/bigquery";
const app = express();
app.use(express.json());
const bigquery = new BigQuery();
const DATASET = "media_processing";
const TABLE = "task_results";
app.post("/webhook", async (req, res) => {
const event = req.body;
if (event.type !== "job.succeeded" && event.type !== "job.failed") {
res.status(200).send("Ignored");
return;
}
const task = event.data;
const row = {
task_id: task.id,
status: task.status,
kind: task.kind,
source_bucket: task.metadata?.source_bucket,
source_key: task.metadata?.source_key,
output_url: task.output?.url || null,
width: task.output?.width || null,
height: task.output?.height || null,
duration: task.output?.duration || null,
format: task.output?.format || null,
duration_ms: task.duration_ms,
created_at: task.created_at,
completed_at: job.succeeded_at,
};
await bigquery.dataset(DATASET).table(TABLE).insert([row]);
console.log(`Recorded ${task.id}: ${task.status}`);
res.status(200).send("OK");
});
app.listen(8080, () => console.log("Webhook handler listening on :8080"));
import os
from flask import Flask, request
from google.cloud import bigquery
app = Flask(__name__)
bq = bigquery.Client()
DATASET = "media_processing"
TABLE = "task_results"
@app.route("/webhook", methods=["POST"])
def webhook():
event = request.get_json(silent=True)
if not event or event.get("type") not in ("job.succeeded", "job.failed"):
return "Ignored", 200
task = event["data"]
output = task.get("output", {})
row = {
"task_id": task["id"],
"status": task["status"],
"kind": task["kind"],
"source_bucket": task.get("metadata", {}).get("source_bucket"),
"source_key": task.get("metadata", {}).get("source_key"),
"output_url": output.get("url"),
"width": output.get("width"),
"height": output.get("height"),
"duration": output.get("duration"),
"format": output.get("format"),
"duration_ms": task.get("duration_ms"),
"created_at": task["created_at"],
"completed_at": task.get("completed_at"),
}
table_ref = f"{bq.project}.{DATASET}.{TABLE}"
errors = bq.insert_rows_json(table_ref, [row])
if errors:
print(f"BigQuery insert errors: {errors}")
return "Insert error", 500
print(f"Recorded {task['id']}: {task['status']}")
return "OK", 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080) BigQuery schema
Create the dataset and table before running the webhook handler.
CREATE SCHEMA IF NOT EXISTS media_processing;
CREATE TABLE IF NOT EXISTS media_processing.task_results (
task_id STRING NOT NULL,
status STRING NOT NULL,
kind STRING NOT NULL,
source_bucket STRING,
source_key STRING,
output_url STRING,
width INT64,
height INT64,
duration FLOAT64,
format STRING,
duration_ms INT64,
created_at TIMESTAMP,
completed_at TIMESTAMP
);
Once results are flowing in, query your batch progress:
-- Completion rate
SELECT
status,
COUNT(*) AS total,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS pct
FROM media_processing.task_results
GROUP BY status;
-- Average processing time by kind
SELECT
kind,
COUNT(*) AS tasks,
ROUND(AVG(duration_ms) / 1000, 1) AS avg_seconds
FROM media_processing.task_results
WHERE status = 'completed'
GROUP BY kind;
Deploy
# Deploy the task handler
gcloud run deploy ittybit-task-handler \
--source . \
--region us-central1 \
--set-env-vars ITTYBIT_API_KEY=$ITTYBIT_API_KEY,WEBHOOK_URL=$WEBHOOK_URL \
--no-allow-unauthenticated
# Deploy the webhook handler (must be publicly reachable)
gcloud run deploy ittybit-webhook-handler \
--source . \
--region us-central1 \
--allow-unauthenticated
After deploying, copy the webhook handler URL and set it as WEBHOOK_URL in the task handler’s environment, and register it in the Ittybit dashboard.
See also
- Google Cloud serverless pipeline — event-driven processing with Cloud Functions
- Process files from S3 — setting up storage connections
- Build a user upload pipeline — multi-step processing patterns