This tutorial demonstrates how to build custom knowledge graphs from scratch using Cognee’s low-level API. You’ll learn how to:
Define custom DataPoint classes for your domain
Create structured relationships between entities
Build custom data ingestion pipelines
Process data through Cognee’s low-level pipeline system
Visualize and query your custom knowledge graph
By the end of this tutorial, you’ll have created a complete organizational knowledge graph with companies, departments, and employees, demonstrating how to model complex real-world relationships.
Create your build_graph.py file with custom entity definitions:
Copy
Ask AI
import osimport uuidimport jsonimport asyncioimport pathlibfrom cognee import config, prune, search, SearchType, visualize_graphfrom cognee.low_level import setup, DataPointfrom cognee.pipelines import run_tasks, Taskfrom cognee.tasks.storage import add_data_pointsfrom cognee.tasks.storage.index_graph_edges import index_graph_edgesfrom cognee.modules.users.methods import get_default_userclass Person(DataPoint): """Represents an individual employee""" name: str metadata: dict = {"index_fields": ["name"]}class Department(DataPoint): """Represents a company department with employees""" name: str employees: list[Person] metadata: dict = {"index_fields": ["name"]}class CompanyType(DataPoint): """Represents the type/category of companies""" name: str = "Company"class Company(DataPoint): """Represents a company with departments and type classification""" name: str departments: list[Department] is_type: CompanyType metadata: dict = {"index_fields": ["name"]}
These custom DataPoint classes define the structure of your knowledge graph. The metadata field with index_fields makes entities searchable by specific attributes.
def ingest_files(): """Load and process JSON data into DataPoint instances""" # Load company data companies_file_path = os.path.join(os.path.dirname(__file__), "data/companies.json") companies = json.loads(open(companies_file_path, "r").read()) # Load people data people_file_path = os.path.join(os.path.dirname(__file__), "data/people.json") people = json.loads(open(people_file_path, "r").read()) # Create person DataPoints and organize by department people_data_points = {} departments_data_points = {} print("🔄 Processing employee data...") for person in people: new_person = Person(name=person["name"]) people_data_points[person["name"]] = new_person # Group employees by department if person["department"] not in departments_data_points: departments_data_points[person["department"]] = Department( name=person["department"], employees=[new_person] ) else: departments_data_points[person["department"]].employees.append(new_person) # Create company DataPoints companies_data_points = {} # Create a single CompanyType node for all companies print("🏢 Creating company type classification...") companyType = CompanyType() print("🔄 Processing company data...") for company in companies: new_company = Company( name=company["name"], departments=[], is_type=companyType ) companies_data_points[company["name"]] = new_company # Link departments to companies for department_name in company["departments"]: if department_name not in departments_data_points: departments_data_points[department_name] = Department( name=department_name, employees=[] ) new_company.departments.append(departments_data_points[department_name]) print(f"✅ Created {len(companies_data_points)} companies with {len(departments_data_points)} departments") return companies_data_points.values()
This function demonstrates how to build complex relationships between entities. Notice how we create the relationships between people → departments → companies.
async def main(): """Main pipeline for building and querying the custom knowledge graph""" # Setup Cognee system directory cognee_directory_path = str( pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system")).resolve() ) config.system_root_directory(cognee_directory_path) print("🧹 Cleaning up previous runs...") # Prune system metadata for fresh state await prune.prune_system(metadata=True) print("⚙️ Setting up Cognee system...") await setup() # Generate unique dataset ID for this run dataset_id = uuid.uuid4() user = await get_default_user() print("🚀 Running custom data pipeline...") # Create and run custom pipeline pipeline = run_tasks( [ Task(ingest_files), # Load and process data Task(add_data_points), # Add to Cognee storage ], dataset_id, None, user, "custom_graph_pipeline" ) # Monitor pipeline execution async for status in pipeline: print(f"📊 Pipeline status: {status}") print("🔗 Indexing graph relationships...") # Index the graph edges for efficient querying await index_graph_edges() print("📈 Generating graph visualization...") # Create visualization graph_file_path = str( os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html") ) await visualize_graph(graph_file_path) print("🔍 Testing graph queries...") # Test different types of queries queries = [ "Who works for GreenFuture Solutions?", "Which departments does TechCorp Solutions have?", "List all employees in the Engineering department", "What companies have Research departments?" ] for query in queries: print(f"\n🤔 Query: {query}") completion = await search( query_text=query, query_type=SearchType.GRAPH_COMPLETION, ) print(f"💡 Answer: {completion}") print(f"🌐 Graph visualization saved to: {graph_file_path}") print("✅ Custom knowledge graph pipeline completed successfully!")if __name__ == "__main__": asyncio.run(main())
This main function orchestrates the entire process: data ingestion, storage, indexing, visualization, and querying.
This will process your organizational data and create a rich, interconnected knowledge graph.You should see output similar to:
Copy
Ask AI
🧹 Cleaning up previous runs...⚙️ Setting up Cognee system... 🔄 Processing employee data...🏢 Creating company type classification...🔄 Processing company data...✅ Created 3 companies with 8 departments🚀 Running custom data pipeline...📊 Pipeline status: Task completed successfully🔗 Indexing graph relationships...📈 Generating graph visualization...🔍 Testing graph queries...🤔 Query: Who works for GreenFuture Solutions?💡 Answer: GreenFuture Solutions has employees in Research, Engineering, and Operations departments...🌐 Graph visualization saved to: .artifacts/graph_visualization.html✅ Custom knowledge graph pipeline completed successfully!
Extend your DataPoint classes with additional relationships based on your data:
Copy
Ask AI
class Project(DataPoint): """Represents a project within a company""" name: str metadata: dict = {"index_fields": ["name"]}class Skill(DataPoint): """Represents a skill that people can have""" name: str category: str metadata: dict = {"index_fields": ["name", "category"]}class Person(DataPoint): """Enhanced person with skills and projects""" name: str skills: list[Skill] = [] current_projects: list[Project] = [] metadata: dict = {"index_fields": ["name"]}
# Search for people with specific skillsskill_query = await search( query_text="Find all engineers with Python skills", query_type=SearchType.GRAPH_COMPLETION)# Search for project collaborationscollaboration_query = await search( query_text="Which people work together on projects?", query_type=SearchType.INSIGHTS)
async def batch_ingest_employees(employee_data_batch): """Process employee data in batches for better performance""" batch_size = 100 for i in range(0, len(employee_data_batch), batch_size): batch = employee_data_batch[i:i + batch_size] # Process batch yield batch
import aiohttpasync def load_from_api(): """Load organizational data from HR API""" async with aiohttp.ClientSession() as session: async with session.get('https://api.your-hr-system.com/employees') as response: employee_data = await response.json() # Convert to DataPoints return process_employee_data(employee_data)
def validate_graph_structure(companies): """Validate the created graph structure""" print("🔍 Validating graph structure...") for company in companies: assert company.name, "Company must have a name" assert company.departments, "Company must have departments" assert company.is_type, "Company must have a type" for dept in company.departments: assert dept.name, "Department must have a name" # Further validation logic... print("✅ Graph structure validation passed")