> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cognee.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipelines

> Orchestrate tasks into coordinated data processing workflows.

## What pipelines are

Pipelines coordinate ordered [Tasks](../building-blocks/tasks) into a reproducible workflow. Default Cognee operations like [Remember](../main-operations/remember) run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through the higher-level operations unless you need staged control.

## Prerequisites

* **Dataset**: a container (name or UUID) where your data is stored and processed. Every document remembered by Cognee belongs to a dataset.
* **User**: the identity for ownership and access control. A default user is created and used if none is provided.
* More details are available below

## How pipelines run

Somewhat unsurprisingly, the function used to run pipelines is called `run_pipeline`.

Cognee uses a **layered execution model**: a single call to `run_pipeline` orchestrates **multi-dataset processing** by running **per-file pipelines** through the sequence of tasks.

* **Statuses** are yielded as the pipeline runs and written to **databases** where appropriate
* **User access** to datasets and files is carefully verified at each layer
* **Pipeline run information** includes dataset IDs, completion status, and error handling
* **Background execution** uses queues to manage status updates and avoid database conflicts

<Accordion title="Pipeline Names and Caching">
  Every `run_pipeline` call takes a `pipeline_name` parameter (default: `"custom_pipeline"`) and a `use_pipeline_cache` flag (default: `False`). These two values together control whether a pipeline re-processes a dataset that was already handled.

  ### Reserved pipeline names

  Two pipeline names are used internally and carry special meaning:

  | Name               | Used by            | Behavior                                                                                                                |
  | ------------------ | ------------------ | ----------------------------------------------------------------------------------------------------------------------- |
  | `cognify_pipeline` | `cognee.cognify()` | Runs with `use_pipeline_cache=False`; starts a new dataset-level run instead of skipping because of a prior dataset run |
  | `add_pipeline`     | `cognee.add()`     | Runs with `use_pipeline_cache=False`; starts a new dataset-level run instead of skipping because of a prior dataset run |

  Both built-in operations run with `use_pipeline_cache=False`, so they do **not** short-circuit based on a dataset-level `DATASET_PROCESSING_COMPLETED` or `DATASET_PROCESSING_STARTED` record. They start a new dataset-level run, while per-document pipeline status can still skip data items that already completed for that pipeline. Concurrent runs on the **same** dataset are kept safe by a per-dataset lock (see the "Per-dataset serialization" section below) rather than by the cache check.

  The lower-level `cognee.add()` step, which is also used inside `remember()`, always resets the stored status for **both** `add_pipeline` and `cognify_pipeline` before running, so that new data can be re-processed by the downstream `cognify()` step on the next call.

  <Warning>
    Do not use `cognify_pipeline` or `add_pipeline` as `pipeline_name` values in your own `run_pipeline` calls. Reusing these names causes your pipeline to read and write the same status records as the built-in operations, which can lead to unexpected skipping or incorrect state resets.
  </Warning>

  ### How `use_pipeline_cache` works

  When `use_pipeline_cache=True`, Cognee checks the relational database for the most recent run of `pipeline_name` on the target dataset before executing:

  * If the stored status is **`DATASET_PROCESSING_COMPLETED`** → the pipeline yields the cached result and returns immediately without re-running the tasks.
  * If the stored status is **`DATASET_PROCESSING_STARTED`** → the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs.
  * If there is **no prior record** (new dataset or new pipeline name) → the pipeline runs normally.

  When `use_pipeline_cache=False` (the default for custom pipelines, and the mode used by `cognee.add()` and `cognee.cognify()`), the dataset-level qualification check is skipped entirely — the prior dataset run status is not read — and the pipeline starts a new dataset-level run regardless of any prior dataset completion status. Per-document pipeline status is checked later during task execution, so individual data items that already completed for that pipeline can still be skipped. Safety against concurrent runs on the same dataset is provided by the per-dataset lock described below instead of by this check.
</Accordion>

