Extracting Structured Data from Documents¶
Status: Current implementation as of JSv4/frontend-cleanup branch
Overview¶
OpenContracts transforms any collection of documents into a spreadsheet-like data grid. You define what to extract via a Fieldset, and the system:
- Fans out work across documents and columns using Celery
 - Uses our structured extraction API powered by PydanticAI agents
 - Enforces constraints through intelligent prompting
 - Parses results into Python primitives or Pydantic models with guaranteed type safety
 
Everything is orchestrated by two Celery tasks: - run_extract – creates individual cells and queues work - doc_extract_query_task – performs the actual extraction using our agent framework
Data Models¶
Fieldset¶
Groups related columns together. Each Fieldset represents a specific configuration of data fields to extract.
class Fieldset(BaseOCModel):
    name = models.CharField(max_length=256)
    description = models.TextField()
    # Optional: Link to corpus for metadata schemas
    corpus = models.OneToOneField(
        "corpuses.Corpus",
        related_name="metadata_schema",
        null=True,
        blank=True
    )
Key features: - Defines the schema for extraction - Can be linked to a corpus as its metadata schema - Supports permissions for access control
Column¶
Defines individual data fields to extract. Each column specifies what to extract, criteria, and output format.
class Column(BaseOCModel):
    name = models.CharField(max_length=256)
    fieldset = models.ForeignKey('Fieldset', related_name='columns')
    # Extraction configuration
    query = models.TextField(null=True)           # The extraction prompt
    match_text = models.TextField(null=True)       # Alternative to query
    must_contain_text = models.TextField(null=True)  # Constraint
    limit_to_label = models.CharField(max_length=512, null=True)
    # Output configuration
    output_type = models.TextField()               # Python type as string
    extract_is_list = models.BooleanField(default=False)
    # Task selection
    task_name = models.CharField(
        default="opencontractserver.tasks.data_extract_tasks.doc_extract_query_task"
    )
    # Metadata fields for manual entry
    data_type = models.CharField(choices=METADATA_DATA_TYPES, null=True)
    validation_config = NullableJSONField(null=True)
Column configuration: - query or match_text: The extraction prompt (one required) - output_type: Python type as string (e.g., "str", "int", "list[str]") - extract_is_list: Wraps the type in List[] - must_contain_text: Only extract from sections containing this text - limit_to_label: Only extract from annotations with this label - instructions: Additional context for extraction - data_type: For manual entry fields (STRING, INTEGER, DATE, etc.) - validation_config: JSON configuration for field validation
Extract¶
Represents an extraction job, containing metadata about the process.
class Extract(BaseOCModel):
    corpus = models.ForeignKey('Corpus', null=True)
    documents = models.ManyToManyField('Document')
    name = models.CharField(max_length=512)
    fieldset = models.ForeignKey('Fieldset')
    # Timestamps
    created = models.DateTimeField(auto_now_add=True)
    started = models.DateTimeField(null=True)
    finished = models.DateTimeField(null=True)
    error = models.TextField(null=True)
Usage: - Groups documents to process with the fieldset defining what to extract - Tracks extraction progress and completion status - Stores error information if extraction fails
Datacell¶
Stores the result of extracting a specific column from a specific document.
class Datacell(BaseOCModel):
    extract = models.ForeignKey('Extract', related_name='extracted_datacells')
    column = models.ForeignKey('Column', related_name='extracted_datacells')
    document = models.ForeignKey('Document', related_name='extracted_datacells')
    # Results
    data = NullableJSONField(null=True)
    data_definition = models.TextField()
    sources = models.ManyToManyField('Annotation')
    # Status tracking
    started = models.DateTimeField(null=True)
    completed = models.DateTimeField(null=True)
    failed = models.DateTimeField(null=True)
    stacktrace = models.TextField(null=True)
Features: - Stores extracted data in JSON format - Links to source annotations (when available) - Tracks processing status and errors
Extraction Pipeline¶
Orchestration (run_extract)¶
 The main orchestrator task that creates and manages extraction jobs:
@shared_task
def run_extract(extract_id: Optional[str | int], user_id: str | int):
    # Creates Datacells for each document × column pair
    # Queues doc_extract_query_task for each cell
    # Uses chord to wait for completion
