Understanding Tasks in cognee
Tasks are the building blocks of cognee’s data processing pipeline. They enable you to transform, enrich, and structure data in ways that enhance LLM reasoning capabilities. By creating custom tasks, you can define exactly how your knowledge graph grows and evolves.
Why Tasks Matter
Tasks play a crucial role in:
- Converting raw data into structured knowledge
- Establishing relationships between different pieces of information
- Enriching existing data with additional context
- Optimizing information retrieval for LLMs
Task Types and Capabilities
cognee supports multiple types of tasks to handle different processing needs:
Synchronous Tasks
Basic tasks that process data directly:
def extract_entities(text: str) -> List[str]:
"""Extract key entities from text."""
entities = process_text(text)
return entities
Asynchronous Tasks
Tasks that handle async operations like API calls:
async def enrich_with_metadata(document: Document) -> Document:
"""Add metadata to document from external service."""
metadata = await fetch_metadata(document.id)
document.metadata = metadata
return document
Generator Tasks
Tasks that yield multiple results:
def chunk_document(document: Document) -> Generator[TextChunk, None, None]:
"""Split document into manageable chunks."""
for section in document.content:
yield TextChunk(content=section)
Async Generator Tasks
Tasks that combine async operations with streaming:
async def stream_process_chunks(chunks: List[TextChunk]) -> AsyncGenerator[ProcessedChunk, None]:
"""Process chunks with streaming results."""
for chunk in chunks:
processed = await process_chunk(chunk)
yield processed
Building Task Pipelines
Tasks can be combined into powerful pipelines that transform your data:
from cognee.modules.pipelines.tasks import Task, TaskConfig, run_tasks
# Define your task pipeline
tasks = [
Task(extract_entities),
Task(enrich_with_metadata,
task_config=TaskConfig(needs=[extract_entities])),
Task(chunk_document),
Task(stream_process_chunks)
]
# Execute the pipeline
async def process_document(document):
pipeline = run_tasks(tasks, document)
async for result in pipeline:
print(f"Processed chunk: {result}")
Task Dependencies
Tasks can specify dependencies to ensure proper execution order:
tasks = [
Task(extract_base_info),
Task(enrich_metadata,
task_config=TaskConfig(needs=[extract_base_info])),
Task(validate_results,
task_config=TaskConfig(needs=[enrich_metadata]))
]
This creates a clear processing flow where each task builds upon the results of its dependencies.
Best Practices
When creating custom tasks:
- Keep tasks focused and single-purpose
- Handle errors gracefully within tasks
- Use type hints for better code clarity
- Document task inputs, outputs, and side effects
- Consider task reusability across different pipelines
Join the Conversation!
Have questions about creating custom tasks? Join our community to discuss implementation strategies and best practices!