Create custom pipeline in cognee

You can create your own pipelines by combining custom or built-in tasks.

Why create a custom pipeline?

The default cognify pipeline covers the common case, but you’ll want a custom pipeline when you need to:
  • Add or swap tasks – e.g., run special processing that your data needs before chunking, anonymise PII, translate text, generate domain-specific embeddings, etc.
  • Change execution order – or perhaps fan-out heavy jobs in parallel.
  • Integrate external services – call a ML model, push results to a third-party API, write audit logs, and so on.
A pipeline is just an ordered list of Task objects executed asynchronously by run_tasks.
def generate_numbers(num):
    for i in range(num):
        yield i
async def add_one(num):
    return num + 1
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import load_or_create_datasets
from cognee.pipelines import run_tasks, Task

tasks = [
    Task(generate_numbers),
    Task(add_one)
]

user = await get_default_user()
datasets = await load_or_create_datasets(["numbers_dataset"], [], user)

pipeline = run_tasks(
    tasks,
    dataset_id=datasets[0].id,
    data=10,
    incremental_loading=False,
)

async for info in pipeline:
    print(info)

Pipeline run information

Tasks and pipelines provide information about the run. Simply print a task or pipeline run object to see detailed execution information:
# Create and run a task
generate_numbers_task = Task(generate_numbers)

data = 10

task_run = await generate_numbers_task.run(data)

# Print task run information and status
for info in task_run:
    print(info)

# Similarly for pipelines
pipeline = run_tasks(
    [Task(generate_numbers_task)],
    dataset_id=datasets[0].id,
    data=10,
    incremental_loading=False,
)

async for pipeline_run_info in pipeline:
    print(pipeline_run_info)  # Shows pipeline status and execution details
This makes it significantly easier to monitor execution flow and troubleshoot issues during development.

Support for multiple task types

The task system now seamlessly handles different task types:
  • Regular functions: def my_task(input): return processed_input
  • Async functions: async def my_task(input): return await process(input)
  • Generators: def my_task(input): yield from process_stream(input)
  • Async generators: async def my_task(input): async for item in stream: yield item
This flexibility allows you to choose the most appropriate implementation style for each specific task. Our pipeline execution engine has been optimized for both sequential and parallel task execution, resulting in faster processing times and reduced resource usage. These updates have already improved our benchmark performance, as shown in our evaluations. You can get to know more about how the main pipeline currently is implemented in cognee to process data in a structured way and populate the graph and vector stores.

Building Custom Pipeline

Below is an example how to build low level custom pipeline with built in Tasks:
import os
import json
import asyncio
from typing import List, Any
from cognee import prune
from cognee import visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.modules.data.methods import load_or_create_datasets
from cognee.modules.users.methods import get_default_user
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points


class Person(DataPoint):
    name: str
    # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
    metadata: dict = {"index_fields": ["name"]}


class Department(DataPoint):
    name: str
    employees: list[Person]
    # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
    metadata: dict = {"index_fields": ["name"]}


class CompanyType(DataPoint):
    name: str = "Company"
    # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
    metadata: dict = {"index_fields": ["name"]}


class Company(DataPoint):
    name: str
    departments: list[Department]
    is_type: CompanyType
    # Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
    metadata: dict = {"index_fields": ["name"]}


def ingest_files(data: List[Any]):
    people_data_points = {}
    departments_data_points = {}
    companies_data_points = {}

    for data_item in data:
        people = data_item["people"]
        companies = data_item["companies"]

        for person in people:
            new_person = Person(name=person["name"])
            people_data_points[person["name"]] = new_person

            if person["department"] not in departments_data_points:
                departments_data_points[person["department"]] = Department(
                    name=person["department"], employees=[new_person]
                )
            else:
                departments_data_points[person["department"]].employees.append(new_person)

        # Create a single CompanyType node, so we connect all companies to it.
        companyType = CompanyType()

        for company in companies:
            new_company = Company(name=company["name"], departments=[], is_type=companyType)
            companies_data_points[company["name"]] = new_company

            for department_name in company["departments"]:
                if department_name not in departments_data_points:
                    departments_data_points[department_name] = Department(
                        name=department_name, employees=[]
                    )

                new_company.departments.append(departments_data_points[department_name])

    return companies_data_points.values()


async def main():
    await prune.prune_data()
    await prune.prune_system(metadata=True)

    # Create relational database tables
    await setup()

    # If no user is provided use default user
    user = await get_default_user()

    # Create dataset object to keep track of pipeline status
    datasets = await load_or_create_datasets(["test_dataset"], [], user)

    # Prepare data for pipeline
    companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
    companies = json.loads(open(companies_file_path, "r").read())
    people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
    people = json.loads(open(people_file_path, "r").read())

    # Run tasks expects a list of data even if it is just one document
    data = [{"companies": companies, "people": people}]

    pipeline = run_tasks(
        [Task(ingest_files), Task(add_data_points)],
        dataset_id=datasets[0].id,
        data=data,
        incremental_loading=False,
    )

    async for status in pipeline:
        print(status)

    # Or use our simple graph preview
    graph_file_path = str(
        os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html")
    )
    await visualize_graph(graph_file_path)


if __name__ == "__main__":
    asyncio.run(main())

Join the Conversation!

Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!