Adding a New Vector Store to cognee

This guide describes how to integrate a new vector database engine into cognee, following the same pattern used for existing engines (e.g., Weaviate, Qdrant, Milvus, PGVector, LanceDB).

Repository Options

Cognee used both the core and the community repositories to host vector-database adapters. 🚨 From now on every vector-database adapter – except LanceDB – will live in the cognee-community repository. Qdrant have already been migrated from the core library and the remaining adapters will follow shortly. You can find Redis, OpenSearch, and Azure AI Search integrations available in the community repo. Therefore all new adapter contributions must target the community repository.
The core repository will keep only the built-in LanceDB integration.

For Community Repository

To add a new adapter to cognee-community:
  1. Fork and clone the cognee-community repository
  2. Create your adapter in packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/
  3. Inside that directory add __init__.py, <engine_name>_adapter.py, and register.py (see the Redis implementation as an example).
  4. At the package root packages/<engine_name>/ add __init__.py, pyproject.toml, and README.md.
  5. Submit a pull request to the community repository
Below are the recommended steps in more detail.

Why cognee-community?

cognee-community is the extension hub for Cognee.
Anything that is not part of the core lives here—adapters for third-party databases, pipelines, community contributed additional tasks, etc.
Placing your adapter in this repository means:
  • Your code is released under the community license and can evolve independently of the core.
  • It can be installed with pip install cognee-community-vector-adapter-(engine_name) without pulling in heavyweight drivers for users who don’t need them. For example, for Qdrant it is pip install cognee-community-vector-adapter-qdrant
  • These packages can be called with the cognee core package using the registration step described below.
