Create custom pipeline in cognee
You can create your own pipelines by combining custom or built-in tasks.
Why create a custom pipeline?
The default cognify pipeline covers the common case, but you’ll want a custom pipeline when you need to:
- Add or swap tasks – e.g., run special processing that your data needs before chunking, anonymise PII, translate text, generate domain-specific embeddings, etc.
- Change execution order – or perhaps fan-out heavy jobs in parallel.
- Integrate external services – call a ML model, push results to a third-party API, write audit logs, and so on.
A pipeline is just an ordered list of Task
objects executed asynchronously by run_tasks
.
def generate_numbers(num):
for i in range(num):
yield i
async def add_one(num):
return num + 1
tasks = [
Task(generate_numbers),
Task(add_one)
]
pipeline_run = await run_tasks(tasks, data=10)
Pipeline run information
Tasks and pipelines provide information about the run. Simply print a task or pipeline run object to see detailed execution information:
# Create and run a task
generate_numbers_task = Task(generate_numbers)
data = 10
task_run = await generate_numbers_task.run(data)
# Print task run information and status
for info in task_run:
print(info)
# Similarly for pipelines
pipeline_run = run_tasks(
[Task(generate_numbers_task)],
data=10
)
for pipeline_run_info in pipeline_run:
print(pipeline_run_info) # Shows pipeline status and execution details
This makes it significantly easier to monitor execution flow and troubleshoot issues during development.
Support for multiple task types
The task system now seamlessly handles different task types:
- Regular functions:
def my_task(input): return processed_input
- Async functions:
async def my_task(input): return await process(input)
- Generators:
def my_task(input): yield from process_stream(input)
- Async generators:
async def my_task(input): async for item in stream: yield item
This flexibility allows you to choose the most appropriate implementation style for each specific task.
Our pipeline execution engine has been optimized for both sequential and parallel task execution, resulting in faster processing times and reduced resource usage.
These updates have already improved our benchmark performance, as shown in our evaluations.
You can get to know more about how the main pipeline currently is implemented in cognee to process data in a structured way and populate the graph and vector stores.
Building Custom Pipeline
Below is an example how to build low level custom pipeline with built in Tasks:
import os
import json
import asyncio
from cognee import prune
from cognee import visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points
class Person(DataPoint):
name: str
class Department(DataPoint):
name: str
employees: list[Person]
class CompanyType(DataPoint):
name: str = "Company"
class Company(DataPoint):
name: str
departments: list[Department]
is_type: CompanyType
def ingest_files():
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())
people_data_points = {}
departments_data_points = {}
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
companies_data_points = {}
# Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()
for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)
new_company.departments.append(departments_data_points[department_name])
return companies_data_points.values()
async def main():
await prune.prune_data()
await prune.prune_system(metadata=True)
await setup()
pipeline = run_tasks([Task(ingest_files), Task(add_data_points)])
async for status in pipeline:
print(status)
# Visualize graph
graph_file_path = str(
os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html")
)
await visualize_graph(graph_file_path)
if __name__ == "__main__":
asyncio.run(main())
Join the Conversation!
Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!