Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.cognee.ai/llms.txt

Use this file to discover all available pages before exploring further.

What pipelines are

Pipelines coordinate ordered Tasks into a reproducible workflow. Default Cognee operations like Remember run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through the higher-level operations unless you need staged control.

Prerequisites

  • Dataset: a container (name or UUID) where your data is stored and processed. Every document remembered by Cognee belongs to a dataset.
  • User: the identity for ownership and access control. A default user is created and used if none is provided.
  • More details are available below

How pipelines run

Somewhat unsurprisingly, the function used to run pipelines is called run_pipeline. Cognee uses a layered execution model: a single call to run_pipeline orchestrates multi-dataset processing by running per-file pipelines through the sequence of tasks.
  • Statuses are yielded as the pipeline runs and written to databases where appropriate
  • User access to datasets and files is carefully verified at each layer
  • Pipeline run information includes dataset IDs, completion status, and error handling
  • Background execution uses queues to manage status updates and avoid database conflicts
Every run_pipeline call takes a pipeline_name parameter (default: "custom_pipeline") and a use_pipeline_cache flag (default: False). These two values together control whether a pipeline re-processes a dataset that was already handled.

Reserved pipeline names

Two pipeline names are used internally and carry special meaning:
NameUsed byBehavior
cognify_pipelinecognee.cognify()Runs with use_pipeline_cache=True; skips datasets that are already processed
add_pipelinecognee.add()Runs with use_pipeline_cache=True; skips datasets with identical content
The lower-level cognee.add() step, which is also used inside remember(), always resets the stored status for both add_pipeline and cognify_pipeline before running, so that new data can be re-processed by the downstream cognify() step on the next call.
Do not use cognify_pipeline or add_pipeline as pipeline_name values in your own run_pipeline calls. Reusing these names causes your pipeline to read and write the same status records as the built-in operations, which can lead to unexpected skipping or incorrect state resets.

How use_pipeline_cache works

When use_pipeline_cache=True, Cognee checks the relational database for the most recent run of pipeline_name on the target dataset before executing:
  • If the stored status is DATASET_PROCESSING_COMPLETED → the pipeline yields the cached result and returns immediately without re-running the tasks.
  • If the stored status is DATASET_PROCESSING_STARTED → the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs.
  • If there is no prior record (new dataset or new pipeline name) → the pipeline runs normally.
When use_pipeline_cache=False (the default for custom pipelines), the pipeline always executes its tasks regardless of any prior completion status. The prior status record is still read, but it does not block execution.
For your own pipelines, choose a unique pipeline_name that does not conflict with cognify_pipeline or add_pipeline. Using a unique name means:
  • State tracking is isolated to your pipeline — a completed run of the built-in cognify() will not affect your pipeline’s qualification check.
  • If you enable use_pipeline_cache=True for your custom pipeline, you must reset its status manually (via reset_dataset_pipeline_run_status) when you want to re-process a dataset.
# Custom pipeline with a unique name — safe to use alongside the built-in memory workflows
async for run_info in run_pipeline(
tasks=tasks,
data=text,
datasets=["my_dataset"],
pipeline_name="my_enrichment_pipeline",  # unique name, no conflict
use_pipeline_cache=False,                 # default: always re-runs
):
pass
When a server crashes or is killed mid-cognify, the pipeline run record in the relational database is left with a DATASET_PROCESSING_STARTED status. On the next call to cognify(), the cache check sees that status and returns immediately without re-running — preventing what it assumes is a duplicate concurrent run.
Pipeline run status after crashWhat cognify() sees on retryOutcome
DATASET_PROCESSING_STARTEDAnother run is already in progressReturns immediately, does not reprocess
DATASET_PROCESSING_ERROREDNo completed run; cache allows re-runReprocesses the dataset normally
To unblock a stuck pipeline, call reset_dataset_pipeline_run_status. It writes a new DATASET_PROCESSING_INITIATED record, which clears the stuck status and lets the next cognify() call proceed.
from uuid import UUID
from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
    reset_dataset_pipeline_run_status,
)

# Reset all pipelines on a dataset
await reset_dataset_pipeline_run_status(dataset_id=my_dataset.id, user=current_user)

