Google Cloud serverless video pipeline with Ittybit
When a video lands in Cloud Storage, an Eventarc trigger fires a Cloud Function that sends it to Ittybit for processing. When Ittybit finishes, it POSTs a webhook to a second Cloud Function that writes the result to Firestore.
Architecture
- User uploads a file to a Cloud Storage bucket
- Eventarc
google.cloud.storage.object.v1.finalizedevent triggers a Cloud Function - The function POSTs the file URL to Ittybit’s
/tasksendpoint - Ittybit processes the video and sends a webhook on completion
- A second Cloud Function receives the webhook and writes metadata to Firestore
Trigger function
This function runs whenever a new object is created in your bucket. It generates a signed URL and sends a task to Ittybit.
import { onObjectFinalized } from "firebase-functions/v2/storage";
import { GetSignedUrlConfig, Storage } from "@google-cloud/storage";
const storage = new Storage();
export const processUpload = onObjectFinalized(
{ bucket: "my-video-uploads" },
async (event) => {
const { bucket, name, contentType } = event.data;
if (!contentType?.startsWith("video/")) {
return;
}
const config: GetSignedUrlConfig = {
version: "v4",
action: "read",
expires: Date.now() + 60 * 60 * 1000,
};
const [signedUrl] = await storage
.bucket(bucket)
.file(name)
.getSignedUrl(config);
const task = {
input: signedUrl,
kind: "video",
options: {
width: 1920,
format: "mp4",
quality: "high",
},
metadata: {
bucket,
path: name,
},
};
const res = await fetch("https://api.ittybit.com/jobs", {
method: "POST",
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify(task),
});
if (!res.ok) {
throw new Error(`Ittybit API error: ${res.status}`);
}
}
);
import os
import functions_framework
from google.cloud import storage
from datetime import timedelta
import requests
storage_client = storage.Client()
@functions_framework.cloud_event
def process_upload(cloud_event):
data = cloud_event.data
bucket_name = data["bucket"]
file_name = data["name"]
content_type = data.get("contentType", "")
if not content_type.startswith("video/"):
return
blob = storage_client.bucket(bucket_name).blob(file_name)
signed_url = blob.generate_signed_url(
version="v4",
expiration=timedelta(hours=1),
method="GET",
)
task = {
"input": signed_url,
"kind": "video",
"options": {
"width": 1920,
"format": "mp4",
"quality": "high",
},
"metadata": {
"bucket": bucket_name,
"path": file_name,
},
}
res = requests.post(
"https://api.ittybit.com/jobs",
headers={"Authorization": f"Bearer {os.environ['ITTYBIT_API_KEY']}"},
json=task,
)
res.raise_for_status() Webhook function
Register this function’s URL as your webhook endpoint in the Ittybit dashboard. It receives task completion events and writes the output metadata to Firestore.
import { onRequest } from "firebase-functions/v2/https";
import { getFirestore } from "firebase-admin/firestore";
import { initializeApp } from "firebase-admin/app";
initializeApp();
const db = getFirestore();
export const ittybitWebhook = onRequest(async (req, res) => {
if (req.method !== "POST") {
res.status(405).send("Method not allowed");
return;
}
const event = req.body;
if (event.type !== "job.succeeded") {
res.status(200).send("Ignored");
return;
}
const { id, output, metadata } = event.data;
await db.collection("processed_videos").doc(id).set({
task_id: id,
output_url: output.url,
width: output.width,
height: output.height,
duration: output.duration,
format: output.format,
original_bucket: metadata?.bucket,
original_path: metadata?.path,
processed_at: new Date().toISOString(),
});
res.status(200).send("OK");
});
import functions_framework
from google.cloud import firestore
db = firestore.Client()
@functions_framework.http
def ittybit_webhook(request):
if request.method != "POST":
return "Method not allowed", 405
event = request.get_json(silent=True)
if not event or event.get("type") != "job.succeeded":
return "Ignored", 200
data = event["data"]
task_id = data["id"]
output = data["output"]
metadata = data.get("metadata", {})
db.collection("processed_videos").document(task_id).set({
"task_id": task_id,
"output_url": output["url"],
"width": output["width"],
"height": output["height"],
"duration": output["duration"],
"format": output["format"],
"original_bucket": metadata.get("bucket"),
"original_path": metadata.get("path"),
"processed_at": firestore.SERVER_TIMESTAMP,
})
return "OK", 200 Deploy
Deploy both functions with the gcloud CLI.
# Deploy the trigger function
gcloud functions deploy process-upload \
--gen2 \
--runtime nodejs20 \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=my-video-uploads" \
--set-env-vars ITTYBIT_API_KEY=$ITTYBIT_API_KEY \
--source .
# Deploy the webhook function
gcloud functions deploy ittybit-webhook \
--gen2 \
--runtime nodejs20 \
--trigger-http \
--allow-unauthenticated \
--source .
# Deploy the trigger function
gcloud functions deploy process-upload \
--gen2 \
--runtime python312 \
--trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
--trigger-event-filters="bucket=my-video-uploads" \
--set-env-vars ITTYBIT_API_KEY=$ITTYBIT_API_KEY \
--entry-point process_upload \
--source .
# Deploy the webhook function
gcloud functions deploy ittybit-webhook \
--gen2 \
--runtime python312 \
--trigger-http \
--allow-unauthenticated \
--entry-point ittybit_webhook \
--source . After deploying, copy the webhook function URL and add it in the Ittybit dashboard under webhook endpoints.