> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cognee.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# PipelineContext

> Typed runtime context automatically injected into pipeline task functions.

# PipelineContext: Runtime Context for Tasks

`PipelineContext` is a typed dataclass that the pipeline framework automatically builds and injects into any [task](/core-concepts/building-blocks/tasks) that declares a `ctx` parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an `extras` dict for custom state.

## Fields

| Field           | Type             | Description                                                                                      |
| --------------- | ---------------- | ------------------------------------------------------------------------------------------------ |
| `user`          | `Any`            | The user that triggered the pipeline. Used for access control and provenance.                    |
| `dataset`       | `Any`            | The resolved dataset object for the current run.                                                 |
| `data_item`     | `Any`            | The individual data item being processed in this pipeline execution.                             |
| `pipeline_name` | `Optional[str]`  | The name passed to `run_pipeline` or `run_tasks`.                                                |
| `extras`        | `Dict[str, Any]` | Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to `{}`. |

## How injection works

The framework inspects every task function's signature at construction time. If it finds a parameter named **`ctx`**, it passes the current `PipelineContext` as that argument when the task runs. Matching is by **parameter name**, not by type annotation.

```python theme={null}
from cognee.modules.pipelines.models.PipelineContext import PipelineContext

# ctx is injected automatically — no manual wiring needed
async def my_task(data, ctx: PipelineContext = None):
    if ctx:
        print(ctx.user)
        print(ctx.dataset)
        print(ctx.pipeline_name)
```

Tasks that do not declare `ctx` simply receive no context and are unaffected.

## Using `extras` for custom pipeline state

Pass a dict as the `context` argument to `run_pipeline` (or `extras` to `run_tasks`). Every task in the pipeline can read those values from `ctx.extras`.

```python theme={null}
from cognee.modules.pipelines import Task, run_pipeline
from cognee.modules.pipelines.models.PipelineContext import PipelineContext

async def score_items(data, ctx: PipelineContext = None):
    multiplier = ctx.extras.get("score_multiplier", 1) if ctx else 1
    return [item * multiplier for item in data]

async for _ in run_pipeline(
    tasks=[Task(score_items)],
    data=[1, 2, 3],
    datasets=["my_dataset"],
    pipeline_name="scoring_pipeline",
    context={"score_multiplier": 10},   # becomes ctx.extras
):
    pass
```

`ctx.extras` is always a plain dict — it is never `None`.

## Examples and details

<Accordion title="Accessing user and dataset in a task">
  The `user` and `dataset` fields are most useful when you need to write provenance records or apply per-tenant logic:

  ```python theme={null}
  async def store_result(data_points, ctx: PipelineContext = None):
      user    = ctx.user    if ctx else None
      dataset = ctx.dataset if ctx else None
      data_item = ctx.data_item if ctx else None

      for dp in data_points:
          await write_to_store(dp, user_id=user.id, dataset_id=dataset.id)
      return data_points
  ```

  The built-in `add_data_points` task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.
</Accordion>

<Accordion title="Extras persist across chained tasks">
  All tasks in the same pipeline run share the same `PipelineContext` object, so values written during construction remain available in every downstream task:

  ```python theme={null}
  async def filter_task(data, ctx: PipelineContext = None):
      threshold = ctx.extras.get("min_threshold", 0) if ctx else 0
      return [x for x in data if x >= threshold]

  async def label_task(data, ctx: PipelineContext = None):
      prefix = ctx.extras.get("label_prefix", "") if ctx else ""
      return [f"{prefix}{x}" for x in data]

  # Both tasks receive the same extras
  async for _ in run_pipeline(
      tasks=[Task(filter_task), Task(label_task)],
      data=[1, 5, 10],
      datasets=["demo"],
      context={"min_threshold": 4, "label_prefix": "item_"},
  ):
      pass
  ```
</Accordion>

<Accordion title="Making ctx optional">
  Always default `ctx` to `None` so the task can also be called directly in tests or scripts without a running pipeline:

  ```python theme={null}
  async def my_task(data, ctx: PipelineContext = None):
      name = ctx.pipeline_name if ctx else "standalone"
      ...
  ```
</Accordion>

<Columns cols={3}>
  <Card title="Tasks" icon="square-check" href="/core-concepts/building-blocks/tasks">
    Learn how tasks are defined and composed
  </Card>

  <Card title="Pipelines" icon="git-merge" href="/core-concepts/building-blocks/pipelines">
    See how tasks and context flow through pipeline runs
  </Card>

  <Card title="Custom Tasks & Pipelines" icon="workflow" href="/guides/custom-tasks-pipelines">
    Step-by-step guide to building your own pipeline
  </Card>
</Columns>
