Documentation Index
Fetch the complete documentation index at: https://docs.cognee.ai/llms.txt
Use this file to discover all available pages before exploring further.
What pipelines are
Pipelines coordinate ordered Tasks into a reproducible workflow. Default Cognee operations like Remember run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through the higher-level operations unless you need staged control.Prerequisites
- Dataset: a container (name or UUID) where your data is stored and processed. Every document remembered by Cognee belongs to a dataset.
- User: the identity for ownership and access control. A default user is created and used if none is provided.
- More details are available below
How pipelines run
Somewhat unsurprisingly, the function used to run pipelines is calledrun_pipeline.
Cognee uses a layered execution model: a single call to run_pipeline orchestrates multi-dataset processing by running per-file pipelines through the sequence of tasks.
- Statuses are yielded as the pipeline runs and written to databases where appropriate
- User access to datasets and files is carefully verified at each layer
- Pipeline run information includes dataset IDs, completion status, and error handling
- Background execution uses queues to manage status updates and avoid database conflicts
Pipeline Names and Caching
Pipeline Names and Caching
Every
The lower-level How
When
run_pipeline call takes a pipeline_name parameter (default: "custom_pipeline") and a use_pipeline_cache flag (default: False). These two values together control whether a pipeline re-processes a dataset that was already handled.Reserved pipeline names
Two pipeline names are used internally and carry special meaning:| Name | Used by | Behavior |
|---|---|---|
cognify_pipeline | cognee.cognify() | Runs with use_pipeline_cache=True; skips datasets that are already processed |
add_pipeline | cognee.add() | Runs with use_pipeline_cache=True; skips datasets with identical content |
cognee.add() step, which is also used inside remember(), always resets the stored status for both add_pipeline and cognify_pipeline before running, so that new data can be re-processed by the downstream cognify() step on the next call.How use_pipeline_cache works
When use_pipeline_cache=True, Cognee checks the relational database for the most recent run of pipeline_name on the target dataset before executing:- If the stored status is
DATASET_PROCESSING_COMPLETED→ the pipeline yields the cached result and returns immediately without re-running the tasks. - If the stored status is
DATASET_PROCESSING_STARTED→ the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs. - If there is no prior record (new dataset or new pipeline name) → the pipeline runs normally.
use_pipeline_cache=False (the default for custom pipelines), the pipeline always executes its tasks regardless of any prior completion status. The prior status record is still read, but it does not block execution.Custom pipeline naming
Custom pipeline naming
For your own pipelines, choose a unique
pipeline_name that does not conflict with cognify_pipeline or add_pipeline. Using a unique name means:- State tracking is isolated to your pipeline — a completed run of the built-in
cognify()will not affect your pipeline’s qualification check. - If you enable
use_pipeline_cache=Truefor your custom pipeline, you must reset its status manually (viareset_dataset_pipeline_run_status) when you want to re-process a dataset.
Crash recovery and stuck pipelines
Crash recovery and stuck pipelines
When a server crashes or is killed mid-cognify, the pipeline run record in the relational database is left with a
DATASET_PROCESSING_STARTED status. On the next call to cognify(), the cache check sees that status and returns immediately without re-running — preventing what it assumes is a duplicate concurrent run.| Pipeline run status after crash | What cognify() sees on retry | Outcome |
|---|---|---|
DATASET_PROCESSING_STARTED | Another run is already in progress | Returns immediately, does not reprocess |
DATASET_PROCESSING_ERRORED | No completed run; cache allows re-run | Reprocesses the dataset normally |
Unblock a stuck pipeline
Unblock a stuck pipeline
To unblock a stuck pipeline, call Parameters
reset_dataset_pipeline_run_status. It writes a new DATASET_PROCESSING_INITIATED record, which clears the stuck status and lets the next cognify() call proceed.| Parameter | Type | Required | Description |
|---|---|---|---|
dataset_id | UUID | Yes | The ID of the dataset whose pipeline runs should be reset |
user | User | Yes | The user object used for ownership lookup |
pipeline_names | list[str] | None | No | If provided, only runs for these pipeline names are reset; omit to reset all |
What happens after reset
What happens after reset
Once reset, calling
cognify() again is safe:- Documents that fully completed before the crash (their per-document
pipeline_statusentry isDATA_ITEM_PROCESSING_COMPLETED) are skipped — no duplicate graph nodes or embeddings are written. - Documents that were mid-processing when the crash occurred will be reprocessed from the beginning. These items will be re-chunked, re-extracted, and re-embedded.
reset_dataset_pipeline_run_status resets the dataset-level run status only. It does not clear per-document status. Documents that completed before the crash remain marked as completed and are not reprocessed.PipelineContext and ctx injection
PipelineContext and ctx injection
PipelineContext is the runtime context object that Cognee automatically builds and injects into any task that declares a ctx parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an extras dict for custom state.| Field | Type | Description |
|---|---|---|
user | Any | The user that triggered the pipeline. Used for access control and provenance. |
dataset | Any | The resolved dataset object for the current run. |
data_item | Any | The individual data item being processed in this pipeline execution. |
pipeline_name | Optional[str] | The name passed to run_pipeline or run_tasks. |
extras | Dict[str, Any] | Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to {}. |
ctx, it passes the current PipelineContext when the task runs. Matching is by parameter name, not by type annotation.Tasks that do not declare ctx simply receive no context and are unaffected.Using extras for custom pipeline state
Using extras for custom pipeline state
Pass a dict as the
context argument to run_pipeline or extras to run_tasks. Every task in the pipeline can read those values from ctx.extras.ctx.extras is always a plain dict, never None.Accessing user and dataset in a task
Accessing user and dataset in a task
The The built-in
user and dataset fields are most useful when you need to write provenance records or apply per-tenant logic:add_data_points task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.Making ctx optional
Making ctx optional
Always default
ctx to None so the task can also be called directly in tests or scripts without a running pipeline:Layered execution
Layered execution
- Innermost layer: individual task execution with telemetry and recursive task running in batches
- Middle layer: per-dataset pipeline management and task orchestration
- Outermost layer: multi-dataset orchestration and overall pipeline execution
- Execution modes: blocking (wait for completion) or background (return immediately with “started” status)
Customization approaches and tips
Customization approaches and tips
- Use Remember for the default ingestion path
- Modify transformation steps without touching low-level functions, avoid going below
run_pipeline - Custom tasks let you extend or replace default behavior
Users
Users
- Identity: represents who owns and acts on data. If omitted, a default user is used
- Ownership: every ingested item is tied to a user; content is deduplicated per owner
- Permissions: enforced per dataset (read/write/delete/share) during processing and API access
Datasets
Datasets
- Container: a named or UUID-scoped collection of related data and derived knowledge
- Scoping:
remember()writes into a specific dataset, and dataset-scoped pipelines process the dataset(s) you pass - Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)
Tasks
Learn about the individual processing units that make up pipelines
DataPoints
Understand the structured outputs that pipelines produce
Main Operations
See how pipelines are used in Remember and lower-level ingestion workflows