Media type router in Langflow with Ittybit

View Markdown

Most media pipelines assume a single input type. Real workloads are messier — users upload MP4s, PNGs, and WAV files to the same endpoint. This guide builds a Langflow flow that accepts any media URL, classifies it by type, routes to the right Ittybit processing task (transcode, normalize, or thumbnail), and merges the results into a unified metadata output.

The flow

URL Input -> Media Classifier -> Conditional Router -> Ittybit Task (video | audio | image) -> Result Merger -> Output

Five components total. The classifier inspects the URL and content-type header to determine what kind of media it is. The router fans out to one of three Ittybit task nodes. The merger collects the result into a consistent shape regardless of which branch ran.

Install dependencies

Make sure requests is available in your Langflow environment:

pip install requests

Create the classifier component

This component takes a URL, sends a HEAD request to read the content-type, and falls back to file extension matching if the server doesn’t return a useful header.

import requests
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.schema import Data

EXTENSION_MAP = {
    ".mp4": "video",
    ".mov": "video",
    ".webm": "video",
    ".avi": "video",
    ".mkv": "video",
    ".mp3": "audio",
    ".wav": "audio",
    ".aac": "audio",
    ".ogg": "audio",
    ".flac": "audio",
    ".opus": "audio",
    ".jpg": "image",
    ".jpeg": "image",
    ".png": "image",
    ".webp": "image",
    ".avif": "image",
    ".gif": "animation",
    ".tiff": "image",
}


class MediaClassifier(Component):
    display_name = "Media Classifier"
    description = "Detect whether a URL points to video, audio, or image content"
    icon = "search"

    inputs = [
        MessageTextInput(
            name="input_url",
            display_name="Media URL",
            info="URL of the media file to classify",
            required=True,
        ),
    ]

    outputs = [
        Output(display_name="Classification", name="classification", method="run"),
    ]

    def run(self) -> Data:
        url = self.input_url.strip()
        media_type = self._from_content_type(url) or self._from_extension(url)

        if not media_type:
            media_type = "video"  # safe default

        return Data(data={"url": url, "media_type": media_type})

    def _from_content_type(self, url: str) -> str | None:
        try:
            res = requests.head(url, timeout=5, allow_redirects=True)
            ct = res.headers.get("Content-Type", "").lower()
            if ct.startswith("video/"):
                return "video"
            if ct.startswith("audio/"):
                return "audio"
            if ct.startswith("image/"):
                return "image"
        except requests.RequestException:
            pass
        return None

    def _from_extension(self, url: str) -> str | None:
        path = url.split("?")[0].lower()
        for ext, kind in EXTENSION_MAP.items():
            if path.endswith(ext):
                return kind
        return None

The classifier tries the content-type header first because some CDN URLs don’t have file extensions. If the HEAD request fails or returns something generic like application/octet-stream, it falls back to extension matching.

Create the router component

The router reads the media type from the classifier and fans out to one of three outputs. Each output connects to a different Ittybit task component.

from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data


class MediaRouter(Component):
    display_name = "Media Router"
    description = "Route classified media to the correct processing branch"
    icon = "git-branch"

    inputs = [
        DataInput(
            name="classification",
            display_name="Classification",
            info="Output from the Media Classifier component",
        ),
    ]

    outputs = [
        Output(display_name="Video", name="video_out", method="route_video"),
        Output(display_name="Audio", name="audio_out", method="route_audio"),
        Output(display_name="Image", name="image_out", method="route_image"),
    ]

    def route_video(self) -> Data:
        data = self.classification.data
        if data.get("media_type") == "video":
            return Data(data=data)
        return Data(data={})

    def route_audio(self) -> Data:
        data = self.classification.data
        if data.get("media_type") == "audio":
            return Data(data=data)
        return Data(data={})

    def route_image(self) -> Data:
        data = self.classification.data
        if data.get("media_type") == "image":
            return Data(data=data)
        return Data(data={})

Only the matching output produces data. The other two emit empty objects, which prevents downstream nodes on those branches from executing.

Create the Ittybit task components

You need three instances of the Ittybit task component, each preconfigured for a different media type. Use the component from the custom Ittybit component guide as a base, or create a streamlined version that accepts input from the router:

import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data

class IttybitVideoTranscode(Component):
display_name = "Ittybit Video Transcode"
description = "Transcode video to MP4 via Ittybit"
icon = "video"

    inputs = [
        SecretStrInput(name="api_key", display_name="API Key", required=True),
        DataInput(name="media_input", display_name="Media Input"),
    ]

    outputs = [
        Output(display_name="Result", name="result", method="run"),
    ]

    def run(self) -> Data:
        data = self.media_input.data
        if not data.get("url"):
            return Data(data={})

        return self._process(data["url"], {
            "kind": "video",
            "options": {"format": "mp4", "quality": "high"},
        })

    def _process(self, url: str, task_config: dict) -> Data:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {"input": url, **task_config}

        res = requests.post(
            "https://api.ittybit.com/jobs",
            headers=headers,
            json=payload,
        )
        res.raise_for_status()
        task_id = res.json()["id"]

        return Data(data=self._poll(task_id, headers))

    def _poll(self, task_id: str, headers: dict) -> dict:
        deadline = time.time() + 300
        while time.time() < deadline:
            res = requests.get(
                f"https://api.ittybit.com/jobs/{task_id}",
                headers=headers,
            )
            res.raise_for_status()
            data = res.json()
            if data["status"] == "completed":
                return data
            if data["status"] == "error":
                raise RuntimeError(f"Task {task_id} failed")
            time.sleep(2)
        raise TimeoutError(f"Task {task_id} timed out")
import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data