# Or reset only specific pipelines by name
await reset_dataset_pipeline_run_status(
    dataset_id=my_dataset.id,
    user=current_user,
    pipeline_names=["cognify_pipeline"],
)
Parameters
ParameterTypeRequiredDescription
dataset_idUUIDYesThe ID of the dataset whose pipeline runs should be reset
userUserYesThe user object used for ownership lookup
pipeline_nameslist[str] | NoneNoIf provided, only runs for these pipeline names are reset; omit to reset all
Once reset, calling cognify() again is safe:
  • Documents that fully completed before the crash (their per-document pipeline_status entry is DATA_ITEM_PROCESSING_COMPLETED) are skipped — no duplicate graph nodes or embeddings are written.
  • Documents that were mid-processing when the crash occurred will be reprocessed from the beginning. These items will be re-chunked, re-extracted, and re-embedded.
reset_dataset_pipeline_run_status resets the dataset-level run status only. It does not clear per-document status. Documents that completed before the crash remain marked as completed and are not reprocessed.
PipelineContext is the runtime context object that Cognee automatically builds and injects into any task that declares a ctx parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an extras dict for custom state.
FieldTypeDescription
userAnyThe user that triggered the pipeline. Used for access control and provenance.
datasetAnyThe resolved dataset object for the current run.
data_itemAnyThe individual data item being processed in this pipeline execution.
pipeline_nameOptional[str]The name passed to run_pipeline or run_tasks.
extrasDict[str, Any]Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to {}.
The framework inspects each task function’s signature. If it finds a parameter named ctx, it passes the current PipelineContext when the task runs. Matching is by parameter name, not by type annotation.Tasks that do not declare ctx simply receive no context and are unaffected.
Pass a dict as the context argument to run_pipeline or extras to run_tasks. Every task in the pipeline can read those values from ctx.extras.
from cognee.modules.pipelines import Task, run_pipeline
from cognee.modules.pipelines.models.PipelineContext import PipelineContext

async def score_items(data, ctx: PipelineContext = None):
    multiplier = ctx.extras.get("score_multiplier", 1) if ctx else 1
    return [item * multiplier for item in data]

async for _ in run_pipeline(
    tasks=[Task(score_items)],
    data=[1, 2, 3],
    datasets=["my_dataset"],
    pipeline_name="scoring_pipeline",
    context={"score_multiplier": 10},
):
    pass
ctx.extras is always a plain dict, never None.
The user and dataset fields are most useful when you need to write provenance records or apply per-tenant logic:
async def store_result(data_points, ctx: PipelineContext = None):
    user = ctx.user if ctx else None
    dataset = ctx.dataset if ctx else None
    data_item = ctx.data_item if ctx else None

    for dp in data_points:
        await write_to_store(dp, user_id=user.id, dataset_id=dataset.id)
    return data_points
The built-in add_data_points task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.
Always default ctx to None so the task can also be called directly in tests or scripts without a running pipeline:
async def my_task(data, ctx: PipelineContext = None):
    name = ctx.pipeline_name if ctx else "standalone"
    ...
  • Innermost layer: individual task execution with telemetry and recursive task running in batches
  • Middle layer: per-dataset pipeline management and task orchestration
  • Outermost layer: multi-dataset orchestration and overall pipeline execution
  • Execution modes: blocking (wait for completion) or background (return immediately with “started” status)
  • Use Remember for the default ingestion path
  • Modify transformation steps without touching low-level functions, avoid going below run_pipeline
  • Custom tasks let you extend or replace default behavior
  • Identity: represents who owns and acts on data. If omitted, a default user is used
  • Ownership: every ingested item is tied to a user; content is deduplicated per owner
  • Permissions: enforced per dataset (read/write/delete/share) during processing and API access
  • Container: a named or UUID-scoped collection of related data and derived knowledge
  • Scoping: remember() writes into a specific dataset, and dataset-scoped pipelines process the dataset(s) you pass
  • Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)

Tasks

Learn about the individual processing units that make up pipelines

DataPoints

Understand the structured outputs that pipelines produce

Main Operations

See how pipelines are used in Remember and lower-level ingestion workflows