Human-in-the-loop video review with Temporal and Ittybit
Not every video should go straight to production. User-generated content, compliance-sensitive material, or anything with legal exposure needs a human to sign off before it’s published. But humans are slow, and a video stuck in limbo forever is worse than no review at all. This guide builds a Temporal workflow that transcodes a video with Ittybit, pauses for human approval via a signal, and automatically escalates if nobody responds within 24 hours.
Project setup
npm init -y
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
Your project structure:
src/
activities.ts # Ittybit API calls + publish/delete/escalate
workflows.ts # Review workflow with signal and timeout
worker.ts # Temporal worker
client.ts # Start a workflow
signal.ts # Send approve/reject from your API
Activities
Each activity is a plain async function. Temporal retries them on failure. The Ittybit calls create a transcoding task and poll until it finishes. The publish, delete, and escalate activities represent whatever your system does next.
// src/activities.ts
import { heartbeat } from '@temporalio/activity';
const ITTYBIT_API = 'https://api.ittybit.com';
interface TaskResult {
id: string;
status: 'queued' | 'processing' | 'succeeded' | 'failed';
output?: { url: string };
}
async function createAndAwaitTask(body: Record<string, unknown>): Promise<TaskResult> {
const res = await fetch(`${ITTYBIT_API}/tasks`, {
method: 'POST',
headers: {
Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(body),
});
if (!res.ok) {
throw new Error(`Ittybit returned ${res.status}: ${await res.text()}`);
}
const task: TaskResult = await res.json();
while (task.status === 'queued' || task.status === 'processing') {
await new Promise((r) => setTimeout(r, 3_000));
heartbeat();
const poll = await fetch(`${ITTYBIT_API}/tasks/${task.id}`, {
headers: { Authorization: `Bearer ${process.env.ITTYBIT_API_KEY}` },
});
const updated: TaskResult = await poll.json();
task.status = updated.status;
task.output = updated.output;
}
if (task.status === 'failed') {
throw new Error(`Task ${task.id} failed`);
}
return task;
}
export async function transcodeVideo(sourceUrl: string): Promise<string> {
const task = await createAndAwaitTask({
input: sourceUrl,
kind: 'video',
options: {
width: 1920,
format: 'mp4',
codec: 'h264',
quality: 'high',
},
});
return task.output!.url;
}
export async function publishVideo(videoUrl: string): Promise<void> {
// Move the video to your public CDN, update your database, notify subscribers
console.log(`Publishing: ${videoUrl}`);
}
export async function deleteVideo(videoUrl: string): Promise<void> {
// Remove the transcoded file from storage
console.log(`Deleting rejected video: ${videoUrl}`);
}
export async function escalateVideo(videoUrl: string, workflowId: string): Promise<void> {
// Notify a supervisor -- Slack message, PagerDuty, email, etc.
console.log(`Escalating: no review after 24h. Workflow ${workflowId}, video: ${videoUrl}`);
}
Workflow
This is where the human-in-the-loop pattern lives. The workflow transcodes the video, then races a signal handler against a 24-hour timer. Three outcomes:
- Approved — the video is published.
- Rejected — the video is deleted.
- Timeout — nobody responded, so the workflow escalates.
// src/workflows.ts
import {
proxyActivities,
defineSignal,
setHandler,
condition,
workflowInfo,
} from '@temporalio/workflow';
import type * as activities from './activities';
const { transcodeVideo, publishVideo, deleteVideo, escalateVideo } = proxyActivities<
typeof activities
>({
startToCloseTimeout: '15 minutes',
heartbeatTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
initialInterval: '5s',
backoffCoefficient: 2,
},
});
// Signal definition -- external callers use this to approve or reject
export const reviewDecisionSignal =
defineSignal<[{ decision: 'approve' | 'reject'; reviewer: string }]>('reviewDecision');
export interface ReviewResult {
videoUrl: string;
outcome: 'approved' | 'rejected' | 'escalated';
reviewer?: string;
}
export async function videoReviewWorkflow(sourceUrl: string): Promise<ReviewResult> {
// Stage 1: Transcode the video
const videoUrl = await transcodeVideo(sourceUrl);
// Stage 2: Wait for human review with a 24-hour deadline
let decision: 'approve' | 'reject' | undefined;
let reviewer: string | undefined;
setHandler(reviewDecisionSignal, (signal) => {
decision = signal.decision;
reviewer = signal.reviewer;
});
// condition() returns true when the callback returns true,
// or false if the timeout elapses first
const receivedDecision = await condition(() => decision !== undefined, '24 hours');
// Stage 3: Act on the outcome
if (!receivedDecision) {
// Timeout -- no one reviewed within 24 hours
await escalateVideo(videoUrl, workflowInfo().workflowId);
return { videoUrl, outcome: 'escalated' };
}
if (decision === 'approve') {
await publishVideo(videoUrl);
return { videoUrl, outcome: 'approved', reviewer };
}
// Rejected
await deleteVideo(videoUrl);
return { videoUrl, outcome: 'rejected', reviewer };
}
The condition call is the key. It blocks the workflow until either the signal sets decision or 24 hours pass. No polling, no cron jobs, no external timers. Temporal tracks the deadline durably — if the worker restarts, the timer picks up where it left off.
Worker
// src/worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'video-review',
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
Start a review workflow
// src/client.ts
import { Client } from '@temporalio/client';
import { videoReviewWorkflow } from './workflows';
async function run() {
const client = new Client();
const handle = await client.workflow.start(videoReviewWorkflow, {
taskQueue: 'video-review',
workflowId: `review-${Date.now()}`,
args: ['https://example.com/uploads/user-video.mov'],
});
console.log(`Review workflow started: ${handle.workflowId}`);
// The workflow is now running -- it will transcode the video
// and wait for a signal before proceeding
}
run().catch(console.error);
Send a review decision
When a moderator clicks approve or reject in your UI, signal the running workflow.
// src/signal.ts
import { Client } from "@temporalio/client";
import { reviewDecisionSignal } from "./workflows";
async function sendDecision(
workflowId: string,
decision: "approve" | "reject",
reviewer: string,
) {
const client = new Client();
const handle = client.workflow.getHandle(workflowId);
await handle.signal(reviewDecisionSignal, {
decision,
reviewer,
});
console.log(`Sent "${decision}" to workflow ${workflowId}`);
}
// Example: approve a specific workflow
sendDecision("review-1712345678", "approve", "moderator@example.com");
import { Client } from "@temporalio/client";
import { reviewDecisionSignal } from "./workflows";
import express from "express";
const app = express();
app.use(express.json());
const temporal = new Client();
app.post("/review/:workflowId", async (req, res) => {
const { workflowId } = req.params;
const { decision, reviewer } = req.body;
const handle = temporal.workflow.getHandle(workflowId);
await handle.signal(reviewDecisionSignal, { decision, reviewer });
res.json({ status: "ok", workflowId, decision });
});
app.listen(3000); Adding a review queue UI
The workflow ID is all you need to connect your review UI to the running workflow. A typical pattern:
- When the workflow starts, store the workflow ID alongside the video metadata in your database.
- Your review dashboard queries for videos with a
pending_reviewstatus. - When a moderator approves or rejects, your API sends the signal to Temporal.
- The workflow resumes, publishes or deletes the video, and your webhook or callback updates the database status.
// Example: store workflow ID when starting the review
const handle = await client.workflow.start(videoReviewWorkflow, {
taskQueue: 'video-review',
workflowId: `review-${videoId}`,
args: [videoUrl],
});
await db.videos.update({
where: { id: videoId },
data: {
status: 'pending_review',
workflowId: handle.workflowId,
},
});
Customizing the timeout
The 24-hour deadline is a workflow constant, but you can make it configurable by passing it as an argument:
export async function videoReviewWorkflow(
sourceUrl: string,
timeoutHours: number = 24,
): Promise<ReviewResult> {
const videoUrl = await transcodeVideo(sourceUrl);
let decision: 'approve' | 'reject' | undefined;
let reviewer: string | undefined;
setHandler(reviewDecisionSignal, (signal) => {
decision = signal.decision;
reviewer = signal.reviewer;
});
const receivedDecision = await condition(() => decision !== undefined, `${timeoutHours} hours`);
// ... rest of the logic
}
For stricter environments, you might use a shorter timeout for high-risk content and a longer one for low-risk uploads.