A minimal guide to creating custom tasks and pipelines. You’ll build a two-step pipeline: the LLM extracts People directly, then you insert them into the knowledge graph. Before you start:
  • Complete Quickstart to understand basic operations
  • Ensure you have LLM Providers configured
  • Have some text data to process

What Custom Tasks and Pipelines Do

  • Define custom processing steps using Task objects
  • Chain multiple operations together in a pipeline
  • Use LLMs to extract structured data from text
  • Insert structured data directly into the knowledge graph
  • Control the entire data processing workflow

Full Working Example

import asyncio
from typing import Any, Dict, List
from pydantic import BaseModel, SkipValidation

from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.pipelines import Task, run_pipeline

class Person(DataPoint):
    name: str
    # Optional relationships (we'll let the LLM populate this)
    knows: SkipValidation[Any] = None  # Person or list[Person]
    # Make names searchable in the vector store
    metadata: Dict[str, Any] = {"index_fields": ["name"]}

class People(BaseModel):
    persons: List[Person]

async def extract_people(text: str) -> List[Person]:
    system_prompt = (
        "Extract people mentioned in the text. "
        "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
        "If the text says someone knows someone set `knows` accordingly. "
        "Only include facts explicitly stated."
    )
    people = await LLMGateway.acreate_structured_output(text, system_prompt, People)
    return people.persons

async def main():
    text = "Alice knows Bob."

    tasks = [
        Task(extract_people),  # input: text -> output: list[Person]
        Task(add_data_points)  # input: list[Person] -> output: list[Person]
    ]

    async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]):
        pass

asyncio.run(main())
This simple example uses a two-step pipeline for demonstration. In practice, you can create complex pipelines with multiple custom tasks, data transformations, and processing steps.

What Just Happened

Step 1: Define Your Data Models

class Person(DataPoint):
    name: str
    knows: SkipValidation[Any] = None
    metadata: Dict[str, Any] = {"index_fields": ["name"]}

class People(BaseModel):
    persons: List[Person]
Create Pydantic models for your data. Person inherits from DataPoint for graph insertion, while People is a simple container for the LLM output. Metadata is recommended to make fields searchable in the vector database.

Step 2: Create Your Custom Task

async def extract_people(text: str) -> List[Person]:
    system_prompt = (
        "Extract people mentioned in the text. "
        "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
        "If the text says someone knows someone set `knows` accordingly. "
        "Only include facts explicitly stated."
    )
    people = await LLMGateway.acreate_structured_output(text, system_prompt, People)
    return people.persons
This task uses the LLM to extract structured data from text. The LLM fills People objects with Person instances, including relationships via the knows field.
acreate_structured_output is backend-agnostic (BAML or LiteLLM+Instructor). Configure via STRUCTURED_OUTPUT_FRAMEWORK in .env.

Step 3: Build Your Pipeline

tasks = [
    Task(extract_people),  # input: text -> output: list[Person]
    Task(add_data_points)  # input: list[Person] -> output: list[Person]
]

async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]):
    pass
Chain your tasks together in a pipeline. The first task extracts people from text, the second inserts them into the knowledge graph. add_data_points automatically creates nodes and edges from the knows relationships. Under the hood, run_pipeline(...) automatically initializes databases and checks LLM/embeddings configuration, so you don’t need to worry about setup. Once the pipeline completes, your Cognee memory with graph and embeddings is created and ready for interaction. You can now search your data using the standard search methods:
from cognee.api.v1.search import SearchType

# Search the processed data
results = await cognee.search(
    query_type=SearchType.GRAPH_COMPLETION,
    query_text="Who does Alice know?",
    datasets=["people_demo"]
)
print(results)

Use Cases

This approach is particularly useful when you need to:
  • Extract structured data from unstructured text
  • Process data through multiple custom steps
  • Control the entire data processing workflow
  • Combine LLM extraction with programmatic data insertion
  • Build complex data processing pipelines