Pipelines in cognee
Cognee pipelines are the data processing system that transforms raw content into structured knowledge graphs. Cognee implements 3 default pipelines that work together to handle different aspects of data processing, from initial ingestion to knowledge graph generation.
At its core, cognee uses tasks grouped into pipelines that efficiently populate graph and vector stores. These tasks process the data and passes results to the next task in the chain in order to create a semantic layer that will improve the quality of answers produced by LLMs.
You can think of tasks as Python functions and pipelines as groups of Python functions executing in an order. Pipelines orchestrate the execution of multiple tasks in sequence. Data flows from one task to the next, with each task’s output becoming the input for the subsequent task.
Tasks are managed and executed asynchronously using the run_tasks
and run_tasks_parallel
functions.
pipeline = run_tasks(tasks, documents)
async for result in pipeline:
print(result)
Pipeline Execution Flow
- Sequential Processing: Tasks execute one after another
- Data Flow: Output of Task 1 → Input of Task 2 → Output of Task 2 → Input of Task 3, etc.
- Batch Handling: Results are collected and passed in batches based on the
batch_size
with default value 1 - Async Execution: All tasks are executed asynchronously for better performance
1- Data Ingestion Pipeline: add
-
Task(resolve_data_directories): Resolves file paths and directory structures
-
Task(ingest_data): Classifies data types, extracts metadata, and stores in database
2- Main Pipeline: cognify
The default cognify
pipeline demonstrates the typical processing flow:
-
Task(classify_documents): Converts raw data into a specific Document type: Pdf, Audio, Image or Text
-
Task(check_permissions_on_dataset): Validates user permissions to access the dataset
-
Task(extract_chunks_from_documents): Extracts text chunks based on the document type
-
Task(extract_graph_from_data): Generates knowledge graphs from the document chunks
-
Task(summarize_text): Extracts a summary for each chunk using an LLM
-
Task(add_data_points): Creates nodes and edges from the chunks and their properties and adds them to the graph engine
When to use it
Use cognify whenever you have “plain” content (PDFs, text, audio transcripts, etc.) and you want cognee to:
- Ingest the data safely,
- Index it for semantic and relational search,
- Summarize it so your LLM can answer faster and cheaper,
- Generate knowledge graph quickly
Remember: Out-of-the-box, cognify pipeline gives you a populated graph store, embeddings in the vector store, and summaries ready for improved RAG - no extra wiring required.
🧠 Cognee allows you to make your own custom pipelines when you need one.
3- Code Graph Pipeline: codify
Specialized pipeline for analyzing code repositories.
- Task(get_repo_file_dependencies): Generates a dependency graph for Python files in the given repository path.
- Task(add_data_points): Creates nodes and edges from the chunks and their properties and adds them to the graph engine
This pipeline also consist of optional tasks for non-code files.
What Happens Behind the Scenes: simple example
Let’s trace through a complete example from the codebase:
# From examples/python/simple_example.py - Complete workflow
async def main():
# Step 1: Clean slate
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
# Step 2: Prepare data
text = """
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""
# Step 3: ADD PIPELINE - Data ingestion
await cognee.add(text)
# Step 4: COGNIFY PIPELINE - Knowledge graph creation
await cognee.cognify()
# Step 5: Query the knowledge
search_results = await cognee.search(
query_type=SearchType.INSIGHTS,
query_text="Tell me about NLP"
)
During cognee.add(text)
:
# Add Pipeline executes:
1. resolve_data_directories(text)
→ Identifies text as string data
2. ingest_data(text, "main_dataset", user, None)
→ Classifies as TextData
→ Generates content hash for deduplication
→ Stores in database with metadata
→ Creates dataset if needed
→ Sets user permissions
During cognee.cognify()
:
# Cognify Pipeline executes:
1. classify_documents(stored_data)
→ Converts Data objects to TextDocument objects
2. check_permissions_on_dataset(dataset, user, ["write"])
→ Validates user can process these dataset
3. extract_chunks_from_documents(documents, max_chunk_size, chunker)
→ Yields: DocumentChunk("Natural language processing (NLP) is...")
→ Yields: DocumentChunk("...interdisciplinary subfield of...")
4. extract_graph_from_data(chunks, KnowledgeGraph)
→ Extracts entities and relationships
→ Creates: Entity("natural language processing"), Entity("computer science")
→ Creates: Relationship("is_subfield_of") connecting them
5. summarize_text(chunks)
→ Generates summaries for each chunk
6. add_data_points(entities_and_relationships)
→ Stores nodes and edges in graph database and vector embeddings
Join the Conversation!
Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!