class IttybitAudioNormalize(Component):
    display_name = "Ittybit Audio Normalize"
    description = "Normalize audio to AAC via Ittybit"
    icon = "music"

    inputs = [
        SecretStrInput(name="api_key", display_name="API Key", required=True),
        DataInput(name="media_input", display_name="Media Input"),
    ]

    outputs = [
        Output(display_name="Result", name="result", method="run"),
    ]

    def run(self) -> Data:
        data = self.media_input.data
        if not data.get("url"):
            return Data(data={})

        return self._process(data["url"], {
            "kind": "audio",
            "options": {"format": "aac", "quality": "high"},
        })

    def _process(self, url: str, task_config: dict) -> Data:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {"input": url, **task_config}

        res = requests.post(
            "https://api.ittybit.com/jobs",
            headers=headers,
            json=payload,
        )
        res.raise_for_status()
        task_id = res.json()["id"]

        return Data(data=self._poll(task_id, headers))

    def _poll(self, task_id: str, headers: dict) -> dict:
        deadline = time.time() + 300
        while time.time() < deadline:
            res = requests.get(
                f"https://api.ittybit.com/jobs/{task_id}",
                headers=headers,
            )
            res.raise_for_status()
            data = res.json()
            if data["status"] == "completed":
                return data
            if data["status"] == "error":
                raise RuntimeError(f"Task {task_id} failed")
            time.sleep(2)
        raise TimeoutError(f"Task {task_id} timed out")
import time
import requests
from langflow.custom import Component
from langflow.io import DataInput, SecretStrInput, Output
from langflow.schema import Data


class IttybitImageThumbnail(Component):
    display_name = "Ittybit Image Thumbnail"
    description = "Generate a thumbnail via Ittybit"
    icon = "image"

    inputs = [
        SecretStrInput(name="api_key", display_name="API Key", required=True),
        DataInput(name="media_input", display_name="Media Input"),
    ]

    outputs = [
        Output(display_name="Result", name="result", method="run"),
    ]

    def run(self) -> Data:
        data = self.media_input.data
        if not data.get("url"):
            return Data(data={})

        return self._process(data["url"], {
            "kind": "image",
            "options": {"format": "webp", "width": 640, "quality": "medium"},
        })

    def _process(self, url: str, task_config: dict) -> Data:
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }
        payload = {"input": url, **task_config}

        res = requests.post(
            "https://api.ittybit.com/jobs",
            headers=headers,
            json=payload,
        )
        res.raise_for_status()
        task_id = res.json()["id"]

        return Data(data=self._poll(task_id, headers))

    def _poll(self, task_id: str, headers: dict) -> dict:
        deadline = time.time() + 300
        while time.time() < deadline:
            res = requests.get(
                f"https://api.ittybit.com/jobs/{task_id}",
                headers=headers,
            )
            res.raise_for_status()
            data = res.json()
            if data["status"] == "completed":
                return data
            if data["status"] == "error":
                raise RuntimeError(f"Task {task_id} failed")
            time.sleep(2)
        raise TimeoutError(f"Task {task_id} timed out")

Each component skips execution when it receives an empty url, so only the branch matching the classified type actually hits the Ittybit API.

Create the result merger

The merger collects results from all three branches and produces a single unified output. Only one branch will have data; the other two will be empty.

from langflow.custom import Component
from langflow.io import DataInput, Output
from langflow.schema import Data


class ResultMerger(Component):
    display_name = "Result Merger"
    description = "Merge results from video, audio, and image branches into unified metadata"
    icon = "merge"

    inputs = [
        DataInput(name="video_result", display_name="Video Result"),
        DataInput(name="audio_result", display_name="Audio Result"),
        DataInput(name="image_result", display_name="Image Result"),
    ]

    outputs = [
        Output(display_name="Unified Result", name="result", method="run"),
    ]

    def run(self) -> Data:
        # Find the branch that produced a result
        for result in [self.video_result, self.audio_result, self.image_result]:
            data = result.data if result else {}
            if data.get("id"):
                return Data(data={
                    "task_id": data.get("id"),
                    "status": data.get("status"),
                    "kind": data.get("kind"),
                    "input": data.get("input"),
                    "output": data.get("output"),
                    "created_at": data.get("created_at"),
                    "completed_at": data.get("completed_at"),
                })

        return Data(data={"status": "no_result", "error": "No branch produced output"})

The unified output always has the same shape — task_id, status, kind, input, output, created_at, completed_at — regardless of whether it processed a video, audio file, or image. Downstream nodes never need to know which branch ran.

Wire it together

In the Langflow canvas:

  1. Chat Input — the user pastes a media URL
  2. Media Classifier — connect the URL to its input
  3. Media Router — connect the classifier output to the router input
  4. Ittybit Video Transcode — connect the router’s Video output
  5. Ittybit Audio Normalize — connect the router’s Audio output
  6. Ittybit Image Thumbnail — connect the router’s Image output
  7. Result Merger — connect all three task outputs to the merger’s three inputs
  8. Chat Output — display the unified result

All three Ittybit task nodes share the same API key. Set it once and reference it in each.

Extending the router

The classifier’s extension map is easy to grow. To handle PDFs or other document types, add entries to EXTENSION_MAP and a fourth output to the router:

# In MediaClassifier
EXTENSION_MAP = {
    # ...existing entries...
    ".pdf": "document",
    ".docx": "document",
}

# In MediaRouter - add a fourth output
outputs = [
    Output(display_name="Video", name="video_out", method="route_video"),
    Output(display_name="Audio", name="audio_out", method="route_audio"),
    Output(display_name="Image", name="image_out", method="route_image"),
    Output(display_name="Document", name="doc_out", method="route_document"),
]

You can also swap out the task configurations. For example, replace the video transcode with an HLS adaptive streaming task by changing the kind to adaptive_video and the options to {"format": "hls"}.

See also