Build Custom Knowledge Graphs
Difficulty: Advanced
Overview
This tutorial demonstrates how to build custom knowledge graphs from scratch using Cognee’s low-level API. You’ll learn how to:
- Define custom DataPoint classes for your domain
- Create structured relationships between entities
- Build custom data ingestion pipelines
- Process data through Cognee’s low-level pipeline system
- Visualize and query your custom knowledge graph
By the end of this tutorial, you’ll have created a complete organizational knowledge graph with companies, departments, and employees, demonstrating how to model complex real-world relationships.
What You’ll Build
We’ll create a knowledge graph representing organizational structures with:
- Companies with multiple departments
- Departments with employee lists
- People working in specific departments
- Company types for categorization
- Rich relationships connecting all entities
Prerequisites
Before starting this tutorial, ensure you have:
- Completed the Load Your Data tutorial
- Python 3.9 to 3.12 installed
- Cognee installed with development dependencies
- Basic understanding of Python classes and async programming
- Familiarity with JSON data structures
Step 1: Project Setup
Create your project structure
In the same environment you used during Load Your Data tutorial, set up a new directory for your custom graph project:
mkdir custom-graph-tutorial
cd custom-graph-tutorial
Create the necessary directories and files:
mkdir data
mkdir .artifacts
touch build_graph.py
touch data/companies.json
touch data/people.json
This structure separates your data, code, and output artifacts for better organization.
Configure your environment
Create a .env
file with your API credentials:
echo 'LLM_API_KEY="your_openai_api_key_here"' > .env
The low-level API still requires LLM access for certain graph operations and search functionality.
Step 2: Prepare Sample Data
Create company data
Add the following content to data/companies.json
:
[
{
"name": "TechCorp Solutions",
"departments": ["Engineering", "Marketing", "Sales"]
},
{
"name": "GreenFuture Solutions",
"departments": ["Research", "Engineering", "Operations"]
},
{
"name": "DataFlow Analytics",
"departments": ["Data Science", "Engineering", "Customer Success"]
}
]
Create employee data
Add the following content to data/people.json
:
[
{"name": "Alice Johnson", "department": "Engineering"},
{"name": "Bob Smith", "department": "Engineering"},
{"name": "Carol Davis", "department": "Marketing"},
{"name": "David Wilson", "department": "Sales"},
{"name": "Eve Brown", "department": "Research"},
{"name": "Frank Miller", "department": "Operations"},
{"name": "Grace Lee", "department": "Data Science"},
{"name": "Henry Chen", "department": "Customer Success"},
{"name": "Ivy Rodriguez", "department": "Engineering"},
{"name": "Jack Thompson", "department": "Marketing"}
]
This sample data creates a realistic organizational structure with overlapping departments across companies.
Step 3: Define Custom DataPoint Classes
Create your build_graph.py
file with custom entity definitions:
import os
import uuid
import json
import asyncio
import pathlib
from cognee import config, prune, search, SearchType, visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points
from cognee.tasks.storage.index_graph_edges import index_graph_edges
from cognee.modules.users.methods import get_default_user
class Person(DataPoint):
"""Represents an individual employee"""
name: str
metadata: dict = {"index_fields": ["name"]}
class Department(DataPoint):
"""Represents a company department with employees"""
name: str
employees: list[Person]
metadata: dict = {"index_fields": ["name"]}
class CompanyType(DataPoint):
"""Represents the type/category of companies"""
name: str = "Company"
class Company(DataPoint):
"""Represents a company with departments and type classification"""
name: str
departments: list[Department]
is_type: CompanyType
metadata: dict = {"index_fields": ["name"]}
These custom DataPoint classes define the structure of your knowledge graph. The metadata
field with index_fields
makes entities searchable by specific attributes.
Step 4: Create Data Ingestion Logic
Add the data ingestion function to your script:
def ingest_files():
"""Load and process JSON data into DataPoint instances"""
# Load company data
companies_file_path = os.path.join(os.path.dirname(__file__), "data/companies.json")
companies = json.loads(open(companies_file_path, "r").read())
# Load people data
people_file_path = os.path.join(os.path.dirname(__file__), "data/people.json")
people = json.loads(open(people_file_path, "r").read())
# Create person DataPoints and organize by department
people_data_points = {}
departments_data_points = {}
print("🔄 Processing employee data...")
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
# Group employees by department
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"],
employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
# Create company DataPoints
companies_data_points = {}
# Create a single CompanyType node for all companies
print("🏢 Creating company type classification...")
companyType = CompanyType()
print("🔄 Processing company data...")
for company in companies:
new_company = Company(
name=company["name"],
departments=[],
is_type=companyType
)
companies_data_points[company["name"]] = new_company
# Link departments to companies
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name,
employees=[]
)
new_company.departments.append(departments_data_points[department_name])
print(f"✅ Created {len(companies_data_points)} companies with {len(departments_data_points)} departments")
return companies_data_points.values()
This function demonstrates how to build complex relationships between entities. Notice how we create the relationships between people → departments → companies.
Step 5: Build the Main Pipeline
Add the main execution logic to your script:
async def main():
"""Main pipeline for building and querying the custom knowledge graph"""
# Setup Cognee system directory
cognee_directory_path = str(
pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve()
)
config.system_root_directory(cognee_directory_path)
print("🧹 Cleaning up previous runs...")
# Prune system metadata for fresh state
await prune.prune_system(metadata=True)
print("⚙️ Setting up Cognee system...")
await setup()
# Generate unique dataset ID for this run
dataset_id = uuid.uuid4()
user = await get_default_user()
print("🚀 Running custom data pipeline...")
# Create and run custom pipeline
pipeline = run_tasks(
[
Task(ingest_files), # Load and process data
Task(add_data_points), # Add to Cognee storage
],
dataset_id,
None,
user,
"custom_graph_pipeline"
)
# Monitor pipeline execution
async for status in pipeline:
print(f"📊 Pipeline status: {status}")
print("🔗 Indexing graph relationships...")
# Index the graph edges for efficient querying
await index_graph_edges()
print("📈 Generating graph visualization...")
# Create visualization
graph_file_path = str(
os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html")
)
await visualize_graph(graph_file_path)
print("🔍 Testing graph queries...")
# Test different types of queries
queries = [
"Who works for GreenFuture Solutions?",
"Which departments does TechCorp Solutions have?",
"List all employees in the Engineering department",
"What companies have Research departments?"
]
for query in queries:
print(f"\n🤔 Query: {query}")
completion = await search(
query_text=query,
query_type=SearchType.GRAPH_COMPLETION,
)
print(f"💡 Answer: {completion}")
print(f"🌐 Graph visualization saved to: {graph_file_path}")
print("✅ Custom knowledge graph pipeline completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
This main function orchestrates the entire process: data ingestion, storage, indexing, visualization, and querying.
Step 6: Run Your Custom Graph Pipeline
Execute your custom knowledge graph builder:
python build_graph.py
This will process your organizational data and create a rich, interconnected knowledge graph.
You should see output similar to:
🧹 Cleaning up previous runs...
⚙️ Setting up Cognee system...
🔄 Processing employee data...
🏢 Creating company type classification...
🔄 Processing company data...
✅ Created 3 companies with 8 departments
🚀 Running custom data pipeline...
📊 Pipeline status: Task completed successfully
🔗 Indexing graph relationships...
📈 Generating graph visualization...
🔍 Testing graph queries...
🤔 Query: Who works for GreenFuture Solutions?
💡 Answer: GreenFuture Solutions has employees in Research, Engineering, and Operations departments...
🌐 Graph visualization saved to: .artifacts/graph_visualization.html
✅ Custom knowledge graph pipeline completed successfully!
Step 7: Explore Your Custom Graph
Interactive Visualization
Open the generated HTML file to explore your knowledge graph:
open .artifacts/graph_visualization.html
In the visualization, you’ll see:
- Company nodes connected to their departments
- Department nodes linked to their employees
- Employee nodes showing individual contributors
- Type classification connecting all companies to the CompanyType
Graph Structure Analysis
Your custom graph demonstrates several important patterns:
- Hierarchical relationships: Companies → Departments → People
- Shared entities: Departments that exist across multiple companies
- Type classification: All companies connected to a shared type
- Bidirectional traversal: Navigate up and down the hierarchy
Step 8: Advanced Customization
Adding More Complex Relationships
Extend your DataPoint classes with additional relationships based on your data:
class Project(DataPoint):
"""Represents a project within a company"""
name: str
metadata: dict = {"index_fields": ["name"]}
class Skill(DataPoint):
"""Represents a skill that people can have"""
name: str
category: str
metadata: dict = {"index_fields": ["name", "category"]}
class Person(DataPoint):
"""Enhanced person with skills and projects"""
name: str
skills: list[Skill] = []
current_projects: list[Project] = []
metadata: dict = {"index_fields": ["name"]}
Custom Search Types
Implement domain-specific search functionality:
# Search for people with specific skills
skill_query = await search(
query_text="Find all engineers with Python skills",
query_type=SearchType.GRAPH_COMPLETION
)
# Search for project collaborations
collaboration_query = await search(
query_text="Which people work together on projects?",
query_type=SearchType.INSIGHTS
)
Batch Data Processing
Handle larger datasets efficiently:
async def batch_ingest_employees(employee_data_batch):
"""Process employee data in batches for better performance"""
batch_size = 100
for i in range(0, len(employee_data_batch), batch_size):
batch = employee_data_batch[i:i + batch_size]
# Process batch
yield batch
Step 9: Integration with External Systems
Database Integration
Connect your custom graph to external databases:
import sqlalchemy
from cognee.infrastructure.databases.relational import get_relational_engine
async def load_from_database():
"""Load organizational data from existing database"""
engine = get_relational_engine()
# Query your existing HR database
query = """
SELECT e.name, e.department, c.company_name
FROM employees e
JOIN companies c ON e.company_id = c.id
"""
# Convert to DataPoints
# ... processing logic
API Integration
Fetch data from external APIs:
import aiohttp
async def load_from_api():
"""Load organizational data from HR API"""
async with aiohttp.ClientSession() as session:
async with session.get('https://api.your-hr-system.com/employees') as response:
employee_data = await response.json()
# Convert to DataPoints
return process_employee_data(employee_data)
Step 10: Testing and Validation
Validate Graph Structure
Add validation to ensure data integrity:
def validate_graph_structure(companies):
"""Validate the created graph structure"""
print("🔍 Validating graph structure...")
for company in companies:
assert company.name, "Company must have a name"
assert company.departments, "Company must have departments"
assert company.is_type, "Company must have a type"
for dept in company.departments:
assert dept.name, "Department must have a name"
# Further validation logic...
print("✅ Graph structure validation passed")
Performance Testing
Monitor pipeline performance:
import time
async def timed_pipeline():
"""Run pipeline with performance monitoring"""
start_time = time.time()
# Run your pipeline
await main()
end_time = time.time()
print(f"⏱️ Pipeline completed in {end_time - start_time:.2f} seconds")
Next Steps
Now that you’ve built your first custom knowledge graph, you can:
-
Expand your domain model:
- Add more entity types (Projects, Skills, Locations)
- Create more complex relationships
- Implement inheritance hierarchies
-
Integrate with production systems:
- Connect to your organization’s databases
- Set up automated data synchronization
- Implement real-time updates
-
Explore advanced features:
- Use Ontologies - Define formal knowledge structures
- API Integration - Expose your graph via REST API
- Load Relational Database - Connect existing databases
-
Build applications:
- Create org chart visualizations
- Build employee search systems
- Develop recommendation engines
Related Resources
- Data Processing - Understanding Cognee’s pipeline
- Knowledge Graphs - Graph theory and structures
- Infrastructure - Database and storage options
Join the Conversation!
Built something amazing with custom knowledge graphs? Share your creations and get help from the community!