Skip to Content
ContributingAdding ProvidersVector Database Integration

Adding a New Vector Store to cognee

This guide describes how to integrate a new vector database engine into cognee, following the same pattern used for existing engines (e.g., Weaviate, Qdrant, Milvus, PGVector, LanceDB).

Repository Options

Cognee used both the core and the community repositories to host vector-database adapters.

🚨 From now on every vector-database adapter – except LanceDB – will live in the cognee-community repository.

Qdrant have already been migrated from the core library and the remaining adapters will follow shortly. You can find Redis, OpenSearch, and Azure AI Search integrations available in the community repo.

Therefore all new adapter contributions must target the community repository.
The core repository will keep only the built-in LanceDB integration.

For Community Repository

To add a new adapter to cognee-community:

  1. Fork and clone the cognee-community repository
  2. Create your adapter in packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/
  3. Inside that directory add __init__.py, <engine_name>_adapter.py, and register.py (see the Redis implementation as an example).
  4. At the package root packages/<engine_name>/ add __init__.py, pyproject.toml, and README.md.
  5. Submit a pull request to the community repository

Below are the recommended steps in more detail.


Why cognee-community?

cognee-community is the extension hub for Cognee.
Anything that is not part of the core lives here—adapters for third-party databases, pipelines, community contributed additional tasks, etc.
Placing your adapter in this repository means:

  • Your code is released under the community license and can evolve independently of the core.
  • It can be installed with pip install cognee-community-vector-adapter-(engine_name) without pulling in heavyweight drivers for users who don’t need them. For example, for Qdrant it is pip install cognee-community-vector-adapter-qdrant
  • These packages can be called with the cognee core package using the registration step described below.

