Cognee organizes tasks into pipelines that populate graph and vector stores. These tasks analyze and enrich data, enhancing the quality of answers produced by Large Language Models (LLMs).
This section provides a template to help you structure your data and build pipelines.
These tasks serve as a starting point for using Cognee to create reliable LLM pipelines.
1. Ingestion Tasks
Handles the ingestion of data, including metadata, transformation, and storage.
save_data_item_to_storage.py
: Saves individual data items to the storage system.ingest_data_with_metadata.py
: Ingests data along with its associated metadata.save_data_to_storage.py
: Saves bulk data to the storage system.get_dlt_destination.py
: Resolves the destination for data load transformations.resolve_data_directories.py
: Resolves directories for data ingestion.ingest_data.py
: Main task for ingesting data into the system.transform_data.py
: Transforms ingested data for further processing.save_data_item_with_metadata_to_storage.py
: Saves individual data items along with metadata to storage.
2. Temporal Awareness Tasks
Adds temporal context to graphs and searches.
build_graph_with_temporal_awareness.py
: Constructs graphs with temporal context for nodes and edges.search_graph_with_temporal_awareness.py
: Searches graphs considering temporal aspects.
3. Summarization Tasks
Handles summarization of text and code.
summarize_code.py
: Generates summaries for code files.query_summaries.py
: Queries existing summaries for specific contexts.summarize_text.py
: Summarizes text documents.
4. Dataset Generation Tasks
Generates golden datasets for training and evaluation.
generate_golden_set.py
: Creates a “golden set” of data for benchmarking or testing.
5. Graph Tasks
Manages graph extraction, querying, and ontology inference.
extract_graph_from_code.py
: Extracts graph representations from codebases.infer_data_ontology.py
: Infers ontologies and relationships from graph data.query_graph_connections.py
: Queries connections and relationships within graphs.extract_graph_from_data.py
: Extracts graphs from structured and unstructured data.
6. Code Dependency Tasks
Analyzes and enriches dependency graphs for code repositories.
expand_dependency_graph_checker.py
: Expands existing dependency graphs.get_repo_dependency_graph_checker.py
: Retrieves dependency graphs for repositories.enrich_dependency_graph_checker.py
: Enriches dependency graphs with additional context.get_local_dependencies_checker.py
: Identifies local dependencies within code repositories.
7. Completion Tasks
Handles completion queries and exceptions.
query_completion.py
: Processes and handles completion requests.exceptions.py
: Defines exceptions related to completion tasks.
8. Chunking Tasks
Manages chunking of text into smaller, processable units.
chunk_naive_llm_classifier.py
: Classifies text chunks using a naive LLM-based approach.remove_disconnected_chunks.py
: Removes text chunks that are disconnected from the main content.chunk_by_sentence.py
: Splits text into chunks by sentences.chunk_by_word.py
: Splits text into chunks by words.chunk_by_paragraph.py
: Splits text into chunks by paragraphs.query_chunks.py
: Queries existing chunks for specific content.
9. Storage Tasks
Handles indexing and storage of data points and graph edges.
index_data_points.py
: Indexes data points for efficient retrieval.index_graph_edges.py
: Indexes edges within a graph structure.add_data_points.py
: Adds new data points to the storage system.
10. Repository Processing Tasks
Processes code repositories for dependency graphs and file relationships.
extract_code_parts.py
: Extracts parts of code for analysis.get_repo_file_dependencies.py
: Retrieves file-level dependencies in a repository.top_down_repo_parse.py
: Parses repositories from a top-down perspective.enrich_dependency_graph.py
: Enriches dependency graphs with additional data.get_local_dependencies.py
: Identifies local dependencies in repositories.expand_dependency_graph.py
: Expands the scope of dependency graphs.
11. Document Tasks
Handles operations related to document processing.
extract_chunks_from_documents.py
: Extracts text chunks from documents.classify_documents.py
: Classifies documents into categories.check_permissions_on_documents.py
: Verifies permissions for accessing documents.
Detailed Task Example: Category Extraction
Data enrichment is the process of enhancing raw data with additional information to make it more valuable. This template is a sample task that extracts categories from a document and populates a graph with the extracted categories.
Let’s go over the steps to use this template full code provided here:
This function is designed to classify chunks of text using a specified language model. The goal is to categorize the text, map relationships, and store the results in a vector engine and a graph engine. The function is asynchronous, allowing for concurrent execution of tasks like classification and data point creation.
Parameters
data_chunks: list[DocumentChunk]
: A list of text chunks to be classified. Each chunk represents a piece of text and includes metadata likechunk_id
anddocument_id
.classification_model: Type[BaseModel]
: The model used to classify each chunk of text. This model is expected to output labels that categorize the text.
Steps in the Function
Check for Empty Input
if len(data_chunks) == 0:
return data_chunks
If there are no data chunks provided, the function returns immediately with the input list (which is empty).
Classify Each Chunk
chunk_classifications = await asyncio.gather(
*[extract_categories(chunk.text, classification_model) for chunk in data_chunks],
)
The function uses asyncio.gather
to concurrently classify each chunk of text. extract_categories
is called for each chunk, and the results are collected in chunk_classifications
.
Initialize Data Structures
classification_data_points = []
A list is initialized to store the classification data points that will be used later for mapping relationships and storing in the vector engine.
Generate UUIDs for Classifications
The function loops through each chunk and generates unique identifiers (UUIDs) for both the main classification type and its subclasses:
classification_data_points.append(uuid5(NAMESPACE_OID, chunk_classification.label.type))
classification_data_points.append(uuid5(NAMESPACE_OID, classification_subclass.value))
These UUIDs are used to uniquely identify classifications and ensure consistency.
Retrieve or Create Vector Collection
vector_engine = get_vector_engine()
collection_name = "classification"
The function interacts with a vector engine. It checks if the collection named “classification” exists. If it does, it retrieves existing data points to avoid duplicates. Otherwise, it creates the collection.
Prepare Data Points, Nodes, and Edges
The function then builds a list of data_points
(representing the classification results) and constructs nodes and edges to represent relationships between chunks and their classifications:
data_points.append(DataPoint[Keyword](...))
nodes.append((...))
edges.append((...))
- Nodes: Represent classifications (e.g., media type, subtype).
- Edges: Represent relationships between chunks and classifications (e.g., “is_media_type”, “is_subtype_of”).
Create Data Points and Relationships
If there are new nodes or edges to add, the function stores the data points in the vector engine and updates the graph engine with the new nodes and edges:
await vector_engine.create_data_points(collection_name, data_points)
await graph_engine.add_nodes(nodes)
await graph_engine.add_edges(edges)
Return the Processed Chunks
Finally, the function returns the processed data_chunks
, which can now be used further as needed:
return data_chunks
Additional Notes
- Each task is designed to handle a specific functionality within the system.
- Ensure that dependencies and configurations are properly set up for each task to function as intended.
For further details, refer to the inline documentation within each task file.