Skip to Content
How-to GuidesCognee SDKCustom Pipelines

Create custom pipeline in cognee

You can create your own pipelines by combining custom or built-in tasks.

Why create a custom pipeline?

The default cognify pipeline covers the common case, but you’ll want a custom pipeline when you need to:

  • Add or swap tasks – e.g., run special processing that your data needs before chunking, anonymise PII, translate text, generate domain-specific embeddings, etc.
  • Change execution order – or perhaps fan-out heavy jobs in parallel.
  • Integrate external services – call a ML model, push results to a third-party API, write audit logs, and so on.

A pipeline is just an ordered list of Task objects executed asynchronously by run_tasks.

def generate_numbers(num): for i in range(num): yield i
async def add_one(num): return num + 1
tasks = [ Task(generate_numbers), Task(add_one) ] pipeline_run = await run_tasks(tasks, data=10)

Pipeline run information

Tasks and pipelines provide information about the run. Simply print a task or pipeline run object to see detailed execution information:

# Create and run a task generate_numbers_task = Task(generate_numbers) data = 10 task_run = await generate_numbers_task.run(data) # Print task run information and status for info in task_run: print(info) # Similarly for pipelines pipeline_run = run_tasks( [Task(generate_numbers_task)], data=10 ) for pipeline_run_info in pipeline_run: print(pipeline_run_info) # Shows pipeline status and execution details

This makes it significantly easier to monitor execution flow and troubleshoot issues during development.

Support for multiple task types

The task system now seamlessly handles different task types:

  • Regular functions: def my_task(input): return processed_input
  • Async functions: async def my_task(input): return await process(input)
  • Generators: def my_task(input): yield from process_stream(input)
  • Async generators: async def my_task(input): async for item in stream: yield item

This flexibility allows you to choose the most appropriate implementation style for each specific task.

Our pipeline execution engine has been optimized for both sequential and parallel task execution, resulting in faster processing times and reduced resource usage.

These updates have already improved our benchmark performance, as shown in our evaluations.

You can get to know more about how the main pipeline currently is implemented in cognee to process data in a structured way and populate the graph and vector stores.

Building Custom Pipeline

Below is an example how to build low level custom pipeline with built in Tasks:

import os import json import asyncio from cognee import prune from cognee import visualize_graph from cognee.low_level import setup, DataPoint from cognee.pipelines import run_tasks, Task from cognee.tasks.storage import add_data_points class Person(DataPoint): name: str class Department(DataPoint): name: str employees: list[Person] class CompanyType(DataPoint): name: str = "Company" class Company(DataPoint): name: str departments: list[Department] is_type: CompanyType def ingest_files(): companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json") companies = json.loads(open(companies_file_path, "r").read()) people_file_path = os.path.join(os.path.dirname(__file__), "people.json") people = json.loads(open(people_file_path, "r").read()) people_data_points = {} departments_data_points = {} for person in people: new_person = Person(name=person["name"]) people_data_points[person["name"]] = new_person if person["department"] not in departments_data_points: departments_data_points[person["department"]] = Department( name=person["department"], employees=[new_person] ) else: departments_data_points[person["department"]].employees.append(new_person) companies_data_points = {} # Create a single CompanyType node, so we connect all companies to it. companyType = CompanyType() for company in companies: new_company = Company(name=company["name"], departments=[], is_type=companyType) companies_data_points[company["name"]] = new_company for department_name in company["departments"]: if department_name not in departments_data_points: departments_data_points[department_name] = Department( name=department_name, employees=[] ) new_company.departments.append(departments_data_points[department_name]) return companies_data_points.values() async def main(): await prune.prune_data() await prune.prune_system(metadata=True) await setup() pipeline = run_tasks([Task(ingest_files), Task(add_data_points)]) async for status in pipeline: print(status) # Visualize graph graph_file_path = str( os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html") ) await visualize_graph(graph_file_path) if __name__ == "__main__": asyncio.run(main())

Join the Conversation!

Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!


Last updated on