Skip to Content

Understanding Tasks in cognee

Tasks are the building blocks of cognee’s data processing pipeline. They enable you to transform, enrich, and structure data in ways that enhance LLM reasoning capabilities. By creating custom tasks, you can define exactly how your knowledge graph grows and evolves.

Why Tasks Matter

Tasks play a crucial role in:

  • Converting raw data into structured knowledge
  • Establishing relationships between different pieces of information
  • Enriching existing data with additional context
  • Optimizing information retrieval for LLMs

Task Types and Capabilities

cognee supports multiple types of tasks to handle different processing needs:

Synchronous Tasks

Basic tasks that process data directly:

def extract_entities(text: str) -> List[str]: """Extract key entities from text.""" entities = process_text(text) return entities

Asynchronous Tasks

Tasks that handle async operations like API calls:

async def enrich_with_metadata(document: Document) -> Document: """Add metadata to document from external service.""" metadata = await fetch_metadata(document.id) document.metadata = metadata return document

Generator Tasks

Tasks that yield multiple results:

def chunk_document(document: Document) -> Generator[TextChunk, None, None]: """Split document into manageable chunks.""" for section in document.content: yield TextChunk(content=section)

Async Generator Tasks

Tasks that combine async operations with streaming:

async def stream_process_chunks(chunks: List[TextChunk]) -> AsyncGenerator[ProcessedChunk, None]: """Process chunks with streaming results.""" for chunk in chunks: processed = await process_chunk(chunk) yield processed

Building Task Pipelines

Tasks can be combined into powerful pipelines that transform your data:

from cognee.modules.pipelines.tasks import Task, TaskConfig, run_tasks # Define your task pipeline tasks = [ Task(extract_entities), Task(enrich_with_metadata, task_config=TaskConfig(needs=[extract_entities])), Task(chunk_document), Task(stream_process_chunks) ] # Execute the pipeline async def process_document(document): pipeline = run_tasks(tasks, document) async for result in pipeline: print(f"Processed chunk: {result}")

Task Dependencies

Tasks can specify dependencies to ensure proper execution order:

tasks = [ Task(extract_base_info), Task(enrich_metadata, task_config=TaskConfig(needs=[extract_base_info])), Task(validate_results, task_config=TaskConfig(needs=[enrich_metadata])) ]

This creates a clear processing flow where each task builds upon the results of its dependencies.

Best Practices

When creating custom tasks:

  • Keep tasks focused and single-purpose
  • Handle errors gracefully within tasks
  • Use type hints for better code clarity
  • Document task inputs, outputs, and side effects
  • Consider task reusability across different pipelines

Join the Conversation!

Have questions about creating custom tasks? Join our community to discuss implementation strategies and best practices!