Skip to main content

cognee.run_custom_pipeline()

async def run_custom_pipeline(
    tasks: Union[List[Task], List[str]] = None,
    data: Any = None,
    dataset: Union[str, UUID] = 'main_dataset',
    user: User = None,
    vector_db_config: Optional[dict] = None,
    graph_db_config: Optional[dict] = None,
    use_pipeline_cache: bool = False,
    incremental_loading: bool = False,
    data_per_batch: int = 20,
    run_in_background: bool = False,
    pipeline_name: str = 'custom_pipeline',
)

Description

Custom pipeline in Cognee, can work with already built graphs. Data needs to be provided which can be processed with provided tasks. Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. This is the core processing step in Cognee that converts raw text and documents into an intelligent knowledge graph. It analyzes content, extracts entities and relationships, and creates semantic connections for enhanced search and reasoning. Args: tasks: List of Cognee Tasks to execute. data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used. Data provided here will be forwarded to the first extraction task in the pipeline as input. dataset: Dataset name or dataset uuid to process. user: User context for authentication and data access. Uses default if None. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. use_pipeline_cache: If True, pipelines with the same ID that are currently executing and pipelines with the same ID that were completed won’t process data again. Pipelines ID is created based on the generate_pipeline_id function. Pipeline status can be manually reset with the reset_dataset_pipeline_run_status function. incremental_loading: If True, only new or modified data will be processed to avoid duplication. (Only works if data is used with the Cognee python Data model). The incremental system stores and compares hashes of processed data in the Data model and skips data with the same content hash. data_per_batch: Number of data items to be processed in parallel. run_in_background: If True, starts processing asynchronously and returns immediately. If False, waits for completion before returning. Background mode recommended for large datasets (>100MB). Use pipeline_run_id from return value to monitor progress.

Parameters

tasks
Union[List[Task], List[str]]
default:"None"
List of Task objects or task names defining the pipeline steps.
data
Any
default:"None"
Input data for the pipeline.
dataset
Union[str, UUID]
default:"'main_dataset'"
Dataset name or UUID.
user
User
default:"None"
User performing the operation.
vector_db_config
Optional[dict]
default:"None"
Override vector database configuration.
graph_db_config
Optional[dict]
default:"None"
Override graph database configuration.
use_pipeline_cache
bool
default:"False"
Cache intermediate pipeline results.
incremental_loading
bool
default:"False"
Skip already-processed data.
data_per_batch
int
default:"20"
Number of data items per batch.
run_in_background
bool
default:"False"
If true, return immediately and process in background.
pipeline_name
str
default:"'custom_pipeline'"
Name identifier for the pipeline run.

Examples

import cognee
from cognee.modules.pipelines import Task

# Define custom tasks
async def my_extractor(data):
    # Custom extraction logic
    yield extracted_data

async def my_enricher(data):
    # Custom enrichment logic
    yield enriched_data

# Run a custom pipeline
await cognee.run_custom_pipeline(
    tasks=[
        Task(my_extractor),
        Task(my_enricher),
    ],
    data="Input data for the pipeline",
    dataset="my_dataset",
    pipeline_name="my_custom_pipeline",
)
See Custom Tasks & Pipelines for a full guide.