Answering Corpus-Level Queries with WebSockets & Unified Agents¶
Overview¶
OpenContracts provides real-time, conversational AI for answering questions about entire document collections (corpora). Using WebSockets for streaming responses and unified agents for consistent behavior, users can interactively explore their documents with instant feedback.
Architecture¶
WebSocket-Based Communication¶
Corpus queries are served live over Django Channels WebSockets, providing: - Real-time streaming of partial answers - Interactive feedback including thought processes - Persistent conversations with memory - Tool approval workflows for function calling
End-to-End Flow¶
# From config/websocket/consumers/corpus_conversation.py
class CorpusQueryConsumer(AsyncWebsocketConsumer):
"""Streams answers for questions about an entire corpus."""
- Client → Server: React frontend opens
wss://…/ws/corpus/<globalId>/
and sends{ "query": "…" }
- Authentication:
CorpusQueryConsumer.connect
authenticates user and resolves Corpus - Agent Initialization: Consumer lazily creates a
CoreAgent
viaopencontractserver.llms.agents.for_corpus(...)
- Framework Selection: Agent uses
UnifiedVectorStoreFactory
and framework fromsettings.LLMS_DEFAULT_AGENT_FRAMEWORK
- Streaming Response: LLM streams answer with incremental
ASYNC_*
messages to UI - Completion: Terminal frame sent, socket ready for next question with context preserved
Message Protocol¶
Client → Server Messages¶
{
"query": "What are the key terms in these contracts?",
"tools": ["search", "summarize"], // Optional tool list
"approve_tool": "tool_call_id_123" // For tool approval
}
Server → Client Messages¶
Type | Description | Payload |
---|---|---|
ASYNC_START | First event with conversation IDs | {conversation_id, message_id} |
ASYNC_CONTENT | Incremental content from LLM | {delta: "text..."} |
ASYNC_THOUGHT | Chain-of-thought reasoning | {thought: "Analyzing..."} |
ASYNC_SOURCES | Sources for last delta | {sources: [...]} |
ASYNC_APPROVAL_NEEDED | Tool requires approval | {tool_call: {...}} |
ASYNC_ERROR | Non-fatal error | {error: "..."} |
ASYNC_FINISH | Final message with sources | {content, sources, timeline} |
SYNC_CONTENT | Immediate notice/error | {message: "..."} |
Example WebSocket Session¶
// Client-side JavaScript
const ws = new WebSocket('wss://localhost/ws/corpus/abc123/');
ws.onopen = () => {
ws.send(JSON.stringify({
query: "What are the payment terms across all contracts?"
}));
};
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case 'ASYNC_START':
console.log('Starting conversation:', data.conversation_id);
break;
case 'ASYNC_CONTENT':
appendToAnswer(data.delta);
break;
case 'ASYNC_SOURCES':
displaySources(data.sources);
break;
case 'ASYNC_FINISH':
finalizeAnswer(data.content, data.sources);
break;
}
};
Agent System¶
Unified Agent Factory¶
The UnifiedAgentFactory
creates framework-agnostic agents:
from opencontractserver.llms.agents import UnifiedAgentFactory
# Creates appropriate agent based on configuration
agent = UnifiedAgentFactory.for_corpus(
corpus_id=corpus_id,
user_id=user.id,
framework=settings.LLMS_DEFAULT_AGENT_FRAMEWORK
)
Core Agent Features¶
The CoreAgent
provides: - Vector search across corpus annotations - Conversation memory with database persistence - Tool calling with approval gates - Source attribution for grounded answers - Streaming events for real-time updates
Framework Support¶
Switch between frameworks via configuration:
# settings.py
LLMS_DEFAULT_AGENT_FRAMEWORK = "pydantic_ai"
Both frameworks provide identical functionality through the unified interface.
Conversation Persistence¶
Memory Management¶
Conversations are persisted for authenticated users:
# Conversation model stores:
- conversation_id: Unique identifier
- user: Associated user
- corpus: Target corpus
- messages: JSON array of conversation history
- created/updated: Timestamps
Context Preservation¶
The WebSocket consumer maintains conversation context: - Previous messages available for follow-up questions - Tool results cached for reference - Sources accumulated across queries
Tool Integration¶
Available Tools¶
Corpus agents can use various tools: - Search: Vector search within corpus - Summarize: Generate summaries of documents - Extract: Pull specific information - Compare: Analyze differences between documents
Tool Approval Workflow¶
For sensitive operations:
- Agent requests tool use
- Server sends
ASYNC_APPROVAL_NEEDED
- Client displays approval UI
- User approves/rejects
- Client sends
approve_tool
message - Agent continues or adjusts
// Server → Client
{
"type": "ASYNC_APPROVAL_NEEDED",
"tool_call": {
"id": "call_123",
"tool": "delete_document",
"args": {"doc_id": 456}
}
}
// Client → Server (approval)
{
"approve_tool": "call_123"
}
Configuration¶
Extensibility Hooks¶
- Framework Selection: Uses PydanticAI framework
- Embedder Override: Set
Corpus.preferred_embedder
for custom models - Tool Registration: Pass
?tools=…
query parameter or configure defaults - Token Limits: Configure max context and response lengths
WebSocket Settings¶
# settings.py
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)],
},
},
}
# Timeout and buffer settings
WEBSOCKET_TIMEOUT = 300 # 5 minutes
WEBSOCKET_MAX_MESSAGE_SIZE = 1048576 # 1MB
Error Handling¶
Connection Errors¶
The consumer handles various failure modes: - Authentication failure: Closes with 4001 code - Corpus not found: Closes with 4004 code - Agent initialization failure: Sends SYNC_CONTENT error - LLM errors: Sends ASYNC_ERROR, conversation continues
Recovery Strategies¶
# Automatic retry with exponential backoff
async def query_with_retry(self, query: str, max_retries: int = 3):
for attempt in range(max_retries):
try:
return await self.agent.query(query)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt)
Performance Optimization¶
Connection Pooling¶
WebSocket connections are pooled and reused: - Connections persist across multiple queries - Reduced overhead for conversation continuity - Automatic cleanup on idle timeout
Streaming Optimizations¶
- Chunked responses: Content streamed in optimal chunks
- Source deduplication: Repeated sources sent once
- Incremental rendering: UI updates without full redraws
Caching¶
Multiple levels of caching improve performance: - Vector embeddings: Cached in database - Agent instances: Reused within consumer - Tool results: Cached within conversation
Frontend Integration¶
React Hook Example¶
// useCorpusChat.ts
import { useWebSocket } from './websocket';
export function useCorpusChat(corpusId: string) {
const { send, messages, status } = useWebSocket(
`/ws/corpus/${corpusId}/`
);
const query = useCallback((text: string) => {
send({ query: text });
}, [send]);
const approveTool = useCallback((toolCallId: string) => {
send({ approve_tool: toolCallId });
}, [send]);
return { query, messages, status, approveTool };
}
Message Handling¶
// CorpusChatStream.tsx
function handleMessage(data: WebSocketMessage) {
switch(data.type) {
case 'ASYNC_CONTENT':
setAnswer(prev => prev + data.delta);
break;
case 'ASYNC_SOURCES':
setSources(data.sources);
break;
case 'ASYNC_APPROVAL_NEEDED':
showApprovalDialog(data.tool_call);
break;
}
}
Monitoring and Debugging¶
Logging¶
Comprehensive logging for debugging:
import logging
logger = logging.getLogger('corpus.websocket')
# Log levels for different events
logger.info(f"User {user.id} connected to corpus {corpus_id}")
logger.debug(f"Query received: {query[:100]}...")
logger.error(f"Agent error: {str(e)}")
Metrics¶
Track key performance indicators: - Connection count and duration - Query latency and token usage - Error rates and types - Tool usage statistics
Security Considerations¶
Authentication¶
All WebSocket connections require authentication: - Session-based auth for web clients - Token-based auth for API clients - Automatic disconnection on auth failure
Authorization¶
Corpus access controlled via permissions: - User must have read permission on corpus - Tool usage requires additional permissions - Admin tools restricted to staff users
Rate Limiting¶
Prevent abuse with rate limits: - Per-user query limits - Token usage caps - Connection count restrictions