If you are unfamiliar with the layout, have a look at the existing folders under packages/* in the community repo—each sub-folder represents a separate provider implemented in exactly the way you are about to do.


1. Implement the Adapter

File: packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py

Your adapter must implement the VectorDBInterface protocol, implementing all required methods for collection management, data point operations, and search functionality. Here is a sample skeleton with placeholders:

"""Adapter for <engine_name> vector database.""" import asyncio from typing import List, Optional, Dict, Any from concurrent.futures import ThreadPoolExecutor from cognee.exceptions import InvalidValueError from cognee.infrastructure.engine import DataPoint from cognee.infrastructure.engine.utils import parse_id from cognee.shared.logging_utils import get_logger from ..embeddings.EmbeddingEngine import EmbeddingEngine from ..models.ScoredResult import ScoredResult from ..utils import normalize_distances from ..vector_db_interface import VectorDBInterface from ..exceptions import CollectionNotFoundError logger = get_logger() class IndexSchema(DataPoint): """ Define a schema for indexing data points with a text field. This class inherits from the DataPoint class and specifies the structure of a single data point that includes a text attribute. """ text: str metadata: dict = {"index_fields": ["text"]} class <EngineName>Adapter(VectorDBInterface): """Adapter for <engine_name> vector database operations.""" name = "<EngineName>" def __init__( self, url: str, api_key: Optional[str] = None, embedding_engine: EmbeddingEngine = None, ): self.url = url self.api_key = api_key self.embedding_engine = embedding_engine self.client = None self.executor = ThreadPoolExecutor() self._initialize_connection() def _initialize_connection(self) -> None: """Establish connection to <engine_name>.""" try: # Initialize your vector database client here # Example: self.client = YourVectorDBClient(url=self.url, api_key=self.api_key) logger.debug(f"Successfully connected to <engine_name> at {self.url}") except Exception as e: logger.error(f"Failed to initialize <engine_name> connection: {e}") raise async def has_collection(self, collection_name: str) -> bool: """Check if a specified collection exists.""" try: # Implement collection existence check # Example: return await self.client.collection_exists(collection_name) return False except Exception as e: logger.error(f"Error checking collection existence: {e}") return False async def create_collection( self, collection_name: str, payload_schema: Optional[object] = None, ): """Create a new collection with an optional payload schema.""" if await self.has_collection(collection_name): return try: vector_size = self.embedding_engine.get_vector_size() # Implement collection creation logic # Example: # await self.client.create_collection( # name=collection_name, # vector_size=vector_size, # distance_metric="cosine" # ) logger.debug(f"Created collection: {collection_name}") except Exception as e: logger.error(f"Failed to create collection {collection_name}: {e}") raise async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): """Insert new data points into the specified collection.""" if not await self.has_collection(collection_name): await self.create_collection(collection_name) # Generate embeddings for data points embeddings = await self.embedding_engine.embed_text( [DataPoint.get_embeddable_data(data_point) for data_point in data_points] ) try: # Implement data point insertion logic # Example: # formatted_points = [ # { # "id": str(data_point.id), # "vector": embeddings[i], # "payload": data_point.model_dump() # } # for i, data_point in enumerate(data_points) # ] # await self.client.upsert(collection_name, formatted_points) logger.debug(f"Inserted {len(data_points)} data points into {collection_name}") except Exception as e: logger.error(f"Failed to insert data points: {e}") raise async def retrieve(self, collection_name: str, data_point_ids: List[str]): """Retrieve data points from a collection using their IDs.""" try: # Implement data point retrieval logic # Example: # results = await self.client.retrieve(collection_name, data_point_ids) # return [ # ScoredResult( # id=parse_id(result["id"]), # payload=result["payload"], # score=0 # ) # for result in results # ] return [] except Exception as e: logger.error(f"Failed to retrieve data points: {e}") return [] async def search( self, collection_name: str, query_text: Optional[str] = None, query_vector: Optional[List[float]] = None, limit: int = 15, with_vector: bool = False, ) -> List[ScoredResult]: """Perform a search in the specified collection.""" if query_text is None and query_vector is None: raise InvalidValueError(message="One of query_text or query_vector must be provided!") if not await self.has_collection(collection_name): logger.warning(f"Collection '{collection_name}' not found; returning [].") return [] if query_vector is None: query_vector = (await self.embedding_engine.embed_text([query_text]))[0] try: # Implement search logic # Example: # results = await self.client.search( # collection_name=collection_name, # query_vector=query_vector, # limit=limit, # include_vector=with_vector # ) # # return [ # ScoredResult( # id=parse_id(result["id"]), # payload=result["payload"], # score=result["score"] # ) # for result in results # ] return [] except Exception as e: logger.error(f"Error searching collection '{collection_name}': {e}") return [] async def batch_search( self, collection_name: str, query_texts: List[str], limit: int, with_vectors: bool = False, ): """Perform a batch search using multiple text queries.""" query_vectors = await self.embedding_engine.embed_text(query_texts) return await asyncio.gather( *[ self.search( collection_name=collection_name, query_vector=query_vector, limit=limit, with_vector=with_vectors, ) for query_vector in query_vectors ] ) async def delete_data_points(self, collection_name: str, data_point_ids: List[str]): """Delete specified data points from a collection.""" try: # Implement deletion logic # Example: # result = await self.client.delete(collection_name, data_point_ids) # return result logger.debug(f"Deleted {len(data_point_ids)} data points from {collection_name}") except Exception as e: logger.error(f"Failed to delete data points: {e}") raise async def create_vector_index(self, index_name: str, index_property_name: str): """Create a vector index based on an index name and property name.""" return await self.create_collection(f"{index_name}_{index_property_name}") async def index_data_points( self, index_name: str, index_property_name: str, data_points: List[DataPoint] ): """Index a list of data points by creating an associated vector index collection.""" formatted_data_points = [ IndexSchema( id=data_point.id, text=DataPoint.get_embeddable_data(data_point), ) for data_point in data_points ] return await self.create_data_points( f"{index_name}_{index_property_name}", formatted_data_points, ) async def prune(self): """Remove all data from the vector database.""" try: # Implement pruning logic - delete all collections or data # Example: # collections = await self.client.list_collections() # for collection in collections: # await self.client.delete_collection(collection.name) logger.debug("Pruned all data from <engine_name>") except Exception as e: logger.error(f"Failed to prune data: {e}") raise

Keep the method signatures consistent with VectorDBInterface. Reference the existing adapters like QDrantAdapter for more comprehensive examples.

2. Test with a Dedicated Script

File: packages/vector/engine_name/examples/example.py

Create a script that loads cognee, configures it to use your new <engine_name> provider, and runs basic usage checks. For example:

import os import pathlib import cognee from cognee.modules.search.operations import get_history from cognee.modules.users.methods import get_default_user from cognee.shared.logging_utils import get_logger from cognee.modules.search.types import SearchType logger = get_logger() async def main(): cognee.config.set_vector_db_provider("<engine_name>") data_directory_path = str( pathlib.Path( os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_<engine_name>") ).resolve() ) cognee.config.data_root_directory(data_directory_path) cognee_directory_path = str( pathlib.Path( os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_<engine_name>") ).resolve() ) cognee.config.system_root_directory(cognee_directory_path) await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) dataset_name = "test_dataset" # Add some test data test_text = """ This is a test document for the <engine_name> vector database integration. It contains information about artificial intelligence and machine learning. Vector databases are essential for semantic search and retrieval applications. """ await cognee.add([test_text], dataset_name) await cognee.cognify([dataset_name]) # Test vector search functionality from cognee.infrastructure.databases.vector import get_vector_engine vector_engine = get_vector_engine() search_results = await vector_engine.search("Entity_name", "artificial intelligence") assert len(search_results) != 0, "The search results list is empty." print(f"Found {len(search_results)} search results") # Test cognee search functionality search_results = await cognee.search( query_type=SearchType.INSIGHTS, query_text="machine learning" ) assert len(search_results) != 0, "The insights search results list is empty." search_results = await cognee.search( query_type=SearchType.CHUNKS, query_text="vector databases" ) assert len(search_results) != 0, "The chunks search results list is empty." # Test search history user = await get_default_user() history = await get_history(user.id) assert len(history) > 0, "Search history is empty." # Test cleanup await cognee.prune.prune_data() assert not os.path.isdir(data_directory_path), "Local data files are not deleted" await cognee.prune.prune_system(metadata=True) # Verify database is empty after pruning # Add specific checks for your vector database here # Example: # vector_engine = get_vector_engine() # collections = await vector_engine.client.list_collections() # assert len(collections) == 0, f"<EngineName> vector database is not empty" print("All tests passed successfully!") if __name__ == "__main__": import asyncio asyncio.run(main())

This script ensures a basic end-to-end test, verifying:

  • Your new adapter can be selected by cognee.
  • Data can be added to the vector database.
  • Search functionality works correctly.
  • The database is empty after a prune operation.

3. Create a Test Workflow

File: .github/workflows/vector_db_tests.yml

Add a new job to the existing vector database tests workflow:

run-<engine_name>-tests: name: <EngineName> Tests runs-on: ubuntu-22.04 if: ${{ inputs.databases == 'all' || contains(inputs.databases, '<engine_name>') }} # Add services if your database needs to run as a container # services: # <engine_name>: # image: <your_db_image>:<version> # ports: # - <port>:<port> # env: # # Add any required environment variables steps: - name: Check out uses: actions/checkout@v4 with: fetch-depth: 0 - name: Cognee Setup uses: ./.github/actions/cognee_setup with: python-version: ${{ inputs.python-version }} - name: Install specific db dependency run: | poetry install -E <engine_name> # Add any setup steps if needed # - name: Wait for <EngineName> to be healthy # run: | # # Add health check commands - name: Run <EngineName> Tests env: ENV: 'dev' LLM_MODEL: ${{ secrets.LLM_MODEL }} LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} LLM_API_KEY: ${{ secrets.LLM_API_KEY }} LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} VECTOR_DB_URL: ${{ secrets.<ENGINE_NAME>_URL }} VECTOR_DB_KEY: ${{ secrets.<ENGINE_NAME>_API_KEY }} run: poetry run python ./cognee_community_vector_adapter_engine_name/examples/example.py working-directory: ./packages/vector/engine_name

5. Update pyproject.toml

Add your vector database client dependencies to the optional dependencies:

pyproject.toml:

[tool.poetry.dependencies] python = "^3.11" cognee your-vector-db-client [tool.poetry.extras] ...
## 6. Final Checklist - [ ] Implement your `<EngineName>Adapter` in `packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py`. - [ ] Add a register helper (`register.py`) and call it before configuring Cognee: ```python from cognee.infrastructure.databases.vector import use_vector_adapter from .<engine_name>_adapter import <EngineName>Adapter use_vector_adapter("engine_name", <EngineName>Adapter)
  • Create a test or example script test_<engine_name>.py or example.py that you can use in your test workflow.
  • Create** a test workflow: .github/workflows/engine_name/test_<engine_name>.yml.
  • Add required dependencies to pyproject.toml optional dependencies.
  • Open a PR to verify that your new integration passes CI.

Additional Considerations

Error Handling

  • Implement proper error handling for connection failures, timeouts, and API errors.
  • Log meaningful error messages that help with debugging.
  • Handle graceful degradation when the vector database is unavailable.

Performance

  • Consider implementing connection pooling if your vector database supports it.
  • Add proper async/await patterns to avoid blocking operations.
  • Implement batch operations efficiently where possible.

Security

  • Never log sensitive information like API keys or connection strings.
  • Validate inputs to prevent injection attacks.
  • Follow your vector database’s security best practices.

Documentation

  • Add docstrings to all public methods.
  • Document any specific configuration requirements.
  • Include examples of how to use your vector database with cognee.

That’s all! This approach keeps cognee’s architecture flexible, allowing you to swap in any vector database provider easily. If you need more advanced functionality (e.g., custom indexes, filters, or advanced search capabilities), simply implement them in your adapter class following the same patterns.

Join the Conversation!

Have questions about creating custom tasks? Join our community to discuss implementation strategies and best practices!