<Accordion title="Per-dataset serialization">
  Pipeline runs are serialized **per dataset**. Before a run starts, `run_pipeline_per_dataset` acquires a lock keyed on the dataset ID, so two runs that target the **same** dataset execute one after another — the second waits until the first finishes — while runs on **different** datasets still proceed in parallel. This protects each dataset from concurrent writers (for example, two `cognify()` calls on the same dataset) without globally serializing all pipeline activity.

  <Warning>
    The lock is **process-local** — it is an in-memory `asyncio.Lock`. It only serializes runs within a single process/event loop and does **not** guard against multiple processes or workers running pipelines on the same dataset at once. If you run Cognee across several workers, coordinate same-dataset access at a higher level.
  </Warning>

  ### Nested (re-entrant) runs

  A pipeline task may legitimately start another pipeline on the same dataset — for example, a session-driven run calling `add()` or `cognify()` on the dataset it is already processing. Because the per-dataset lock is not re-entrant, re-acquiring it from within the same execution would self-deadlock. Cognee detects that the current execution already holds the dataset's lock and lets the nested run proceed **without re-locking**; external runs on that dataset stay queued behind the lock the ancestor run holds.
</Accordion>

<Accordion title="Custom pipeline naming">
  For your own pipelines, choose a unique `pipeline_name` that does not conflict with `cognify_pipeline` or `add_pipeline`. Using a unique name means:

  * State tracking is isolated to your pipeline — a completed run of the built-in `cognify()` will not affect your pipeline's qualification check.
  * If you enable `use_pipeline_cache=True` for your custom pipeline, you must reset its status manually (via `reset_dataset_pipeline_run_status`) when you want to re-process a dataset.

  ```python theme={null}
  # Custom pipeline with a unique name — safe to use alongside the built-in memory workflows
  async for run_info in run_pipeline(
  tasks=tasks,
  data=text,
  datasets=["my_dataset"],
  pipeline_name="my_enrichment_pipeline",  # unique name, no conflict
  use_pipeline_cache=False,                 # default: always re-runs
  ):
  pass
  ```
</Accordion>

<Accordion title="Crash recovery and stuck pipelines">
  When a server crashes or is killed mid-cognify, the pipeline run record in the relational database is left with a `DATASET_PROCESSING_STARTED` status. Because `cognify()` and `add()` now run with `use_pipeline_cache=False`, they no longer consult that dataset-level status — the next call starts a new dataset-level run, serialized by the per-dataset lock. A stuck `DATASET_PROCESSING_STARTED` record therefore no longer blocks the built-in operations, although completed data items can still be skipped by their per-document pipeline status.

  | Pipeline run status after crash | What `cognify()` does on retry                                                 | Outcome                                                  |
  | ------------------------------- | ------------------------------------------------------------------------------ | -------------------------------------------------------- |
  | `DATASET_PROCESSING_STARTED`    | Dataset-level cache check skipped; starts a new run under the per-dataset lock | Runs normally; completed data items may still be skipped |
  | `DATASET_PROCESSING_ERRORED`    | Dataset-level cache check skipped; starts a new run under the per-dataset lock | Runs normally; completed data items may still be skipped |

  The `reset_dataset_pipeline_run_status` helper below is still useful for **custom pipelines that opt into `use_pipeline_cache=True`**, where a stuck `DATASET_PROCESSING_STARTED` record would otherwise cause the cache check to skip a re-run.

  <AccordionGroup>
    <Accordion title="Unblock a stuck pipeline">
      To unblock a stuck pipeline that uses `use_pipeline_cache=True`, call `reset_dataset_pipeline_run_status`. It writes a new `DATASET_PROCESSING_INITIATED` record, which clears the stuck status so the next cached run is no longer skipped.

      ```python theme={null}
      from uuid import UUID
      from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
          reset_dataset_pipeline_run_status,
      )

      # Reset all pipelines on a dataset
      await reset_dataset_pipeline_run_status(dataset_id=my_dataset.id, user=current_user)

      # Or reset only specific pipelines by name
      await reset_dataset_pipeline_run_status(
          dataset_id=my_dataset.id,
          user=current_user,
          pipeline_names=["cognify_pipeline"],
      )
      ```

      **Parameters**

      | Parameter        | Type                | Required | Description                                                                  |
      | ---------------- | ------------------- | -------- | ---------------------------------------------------------------------------- |
      | `dataset_id`     | `UUID`              | Yes      | The ID of the dataset whose pipeline runs should be reset                    |
      | `user`           | `User`              | Yes      | The user object used for ownership lookup                                    |
      | `pipeline_names` | `list[str] \| None` | No       | If provided, only runs for these pipeline names are reset; omit to reset all |
    </Accordion>

    <Accordion title="What happens after reset">
      Once reset, calling `cognify()` again is safe:

      * Documents that **fully completed** before the crash (their per-document `pipeline_status` entry is `DATA_ITEM_PROCESSING_COMPLETED`) are skipped — no duplicate graph nodes or embeddings are written.
      * Documents that were **mid-processing** when the crash occurred will be reprocessed from the beginning. These items will be re-chunked, re-extracted, and re-embedded.
    </Accordion>
  </AccordionGroup>

  <Note>
    `reset_dataset_pipeline_run_status` resets the dataset-level run status only. It does not clear per-document status. Documents that completed before the crash remain marked as completed and are not reprocessed.
  </Note>
