Media type router in Langflow with Ittybit
Most media pipelines assume a single input type. Real workloads are messier — users upload MP4s, PNGs, and WAV files to the same endpoint. This guide builds a Langflow flow that accepts any media URL, classifies it by type, routes to the right Ittybit processing task (transcode, normalize, or thumbnail), and merges the results into a unified metadata output.
The flow
URL Input -> Media Classifier -> Conditional Router -> Ittybit Task (video | audio | image) -> Result Merger -> Output
Five components total. The classifier inspects the URL and content-type header to determine what kind of media it is. The router fans out to one of three Ittybit task nodes. The merger collects the result into a consistent shape regardless of which branch ran.
Install dependencies
Make sure requests is available in your Langflow environment:
pip install requests
Create the classifier component
This component takes a URL, sends a HEAD request to read the content-type, and falls back to file extension matching if the server doesn’t return a useful header.
import requests
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data
EXTENSION_MAP = {
".mp4": "video",
".mov": "video",
".webm": "video",
".avi": "video",
".mkv": "video",
".mp3": "audio",
".wav": "audio",
".aac": "audio",
".ogg": "audio",
".flac": "audio",
".opus": "audio",
".jpg": "image",
".jpeg": "image",
".png": "image",
".webp": "image",
".avif": "image",
".gif": "animation",
".tiff": "image",
}
class MediaClassifier(Component):
display_name = "Media Classifier"
description = "Detect whether a URL points to video, audio, or image content"
icon = "search"
inputs = [
MessageTextInput(
name="input_url",
display_name="Media URL",
info="URL of the media file to classify",
required=True,
),
]
outputs = [
Output(display_name="Classification", name="classification", method="run"),
]
def run(self) -> Data:
url = self.input_url.strip()
media_type = self._from_content_type(url) or self._from_extension(url)
if not media_type:
media_type = "video" # safe default
return Data(data={"url": url, "media_type": media_type})
def _from_content_type(self, url: str) -> str | None:
try:
res = requests.head(url, timeout=5, allow_redirects=True)
ct = res.headers.get("Content-Type", "").lower()
if ct.startswith("video/"):
return "video"
if ct.startswith("audio/"):
return "audio"
if ct.startswith("image/"):
return "image"
except requests.RequestException:
pass
return None
def _from_extension(self, url: str) -> str | None:
path = url.split("?")[0].lower()
for ext, kind in EXTENSION_MAP.items():
if path.endswith(ext):
return kind
return None
The classifier tries the content-type header first because some CDN URLs don’t have file extensions. If the HEAD request fails or returns something generic like application/octet-stream, it falls back to extension matching.
Create the router component
The router reads the media type from the classifier and fans out to one of three outputs. Each output connects to a different Ittybit task component.
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
class MediaRouter(Component):
display_name = "Media Router"
description = "Route classified media to the correct processing branch"
icon = "git-branch"
inputs = [
DataInput(
name="classification",
display_name="Classification",
info="Output from the Media Classifier component",
),
]
outputs = [
Output(display_name="Video", name="video_out", method="route_video"),
Output(display_name="Audio", name="audio_out", method="route_audio"),
Output(display_name="Image", name="image_out", method="route_image"),
]
def route_video(self) -> Data:
data = self.classification.data
if data.get("media_type") == "video":
return Data(data=data)
return Data(data={})
def route_audio(self) -> Data:
data = self.classification.data
if data.get("media_type") == "audio":
return Data(data=data)
return Data(data={})
def route_image(self) -> Data:
data = self.classification.data
if data.get("media_type") == "image":
return Data(data=data)
return Data(data={})
Only the matching output produces data. The other two emit empty objects, which prevents downstream nodes on those branches from executing.
Create the Ittybit task components
You need three instances of the Ittybit task component, each preconfigured for a different media type. Use the component from the custom Ittybit component guide as a base, or create a streamlined version that accepts input from the router:
import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data
class IttybitVideoTranscode(Component):
display_name = "Ittybit Video Transcode"
description = "Transcode video to MP4 via Ittybit"
icon = "video"
inputs = [
SecretStrInput(name="api_key", display_name="API Key", required=True),
DataInput(name="media_input", display_name="Media Input"),
]
outputs = [
Output(display_name="Result", name="result", method="run"),
]
def run(self) -> Data:
data = self.media_input.data
if not data.get("url"):
return Data(data={})
return self._process(data["url"], {
"kind": "video",
"options": {"format": "mp4", "quality": "high"},
})
def _process(self, url: str, task_config: dict) -> Data:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {"input": url, **task_config}
res = requests.post(
"https://api.ittybit.com/jobs",
headers=headers,
json=payload,
)
res.raise_for_status()
task_id = res.json()["id"]
return Data(data=self._poll(task_id, headers))
def _poll(self, task_id: str, headers: dict) -> dict:
deadline = time.time() + 300
while time.time() < deadline:
res = requests.get(
f"https://api.ittybit.com/jobs/{task_id}",
headers=headers,
)
res.raise_for_status()
data = res.json()
if data["status"] == "completed":
return data
if data["status"] == "error":
raise RuntimeError(f"Task {task_id} failed")
time.sleep(2)
raise TimeoutError(f"Task {task_id} timed out")
import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data
class IttybitAudioNormalize(Component):
display_name = "Ittybit Audio Normalize"
description = "Normalize audio to AAC via Ittybit"
icon = "music"
inputs = [
SecretStrInput(name="api_key", display_name="API Key", required=True),
DataInput(name="media_input", display_name="Media Input"),
]
outputs = [
Output(display_name="Result", name="result", method="run"),
]
def run(self) -> Data:
data = self.media_input.data
if not data.get("url"):
return Data(data={})
return self._process(data["url"], {
"kind": "audio",
"options": {"format": "aac", "quality": "high"},
})
def _process(self, url: str, task_config: dict) -> Data:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {"input": url, **task_config}
res = requests.post(
"https://api.ittybit.com/jobs",
headers=headers,
json=payload,
)
res.raise_for_status()
task_id = res.json()["id"]
return Data(data=self._poll(task_id, headers))
def _poll(self, task_id: str, headers: dict) -> dict:
deadline = time.time() + 300
while time.time() < deadline:
res = requests.get(
f"https://api.ittybit.com/jobs/{task_id}",
headers=headers,
)
res.raise_for_status()
data = res.json()
if data["status"] == "completed":
return data
if data["status"] == "error":
raise RuntimeError(f"Task {task_id} failed")
time.sleep(2)
raise TimeoutError(f"Task {task_id} timed out")import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data
class IttybitImageThumbnail(Component):
display_name = "Ittybit Image Thumbnail"
description = "Generate a thumbnail via Ittybit"
icon = "image"
inputs = [
SecretStrInput(name="api_key", display_name="API Key", required=True),
DataInput(name="media_input", display_name="Media Input"),
]
outputs = [
Output(display_name="Result", name="result", method="run"),
]
def run(self) -> Data:
data = self.media_input.data
if not data.get("url"):
return Data(data={})
return self._process(data["url"], {
"kind": "image",
"options": {"format": "webp", "width": 640, "quality": "medium"},
})
def _process(self, url: str, task_config: dict) -> Data:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {"input": url, **task_config}
res = requests.post(
"https://api.ittybit.com/jobs",
headers=headers,
json=payload,
)
res.raise_for_status()
task_id = res.json()["id"]
return Data(data=self._poll(task_id, headers))
def _poll(self, task_id: str, headers: dict) -> dict:
deadline = time.time() + 300
while time.time() < deadline:
res = requests.get(
f"https://api.ittybit.com/jobs/{task_id}",
headers=headers,
)
res.raise_for_status()
data = res.json()
if data["status"] == "completed":
return data
if data["status"] == "error":
raise RuntimeError(f"Task {task_id} failed")
time.sleep(2)
raise TimeoutError(f"Task {task_id} timed out") Each component skips execution when it receives an empty url, so only the branch matching the classified type actually hits the Ittybit API.
Create the result merger
The merger collects results from all three branches and produces a single unified output. Only one branch will have data; the other two will be empty.
from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data
class ResultMerger(Component):
display_name = "Result Merger"
description = "Merge results from video, audio, and image branches into unified metadata"
icon = "merge"
inputs = [
DataInput(name="video_result", display_name="Video Result"),
DataInput(name="audio_result", display_name="Audio Result"),
DataInput(name="image_result", display_name="Image Result"),
]
outputs = [
Output(display_name="Unified Result", name="result", method="run"),
]
def run(self) -> Data:
# Find the branch that produced a result
for result in [self.video_result, self.audio_result, self.image_result]:
data = result.data if result else {}
if data.get("id"):
return Data(data={
"task_id": data.get("id"),
"status": data.get("status"),
"kind": data.get("kind"),
"input": data.get("input"),
"output": data.get("output"),
"created_at": data.get("created_at"),
"completed_at": data.get("completed_at"),
})
return Data(data={"status": "no_result", "error": "No branch produced output"})
The unified output always has the same shape — task_id, status, kind, input, output, created_at, completed_at — regardless of whether it processed a video, audio file, or image. Downstream nodes never need to know which branch ran.
Wire it together
In the Langflow canvas:
- Chat Input — the user pastes a media URL
- Media Classifier — connect the URL to its input
- Media Router — connect the classifier output to the router input
- Ittybit Video Transcode — connect the router’s Video output
- Ittybit Audio Normalize — connect the router’s Audio output
- Ittybit Image Thumbnail — connect the router’s Image output
- Result Merger — connect all three task outputs to the merger’s three inputs
- Chat Output — display the unified result
All three Ittybit task nodes share the same API key. Set it once and reference it in each.
Extending the router
The classifier’s extension map is easy to grow. To handle PDFs or other document types, add entries to EXTENSION_MAP and a fourth output to the router:
# In MediaClassifier
EXTENSION_MAP = {
# ...existing entries...
".pdf": "document",
".docx": "document",
}
# In MediaRouter - add a fourth output
outputs = [
Output(display_name="Video", name="video_out", method="route_video"),
Output(display_name="Audio", name="audio_out", method="route_audio"),
Output(display_name="Image", name="image_out", method="route_image"),
Output(display_name="Document", name="doc_out", method="route_document"),
]
You can also swap out the task configurations. For example, replace the video transcode with an HLS adaptive streaming task by changing the kind to adaptive_video and the options to {"format": "hls"}.
See also
- Custom Ittybit component for Langflow — the base component this guide builds on
- Ittybit Task API reference
- Build a user upload pipeline — multi-task processing for uploads
- Langflow custom components docs