API Reference¶
Django Models¶
Extraction Models¶
Fieldset¶
Defines a collection of fields to extract from documents.
class Fieldset(BaseOCModel):
    name: str                    # Fieldset name
    description: str             # Description of purpose
    corpus: Corpus | None        # Optional link for metadata schemas
Permissions: - permission_fieldset - Base permission - create_fieldset - Create new fieldsets - read_fieldset - View fieldsets - update_fieldset - Modify fieldsets - remove_fieldset - Delete fieldsets
Column¶
Defines individual data fields within a fieldset.
class Column(BaseOCModel):
    # Basic fields
    name: str                           # Column name
    fieldset: Fieldset                  # Parent fieldset
    # Extraction configuration
    query: str | None                   # Extraction prompt
    match_text: str | None              # Alternative to query
    must_contain_text: str | None       # Required text constraint
    limit_to_label: str | None          # Annotation label filter
    instructions: str | None            # Additional instructions
    # Output configuration
    output_type: str                    # Python type as string
    extract_is_list: bool = False       # Wrap in List[]
    # Task configuration
    task_name: str = "...doc_extract_query_task"  # Celery task
    # Metadata fields
    data_type: str | None              # METADATA_DATA_TYPES choice
    validation_config: dict | None      # Validation rules
Data Types (for manual entry): - STRING - Single line text - TEXT - Multi-line text - BOOLEAN - True/False - INTEGER - Whole numbers - FLOAT - Decimal numbers - DATE - Date only - DATETIME - Date and time - URL - Web addresses - EMAIL - Email addresses - CHOICE - Single selection - MULTI_CHOICE - Multiple selections - JSON - JSON objects
Extract¶
Represents an extraction job.
class Extract(BaseOCModel):
    # Scope
    corpus: Corpus | None              # Target corpus
    documents: ManyToMany[Document]    # Documents to process
    # Configuration
    name: str                          # Extract name
    fieldset: Fieldset                 # Fields to extract
    # Status
    created: datetime                  # Creation time
    started: datetime | None           # Start time
    finished: datetime | None          # Completion time
    error: str | None                  # Error message if failed
Datacell¶
Stores extracted data for a document/column pair.
class Datacell(BaseOCModel):
    # Relations
    extract: Extract                   # Parent extract
    column: Column                     # Column definition
    document: Document                 # Source document
    # Results
    data: Any | None                   # Extracted data (JSON)
    data_definition: str               # Data type description
    sources: ManyToMany[Annotation]    # Source annotations
    # Status
    started: datetime | None           # Processing start
    completed: datetime | None         # Processing end
    failed: datetime | None            # Failure time
    stacktrace: str | None            # Error details
    # Metadata
    creator: User                      # User who created
Celery Tasks¶
Orchestration Tasks¶
run_extract¶
 Main extraction orchestrator that creates datacells and queues processing.
@shared_task
def run_extract(
    extract_id: str | int,
    user_id: str | int
) -> None:
    """
    Creates Datacells for each document × column combination
    and queues extraction tasks.
    Args:
        extract_id: ID of Extract to process
        user_id: ID of user running extraction
    """
mark_extract_complete¶
 Marks an extract as finished after all datacells complete.
@shared_task
def mark_extract_complete(
    extract_id: str | int
) -> None:
    """
    Updates Extract.finished timestamp and aggregates
    any errors from failed datacells.
    Args:
        extract_id: ID of Extract to mark complete
    """
Extraction Tasks¶
doc_extract_query_task¶
 Performs structured data extraction using agent framework.
@celery_task_with_async_to_sync()
async def doc_extract_query_task(
    cell_id: int,
    similarity_top_k: int = 10,
    max_token_length: int = 64000
) -> None:
    """
    Extracts data for a single datacell using PydanticAI agents.
    Args:
        cell_id: Datacell ID to process
        similarity_top_k: Number of similar chunks to retrieve
        max_token_length: Maximum context tokens
    """
Agent System¶
Factories¶
UnifiedAgentFactory¶
 Creates framework-agnostic agents for document and corpus interactions.
class UnifiedAgentFactory:
    @classmethod
    def for_corpus(
        cls,
        corpus_id: int,
        user_id: int,
        framework: str = None
    ) -> CoreAgent:
        """Create agent for corpus-level queries."""
    @classmethod
    def for_document(
        cls,
        document_id: int,
        user_id: int,
        framework: str = None
    ) -> CoreAgent:
        """Create agent for document-level queries."""
UnifiedVectorStoreFactory¶
 Creates appropriate vector store based on framework.
class UnifiedVectorStoreFactory:
    @classmethod
    def create(
        cls,
        framework: str,
        corpus_id: int = None,
        user_id: int = None,
        **kwargs
    ) -> VectorStore:
        """
        Create vector store for specified framework.
        Args:
            framework: "pydantic_ai"
            corpus_id: Filter by corpus
            user_id: Filter by user
            **kwargs: Additional configuration
        """
Core Classes¶
CoreAgent¶
 Base agent class providing unified interface.