</Accordion>

<Accordion title="What gets stored in a pipeline run record">
  Each pipeline run is persisted as a row in the relational `pipeline_runs` table. Alongside the status, IDs, and pipeline name, the record keeps a `run_info` column with an **audit-only preview** of the input the run was started with. This preview is bounded so a single run cannot grow the table without limit:

  * If the input is a list of Cognee `Data` records, only their IDs are stored.
  * Any other input is stringified and, if longer than **512 characters**, truncated to a preview that ends with `... [truncated, <N> chars total]`.
  * Empty or missing input is recorded as `"None"`.

  `run_info` is intended for inspection and debugging only — Cognee never reads it back during processing. If you need the full input payload (for example, large raw text passed to `add()` or `cognify()`), persist it yourself in object storage or a linked record rather than relying on `run_info` to retain it verbatim.
</Accordion>

<Accordion title="PipelineContext and ctx injection">
  `PipelineContext` is the runtime context object that Cognee automatically builds and injects into any task that declares a `ctx` parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an `extras` dict for custom state.

  | Field           | Type             | Description                                                                                      |
  | --------------- | ---------------- | ------------------------------------------------------------------------------------------------ |
  | `user`          | `Any`            | The user that triggered the pipeline. Used for access control and provenance.                    |
  | `dataset`       | `Any`            | The resolved dataset object for the current run.                                                 |
  | `data_item`     | `Any`            | The individual data item being processed in this pipeline execution.                             |
  | `pipeline_name` | `Optional[str]`  | The name passed to `run_pipeline` or `run_tasks`.                                                |
  | `extras`        | `Dict[str, Any]` | Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to `{}`. |

  The framework inspects each task function's signature. If it finds a parameter named `ctx`, it passes the current `PipelineContext` when the task runs. Matching is by parameter name, not by type annotation.

  Tasks that do not declare `ctx` simply receive no context and are unaffected.

  <AccordionGroup>
    <Accordion title="Using extras for custom pipeline state">
      Pass a dict as the `context` argument to `run_pipeline` or `extras` to `run_tasks`. Every task in the pipeline can read those values from `ctx.extras`.

      ```python theme={null}
      from cognee.modules.pipelines import Task, run_pipeline
      from cognee.modules.pipelines.models.PipelineContext import PipelineContext

      async def score_items(data, ctx: PipelineContext = None):
          multiplier = ctx.extras.get("score_multiplier", 1) if ctx else 1
          return [item * multiplier for item in data]

      async for _ in run_pipeline(
          tasks=[Task(score_items)],
          data=[1, 2, 3],
          datasets=["my_dataset"],
          pipeline_name="scoring_pipeline",
          context={"score_multiplier": 10},
      ):
          pass
      ```

      `ctx.extras` is always a plain dict, never `None`.
    </Accordion>

    <Accordion title="Accessing user and dataset in a task">
      The `user` and `dataset` fields are most useful when you need to write provenance records or apply per-tenant logic:

      ```python theme={null}
      async def store_result(data_points, ctx: PipelineContext = None):
          user = ctx.user if ctx else None
          dataset = ctx.dataset if ctx else None
          data_item = ctx.data_item if ctx else None

          for dp in data_points:
              await write_to_store(dp, user_id=user.id, dataset_id=dataset.id)
          return data_points
      ```

      The built-in `add_data_points` task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.
    </Accordion>

    <Accordion title="Making ctx optional">
      Always default `ctx` to `None` so the task can also be called directly in tests or scripts without a running pipeline:

      ```python theme={null}
      async def my_task(data, ctx: PipelineContext = None):
          name = ctx.pipeline_name if ctx else "standalone"
          ...
      ```
    </Accordion>
  </AccordionGroup>
