> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cognee.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipelines

> Orchestrating tasks into coordinated workflows for data processing

## What pipelines are

Pipelines coordinate ordered [Tasks](../building-blocks/tasks) into a reproducible workflow. Default Cognee operations like [Remember](../main-operations/remember) run on top of the same execution layer. You typically do not call low-level functions directly; you trigger pipelines through the higher-level operations unless you need staged control.

## Prerequisites

* **Dataset**: a container (name or UUID) where your data is stored and processed. Every document remembered by Cognee belongs to a dataset.
* **User**: the identity for ownership and access control. A default user is created and used if none is provided.
* More details are available below

## How pipelines run

Somewhat unsurprisingly, the function used to run pipelines is called `run_pipeline`.

Cognee uses a **layered execution model**: a single call to `run_pipeline` orchestrates **multi-dataset processing** by running **per-file pipelines** through the sequence of tasks.

* **Statuses** are yielded as the pipeline runs and written to **databases** where appropriate
* **User access** to datasets and files is carefully verified at each layer
* **Pipeline run information** includes dataset IDs, completion status, and error handling
* **Background execution** uses queues to manage status updates and avoid database conflicts

<Accordion title="Pipeline Names and Caching">
  Every `run_pipeline` call takes a `pipeline_name` parameter (default: `"custom_pipeline"`) and a `use_pipeline_cache` flag (default: `False`). These two values together control whether a pipeline re-processes a dataset that was already handled.

  ### Reserved pipeline names

  Two pipeline names are used internally and carry special meaning:

  | Name               | Used by            | Behavior                                                                       |
  | ------------------ | ------------------ | ------------------------------------------------------------------------------ |
  | `cognify_pipeline` | `cognee.cognify()` | Runs with `use_pipeline_cache=True`; skips datasets that are already processed |
  | `add_pipeline`     | `cognee.add()`     | Runs with `use_pipeline_cache=True`; skips datasets with identical content     |

  The lower-level `cognee.add()` step, which is also used inside `remember()`, always resets the stored status for **both** `add_pipeline` and `cognify_pipeline` before running, so that new data can be re-processed by the downstream `cognify()` step on the next call.

  <Warning>
    Do not use `cognify_pipeline` or `add_pipeline` as `pipeline_name` values in your own `run_pipeline` calls. Reusing these names causes your pipeline to read and write the same status records as the built-in operations, which can lead to unexpected skipping or incorrect state resets.
  </Warning>

  ### How `use_pipeline_cache` works

  When `use_pipeline_cache=True`, Cognee checks the relational database for the most recent run of `pipeline_name` on the target dataset before executing:

  * If the stored status is **`DATASET_PROCESSING_COMPLETED`** → the pipeline yields the cached result and returns immediately without re-running the tasks.
  * If the stored status is **`DATASET_PROCESSING_STARTED`** → the pipeline yields the in-progress status and returns, preventing duplicate concurrent runs.
  * If there is **no prior record** (new dataset or new pipeline name) → the pipeline runs normally.

  When `use_pipeline_cache=False` (the default for custom pipelines), the pipeline always executes its tasks regardless of any prior completion status. The prior status record is still read, but it does not block execution.
</Accordion>

<Accordion title="Custom pipeline naming">
  For your own pipelines, choose a unique `pipeline_name` that does not conflict with `cognify_pipeline` or `add_pipeline`. Using a unique name means:

  * State tracking is isolated to your pipeline — a completed run of the built-in `cognify()` will not affect your pipeline's qualification check.
  * If you enable `use_pipeline_cache=True` for your custom pipeline, you must reset its status manually (via `reset_dataset_pipeline_run_status`) when you want to re-process a dataset.

  ```python theme={null}
  # Custom pipeline with a unique name — safe to use alongside the built-in memory workflows
  async for run_info in run_pipeline(
  tasks=tasks,
  data=text,
  datasets=["my_dataset"],
  pipeline_name="my_enrichment_pipeline",  # unique name, no conflict
  use_pipeline_cache=False,                 # default: always re-runs
  ):
  pass
  ```
</Accordion>

<Accordion title="Crash recovery and stuck pipelines">
  When a server crashes or is killed mid-cognify, the pipeline run record in the relational database is left with a `DATASET_PROCESSING_STARTED` status. On the next call to `cognify()`, the cache check sees that status and returns immediately without re-running — preventing what it assumes is a duplicate concurrent run.

  | Pipeline run status after crash | What `cognify()` sees on retry        | Outcome                                 |
  | ------------------------------- | ------------------------------------- | --------------------------------------- |
  | `DATASET_PROCESSING_STARTED`    | Another run is already in progress    | Returns immediately, does not reprocess |
  | `DATASET_PROCESSING_ERRORED`    | No completed run; cache allows re-run | Reprocesses the dataset normally        |

  <AccordionGroup>
    <Accordion title="Unblock a stuck pipeline">
      To unblock a stuck pipeline, call `reset_dataset_pipeline_run_status`. It writes a new `DATASET_PROCESSING_INITIATED` record, which clears the stuck status and lets the next `cognify()` call proceed.

      ```python theme={null}
      from uuid import UUID
      from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
          reset_dataset_pipeline_run_status,
      )

      # Reset all pipelines on a dataset
      await reset_dataset_pipeline_run_status(dataset_id=my_dataset.id, user=current_user)

      # Or reset only specific pipelines by name
      await reset_dataset_pipeline_run_status(
          dataset_id=my_dataset.id,
          user=current_user,
          pipeline_names=["cognify_pipeline"],
      )
      ```

      **Parameters**

      | Parameter        | Type                | Required | Description                                                                  |
      | ---------------- | ------------------- | -------- | ---------------------------------------------------------------------------- |
      | `dataset_id`     | `UUID`              | Yes      | The ID of the dataset whose pipeline runs should be reset                    |
      | `user`           | `User`              | Yes      | The user object used for ownership lookup                                    |
      | `pipeline_names` | `list[str] \| None` | No       | If provided, only runs for these pipeline names are reset; omit to reset all |
    </Accordion>

    <Accordion title="What happens after reset">
      Once reset, calling `cognify()` again is safe:

      * Documents that **fully completed** before the crash (their per-document `pipeline_status` entry is `DATA_ITEM_PROCESSING_COMPLETED`) are skipped — no duplicate graph nodes or embeddings are written.
      * Documents that were **mid-processing** when the crash occurred will be reprocessed from the beginning. These items will be re-chunked, re-extracted, and re-embedded.
    </Accordion>
  </AccordionGroup>

  <Note>
    `reset_dataset_pipeline_run_status` resets the dataset-level run status only. It does not clear per-document status. Documents that completed before the crash remain marked as completed and are not reprocessed.
  </Note>