class CoreAgent:
    async def query(
        self,
        query: str,
        tools: list[str] = None
    ) -> AsyncIterator[Event]:
        """
        Process a query and stream events.
        Yields:
            StartEvent: Initial event with IDs
            ContentEvent: Incremental content
            SourcesEvent: Source annotations
            FinishEvent: Final results
        """
    async def approve_tool(
        self,
        tool_call_id: str
    ) -> None:
        """Approve a pending tool call."""
CoreAnnotationVectorStore¶
 Framework-agnostic vector store implementation.
class CoreAnnotationVectorStore:
    def __init__(
        self,
        corpus_id: int = None,
        user_id: int = None,
        embedder_path: str = None,
        embed_dim: int = 384
    ):
        """Initialize vector store with filters."""
    def search(
        self,
        query: VectorSearchQuery
    ) -> list[VectorSearchResult]:
        """Execute vector similarity search."""
WebSocket Consumers¶
CorpusQueryConsumer¶
 Handles real-time corpus queries over WebSocket.
class CorpusQueryConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        """Authenticate and initialize corpus agent."""
    async def receive(self, text_data):
        """Process incoming queries."""
    async def disconnect(self, close_code):
        """Clean up on disconnection."""
WebSocket URL: /ws/corpus/<corpus_id>/
Message Types:
Client → Server:
{
    "query": "string",           // User question
    "tools": ["list"],          // Optional tools
    "approve_tool": "string"    // Tool approval
}
Server → Client:
{
    "type": "ASYNC_START|ASYNC_CONTENT|ASYNC_SOURCES|...",
    "data": {}  // Type-specific payload
}
DocumentQueryConsumer¶
 Handles document-specific queries.
class DocumentQueryConsumer(AsyncWebsocketConsumer):
    # Similar interface to CorpusQueryConsumer
    # URL: /ws/document/<document_id>/
GraphQL API¶
Queries¶
Extract Queries¶
query GetExtracts {
    extracts {
        edges {
            node {
                id
                name
                started
                finished
                datacells {
                    edges {
                        node {
                            id
                            data
                            completed
                        }
                    }
                }
            }
        }
    }
}
Fieldset Queries¶
query GetFieldsets {
    fieldsets {
        edges {
            node {
                id
                name
                description
                columns {
                    edges {
                        node {
                            id
                            name
                            outputType
                        }
                    }
                }
            }
        }
    }
}
Mutations¶
Start Extract¶
mutation StartExtract($extractId: ID!) {
    startExtract(extractId: $extractId) {
        ok
        message
        objId
    }
}
Create Fieldset¶
mutation CreateFieldset($name: String!, $description: String!) {
    createFieldset(
        name: $name
        description: $description
    ) {
        ok
        objId
        message
    }
}
Configuration Settings¶
Agent Framework¶
# settings.py
# Framework selection: "pydantic_ai"
LLMS_DEFAULT_AGENT_FRAMEWORK = "pydantic_ai"
# Model configuration
LLMS_DEFAULT_MODEL = "gpt-4-turbo"
LLMS_MAX_TOKENS = 4096
LLMS_TEMPERATURE = 0.7
# Embedder settings
PREFERRED_EMBEDDER = "sentence-transformers/all-MiniLM-L6-v2"
EMBED_DIMENSIONS = 384
Celery Configuration¶
# Celery settings
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
# Task routing
CELERY_TASK_ROUTES = {
    'opencontractserver.tasks.extract_orchestrator_tasks.*': {
        'queue': 'extract'
    },
    'opencontractserver.tasks.data_extract_tasks.*': {
        'queue': 'ml'
    }
}
WebSocket Configuration¶
# Channel layers
CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [('127.0.0.1', 6379)],
        },
    },
}
# WebSocket settings
WEBSOCKET_TIMEOUT = 300  # seconds
WEBSOCKET_MAX_MESSAGE_SIZE = 1048576  # 1MB
Error Codes¶
WebSocket Close Codes¶
| Code | Description | 
|---|---|
| 1000 | Normal closure | 
| 4001 | Authentication failed | 
| 4004 | Resource not found | 
| 4008 | Rate limit exceeded | 
| 5000 | Internal server error | 
Extraction Error Types¶
| Error | Description | 
|---|---|
ExtractionTimeout |  Task exceeded time limit | 
InvalidOutputType |  Unsupported type specified | 
DocumentNotFound |  Document doesn't exist | 
InsufficientPermissions |  User lacks access | 
AgentError |  LLM processing failed | 
Utilities¶
Type Parsing¶
from opencontractserver.utils.etl import parse_model_or_primitive
# Parse string type to Python type
python_type = parse_model_or_primitive("list[str]")
Embedding Generation¶
from opencontractserver.annotations.models import generate_embeddings_from_text
# Generate embeddings for text
embeddings = generate_embeddings_from_text(
    text="Sample text",
    embedder_path="sentence-transformers/all-MiniLM-L6-v2"
)
Async Decorators¶
from opencontractserver.shared.decorators import celery_task_with_async_to_sync
@celery_task_with_async_to_sync()
async def my_async_task():
    # Async task implementation
    pass