PIPELINES
Cognee uses tasks grouped into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs).
The tasks are managed and executed asynchronously using the run_tasks
and run_tasks_parallel
functions.
pipeline = run_tasks(tasks, documents)
async for result in pipeline:
print(result)
Main pipeline: cognee.cognify
This is the main pipeline currently implemented in cognee. It is designed to process data in a structured way and populate the graph and vector stores.
This function is the entry point for processing datasets. It handles dataset retrieval, user authorization, and manages the execution of a pipeline of tasks that process documents.
Parameters
-
datasets: Union[str, list[str]] = None
: A string or list of dataset names to be processed. -
user: User = None
: The user requesting the processing. If not provided, the default user is retrieved.
Steps in the Function
User Authentication
if user is None:
user = await get_default_user()
If no user is provided, the function retrieves the default user.
Handling Empty or String Dataset Input
existing_datasets = await get_datasets(user.id)
if datasets is None or len(datasets) == 0:
datasets = existing_datasets
if type(datasets[0]) == str:
datasets = await get_datasets_by_name(datasets, user.id)
If no datasets are provided, the function retrieves all datasets owned by the user. If a list of dataset names (strings) is provided, they are converted into dataset objects.
Selecting datasets from the input list that are owned by the user
existing_datasets_map = {
generate_dataset_name(dataset.name): True for dataset in existing_datasets
}
Run Cognify Pipeline for Each Dataset
awaitables = []
for dataset in datasets:
dataset_name = generate_dataset_name(dataset.name)
if dataset_name in existing_datasets_map:
awaitables.append(run_cognify_pipeline(dataset, user))
return await asyncio.gather(*awaitables)
The run_cognify_pipeline
function is defined within cognify
and is responsible for processing a single dataset. This is where most of the heavy lifting occurs. The function processes multiple datasets concurrently using asyncio.gather
.
Pipeline Tasks
The pipeline consists of several tasks, each responsible for different parts of the processing:
-
classify_documents
: Converts each of the documents into one of the specific Document types: PdfDocument, AudioDocument, ImageDocument or TextDocument -
check_permissions_on_documents
: Checks if the user has the necessary permissions to access the documents. In this case, it checks for “write” permission. -
extract_chunks_from_documents
: Extracts text chunks based on the document type. -
add_data_points
: Creates nodes and edges from the chunks and their properties. Adds them to the graph engine. -
extract_graph_from_data
: Generates knowledge graphs from the document chunks. -
summarize_text
: Extracts a summary for each chunk using an llm.