What pipelines are
Pipelines coordinate ordered Tasks into a reproducible workflow. Default Cognee operations like Remember run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through the higher-level operations unless you need staged control.Prerequisites
- Dataset: a container (name or UUID) where your data is stored and processed. Every document remembered by Cognee belongs to a dataset.
- User: the identity for ownership and access control. A default user is created and used if none is provided.
- More details are available below
How pipelines run
Somewhat unsurprisingly, the function used to run pipelines is calledrun_pipeline.
Cognee uses a layered execution model: a single call to run_pipeline orchestrates multi-dataset processing by running per-file pipelines through the sequence of tasks.
- Statuses are yielded as the pipeline runs and written to databases where appropriate
- User access to datasets and files is carefully verified at each layer
- Pipeline run information includes dataset IDs, completion status, and error handling
- Background execution uses queues to manage status updates and avoid database conflicts
Pipeline Names and Caching
Pipeline Names and Caching
Every
Both built-in operations run with How
When
run_pipeline call takes a pipeline_name parameter (default: "custom_pipeline") and a use_pipeline_cache flag (default: False). These two values together control whether a pipeline re-processes a dataset that was already handled.Reserved pipeline names
Two pipeline names are used internally and carry special meaning:| Name | Used by | Behavior |
|---|---|---|
cognify_pipeline | cognee.cognify() | Runs with use_pipeline_cache=False; starts a new dataset-level run instead of skipping because of a prior dataset run |
add_pipeline | cognee.add() | Runs with use_pipeline_cache=False; starts a new dataset-level run instead of skipping because of a prior dataset run |
use_pipeline_cache=False, so they do not short-circuit based on a dataset-level DATASET_PROCESSING_COMPLETED or DATASET_PROCESSING_STARTED record. They start a new dataset-level run, while per-document pipeline status can still skip data items that already completed for that pipeline. Concurrent runs on the same dataset are kept safe by a per-dataset lock (see the “Per-dataset serialization” section below) rather than by the cache check.The lower-level cognee.add() step, which is also used inside remember(), always resets the stored status for both add_pipeline and cognify_pipeline before running, so that new data can be re-processed by the downstream cognify() step on the next call.How use_pipeline_cache works
When use_pipeline_cache=True, Cognee checks the relational database for the most recent run of pipeline_name on the target dataset before executing:- If the stored status is
DATASET_PROCESSING_COMPLETED→ the pipeline yields the cached result and returns immediately without re-running the tasks. - If the stored status is
DATASET_PROCESSING_STARTED→ the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs. - If there is no prior record (new dataset or new pipeline name) → the pipeline runs normally.
use_pipeline_cache=False (the default for custom pipelines, and the mode used by cognee.add() and cognee.cognify()), the dataset-level qualification check is skipped entirely — the prior dataset run status is not read — and the pipeline starts a new dataset-level run regardless of any prior dataset completion status. Per-document pipeline status is checked later during task execution, so individual data items that already completed for that pipeline can still be skipped. Safety against concurrent runs on the same dataset is provided by the per-dataset lock described below instead of by this check.Per-dataset serialization
Per-dataset serialization
Pipeline runs are serialized per dataset. Before a run starts,
run_pipeline_per_dataset acquires a lock keyed on the dataset ID, so two runs that target the same dataset execute one after another — the second waits until the first finishes — while runs on different datasets still proceed in parallel. This protects each dataset from concurrent writers (for example, two cognify() calls on the same dataset) without globally serializing all pipeline activity.Nested (re-entrant) runs
A pipeline task may legitimately start another pipeline on the same dataset — for example, a session-driven run callingadd() or cognify() on the dataset it is already processing. Because the per-dataset lock is not re-entrant, re-acquiring it from within the same execution would self-deadlock. Cognee detects that the current execution already holds the dataset’s lock and lets the nested run proceed without re-locking; external runs on that dataset stay queued behind the lock the ancestor run holds.Custom pipeline naming
Custom pipeline naming
For your own pipelines, choose a unique
pipeline_name that does not conflict with cognify_pipeline or add_pipeline. Using a unique name means:- State tracking is isolated to your pipeline — a completed run of the built-in
cognify()will not affect your pipeline’s qualification check. - If you enable
use_pipeline_cache=Truefor your custom pipeline, you must reset its status manually (viareset_dataset_pipeline_run_status) when you want to re-process a dataset.
Crash recovery and stuck pipelines
Crash recovery and stuck pipelines
When a server crashes or is killed mid-cognify, the pipeline run record in the relational database is left with a
The
DATASET_PROCESSING_STARTED status. Because cognify() and add() now run with use_pipeline_cache=False, they no longer consult that dataset-level status — the next call starts a new dataset-level run, serialized by the per-dataset lock. A stuck DATASET_PROCESSING_STARTED record therefore no longer blocks the built-in operations, although completed data items can still be skipped by their per-document pipeline status.| Pipeline run status after crash | What cognify() does on retry | Outcome |
|---|---|---|
DATASET_PROCESSING_STARTED | Dataset-level cache check skipped; starts a new run under the per-dataset lock | Runs normally; completed data items may still be skipped |
DATASET_PROCESSING_ERRORED | Dataset-level cache check skipped; starts a new run under the per-dataset lock | Runs normally; completed data items may still be skipped |
reset_dataset_pipeline_run_status helper below is still useful for custom pipelines that opt into use_pipeline_cache=True, where a stuck DATASET_PROCESSING_STARTED record would otherwise cause the cache check to skip a re-run.Unblock a stuck pipeline
Unblock a stuck pipeline
To unblock a stuck pipeline that uses Parameters
use_pipeline_cache=True, call reset_dataset_pipeline_run_status. It writes a new DATASET_PROCESSING_INITIATED record, which clears the stuck status so the next cached run is no longer skipped.| Parameter | Type | Required | Description |
|---|---|---|---|
dataset_id | UUID | Yes | The ID of the dataset whose pipeline runs should be reset |
user | User | Yes | The user object used for ownership lookup |
pipeline_names | list[str] | None | No | If provided, only runs for these pipeline names are reset; omit to reset all |
What happens after reset
What happens after reset
Once reset, calling
cognify() again is safe:- Documents that fully completed before the crash (their per-document
pipeline_statusentry isDATA_ITEM_PROCESSING_COMPLETED) are skipped — no duplicate graph nodes or embeddings are written. - Documents that were mid-processing when the crash occurred will be reprocessed from the beginning. These items will be re-chunked, re-extracted, and re-embedded.
reset_dataset_pipeline_run_status resets the dataset-level run status only. It does not clear per-document status. Documents that completed before the crash remain marked as completed and are not reprocessed.What gets stored in a pipeline run record
What gets stored in a pipeline run record
Each pipeline run is persisted as a row in the relational
pipeline_runs table. Alongside the status, IDs, and pipeline name, the record keeps a run_info column with an audit-only preview of the input the run was started with. This preview is bounded so a single run cannot grow the table without limit:- If the input is a list of Cognee
Datarecords, only their IDs are stored. - Any other input is stringified and, if longer than 512 characters, truncated to a preview that ends with
... [truncated, <N> chars total]. - Empty or missing input is recorded as
"None".
run_info is intended for inspection and debugging only — Cognee never reads it back during processing. If you need the full input payload (for example, large raw text passed to add() or cognify()), persist it yourself in object storage or a linked record rather than relying on run_info to retain it verbatim.PipelineContext and ctx injection
PipelineContext and ctx injection
PipelineContext is the runtime context object that Cognee automatically builds and injects into any task that declares a ctx parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an extras dict for custom state.| Field | Type | Description |
|---|---|---|
user | Any | The user that triggered the pipeline. Used for access control and provenance. |
dataset | Any | The resolved dataset object for the current run. |
data_item | Any | The individual data item being processed in this pipeline execution. |
pipeline_name | Optional[str] | The name passed to run_pipeline or run_tasks. |
extras | Dict[str, Any] | Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to {}. |
ctx, it passes the current PipelineContext when the task runs. Matching is by parameter name, not by type annotation.Tasks that do not declare ctx simply receive no context and are unaffected.Using extras for custom pipeline state
Using extras for custom pipeline state
Pass a dict as the
context argument to run_pipeline or extras to run_tasks. Every task in the pipeline can read those values from ctx.extras.ctx.extras is always a plain dict, never None.Accessing user and dataset in a task
Accessing user and dataset in a task
The The built-in
user and dataset fields are most useful when you need to write provenance records or apply per-tenant logic:add_data_points task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.Making ctx optional
Making ctx optional
Always default
ctx to None so the task can also be called directly in tests or scripts without a running pipeline:Error handling and exception propagation
Error handling and exception propagation
When a task raises while processing a data item, the pipeline logs the error, yields a
PipelineRunErrored status, and then re-raises the original exception to the caller. A failing data item therefore both surfaces a PipelineRunErrored event and propagates the underlying exception out of the pipeline run, rather than being silently collapsed into an error status only.Because of this, wrap pipeline runs in try/except when you iterate them, so you can react to the propagated exception:Layered execution
Layered execution
- Innermost layer: individual task execution with telemetry and recursive task running in batches
- Middle layer: per-dataset pipeline management and task orchestration
- Outermost layer: multi-dataset orchestration and overall pipeline execution
- Execution modes: blocking (wait for completion) or background (return immediately with “started” status)
- In background mode with no
datasetspassed, the run resolves to every dataset the run’s user has write access to — the user supplied in the run’s params, or the default user when none is given
Customization approaches and tips
Customization approaches and tips
- Use Remember for the default ingestion path
- Modify transformation steps without touching low-level functions, avoid going below
run_pipeline - Custom tasks let you extend or replace default behavior
Users
Users
- Identity: represents who owns and acts on data. If omitted, a default user is used
- Ownership: every ingested item is tied to a user; content is deduplicated per owner
- Permissions: enforced per dataset (read/write/delete/share) during processing and API access
Datasets
Datasets
- Container: a named or UUID-scoped collection of related data and derived knowledge
- Scoping:
remember()writes into a specific dataset, and dataset-scoped pipelines process the dataset(s) you pass - Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)
Tasks
Learn about the individual processing units that make up pipelines
DataPoints
Understand the structured outputs that pipelines produce
Main Operations
See how pipelines are used in Remember and lower-level ingestion workflows