</Accordion>

<Accordion title="PipelineContext and ctx injection">
  `PipelineContext` is the runtime context object that Cognee automatically builds and injects into any task that declares a `ctx` parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an `extras` dict for custom state.

  | Field           | Type             | Description                                                                                      |
  | --------------- | ---------------- | ------------------------------------------------------------------------------------------------ |
  | `user`          | `Any`            | The user that triggered the pipeline. Used for access control and provenance.                    |
  | `dataset`       | `Any`            | The resolved dataset object for the current run.                                                 |
  | `data_item`     | `Any`            | The individual data item being processed in this pipeline execution.                             |
  | `pipeline_name` | `Optional[str]`  | The name passed to `run_pipeline` or `run_tasks`.                                                |
  | `extras`        | `Dict[str, Any]` | Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to `{}`. |

  The framework inspects each task function's signature. If it finds a parameter named `ctx`, it passes the current `PipelineContext` when the task runs. Matching is by parameter name, not by type annotation.

  Tasks that do not declare `ctx` simply receive no context and are unaffected.

  <AccordionGroup>
    <Accordion title="Using extras for custom pipeline state">
      Pass a dict as the `context` argument to `run_pipeline` or `extras` to `run_tasks`. Every task in the pipeline can read those values from `ctx.extras`.

      ```python theme={null}
      from cognee.modules.pipelines import Task, run_pipeline
      from cognee.modules.pipelines.models.PipelineContext import PipelineContext

      async def score_items(data, ctx: PipelineContext = None):
          multiplier = ctx.extras.get("score_multiplier", 1) if ctx else 1
          return [item * multiplier for item in data]

      async for _ in run_pipeline(
          tasks=[Task(score_items)],
          data=[1, 2, 3],
          datasets=["my_dataset"],
          pipeline_name="scoring_pipeline",
          context={"score_multiplier": 10},
      ):
          pass
      ```

      `ctx.extras` is always a plain dict, never `None`.
    </Accordion>

    <Accordion title="Accessing user and dataset in a task">
      The `user` and `dataset` fields are most useful when you need to write provenance records or apply per-tenant logic:

      ```python theme={null}
      async def store_result(data_points, ctx: PipelineContext = None):
          user = ctx.user if ctx else None
          dataset = ctx.dataset if ctx else None
          data_item = ctx.data_item if ctx else None

          for dp in data_points:
              await write_to_store(dp, user_id=user.id, dataset_id=dataset.id)
          return data_points
      ```

      The built-in `add_data_points` task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.
    </Accordion>

    <Accordion title="Making ctx optional">
      Always default `ctx` to `None` so the task can also be called directly in tests or scripts without a running pipeline:

      ```python theme={null}
      async def my_task(data, ctx: PipelineContext = None):
          name = ctx.pipeline_name if ctx else "standalone"
          ...
      ```
    </Accordion>
  </AccordionGroup>
</Accordion>

<Accordion title="Layered execution">
  * Innermost layer: individual task execution with telemetry and recursive task running in batches
  * Middle layer: per-dataset pipeline management and task orchestration
  * Outermost layer: multi-dataset orchestration and overall pipeline execution
  * Execution modes: blocking (wait for completion) or background (return immediately with "started" status)
</Accordion>

<Accordion title="Customization approaches and tips">
  * Use [Remember](../main-operations/remember) for the default ingestion path
  * Modify transformation steps without touching low-level functions, avoid going below `run_pipeline`
  * Custom tasks let you extend or replace default behavior
</Accordion>

<Accordion title="Users">
  * Identity: represents who owns and acts on data. If omitted, a default user is used
  * Ownership: every ingested item is tied to a user; content is deduplicated per owner
  * Permissions: enforced per dataset (read/write/delete/share) during processing and API access
</Accordion>

<Accordion title="Datasets">
  * Container: a named or UUID-scoped collection of related data and derived knowledge
  * Scoping: `remember()` writes into a specific dataset, and dataset-scoped pipelines process the dataset(s) you pass
  * Lifecycle: new names create datasets and grant the calling user permissions; UUIDs let you target existing datasets (given permission)
</Accordion>

<Columns cols={3}>
  <Card title="Tasks" icon="square-check" href="/core-concepts/building-blocks/tasks">
    Learn about the individual processing units that make up pipelines
  </Card>

  <Card title="DataPoints" icon="circle" href="/core-concepts/building-blocks/datapoints">
    Understand the structured outputs that pipelines produce
  </Card>

  <Card title="Main Operations" icon="play" href="/core-concepts/main-operations/remember">
    See how pipelines are used in Remember and lower-level ingestion workflows
  </Card>
</Columns>
