Skip to Content
Core ConceptsPipelines

Pipelines in cognee

Cognee pipelines are the data processing system that transforms raw content into structured knowledge graphs. Cognee implements 3 default pipelines that work together to handle different aspects of data processing, from initial ingestion to knowledge graph generation.

At its core, cognee uses tasks grouped into pipelines that efficiently populate graph and vector stores. These tasks process the data and passes results to the next task in the chain in order to create a semantic layer that will improve the quality of answers produced by LLMs.

You can think of tasks as Python functions and pipelines as groups of Python functions executing in an order. Pipelines orchestrate the execution of multiple tasks in sequence. Data flows from one task to the next, with each task’s output becoming the input for the subsequent task. Tasks are managed and executed asynchronously using the run_tasks and run_tasks_parallel functions.

pipeline = run_tasks(tasks, documents) async for result in pipeline: print(result)

Pipeline Execution Flow

  1. Sequential Processing: Tasks execute one after another
  2. Data Flow: Output of Task 1 → Input of Task 2 → Output of Task 2 → Input of Task 3, etc.
  3. Batch Handling: Results are collected and passed in batches based on the batch_size with default value 1
  4. Async Execution: All tasks are executed asynchronously for better performance

1- Data Ingestion Pipeline: add

  • Task(resolve_data_directories): Resolves file paths and directory structures

  • Task(ingest_data): Classifies data types, extracts metadata, and stores in database

2- Main Pipeline: cognify

The default cognify pipeline demonstrates the typical processing flow:

  • Task(classify_documents): Converts raw data into a specific Document type: Pdf, Audio, Image or Text

  • Task(check_permissions_on_dataset): Validates user permissions to access the dataset

  • Task(extract_chunks_from_documents): Extracts text chunks based on the document type

  • Task(extract_graph_from_data): Generates knowledge graphs from the document chunks

  • Task(summarize_text): Extracts a summary for each chunk using an LLM

  • Task(add_data_points): Creates nodes and edges from the chunks and their properties and adds them to the graph engine

When to use it

Use cognify whenever you have “plain” content (PDFs, text, audio transcripts, etc.) and you want cognee to:

  1. Ingest the data safely,
  2. Index it for semantic and relational search,
  3. Summarize it so your LLM can answer faster and cheaper,
  4. Generate knowledge graph quickly

Remember: Out-of-the-box, cognify pipeline gives you a populated graph store, embeddings in the vector store, and summaries ready for improved RAG - no extra wiring required.

🧠 Cognee allows you to make your own custom pipelines when you need one.

3- Code Graph Pipeline: codify

Specialized pipeline for analyzing code repositories.

  • Task(get_repo_file_dependencies): Generates a dependency graph for Python files in the given repository path.
  • Task(add_data_points): Creates nodes and edges from the chunks and their properties and adds them to the graph engine

This pipeline also consist of optional tasks for non-code files.

What Happens Behind the Scenes: simple example

Let’s trace through a complete example from the codebase:

# From examples/python/simple_example.py - Complete workflow async def main(): # Step 1: Clean slate await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) # Step 2: Prepare data text = """ Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval. """ # Step 3: ADD PIPELINE - Data ingestion await cognee.add(text) # Step 4: COGNIFY PIPELINE - Knowledge graph creation await cognee.cognify() # Step 5: Query the knowledge search_results = await cognee.search( query_type=SearchType.INSIGHTS, query_text="Tell me about NLP" )

During cognee.add(text):

# Add Pipeline executes: 1. resolve_data_directories(text) → Identifies text as string data 2. ingest_data(text, "main_dataset", user, None) → Classifies as TextData → Generates content hash for deduplication → Stores in database with metadata → Creates dataset if needed → Sets user permissions

During cognee.cognify():

# Cognify Pipeline executes: 1. classify_documents(stored_data) → Converts Data objects to TextDocument objects 2. check_permissions_on_dataset(dataset, user, ["write"]) → Validates user can process these dataset 3. extract_chunks_from_documents(documents, max_chunk_size, chunker) → Yields: DocumentChunk("Natural language processing (NLP) is...") → Yields: DocumentChunk("...interdisciplinary subfield of...") 4. extract_graph_from_data(chunks, KnowledgeGraph) → Extracts entities and relationships → Creates: Entity("natural language processing"), Entity("computer science") → Creates: Relationship("is_subfield_of") connecting them 5. summarize_text(chunks) → Generates summaries for each chunk 6. add_data_points(entities_and_relationships) → Stores nodes and edges in graph database and vector embeddings

Join the Conversation!

Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!