Key operations: 1. Creates one Datacell per document × column combination 2. Looks up the Celery task from column.task_name 3. Uses chord(group(*tasks)) to wait for all cells 4. Calls mark_extract_complete when finished
Per-Cell Extraction (doc_extract_query_task)¶
 The async task that performs actual extraction using our agent framework:
@celery_task_with_async_to_sync()
async def doc_extract_query_task(
    cell_id: int,
    similarity_top_k: int = 10,
    max_token_length: int = 64000
) -> None:
    """Agent-based data extraction pipeline using PydanticAI."""
Extraction steps:
- Setup: Fetch Datacell, mark as started, validate corpus membership
 - Type parsing: Convert 
column.output_typestring to Python type - Prompt construction: Build extraction prompt from query or match_text
 - System prompt: Add constraints from must_contain_text and limit_to_label
 - Extract: Call 
agents.get_structured_response_from_document() - Save results: Convert response to appropriate format and mark complete
 
Async Task Decorator Pattern¶
The extraction task uses our custom decorator to handle async functions in Celery:
@celery_task_with_async_to_sync()
async def doc_extract_query_task(...) -> None:
    # Async implementation
This decorator: - Converts async functions to sync using asgiref.sync.async_to_sync - Properly handles database connections - Works seamlessly in test and production environments - Avoids complex event loop management
Testing async tasks:
from django.test import TransactionTestCase
class ExtractionTestCase(TransactionTestCase):
    def test_extraction(self):
        # Create datacell...
        doc_extract_query_task.si(datacell.id).apply()
        # Assert results...
Sequence Diagram¶
```mermaid sequenceDiagram participant U as User participant G as GraphQL/Admin participant R as run_extract participant Q as doc_extract_query_task participant A as Agent Framework participant LLM as Language Model
U->>G: Start extraction
G->>R: Call run_extract(extract_id)
R->>R: Create Datacells
R->>Q: Queue task for each cell
Q->>A: get_structured_response()
A->>LLM: Vector search + extraction
LLM-->>A: Typed response
A-->>Q: Parsed result
Q-->>Q: Save to Datacell.data
Q-->>R: Task complete
R-->>G: Extract finished
G-->>U: Results ready
```
Supported Output Types¶
The system supports extraction to various Python types:
Primitive Types¶
str- Text stringsint- Integersfloat- Floating point numbersbool- Boolean values
Collection Types¶
list[str]- List of stringslist[int]- List of integers- Use 
extract_is_list=Trueto wrap any type in a list 
Complex Types¶
- JSON objects via 
dicttype - Custom Pydantic models (planned)
 
Constraints and Filtering¶
Document Section Filtering¶
Use must_contain_text to limit extraction to specific sections: 
column.must_contain_text = "CONFIDENTIALITY"
# Only extracts from sections containing this text
Annotation Label Filtering¶
Use limit_to_label to extract only from specific annotation types: 
column.limit_to_label = "contract-term"
# Only processes annotations with this label
Additional Instructions¶
Provide extra context via instructions: 
column.instructions = "Extract as ISO 8601 date format"
Error Handling¶
The extraction pipeline includes comprehensive error tracking:
- Cell-level errors: Stored in 
Datacell.stacktrace - Extract-level errors: Stored in 
Extract.error - Automatic retry: Failed cells can be retried
 - Partial completion: Successful cells are saved even if others fail
 
Performance Optimization¶
Parallel Processing¶
- Extraction tasks run in parallel across Celery workers
 - Each document × column combination is independent
 - Scales horizontally with additional workers
 
Vector Search Efficiency¶
- Uses pgvector for fast similarity search
 - Caches embeddings for reuse
 - Limits token context to 
max_token_length 
Database Optimization¶
- Batch creates Datacells
 - Uses select_related/prefetch_related
 - Minimizes database round trips
 
Configuration¶
Framework Selection¶
Set the agent framework in settings:
LLMS_DEFAULT_AGENT_FRAMEWORK = "pydantic_ai"
Custom Task Registration¶
Register custom extraction tasks:
# In your app's tasks.py
@shared_task
def custom_extract_task(cell_id: int):
    # Custom extraction logic
    pass
# In Column configuration
column.task_name = "myapp.tasks.custom_extract_task"
Next Steps¶
- Complex types: Expand output_type to support JSON schemas
 - Multi-step extraction: Leverage conversation history
 - Cross-document aggregation: Use corpus agents for analysis
 - Custom models: Allow registration of Pydantic models