Step-by-step guide to creating custom tasks and pipelines
A minimal guide to creating custom tasks and pipelines. The updated example builds a lightweight ingestion object, extracts people with an LLM task, stores the resulting graph nodes, and then visualizes the result.Before you start:
Complete Quickstart to understand basic operations
PersonLLM and PeopleLLM describe the structured output the LLM should return. Person is the graph-ready DataPoint, and LightweightData gives the pipeline a simple ingestion object with a stable ID and text field.
async def extract_people(data: LightweightData) -> List[Person]: system_prompt = ( "Extract people mentioned in the text. " "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. " "Infer 'knows' only when there is a clear interpersonal interaction in the text." ) person_map: Dict[str, Person] = {} ...
This task uses the LLM to extract lightweight people records, then resolves those names into graph-ready Person objects with knows relationships.
acreate_structured_output is backend-agnostic (BAML or LiteLLM+Instructor). Configure it through STRUCTURED_OUTPUT_FRAMEWORK in .env.
The custom pipeline inserts the extracted Person nodes directly into the graph. The follow-up cognify() call builds the rest of Cognee’s retrieval stack on top of that stored graph data.
import asyncioimport osfrom typing import Any, Dict, Listfrom pydantic import BaseModelimport cogneefrom cognee.modules.engine.operations.setup import setupfrom cognee.infrastructure.llm.LLMGateway import LLMGatewayfrom cognee.infrastructure.engine import DataPointfrom cognee.tasks.storage import add_data_pointsfrom cognee.modules.pipelines import Taskfrom cognee.api.v1.visualize.visualize import visualize_graphfrom uuid import uuid5, NAMESPACE_OID, UUIDclass PersonLLM(BaseModel): """Lightweight Pydantic model for LLM extraction only.""" name: str knows: List[str] = [] # Just names for now, we'll resolve to Person instances laterclass PeopleLLM(BaseModel): """Lightweight Pydantic model for LLM extraction only.""" persons: List[PersonLLM]class Person(DataPoint): name: str # Optional relationships (we'll let the LLM populate this) knows: List["Person"] = [] # Make names searchable in the vector store metadata: Dict[str, Any] = {"index_fields": ["name"]}class LightweightData(DataPoint): """Lightweight DataPoint model for data ingestion only.""" id: UUID text: strdef build_lightweight_data_object(text_data): return LightweightData(id=uuid5(NAMESPACE_OID, text_data), text=text_data)async def extract_people(data: LightweightData) -> List[Person]: system_prompt = ( "Extract people mentioned in the text. " "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. " "Infer 'knows' only when there is a clear interpersonal interaction in the text." ) # Create a mapping of name -> Person DataPoint person_map: Dict[str, Person] = {} for data_item in data: people_llm = await LLMGateway.acreate_structured_output( data_item.text, system_prompt, PeopleLLM ) for person_llm in people_llm.persons: person_map[person_llm.name] = Person(name=person_llm.name) # Resolve knows relationships for person_llm in people_llm.persons: person = person_map[person_llm.name] person.knows = [person_map[name] for name in person_llm.knows if name in person_map] return list(person_map.values())async def main(text_data): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) await setup() tasks = [ Task(extract_people), # input: text -> output: list[Person] Task(add_data_points), # input: list[Person] -> output: list[Person] ] await cognee.run_custom_pipeline( tasks=tasks, data=build_lightweight_data_object(text_data), dataset="people_demo" ) await cognee.cognify() visualize_graph_path = os.path.join( os.path.dirname(__file__), ".artifacts", "custom_tasks_and_pipelines.html" ) await visualize_graph(visualize_graph_path)if __name__ == "__main__": text = "Alice knows Mark. Mark had dinner with Bob and Alice. Bob knows Mary." asyncio.run(main(text))
Legacy guide
import asynciofrom typing import Any, Dict, Listfrom pydantic import BaseModel, SkipValidationimport cogneefrom cognee.modules.engine.operations.setup import setupfrom cognee.infrastructure.llm.LLMGateway import LLMGatewayfrom cognee.infrastructure.engine import DataPointfrom cognee.tasks.storage import add_data_pointsfrom cognee.modules.pipelines import Task, run_pipelineclass Person(DataPoint): name: str # Optional relationships (we'll let the LLM populate this) knows: List["Person"] = [] # Make names searchable in the vector store metadata: Dict[str, Any] = {"index_fields": ["name"]}class People(BaseModel): persons: List[Person]async def extract_people(text: str) -> List[Person]: system_prompt = ( "Extract people mentioned in the text. " "Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. " "If the text says someone knows someone set `knows` accordingly. " "Only include facts explicitly stated." ) people = await LLMGateway.acreate_structured_output(text, system_prompt, People) return people.personsasync def main(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) await setup() text = "Alice knows Bob." tasks = [ Task(extract_people), # input: text -> output: list[Person] Task(add_data_points) # input: list[Person] -> output: list[Person] ] async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]): passif __name__ == "__main__": asyncio.run(main())
This updated example uses a lightweight ingestion object, a custom extraction task, and a visualization step. In practice, you can create larger pipelines with additional transforms and storage stages.