Skip to main content

What pipelines are

Pipelines coordinate ordered Tasks into a reproducible workflow. Default Cognee operations like Add and Cognify run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through these operations.

Prerequisites

  • Dataset: a container (name or UUID) where your data is stored and processed. Every document added to cognee belongs to a dataset.
  • User: the identity for ownership and access control. A default user is created and used if none is provided.
  • More details are available below

How pipelines run

Somewhat unsurprisingly, the function used to run pipelines is called run_pipeline. Cognee uses a layered execution model: a single call to run_pipeline orchestrates multi-dataset processing by running per-file pipelines through the sequence of tasks.
  • Statuses are yielded as the pipeline runs and written to databases where appropriate
  • User access to datasets and files is carefully verified at each layer
  • Pipeline run information includes dataset IDs, completion status, and error handling
  • Background execution uses queues to manage status updates and avoid database conflicts
Every run_pipeline call takes a pipeline_name parameter (default: "custom_pipeline") and a use_pipeline_cache flag (default: False). These two values together control whether a pipeline re-processes a dataset that was already handled.

Reserved pipeline names

Two pipeline names are used internally and carry special meaning:
NameUsed byBehavior
cognify_pipelinecognee.cognify()Runs with use_pipeline_cache=True; skips datasets that are already processed
add_pipelinecognee.add()Runs with use_pipeline_cache=True; skips datasets with identical content
cognee.add() always resets the stored status for both add_pipeline and cognify_pipeline before running, so that calling add() with new data will allow cognify() to re-process the dataset on the next call.
Do not use cognify_pipeline or add_pipeline as pipeline_name values in your own run_pipeline calls. Reusing these names causes your pipeline to read and write the same status records as the built-in operations, which can lead to unexpected skipping or incorrect state resets.

How use_pipeline_cache works

When use_pipeline_cache=True, Cognee checks the relational database for the most recent run of pipeline_name on the target dataset before executing:
  • If the stored status is DATASET_PROCESSING_COMPLETED → the pipeline yields the cached result and returns immediately without re-running the tasks.
  • If the stored status is DATASET_PROCESSING_STARTED → the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs.
  • If there is no prior record (new dataset or new pipeline name) → the pipeline runs normally.
When use_pipeline_cache=False (the default for custom pipelines), the pipeline always executes its tasks regardless of any prior completion status. The prior status record is still read, but it does not block execution.
For your own pipelines, choose a unique pipeline_name that does not conflict with cognify_pipeline or add_pipeline. Using a unique name means:
  • State tracking is isolated to your pipeline — a completed run of the built-in cognify() will not affect your pipeline’s qualification check.
  • If you enable use_pipeline_cache=True for your custom pipeline, you must reset its status manually (via reset_dataset_pipeline_run_status) when you want to re-process a dataset.
# Custom pipeline with a unique name — safe to use alongside add() / cognify()
async for run_info in run_pipeline(
tasks=tasks,
data=text,
datasets=["my_dataset"],
pipeline_name="my_enrichment_pipeline",  # unique name, no conflict
use_pipeline_cache=False,                 # default: always re-runs
):
pass
  • Innermost layer: individual task execution with telemetry and recursive task running in batches
  • Middle layer: per-dataset pipeline management and task orchestration
  • Outermost layer: multi-dataset orchestration and overall pipeline execution
  • Execution modes: blocking (wait for completion) or background (return immediately with “started” status)
  • Use Cognify with custom tasks after Add
  • Modify transformation steps without touching low-level functions, avoid going below run_pipeline
  • Custom tasks let you extend or replace default behavior
  • Identity: represents who owns and acts on data. If omitted, a default user is used
  • Ownership: every ingested item is tied to a user; content is deduplicated per owner
  • Permissions: enforced per dataset (read/write/delete/share) during processing and API access
  • Container: a named or UUID-scoped collection of related data and derived knowledge
  • Scoping: Add writes into a specific dataset; Cognify processes the dataset(s) you pass
  • Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)

Tasks

Learn about the individual processing units that make up pipelines

DataPoints

Understand the structured outputs that pipelines produce

Main Operations

See how pipelines are used in Add, Cognify, and Search workflows