Core ConceptsPipelines

PIPELINES

Cognee uses tasks grouped into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs).

The tasks are managed and executed asynchronously using the run_tasks and run_tasks_parallel functions.

 
pipeline = run_tasks(tasks, documents)
 
async for result in pipeline:
 
    print(result)
 

Main pipeline: cognee.cognify

This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores.

This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.

Parameters

  • datasets: Union[str, list[str]] = None: A string or list of dataset names to be processed.

  • user: User = None: The user requesting the processing. If not provided, the default user is retrieved.

Steps in the Function

User Authentication

 
if user is None:
 
    user = await get_default_user()
 

If no user is provided, the function retrieves the default user.

Handling Empty or String Dataset Input

 
existing_datasets = await get_datasets(user.id)
 
if datasets is None or len(datasets) == 0:
 
        datasets = existing_datasets
 
if type(datasets[0]) == str:
 
        datasets = await get_datasets_by_name(datasets, user.id)
 

If no datasets are provided, the function retrieves all datasets owned by the user. If a list of dataset names (strings) is provided, they are converted into dataset objects.

Selecting datasets from the input list that are owned by the user

 
existing_datasets_map = {
 
        generate_dataset_name(dataset.name): True for dataset in existing_datasets
 
    }
 

Run Cognify Pipeline for Each Dataset

 
awaitables = []
 
 
for dataset in datasets:
 
    dataset_name = generate_dataset_name(dataset.name)
 
 
    if dataset_name in existing_datasets_map:
 
        awaitables.append(run_cognify_pipeline(dataset, user))
 
 
return await asyncio.gather(*awaitables)

The run_cognify_pipeline function is defined within cognify and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using asyncio.gather.

Pipeline Tasks

The pipeline consists of several tasks, each responsible for different parts of the processing:

  • classify_documents: Converts each of the documents into one of the specific Document types: PdfDocument, AudioDocument, ImageDocument or TextDocument

  • check_permissions_on_documents: Checks if the user has the necessary permissions to access the documents. In this case, it checks for “write” permission.

  • extract_chunks_from_documents: Extracts text chunks based on the document type.

  • add_data_points: Creates nodes and edges from the chunks and their properties. Adds them to the graph engine.

  • extract_graph_from_data: Generates knowledge graphs from the document chunks.

  • summarize_text: Extracts a summary for each chunk using an llm.