Extracting Structured Data from Documents¶
Status: Current implementation as of JSv4/frontend-cleanup branch
Overview¶
OpenContracts transforms any collection of documents into a spreadsheet-like data grid. You define what to extract via a Fieldset, and the system:
- Fans out work across documents and columns using Celery
- Uses our structured extraction API powered by PydanticAI agents
- Enforces constraints through intelligent prompting
- Parses results into Python primitives or Pydantic models with guaranteed type safety
Everything is orchestrated by two Celery tasks: - run_extract
– creates individual cells and queues work - doc_extract_query_task
– performs the actual extraction using our agent framework
Data Models¶
Fieldset¶
Groups related columns together. Each Fieldset represents a specific configuration of data fields to extract.
class Fieldset(BaseOCModel):
name = models.CharField(max_length=256)
description = models.TextField()
# Optional: Link to corpus for metadata schemas
corpus = models.OneToOneField(
"corpuses.Corpus",
related_name="metadata_schema",
null=True,
blank=True
)
Key features: - Defines the schema for extraction - Can be linked to a corpus as its metadata schema - Supports permissions for access control
Column¶
Defines individual data fields to extract. Each column specifies what to extract, criteria, and output format.
class Column(BaseOCModel):
name = models.CharField(max_length=256)
fieldset = models.ForeignKey('Fieldset', related_name='columns')
# Extraction configuration
query = models.TextField(null=True) # The extraction prompt
match_text = models.TextField(null=True) # Alternative to query
must_contain_text = models.TextField(null=True) # Constraint
limit_to_label = models.CharField(max_length=512, null=True)
# Output configuration
output_type = models.TextField() # Python type as string
extract_is_list = models.BooleanField(default=False)
# Task selection
task_name = models.CharField(
default="opencontractserver.tasks.data_extract_tasks.doc_extract_query_task"
)
# Metadata fields for manual entry
data_type = models.CharField(choices=METADATA_DATA_TYPES, null=True)
validation_config = NullableJSONField(null=True)
Column configuration: - query
or match_text
: The extraction prompt (one required) - output_type
: Python type as string (e.g., "str", "int", "list[str]") - extract_is_list
: Wraps the type in List[]
- must_contain_text
: Only extract from sections containing this text - limit_to_label
: Only extract from annotations with this label - instructions
: Additional context for extraction - data_type
: For manual entry fields (STRING, INTEGER, DATE, etc.) - validation_config
: JSON configuration for field validation
Extract¶
Represents an extraction job, containing metadata about the process.
class Extract(BaseOCModel):
corpus = models.ForeignKey('Corpus', null=True)
documents = models.ManyToManyField('Document')
name = models.CharField(max_length=512)
fieldset = models.ForeignKey('Fieldset')
# Timestamps
created = models.DateTimeField(auto_now_add=True)
started = models.DateTimeField(null=True)
finished = models.DateTimeField(null=True)
error = models.TextField(null=True)
Usage: - Groups documents to process with the fieldset defining what to extract - Tracks extraction progress and completion status - Stores error information if extraction fails
Datacell¶
Stores the result of extracting a specific column from a specific document.
class Datacell(BaseOCModel):
extract = models.ForeignKey('Extract', related_name='extracted_datacells')
column = models.ForeignKey('Column', related_name='extracted_datacells')
document = models.ForeignKey('Document', related_name='extracted_datacells')
# Results
data = NullableJSONField(null=True)
data_definition = models.TextField()
sources = models.ManyToManyField('Annotation')
# Status tracking
started = models.DateTimeField(null=True)
completed = models.DateTimeField(null=True)
failed = models.DateTimeField(null=True)
stacktrace = models.TextField(null=True)
Features: - Stores extracted data in JSON format - Links to source annotations (when available) - Tracks processing status and errors
Extraction Pipeline¶
Orchestration (run_extract
)¶
The main orchestrator task that creates and manages extraction jobs:
@shared_task
def run_extract(extract_id: Optional[str | int], user_id: str | int):
# Creates Datacells for each document × column pair
# Queues doc_extract_query_task for each cell
# Uses chord to wait for completion
Key operations: 1. Creates one Datacell per document × column combination 2. Looks up the Celery task from column.task_name
3. Uses chord(group(*tasks))
to wait for all cells 4. Calls mark_extract_complete
when finished
Per-Cell Extraction (doc_extract_query_task
)¶
The async task that performs actual extraction using our agent framework:
@celery_task_with_async_to_sync()
async def doc_extract_query_task(
cell_id: int,
similarity_top_k: int = 10,
max_token_length: int = 64000
) -> None:
"""Agent-based data extraction pipeline using PydanticAI."""
Extraction steps:
- Setup: Fetch Datacell, mark as started, validate corpus membership
- Type parsing: Convert
column.output_type
string to Python type - Prompt construction: Build extraction prompt from query or match_text
- System prompt: Add constraints from must_contain_text and limit_to_label
- Extract: Call
agents.get_structured_response_from_document()
- Save results: Convert response to appropriate format and mark complete
Async Task Decorator Pattern¶
The extraction task uses our custom decorator to handle async functions in Celery:
@celery_task_with_async_to_sync()
async def doc_extract_query_task(...) -> None:
# Async implementation
This decorator: - Converts async functions to sync using asgiref.sync.async_to_sync
- Properly handles database connections - Works seamlessly in test and production environments - Avoids complex event loop management
Testing async tasks:
from django.test import TransactionTestCase
class ExtractionTestCase(TransactionTestCase):
def test_extraction(self):
# Create datacell...
doc_extract_query_task.si(datacell.id).apply()
# Assert results...
Sequence Diagram¶
```mermaid sequenceDiagram participant U as User participant G as GraphQL/Admin participant R as run_extract participant Q as doc_extract_query_task participant A as Agent Framework participant LLM as Language Model
U->>G: Start extraction
G->>R: Call run_extract(extract_id)
R->>R: Create Datacells
R->>Q: Queue task for each cell
Q->>A: get_structured_response()
A->>LLM: Vector search + extraction
LLM-->>A: Typed response
A-->>Q: Parsed result
Q-->>Q: Save to Datacell.data
Q-->>R: Task complete
R-->>G: Extract finished
G-->>U: Results ready
```
Supported Output Types¶
The system supports extraction to various Python types:
Primitive Types¶
str
- Text stringsint
- Integersfloat
- Floating point numbersbool
- Boolean values
Collection Types¶
list[str]
- List of stringslist[int]
- List of integers- Use
extract_is_list=True
to wrap any type in a list
Complex Types¶
- JSON objects via
dict
type - Custom Pydantic models (planned)
Constraints and Filtering¶
Document Section Filtering¶
Use must_contain_text
to limit extraction to specific sections:
column.must_contain_text = "CONFIDENTIALITY"
# Only extracts from sections containing this text
Annotation Label Filtering¶
Use limit_to_label
to extract only from specific annotation types:
column.limit_to_label = "contract-term"
# Only processes annotations with this label
Additional Instructions¶
Provide extra context via instructions
:
column.instructions = "Extract as ISO 8601 date format"
Error Handling¶
The extraction pipeline includes comprehensive error tracking:
- Cell-level errors: Stored in
Datacell.stacktrace
- Extract-level errors: Stored in
Extract.error
- Automatic retry: Failed cells can be retried
- Partial completion: Successful cells are saved even if others fail
Performance Optimization¶
Parallel Processing¶
- Extraction tasks run in parallel across Celery workers
- Each document × column combination is independent
- Scales horizontally with additional workers
Vector Search Efficiency¶
- Uses pgvector for fast similarity search
- Caches embeddings for reuse
- Limits token context to
max_token_length
Database Optimization¶
- Batch creates Datacells
- Uses select_related/prefetch_related
- Minimizes database round trips
Configuration¶
Framework Selection¶
Set the agent framework in settings:
LLMS_DEFAULT_AGENT_FRAMEWORK = "pydantic_ai"
Custom Task Registration¶
Register custom extraction tasks:
# In your app's tasks.py
@shared_task
def custom_extract_task(cell_id: int):
# Custom extraction logic
pass
# In Column configuration
column.task_name = "myapp.tasks.custom_extract_task"
Next Steps¶
- Complex types: Expand output_type to support JSON schemas
- Multi-step extraction: Leverage conversation history
- Cross-document aggregation: Use corpus agents for analysis
- Custom models: Allow registration of Pydantic models