WebSocket Streaming
The Multimodal Video RAG system uses WebSockets to provide real-time updates for long-running processes, specifically video ingestion and conversational agent reasoning. This ensures a responsive user experience by eliminating the need for constant polling.
Ingestion Progress Monitoring
When a video ingestion job is initiated via the /api/v1/videos/ingest endpoint, the system returns a job_id. This ID is used to connect to a WebSocket for real-time tracking of the multi-stage pipeline (ASR, Vision LLM processing, and Vector Indexing).
Endpoint: ws://<host>:<port>/api/v1/ws/jobs/{job_id}
Progress Event Schema
The WebSocket broadcasts messages following the ProgressEvent structure:
interface ProgressEvent {
job_id: string; // Unique identifier for the job
video_id: string; // ID of the video being processed
stage: string; // Current processing stage (e.g., 'transcribing')
stage_number: number; // Current stage index
total_stages: number; // Total number of stages in the pipeline
progress_percent: number;// Completion percentage for the current stage
message: string; // Human-readable status update
eta_seconds?: number; // Estimated time remaining (if available)
timestamp: string; // ISO 8601 timestamp
}
Ingestion Lifecycle Stages
The stage field will transition through the following values as the backend processes the media:
| Stage | Description |
| :--- | :--- |
| downloading | Fetching the video content from the source URL. |
| extracting | Separating audio and visual tracks for processing. |
| transcribing | Running Whisper ASR to generate text transcripts. |
| describing | Vision LLM generating descriptions for visual frames. |
| embedding | Generating vector representations of text and visual data. |
| indexing | Committing vectors and metadata to ChromaDB. |
| complete | Processing finished; video is now searchable. |
| error | Processing failed (check the message field for details). |
Chat Agent Heartbeats
The Conversational AI agent (powered by LangGraph) emits events through the chat interface to provide transparency into its reasoning process. These events allow the UI to display "thinking" states or indicate which tools (search, PII masking) are currently active.
Agent Event Schema
Updates from the agent follow the ChatEvent structure:
interface ChatEvent {
event: "status";
data: {
stage: 'thinking' | 'searching' | 'found' | 'generating' | 'pii_anonymized' | 'complete' | 'error';
query?: string; // The rewritten or optimized search query
reason?: string; // Internal reasoning for the current action
count?: number; // e.g., number of PII entities found
message?: string; // Error details if applicable
videos?: VideoReference[]; // References to videos found during 'searching'
};
timestamp: string;
}
Implementation Example (Frontend)
To consume these streams in a React/Next.js environment, use a WebSocket hook to manage the connection lifecycle and update component state.
import { useWebSocket } from '@/lib/hooks/useWebSocket';
import { ProgressEvent } from '@/lib/types';
export function JobTracker({ jobId }: { jobId: string }) {
const { lastMessage, readyState } = useWebSocket<ProgressEvent>(
`/api/v1/ws/jobs/${jobId}`
);
if (!lastMessage) return <div>Connecting to ingestion stream...</div>;
return (
<div>
<p>Status: {lastMessage.stage}</p>
<progress value={lastMessage.progress_percent} max="100" />
<p>{lastMessage.message}</p>
</div>
);
}
Error Handling & Reconnection
- Closed Connections: If the WebSocket closes before the
completeorerrorstage is reached, clients should implement an exponential backoff reconnection strategy. - Job Persistence: If a client disconnects, the ingestion job continues on the server. Upon reconnection, the server will broadcast the current state of the job.
- Validation: Ensure the
job_idpassed in the URL is a valid ULID/string to avoid immediate connection rejection.