</Accordion>

<Accordion title="Error handling and exception propagation">
  When a task raises while processing a data item, the pipeline logs the error, yields a `PipelineRunErrored` status, and then re-raises the original exception to the caller. A failing data item therefore both surfaces a `PipelineRunErrored` event **and** propagates the underlying exception out of the pipeline run, rather than being silently collapsed into an error status only.

  Because of this, wrap pipeline runs in `try`/`except` when you iterate them, so you can react to the propagated exception:

  ```python theme={null}
  try:
      async for run_info in run_pipeline(
          tasks=tasks,
          data=text,
          datasets=["my_dataset"],
          pipeline_name="my_pipeline",
      ):
          ...
  except Exception as error:
      # the original task exception propagates here after the
      # PipelineRunErrored status has been yielded
      handle(error)
  ```
</Accordion>

<Accordion title="Layered execution">
  * Innermost layer: individual task execution with telemetry and recursive task running in batches
  * Middle layer: per-dataset pipeline management and task orchestration
  * Outermost layer: multi-dataset orchestration and overall pipeline execution
  * Execution modes: blocking (wait for completion) or background (return immediately with "started" status)
  * In background mode with no `datasets` passed, the run resolves to every dataset the run's user has write access to — the user supplied in the run's params, or the default user when none is given
</Accordion>

<Accordion title="Customization approaches and tips">
  * Use [Remember](../main-operations/remember) for the default ingestion path
  * Modify transformation steps without touching low-level functions, avoid going below `run_pipeline`
  * Custom tasks let you extend or replace default behavior
</Accordion>

<Accordion title="Users">
  * Identity: represents who owns and acts on data. If omitted, a default user is used
  * Ownership: every ingested item is tied to a user; content is deduplicated per owner
  * Permissions: enforced per dataset (read/write/delete/share) during processing and API access
</Accordion>

<Accordion title="Datasets">
  * Container: a named or UUID-scoped collection of related data and derived knowledge
  * Scoping: `remember()` writes into a specific dataset, and dataset-scoped pipelines process the dataset(s) you pass
  * Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)
</Accordion>

<Columns cols={3}>
  <Card title="Tasks" icon="square-check" href="/core-concepts/building-blocks/tasks">
    Learn about the individual processing units that make up pipelines
  </Card>

  <Card title="DataPoints" icon="circle" href="/core-concepts/building-blocks/datapoints">
    Understand the structured outputs that pipelines produce
  </Card>

  <Card title="Main Operations" icon="play" href="/core-concepts/main-operations/remember">
    See how pipelines are used in Remember and lower-level ingestion workflows
  </Card>
</Columns>
