API Reference¶
Django Models¶
Extraction Models¶
Fieldset¶
Defines a collection of fields to extract from documents.
class Fieldset(BaseOCModel):
name: str # Fieldset name
description: str # Description of purpose
corpus: Corpus | None # Optional link for metadata schemas
Permissions: - permission_fieldset
- Base permission - create_fieldset
- Create new fieldsets - read_fieldset
- View fieldsets - update_fieldset
- Modify fieldsets - remove_fieldset
- Delete fieldsets
Column¶
Defines individual data fields within a fieldset.
class Column(BaseOCModel):
# Basic fields
name: str # Column name
fieldset: Fieldset # Parent fieldset
# Extraction configuration
query: str | None # Extraction prompt
match_text: str | None # Alternative to query
must_contain_text: str | None # Required text constraint
limit_to_label: str | None # Annotation label filter
instructions: str | None # Additional instructions
# Output configuration
output_type: str # Python type as string
extract_is_list: bool = False # Wrap in List[]
# Task configuration
task_name: str = "...doc_extract_query_task" # Celery task
# Metadata fields
data_type: str | None # METADATA_DATA_TYPES choice
validation_config: dict | None # Validation rules
Data Types (for manual entry): - STRING
- Single line text - TEXT
- Multi-line text - BOOLEAN
- True/False - INTEGER
- Whole numbers - FLOAT
- Decimal numbers - DATE
- Date only - DATETIME
- Date and time - URL
- Web addresses - EMAIL
- Email addresses - CHOICE
- Single selection - MULTI_CHOICE
- Multiple selections - JSON
- JSON objects
Extract¶
Represents an extraction job.
class Extract(BaseOCModel):
# Scope
corpus: Corpus | None # Target corpus
documents: ManyToMany[Document] # Documents to process
# Configuration
name: str # Extract name
fieldset: Fieldset # Fields to extract
# Status
created: datetime # Creation time
started: datetime | None # Start time
finished: datetime | None # Completion time
error: str | None # Error message if failed
Datacell¶
Stores extracted data for a document/column pair.
class Datacell(BaseOCModel):
# Relations
extract: Extract # Parent extract
column: Column # Column definition
document: Document # Source document
# Results
data: Any | None # Extracted data (JSON)
data_definition: str # Data type description
sources: ManyToMany[Annotation] # Source annotations
# Status
started: datetime | None # Processing start
completed: datetime | None # Processing end
failed: datetime | None # Failure time
stacktrace: str | None # Error details
# Metadata
creator: User # User who created
Celery Tasks¶
Orchestration Tasks¶
run_extract
¶
Main extraction orchestrator that creates datacells and queues processing.
@shared_task
def run_extract(
extract_id: str | int,
user_id: str | int
) -> None:
"""
Creates Datacells for each document × column combination
and queues extraction tasks.
Args:
extract_id: ID of Extract to process
user_id: ID of user running extraction
"""
mark_extract_complete
¶
Marks an extract as finished after all datacells complete.
@shared_task
def mark_extract_complete(
extract_id: str | int
) -> None:
"""
Updates Extract.finished timestamp and aggregates
any errors from failed datacells.
Args:
extract_id: ID of Extract to mark complete
"""
Extraction Tasks¶
doc_extract_query_task
¶
Performs structured data extraction using agent framework.
@celery_task_with_async_to_sync()
async def doc_extract_query_task(
cell_id: int,
similarity_top_k: int = 10,
max_token_length: int = 64000
) -> None:
"""
Extracts data for a single datacell using PydanticAI agents.
Args:
cell_id: Datacell ID to process
similarity_top_k: Number of similar chunks to retrieve
max_token_length: Maximum context tokens
"""
Agent System¶
Factories¶
UnifiedAgentFactory
¶
Creates framework-agnostic agents for document and corpus interactions.
class UnifiedAgentFactory:
@classmethod
def for_corpus(
cls,
corpus_id: int,
user_id: int,
framework: str = None
) -> CoreAgent:
"""Create agent for corpus-level queries."""
@classmethod
def for_document(
cls,
document_id: int,
user_id: int,
framework: str = None
) -> CoreAgent:
"""Create agent for document-level queries."""
UnifiedVectorStoreFactory
¶
Creates appropriate vector store based on framework.
class UnifiedVectorStoreFactory:
@classmethod
def create(
cls,
framework: str,
corpus_id: int = None,
user_id: int = None,
**kwargs
) -> VectorStore:
"""
Create vector store for specified framework.
Args:
framework: "pydantic_ai"
corpus_id: Filter by corpus
user_id: Filter by user
**kwargs: Additional configuration
"""
Core Classes¶
CoreAgent
¶
Base agent class providing unified interface.
class CoreAgent:
async def query(
self,
query: str,
tools: list[str] = None
) -> AsyncIterator[Event]:
"""
Process a query and stream events.
Yields:
StartEvent: Initial event with IDs
ContentEvent: Incremental content
SourcesEvent: Source annotations
FinishEvent: Final results
"""
async def approve_tool(
self,
tool_call_id: str
) -> None:
"""Approve a pending tool call."""
CoreAnnotationVectorStore
¶
Framework-agnostic vector store implementation.
class CoreAnnotationVectorStore:
def __init__(
self,
corpus_id: int = None,
user_id: int = None,
embedder_path: str = None,
embed_dim: int = 384
):
"""Initialize vector store with filters."""
def search(
self,
query: VectorSearchQuery
) -> list[VectorSearchResult]:
"""Execute vector similarity search."""
WebSocket Consumers¶
CorpusQueryConsumer
¶
Handles real-time corpus queries over WebSocket.
class CorpusQueryConsumer(AsyncWebsocketConsumer):
async def connect(self):
"""Authenticate and initialize corpus agent."""
async def receive(self, text_data):
"""Process incoming queries."""
async def disconnect(self, close_code):
"""Clean up on disconnection."""
WebSocket URL: /ws/corpus/<corpus_id>/
Message Types:
Client → Server:
{
"query": "string", // User question
"tools": ["list"], // Optional tools
"approve_tool": "string" // Tool approval
}
Server → Client:
{
"type": "ASYNC_START|ASYNC_CONTENT|ASYNC_SOURCES|...",
"data": {} // Type-specific payload
}
DocumentQueryConsumer
¶
Handles document-specific queries.
class DocumentQueryConsumer(AsyncWebsocketConsumer):
# Similar interface to CorpusQueryConsumer
# URL: /ws/document/<document_id>/
GraphQL API¶
Queries¶
Extract Queries¶
query GetExtracts {
extracts {
edges {
node {
id
name
started
finished
datacells {
edges {
node {
id
data
completed
}
}
}
}
}
}
}
Fieldset Queries¶
query GetFieldsets {
fieldsets {
edges {
node {
id
name
description
columns {
edges {
node {
id
name
outputType
}
}
}
}
}
}
}
Mutations¶
Start Extract¶
mutation StartExtract($extractId: ID!) {
startExtract(extractId: $extractId) {
ok
message
objId
}
}
Create Fieldset¶
mutation CreateFieldset($name: String!, $description: String!) {
createFieldset(
name: $name
description: $description
) {
ok
objId
message
}
}
Configuration Settings¶
Agent Framework¶
# settings.py
# Framework selection: "pydantic_ai"
LLMS_DEFAULT_AGENT_FRAMEWORK = "pydantic_ai"
# Model configuration
LLMS_DEFAULT_MODEL = "gpt-4-turbo"
LLMS_MAX_TOKENS = 4096
LLMS_TEMPERATURE = 0.7
# Embedder settings
PREFERRED_EMBEDDER = "sentence-transformers/all-MiniLM-L6-v2"
EMBED_DIMENSIONS = 384
Celery Configuration¶
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
# Task routing
CELERY_TASK_ROUTES = {
'opencontractserver.tasks.extract_orchestrator_tasks.*': {
'queue': 'extract'
},
'opencontractserver.tasks.data_extract_tasks.*': {
'queue': 'ml'
}
}
WebSocket Configuration¶
# Channel layers
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)],
},
},
}
# WebSocket settings
WEBSOCKET_TIMEOUT = 300 # seconds
WEBSOCKET_MAX_MESSAGE_SIZE = 1048576 # 1MB
Error Codes¶
WebSocket Close Codes¶
Code | Description |
---|---|
1000 | Normal closure |
4001 | Authentication failed |
4004 | Resource not found |
4008 | Rate limit exceeded |
5000 | Internal server error |
Extraction Error Types¶
Error | Description |
---|---|
ExtractionTimeout | Task exceeded time limit |
InvalidOutputType | Unsupported type specified |
DocumentNotFound | Document doesn't exist |
InsufficientPermissions | User lacks access |
AgentError | LLM processing failed |
Utilities¶
Type Parsing¶
from opencontractserver.utils.etl import parse_model_or_primitive
# Parse string type to Python type
python_type = parse_model_or_primitive("list[str]")
Embedding Generation¶
from opencontractserver.annotations.models import generate_embeddings_from_text
# Generate embeddings for text
embeddings = generate_embeddings_from_text(
text="Sample text",
embedder_path="sentence-transformers/all-MiniLM-L6-v2"
)
Async Decorators¶
from opencontractserver.shared.decorators import celery_task_with_async_to_sync
@celery_task_with_async_to_sync()
async def my_async_task():
# Async task implementation
pass