Skip to main content
A minimal guide to creating custom tasks and pipelines. The updated example builds a lightweight ingestion object, extracts people with an LLM task, stores the resulting graph nodes, and then visualizes the result. Before you start:
  • Complete Quickstart to understand basic operations
  • Ensure you have LLM Providers configured
  • Have some text data to process

What Custom Tasks and Pipelines Do

  • Define custom processing steps using Task objects
  • Chain multiple operations together in a custom pipeline
  • Use LLMs to extract structured data from text
  • Insert structured data directly into the knowledge graph
  • Control the entire data processing workflow

Code in Action

Step 1: Define Your Pipeline Models

class PersonLLM(BaseModel):
    name: str
    knows: List[str] = []

class Person(DataPoint):
    name: str
    knows: List["Person"] = []
    metadata: Dict[str, Any] = {"index_fields": ["name"]}

class LightweightData(DataPoint):
    id: UUID
    text: str
PersonLLM and PeopleLLM describe the structured output the LLM should return. Person is the graph-ready DataPoint, and LightweightData gives the pipeline a simple ingestion object with a stable ID and text field.

Step 2: Create Your Custom Task

async def extract_people(data: LightweightData) -> List[Person]:
    system_prompt = (
        "Extract people mentioned in the text. "
        "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
        "Infer 'knows' only when there is a clear interpersonal interaction in the text."
    )
    person_map: Dict[str, Person] = {}
    ...
This task uses the LLM to extract lightweight people records, then resolves those names into graph-ready Person objects with knows relationships.
acreate_structured_output is backend-agnostic (BAML or LiteLLM+Instructor). Configure it through STRUCTURED_OUTPUT_FRAMEWORK in .env.

Step 3: Build and Run Your Pipeline

tasks = [
    Task(extract_people),
    Task(add_data_points),
]

await cognee.run_custom_pipeline(
    tasks=tasks, data=build_lightweight_data_object(text_data), dataset="people_demo"
)

await cognee.cognify()
The custom pipeline inserts the extracted Person nodes directly into the graph. The follow-up cognify() call builds the rest of Cognee’s retrieval stack on top of that stored graph data.

Step 4: Visualize the Result

visualize_graph_path = os.path.join(
    os.path.dirname(__file__), ".artifacts", "custom_tasks_and_pipelines.html"
)
await visualize_graph(visualize_graph_path)
The example writes an HTML graph visualization so you can inspect the entities and inferred knows edges produced by the custom pipeline.

Use Cases

This approach is particularly useful when you need to:
  • Extract structured data from unstructured text
  • Process data through multiple custom steps
  • Control the entire data processing workflow
  • Combine LLM extraction with programmatic data insertion
  • Build complex data processing pipelines

Additional examples

Additional examples about custom tasks and pipelines are available on our github.

Full Example

import asyncio
import os
from typing import Any, Dict, List
from pydantic import BaseModel

import cognee
from cognee.modules.engine.operations.setup import setup
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.pipelines import Task
from cognee.api.v1.visualize.visualize import visualize_graph
from uuid import uuid5, NAMESPACE_OID, UUID


class PersonLLM(BaseModel):
    """Lightweight Pydantic model for LLM extraction only."""

    name: str
    knows: List[str] = []  # Just names for now, we'll resolve to Person instances later


class PeopleLLM(BaseModel):
    """Lightweight Pydantic model for LLM extraction only."""

    persons: List[PersonLLM]


class Person(DataPoint):
    name: str
    # Optional relationships (we'll let the LLM populate this)
    knows: List["Person"] = []
    # Make names searchable in the vector store
    metadata: Dict[str, Any] = {"index_fields": ["name"]}


class LightweightData(DataPoint):
    """Lightweight DataPoint model for data ingestion only."""

    id: UUID
    text: str


def build_lightweight_data_object(text_data):
    return LightweightData(id=uuid5(NAMESPACE_OID, text_data), text=text_data)


async def extract_people(data: LightweightData) -> List[Person]:
    system_prompt = (
        "Extract people mentioned in the text. "
        "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
        "Infer 'knows' only when there is a clear interpersonal interaction in the text."
    )
    # Create a mapping of name -> Person DataPoint
    person_map: Dict[str, Person] = {}
    for data_item in data:
        people_llm = await LLMGateway.acreate_structured_output(
            data_item.text, system_prompt, PeopleLLM
        )

        for person_llm in people_llm.persons:
            person_map[person_llm.name] = Person(name=person_llm.name)

        # Resolve knows relationships
        for person_llm in people_llm.persons:
            person = person_map[person_llm.name]
            person.knows = [person_map[name] for name in person_llm.knows if name in person_map]

    return list(person_map.values())


async def main(text_data):
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)
    await setup()

    tasks = [
        Task(extract_people),  # input: text -> output: list[Person]
        Task(add_data_points),  # input: list[Person] -> output: list[Person]
    ]

    await cognee.run_custom_pipeline(
        tasks=tasks, data=build_lightweight_data_object(text_data), dataset="people_demo"
    )

    await cognee.cognify()

    visualize_graph_path = os.path.join(
        os.path.dirname(__file__), ".artifacts", "custom_tasks_and_pipelines.html"
    )
    await visualize_graph(visualize_graph_path)


if __name__ == "__main__":
    text = "Alice knows Mark. Mark had dinner with Bob and Alice. Bob knows Mary."
    asyncio.run(main(text))
import asyncio
from typing import Any, Dict, List
from pydantic import BaseModel, SkipValidation

import cognee
from cognee.modules.engine.operations.setup import setup
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.pipelines import Task, run_pipeline

class Person(DataPoint):
    name: str
    # Optional relationships (we'll let the LLM populate this)
    knows: List["Person"] = []
    # Make names searchable in the vector store
    metadata: Dict[str, Any] = {"index_fields": ["name"]}

class People(BaseModel):
    persons: List[Person]

async def extract_people(text: str) -> List[Person]:
    system_prompt = (
        "Extract people mentioned in the text. "
        "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
        "If the text says someone knows someone set `knows` accordingly. "
        "Only include facts explicitly stated."
    )
    people = await LLMGateway.acreate_structured_output(text, system_prompt, People)
    return people.persons

async def main():
    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)
    await setup()

    text = "Alice knows Bob."

    tasks = [
        Task(extract_people),  # input: text -> output: list[Person]
        Task(add_data_points)  # input: list[Person] -> output: list[Person]
    ]

    async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]):
        pass



if __name__ == "__main__":
    asyncio.run(main())
This updated example uses a lightweight ingestion object, a custom extraction task, and a visualization step. In practice, you can create larger pipelines with additional transforms and storage stages.

Custom Data Models

Learn about custom data models

Low-Level LLM

Learn about direct LLM interaction

Core Concepts

Understand knowledge graph fundamentals