Creating Custom Tasks

Creating custom tasks in cognee enables you to extend functionality for your specific use case. Tasks can be any callable in python; functions, asyncronous functions, generators and asyncronous generators.
# Function task
def classify_documents(documents: list):
    # Classify documents and convert them to appropriate data models
    return classified_documents
# Async function task
async def add_data_points(data: list[DataPoint]):
    # Asyncronously save data into database
    await save_data_to_db(data)

    return data
# Generator task
async def split_text_into_chunks(text: str):
    # Use any chunker to split text into chunks
    return chunks
# Async generator task
async def generate_graph_from_chunks(chunks: list):
    for chunk in chunks:
        # Asyncronously generate graph from a chunk
        graph = await generate_graph(chunk)

        yield graph
We can use these tasks to form a pipeline:
from cognee.pipelines import run_tasks, Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import load_or_create_datasets

documents = [
    "/path/to/file1",
    "/path/to/file2",
    "/path/to/file3",
]

tasks = [
    Task(classify_documents),
    Task(split_text_into_chunks),
    Task(generate_graph_from_chunks),
    Task(add_data_points),
]

user = await get_default_user()
datasets = await load_or_create_datasets(["documents_dataset"], [], user)

pipeline = run_tasks(
    tasks,
    dataset_id=datasets[0].id,
    data=documents,
    incremental_loading=False,
)

async for pipeline_run_info in pipeline:
    print(pipeline_run_info)

Join the Conversation!

Have questions? Join our community now to connect with professionals, share insights, and get your questions answered!