If you are unfamiliar with the layout, have a look at the existing folders under packages/* in the community repo—each sub-folder represents a separate provider implemented in exactly the way you are about to do.

1. Implement the Adapter

File: packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py Your adapter must implement the VectorDBInterface protocol, implementing all required methods for collection management, data point operations, and search functionality. Here is a sample skeleton with placeholders:
"""Adapter for <engine_name> vector database."""

import asyncio
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor

from cognee.exceptions import InvalidValueError
from cognee.infrastructure.engine import DataPoint
from cognee.infrastructure.engine.utils import parse_id
from cognee.shared.logging_utils import get_logger

from ..embeddings.EmbeddingEngine import EmbeddingEngine
from ..models.ScoredResult import ScoredResult
from ..utils import normalize_distances
from ..vector_db_interface import VectorDBInterface
from ..exceptions import CollectionNotFoundError

logger = get_logger()

class IndexSchema(DataPoint):
    """
    Define a schema for indexing data points with a text field.
    
    This class inherits from the DataPoint class and specifies the structure of a single
    data point that includes a text attribute.
    """
    text: str
    metadata: dict = {"index_fields": ["text"]}

class <EngineName>Adapter(VectorDBInterface):
    """Adapter for <engine_name> vector database operations."""
    
    name = "<EngineName>"
    
    def __init__(
        self,
        url: str,
        api_key: Optional[str] = None,
        embedding_engine: EmbeddingEngine = None,
    ):
        self.url = url
        self.api_key = api_key
        self.embedding_engine = embedding_engine
        self.client = None
        self.executor = ThreadPoolExecutor()
        self._initialize_connection()
    
    def _initialize_connection(self) -> None:
        """Establish connection to <engine_name>."""
        try:
            # Initialize your vector database client here
            # Example: self.client = YourVectorDBClient(url=self.url, api_key=self.api_key)
            logger.debug(f"Successfully connected to <engine_name> at {self.url}")
        except Exception as e:
            logger.error(f"Failed to initialize <engine_name> connection: {e}")
            raise
    
    async def has_collection(self, collection_name: str) -> bool:
        """Check if a specified collection exists."""
        try:
            # Implement collection existence check
            # Example: return await self.client.collection_exists(collection_name)
            return False
        except Exception as e:
            logger.error(f"Error checking collection existence: {e}")
            return False
    
    async def create_collection(
        self,
        collection_name: str,
        payload_schema: Optional[object] = None,
    ):
        """Create a new collection with an optional payload schema."""
        if await self.has_collection(collection_name):
            return
        
        try:
            vector_size = self.embedding_engine.get_vector_size()
            # Implement collection creation logic
            # Example:
            # await self.client.create_collection(
            #     name=collection_name,
            #     vector_size=vector_size,
            #     distance_metric="cosine"
            # )
            logger.debug(f"Created collection: {collection_name}")
        except Exception as e:
            logger.error(f"Failed to create collection {collection_name}: {e}")
            raise
    
    async def create_data_points(self, collection_name: str, data_points: List[DataPoint]):
        """Insert new data points into the specified collection."""
        if not await self.has_collection(collection_name):
            await self.create_collection(collection_name)
        
        # Generate embeddings for data points
        embeddings = await self.embedding_engine.embed_text(
            [DataPoint.get_embeddable_data(data_point) for data_point in data_points]
        )
        
        try:
            # Implement data point insertion logic
            # Example:
            # formatted_points = [
            #     {
            #         "id": str(data_point.id),
            #         "vector": embeddings[i],
            #         "payload": data_point.model_dump()
            #     }
            #     for i, data_point in enumerate(data_points)
            # ]
            # await self.client.upsert(collection_name, formatted_points)
            logger.debug(f"Inserted {len(data_points)} data points into {collection_name}")
        except Exception as e:
            logger.error(f"Failed to insert data points: {e}")
            raise
    
    async def retrieve(self, collection_name: str, data_point_ids: List[str]):
        """Retrieve data points from a collection using their IDs."""
        try:
            # Implement data point retrieval logic
            # Example:
            # results = await self.client.retrieve(collection_name, data_point_ids)
            # return [
            #     ScoredResult(
            #         id=parse_id(result["id"]),
            #         payload=result["payload"],
            #         score=0
            #     )
            #     for result in results
            # ]
            return []
        except Exception as e:
            logger.error(f"Failed to retrieve data points: {e}")
            return []
    
    async def search(
        self,
        collection_name: str,
        query_text: Optional[str] = None,
        query_vector: Optional[List[float]] = None,
        limit: int = 15,
        with_vector: bool = False,
    ) -> List[ScoredResult]:
        """Perform a search in the specified collection."""
        if query_text is None and query_vector is None:
            raise InvalidValueError(message="One of query_text or query_vector must be provided!")
        
        if not await self.has_collection(collection_name):
            logger.warning(f"Collection '{collection_name}' not found; returning [].")
            return []
        
        if query_vector is None:
            query_vector = (await self.embedding_engine.embed_text([query_text]))[0]
        
        try:
            # Implement search logic
            # Example:
            # results = await self.client.search(
            #     collection_name=collection_name,
            #     query_vector=query_vector,
            #     limit=limit,
            #     include_vector=with_vector
            # )
            # 
            # return [
            #     ScoredResult(
            #         id=parse_id(result["id"]),
            #         payload=result["payload"],
            #         score=result["score"]
            #     )
            #     for result in results
            # ]
            return []
        except Exception as e:
            logger.error(f"Error searching collection '{collection_name}': {e}")
            return []
    
    async def batch_search(
        self,
        collection_name: str,
        query_texts: List[str],
        limit: int,
        with_vectors: bool = False,
    ):
        """Perform a batch search using multiple text queries."""
        query_vectors = await self.embedding_engine.embed_text(query_texts)
        
        return await asyncio.gather(
            *[
                self.search(
                    collection_name=collection_name,
                    query_vector=query_vector,
                    limit=limit,
                    with_vector=with_vectors,
                )
                for query_vector in query_vectors
            ]
        )
    
    async def delete_data_points(self, collection_name: str, data_point_ids: List[str]):
        """Delete specified data points from a collection."""
        try:
            # Implement deletion logic
            # Example:
            # result = await self.client.delete(collection_name, data_point_ids)
            # return result
            logger.debug(f"Deleted {len(data_point_ids)} data points from {collection_name}")
        except Exception as e:
            logger.error(f"Failed to delete data points: {e}")
            raise
    
    async def create_vector_index(self, index_name: str, index_property_name: str):
        """Create a vector index based on an index name and property name."""
        return await self.create_collection(f"{index_name}_{index_property_name}")
    
    async def index_data_points(
        self, index_name: str, index_property_name: str, data_points: List[DataPoint]
    ):
        """Index a list of data points by creating an associated vector index collection."""
        formatted_data_points = [
            IndexSchema(
                id=data_point.id,
                text=DataPoint.get_embeddable_data(data_point),
            )
            for data_point in data_points
        ]
        
        return await self.create_data_points(
            f"{index_name}_{index_property_name}",
            formatted_data_points,
        )
    
    async def prune(self):
        """Remove all data from the vector database."""
        try:
            # Implement pruning logic - delete all collections or data
            # Example:
            # collections = await self.client.list_collections()
            # for collection in collections:
            #     await self.client.delete_collection(collection.name)
            logger.debug("Pruned all data from <engine_name>")
        except Exception as e:
            logger.error(f"Failed to prune data: {e}")
            raise
Keep the method signatures consistent with VectorDBInterface. Reference the existing adapters like QDrantAdapter for more comprehensive examples.

2. Test with a Dedicated Script

File: packages/vector/engine_name/examples/example.py Create a script that loads cognee, configures it to use your new <engine_name> provider, and runs basic usage checks. For example:
import os
import pathlib
import cognee
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_default_user
from cognee.shared.logging_utils import get_logger
from cognee.modules.search.types import SearchType

logger = get_logger()

async def main():
    cognee.config.set_vector_db_provider("<engine_name>")
    data_directory_path = str(
        pathlib.Path(
            os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_<engine_name>")
        ).resolve()
    )
    cognee.config.data_root_directory(data_directory_path)
    cognee_directory_path = str(
        pathlib.Path(
            os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_<engine_name>")
        ).resolve()
    )
    cognee.config.system_root_directory(cognee_directory_path)

    await cognee.prune.prune_data()
    await cognee.prune.prune_system(metadata=True)

    dataset_name = "test_dataset"

    # Add some test data
    test_text = """
    This is a test document for the <engine_name> vector database integration.
    It contains information about artificial intelligence and machine learning.
    Vector databases are essential for semantic search and retrieval applications.
    """

    await cognee.add([test_text], dataset_name)
    await cognee.cognify([dataset_name])

    # Test vector search functionality
    from cognee.infrastructure.databases.vector import get_vector_engine

    vector_engine = get_vector_engine()
    search_results = await vector_engine.search("Entity_name", "artificial intelligence")

    assert len(search_results) != 0, "The search results list is empty."
    print(f"Found {len(search_results)} search results")

    # Test cognee search functionality
    search_results = await cognee.search(
        query_type=SearchType.INSIGHTS, 
        query_text="machine learning"
    )
    assert len(search_results) != 0, "The insights search results list is empty."

    search_results = await cognee.search(
        query_type=SearchType.CHUNKS, 
        query_text="vector databases"
    )
    assert len(search_results) != 0, "The chunks search results list is empty."

    # Test search history
    user = await get_default_user()
    history = await get_history(user.id)
    assert len(history) > 0, "Search history is empty."

    # Test cleanup
    await cognee.prune.prune_data()
    assert not os.path.isdir(data_directory_path), "Local data files are not deleted"

    await cognee.prune.prune_system(metadata=True)
    
    # Verify database is empty after pruning
    # Add specific checks for your vector database here
    # Example:
    # vector_engine = get_vector_engine()
    # collections = await vector_engine.client.list_collections()
    # assert len(collections) == 0, f"<EngineName> vector database is not empty"

    print("All tests passed successfully!")

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
This script ensures a basic end-to-end test, verifying:
  • Your new adapter can be selected by cognee.
  • Data can be added to the vector database.
  • Search functionality works correctly.
  • The database is empty after a prune operation.

3. Create a Test Workflow

File: .github/workflows/vector_db_tests.yml Add a new job to the existing vector database tests workflow:
  run-<engine_name>-tests:
    name: <EngineName> Tests
    runs-on: ubuntu-22.04
    if: ${{ inputs.databases == 'all' || contains(inputs.databases, '<engine_name>') }}
    
    # Add services if your database needs to run as a container
    # services:
    #   <engine_name>:
    #     image: <your_db_image>:<version>
    #     ports:
    #       - <port>:<port>
    #     env:
    #       # Add any required environment variables
    
    steps:
      - name: Check out
        uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - name: Cognee Setup
        uses: ./.github/actions/cognee_setup
        with:
          python-version: ${{ inputs.python-version }}

      - name: Install specific db dependency
        run: |
          poetry install -E <engine_name>

      # Add any setup steps if needed
      # - name: Wait for <EngineName> to be healthy
      #   run: |
      #     # Add health check commands

      - name: Run <EngineName> Tests
        env:
          ENV: 'dev'
          LLM_MODEL: ${{ secrets.LLM_MODEL }}
          LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
          LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
          LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
          EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
          EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
          EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
          EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
          VECTOR_DB_URL: ${{ secrets.<ENGINE_NAME>_URL }}
          VECTOR_DB_KEY: ${{ secrets.<ENGINE_NAME>_API_KEY }}
        run: poetry run python ./cognee_community_vector_adapter_engine_name/examples/example.py
        working-directory: ./packages/vector/engine_name

5. Update pyproject.toml

Add your vector database client dependencies to the optional dependencies: pyproject.toml:
[tool.poetry.dependencies]
python = "^3.11"
cognee
your-vector-db-client

[tool.poetry.extras]
...

## 6. Final Checklist

- [ ] Implement your `<EngineName>Adapter` in `packages/vector/<engine_name>/cognee_community_vector_adapter_<engine_name>/<engine_name>_adapter.py`.
- [ ] Add a register helper (`register.py`) and call it before configuring Cognee:

    ```python
    from cognee.infrastructure.databases.vector import use_vector_adapter

    from .<engine_name>_adapter import <EngineName>Adapter

    use_vector_adapter("engine_name", <EngineName>Adapter)
  • Create a test or example script test_<engine_name>.py or example.py that you can use in your test workflow.
  • Create** a test workflow: .github/workflows/engine_name/test_<engine_name>.yml.
  • Add required dependencies to pyproject.toml optional dependencies.
  • Open a PR to verify that your new integration passes CI.

Additional Considerations

Error Handling

  • Implement proper error handling for connection failures, timeouts, and API errors.
  • Log meaningful error messages that help with debugging.
  • Handle graceful degradation when the vector database is unavailable.

Performance

  • Consider implementing connection pooling if your vector database supports it.
  • Add proper async/await patterns to avoid blocking operations.
  • Implement batch operations efficiently where possible.

Security

  • Never log sensitive information like API keys or connection strings.
  • Validate inputs to prevent injection attacks.
  • Follow your vector database’s security best practices.

Documentation

  • Add docstrings to all public methods.
  • Document any specific configuration requirements.
  • Include examples of how to use your vector database with cognee.
That’s all! This approach keeps cognee’s architecture flexible, allowing you to swap in any vector database provider easily. If you need more advanced functionality (e.g., custom indexes, filters, or advanced search capabilities), simply implement them in your adapter class following the same patterns.

Join the Conversation!

Have questions about creating custom tasks? Join our community to discuss implementation strategies and best practices!