Adding a New Vector Store to cognee
This guide describes how to integrate a new vector database engine into cognee, following the same pattern used for existing engines (e.g., Weaviate, Qdrant, Milvus, PGVector, LanceDB).
Repository Options
Cognee used both the core and the community repositories to host vector-database adapters.
🚨 From now on every vector-database adapter – except LanceDB – will live in the cognee-community repository.
Qdrant
have already been migrated from the core library and the remaining adapters will follow shortly. You can find Redis
, OpenSearch
, and Azure AI Search
integrations available in the community repo.
Therefore all new adapter contributions must target the community repository.
The core repository will keep only the built-in LanceDB integration.
For Community Repository
To add a new adapter to cognee-community :
- Fork and clone the cognee-community repository
- Create your adapter in
packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/
- Inside that directory add
__init__.py
,<engine_name>_adapter.py
, andregister.py
(see the Redis implementation as an example). - At the package root
packages/<engine_name>/
add__init__.py
,pyproject.toml
, andREADME.md
. - Submit a pull request to the community repository
Below are the recommended steps in more detail.
Why cognee-community?
cognee-community
is the extension hub for Cognee.
Anything that is not part of the core lives here—adapters for third-party databases, pipelines, community contributed additional tasks, etc.
Placing your adapter in this repository means:
- Your code is released under the community license and can evolve independently of the core.
- It can be installed with
pip install cognee-community-vector-adapter-(engine_name)
without pulling in heavyweight drivers for users who don’t need them. For example, for Qdrant it ispip install cognee-community-vector-adapter-qdrant
- These packages can be called with the cognee core package using the registration step described below.
If you are unfamiliar with the layout, have a look at the existing folders under packages/*
in the community repo—each sub-folder represents a separate provider implemented in exactly the way you are about to do.
1. Implement the Adapter
File: packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py
Your adapter must implement the VectorDBInterface
protocol, implementing all required methods for collection management, data point operations, and search functionality. Here is a sample skeleton with placeholders:
"""Adapter for <engine_name> vector database."""
import asyncio
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor
from cognee.exceptions import InvalidValueError
from cognee.infrastructure.engine import DataPoint
from cognee.infrastructure.engine.utils import parse_id
from cognee.shared.logging_utils import get_logger
from ..embeddings.EmbeddingEngine import EmbeddingEngine
from ..models.ScoredResult import ScoredResult
from ..utils import normalize_distances
from ..vector_db_interface import VectorDBInterface
from ..exceptions import CollectionNotFoundError
logger = get_logger()
class IndexSchema(DataPoint):
"""
Define a schema for indexing data points with a text field.
This class inherits from the DataPoint class and specifies the structure of a single
data point that includes a text attribute.
"""
text: str
metadata: dict = {"index_fields": ["text"]}
class <EngineName>Adapter(VectorDBInterface):
"""Adapter for <engine_name> vector database operations."""
name = "<EngineName>"
def __init__(
self,
url: str,
api_key: Optional[str] = None,
embedding_engine: EmbeddingEngine = None,
):
self.url = url
self.api_key = api_key
self.embedding_engine = embedding_engine
self.client = None
self.executor = ThreadPoolExecutor()
self._initialize_connection()
def _initialize_connection(self) -> None:
"""Establish connection to <engine_name>."""
try:
# Initialize your vector database client here
# Example: self.client = YourVectorDBClient(url=self.url, api_key=self.api_key)
logger.debug(f"Successfully connected to <engine_name> at {self.url}")
except Exception as e:
logger.error(f"Failed to initialize <engine_name> connection: {e}")
raise
async def has_collection(self, collection_name: str) -> bool:
"""Check if a specified collection exists."""
try:
# Implement collection existence check
# Example: return await self.client.collection_exists(collection_name)
return False
except Exception as e:
logger.error(f"Error checking collection existence: {e}")
return False
async def create_collection(
self,
collection_name: str,
payload_schema: Optional[object] = None,
):
"""Create a new collection with an optional payload schema."""
if await self.has_collection(collection_name):
return
try:
vector_size = self.embedding_engine.get_vector_size()
# Implement collection creation logic
# Example:
# await self.client.create_collection(
# name=collection_name,
# vector_size=vector_size,
# distance_metric="cosine"
# )
logger.debug(f"Created collection: {collection_name}")
except Exception as e:
logger.error(f"Failed to create collection {collection_name}: {e}")
raise
async def create_data_points(self, collection_name: str, data_points: List[DataPoint]):
"""Insert new data points into the specified collection."""
if not await self.has_collection(collection_name):
await self.create_collection(collection_name)
# Generate embeddings for data points
embeddings = await self.embedding_engine.embed_text(
[DataPoint.get_embeddable_data(data_point) for data_point in data_points]
)
try:
# Implement data point insertion logic
# Example:
# formatted_points = [
# {
# "id": str(data_point.id),
# "vector": embeddings[i],
# "payload": data_point.model_dump()
# }
# for i, data_point in enumerate(data_points)
# ]
# await self.client.upsert(collection_name, formatted_points)
logger.debug(f"Inserted {len(data_points)} data points into {collection_name}")
except Exception as e:
logger.error(f"Failed to insert data points: {e}")
raise
async def retrieve(self, collection_name: str, data_point_ids: List[str]):
"""Retrieve data points from a collection using their IDs."""
try:
# Implement data point retrieval logic
# Example:
# results = await self.client.retrieve(collection_name, data_point_ids)
# return [
# ScoredResult(
# id=parse_id(result["id"]),
# payload=result["payload"],
# score=0
# )
# for result in results
# ]
return []
except Exception as e:
logger.error(f"Failed to retrieve data points: {e}")
return []
async def search(
self,
collection_name: str,
query_text: Optional[str] = None,
query_vector: Optional[List[float]] = None,
limit: int = 15,
with_vector: bool = False,
) -> List[ScoredResult]:
"""Perform a search in the specified collection."""
if query_text is None and query_vector is None:
raise InvalidValueError(message="One of query_text or query_vector must be provided!")
if not await self.has_collection(collection_name):
logger.warning(f"Collection '{collection_name}' not found; returning [].")
return []
if query_vector is None:
query_vector = (await self.embedding_engine.embed_text([query_text]))[0]
try:
# Implement search logic
# Example:
# results = await self.client.search(
# collection_name=collection_name,
# query_vector=query_vector,
# limit=limit,
# include_vector=with_vector
# )
#
# return [
# ScoredResult(
# id=parse_id(result["id"]),
# payload=result["payload"],
# score=result["score"]
# )
# for result in results
# ]
return []
except Exception as e:
logger.error(f"Error searching collection '{collection_name}': {e}")
return []
async def batch_search(
self,
collection_name: str,
query_texts: List[str],
limit: int,
with_vectors: bool = False,
):
"""Perform a batch search using multiple text queries."""
query_vectors = await self.embedding_engine.embed_text(query_texts)
return await asyncio.gather(
*[
self.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
with_vector=with_vectors,
)
for query_vector in query_vectors
]
)
async def delete_data_points(self, collection_name: str, data_point_ids: List[str]):
"""Delete specified data points from a collection."""
try:
# Implement deletion logic
# Example:
# result = await self.client.delete(collection_name, data_point_ids)
# return result
logger.debug(f"Deleted {len(data_point_ids)} data points from {collection_name}")
except Exception as e:
logger.error(f"Failed to delete data points: {e}")
raise
async def create_vector_index(self, index_name: str, index_property_name: str):
"""Create a vector index based on an index name and property name."""
return await self.create_collection(f"{index_name}_{index_property_name}")
async def index_data_points(
self, index_name: str, index_property_name: str, data_points: List[DataPoint]
):
"""Index a list of data points by creating an associated vector index collection."""
formatted_data_points = [
IndexSchema(
id=data_point.id,
text=DataPoint.get_embeddable_data(data_point),
)
for data_point in data_points
]
return await self.create_data_points(
f"{index_name}_{index_property_name}",
formatted_data_points,
)
async def prune(self):
"""Remove all data from the vector database."""
try:
# Implement pruning logic - delete all collections or data
# Example:
# collections = await self.client.list_collections()
# for collection in collections:
# await self.client.delete_collection(collection.name)
logger.debug("Pruned all data from <engine_name>")
except Exception as e:
logger.error(f"Failed to prune data: {e}")
raise
Keep the method signatures consistent with VectorDBInterface
. Reference the existing adapters like QDrantAdapter
for more comprehensive examples.
2. Test with a Dedicated Script
File: packages/vector/engine_name/examples/example.py
Create a script that loads cognee, configures it to use your new <engine_name>
provider, and runs basic usage checks. For example:
import os
import pathlib
import cognee
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType
logger = get_logger()
async def main():
cognee.config.set_vector_db_provider("<engine_name>")
data_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_<engine_name>")
).resolve()
)
cognee.config.data_root_directory(data_directory_path)
cognee_directory_path = str(
pathlib.Path(
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_<engine_name>")
).resolve()
)
cognee.config.system_root_directory(cognee_directory_path)
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
dataset_name = "test_dataset"
# Add some test data
test_text = """
This is a test document for the <engine_name> vector database integration.
It contains information about artificial intelligence and machine learning.
Vector databases are essential for semantic search and retrieval applications.
"""
await cognee.add([test_text], dataset_name)
await cognee.cognify([dataset_name])
# Test vector search functionality
from cognee.infrastructure.databases.vector import get_vector_engine
vector_engine = get_vector_engine()
search_results = await vector_engine.search("Entity_name", "artificial intelligence")
assert len(search_results) != 0, "The search results list is empty."
print(f"Found {len(search_results)} search results")
# Test cognee search functionality
search_results = await cognee.search(
query_type=SearchType.INSIGHTS,
query_text="machine learning"
)
assert len(search_results) != 0, "The insights search results list is empty."
search_results = await cognee.search(
query_type=SearchType.CHUNKS,
query_text="vector databases"
)
assert len(search_results) != 0, "The chunks search results list is empty."
# Test search history
user = await get_default_user()
history = await get_history(user.id)
assert len(history) > 0, "Search history is empty."
# Test cleanup
await cognee.prune.prune_data()
assert not os.path.isdir(data_directory_path), "Local data files are not deleted"
await cognee.prune.prune_system(metadata=True)
# Verify database is empty after pruning
# Add specific checks for your vector database here
# Example:
# vector_engine = get_vector_engine()
# collections = await vector_engine.client.list_collections()
# assert len(collections) == 0, f"<EngineName> vector database is not empty"
print("All tests passed successfully!")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
This script ensures a basic end-to-end test, verifying:
- Your new adapter can be selected by cognee.
- Data can be added to the vector database.
- Search functionality works correctly.
- The database is empty after a prune operation.
3. Create a Test Workflow
File: .github/workflows/vector_db_tests.yml
Add a new job to the existing vector database tests workflow:
run-<engine_name>-tests:
name: <EngineName> Tests
runs-on: ubuntu-22.04
if: ${{ inputs.databases == 'all' || contains(inputs.databases, '<engine_name>') }}
# Add services if your database needs to run as a container
# services:
# <engine_name>:
# image: <your_db_image>:<version>
# ports:
# - <port>:<port>
# env:
# # Add any required environment variables
steps:
- name: Check out
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Cognee Setup
uses: ./.github/actions/cognee_setup
with:
python-version: ${{ inputs.python-version }}
- name: Install specific db dependency
run: |
poetry install -E <engine_name>
# Add any setup steps if needed
# - name: Wait for <EngineName> to be healthy
# run: |
# # Add health check commands
- name: Run <EngineName> Tests
env:
ENV: 'dev'
LLM_MODEL: ${{ secrets.LLM_MODEL }}
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
VECTOR_DB_URL: ${{ secrets.<ENGINE_NAME>_URL }}
VECTOR_DB_KEY: ${{ secrets.<ENGINE_NAME>_API_KEY }}
run: poetry run python ./cognee_community_vector_adapter_engine_name/examples/example.py
working-directory: ./packages/vector/engine_name
5. Update pyproject.toml
Add your vector database client dependencies to the optional dependencies:
pyproject.toml
:
[tool.poetry.dependencies]
python = "^3.11"
cognee
your-vector-db-client
[tool.poetry.extras]
...
## 6. Final Checklist
- [ ] Implement your `<EngineName>Adapter` in `packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py`.
- [ ] Add a register helper (`register.py`) and call it before configuring Cognee:
```python
from cognee.infrastructure.databases.vector import use_vector_adapter
from .<engine_name>_adapter import <EngineName>Adapter
use_vector_adapter("engine_name", <EngineName>Adapter)
- Create a test or example script
test_<engine_name>.py
orexample.py
that you can use in your test workflow. - Create** a test workflow:
.github/workflows/engine_name/test_<engine_name>.yml
. - Add required dependencies to
pyproject.toml
optional dependencies. - Open a PR to verify that your new integration passes CI.
Additional Considerations
Error Handling
- Implement proper error handling for connection failures, timeouts, and API errors.
- Log meaningful error messages that help with debugging.
- Handle graceful degradation when the vector database is unavailable.
Performance
- Consider implementing connection pooling if your vector database supports it.
- Add proper async/await patterns to avoid blocking operations.
- Implement batch operations efficiently where possible.
Security
- Never log sensitive information like API keys or connection strings.
- Validate inputs to prevent injection attacks.
- Follow your vector database’s security best practices.
Documentation
- Add docstrings to all public methods.
- Document any specific configuration requirements.
- Include examples of how to use your vector database with cognee.
That’s all! This approach keeps cognee’s architecture flexible, allowing you to swap in any vector database provider easily. If you need more advanced functionality (e.g., custom indexes, filters, or advanced search capabilities), simply implement them in your adapter class following the same patterns.
Join the Conversation!
Have questions about creating custom tasks? Join our community to discuss implementation strategies and best practices!