Skip to main content

PipelineContext: Runtime Context for Tasks

PipelineContext is a typed dataclass that the pipeline framework automatically builds and injects into any task that declares a ctx parameter. It carries the user, dataset, and per-item context for the current pipeline run, and provides an extras dict for custom state.

Fields

FieldTypeDescription
userAnyThe user that triggered the pipeline. Used for access control and provenance.
datasetAnyThe resolved dataset object for the current run.
data_itemAnyThe individual data item being processed in this pipeline execution.
pipeline_nameOptional[str]The name passed to run_pipeline or run_tasks.
extrasDict[str, Any]Arbitrary key/value state you can pass into the pipeline and read in any task. Defaults to {}.

How injection works

The framework inspects every task function’s signature at construction time. If it finds a parameter named ctx, it passes the current PipelineContext as that argument when the task runs. Matching is by parameter name, not by type annotation.
from cognee.modules.pipelines.models.PipelineContext import PipelineContext

# ctx is injected automatically — no manual wiring needed
async def my_task(data, ctx: PipelineContext = None):
    if ctx:
        print(ctx.user)
        print(ctx.dataset)
        print(ctx.pipeline_name)
Tasks that do not declare ctx simply receive no context and are unaffected.

Using extras for custom pipeline state

Pass a dict as the context argument to run_pipeline (or extras to run_tasks). Every task in the pipeline can read those values from ctx.extras.
from cognee.modules.pipelines import Task, run_pipeline
from cognee.modules.pipelines.models.PipelineContext import PipelineContext

async def score_items(data, ctx: PipelineContext = None):
    multiplier = ctx.extras.get("score_multiplier", 1) if ctx else 1
    return [item * multiplier for item in data]

async for _ in run_pipeline(
    tasks=[Task(score_items)],
    data=[1, 2, 3],
    datasets=["my_dataset"],
    pipeline_name="scoring_pipeline",
    context={"score_multiplier": 10},   # becomes ctx.extras
):
    pass
ctx.extras is always a plain dict — it is never None.

Examples and details

The user and dataset fields are most useful when you need to write provenance records or apply per-tenant logic:
async def store_result(data_points, ctx: PipelineContext = None):
    user    = ctx.user    if ctx else None
    dataset = ctx.dataset if ctx else None
    data_item = ctx.data_item if ctx else None

    for dp in data_points:
        await write_to_store(dp, user_id=user.id, dataset_id=dataset.id)
    return data_points
The built-in add_data_points task already does this automatically, so you typically only need to read these fields when writing your own storage tasks.
All tasks in the same pipeline run share the same PipelineContext object, so values written during construction remain available in every downstream task:
async def filter_task(data, ctx: PipelineContext = None):
    threshold = ctx.extras.get("min_threshold", 0) if ctx else 0
    return [x for x in data if x >= threshold]

async def label_task(data, ctx: PipelineContext = None):
    prefix = ctx.extras.get("label_prefix", "") if ctx else ""
    return [f"{prefix}{x}" for x in data]

# Both tasks receive the same extras
async for _ in run_pipeline(
    tasks=[Task(filter_task), Task(label_task)],
    data=[1, 5, 10],
    datasets=["demo"],
    context={"min_threshold": 4, "label_prefix": "item_"},
):
    pass
Always default ctx to None so the task can also be called directly in tests or scripts without a running pipeline:
async def my_task(data, ctx: PipelineContext = None):
    name = ctx.pipeline_name if ctx else "standalone"
    ...

Tasks

Learn how tasks are defined and composed

Pipelines

See how tasks and context flow through pipeline runs

Custom Tasks & Pipelines

Step-by-step guide to building your own pipeline