Create custom pipeline in cognee
You can create your own pipelines by combining custom or built-in tasks:
def generate_numbers(num):
for i in range(num):
yield i
async def add_one(num):
return num + 1
tasks = [
Task(generate_numbers),
Task(
add_one
task_config=TaskConfig(needs=[generate_numbers])
)
]
pipeline_run = await run_tasks(tasks, data=10)
Pipeline run information
Tasks and pipelines provide information about the run. Simply print a task or pipeline run object to see detailed execution information:
# Create and run a task
generate_numbers_task = Task(generate_numbers)
data = 10
task_run = await generate_numbers_task.run(data)
# Print task run information and status
for info in task_run:
print(info)
# Similarly for pipelines
pipeline_run = run_tasks(
[Task(generate_numbers_task)],
data=10
)
for pipeline_run_info in pipeline_run:
print(pipeline_run_info) # Shows pipeline status and execution details
This makes it significantly easier to monitor execution flow and troubleshoot issues during development.
Task needs
Tasks can specify their needs using the TaskConfig
, enabling more complex pipeline orchestration:
tasks = [
Task(generate_numbers),
Task(
add_one
task_config=TaskConfig(needs=[generate_numbers]),
),
]
This allows for creating sophisticated data processing flows while maintaining clean, modular code.
Support for multiple task types
The task system now seamlessly handles different task types:
- Regular functions:
def my_task(input): return processed_input
- Async functions:
async def my_task(input): return await process(input)
- Generators:
def my_task(input): yield from process_stream(input)
- Async generators:
async def my_task(input): async for item in stream: yield item
This flexibility allows you to choose the most appropriate implementation style for each specific task.
Our pipeline execution engine has been optimized for both sequential and parallel task execution, resulting in faster processing times and reduced resource usage.
These updates have already improved our benchmark performance, as shown in our evaluations.
You can get to know more about how the main pipeline currently is implemented in cognee to process data in a structured way and populate the graph and vector stores.
Join the Conversation!
Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!