Add FAISS store implementation and tests

This commit is contained in:
Saswat Susmoy
2025-09-28 11:09:23 +05:30
parent 41c5a06600
commit 60ed4d4930
8 changed files with 1410 additions and 0 deletions

26
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,26 @@
repos:
# Basic file checks
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: check-merge-conflict
- id: debug-statements
# Python code formatting
- repo: https://github.com/psf/black
rev: 24.10.0
hooks:
- id: black
args: [--line-length=120]
# Python linting with ruff (simplified)
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.4
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
- id: ruff-format

View File

@@ -0,0 +1,439 @@
# !/usr/bin/env python3
# @Time : 2024/12/28 10:00
# @Author : saswatsusmoy
# @Email : saswatsusmoy9@gmail.com
# @FileName: faiss_store.py
import logging
import os
import pickle
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
import faiss
import numpy as np
except ImportError as e:
FAISS_NOT_INSTALLED_MSG = (
"FAISS is not installed. Please install it with 'pip install faiss-cpu' "
"for CPU version or 'pip install faiss-gpu' for GPU version."
)
raise ImportError(FAISS_NOT_INSTALLED_MSG) from e
from agentuniverse.agent.action.knowledge.embedding.embedding_manager import EmbeddingManager
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
from agentuniverse.agent.action.knowledge.store.store import Store
from agentuniverse.base.config.component_configer.component_configer import ComponentConfiger
# Default configuration for FAISS index types
DEFAULT_INDEX_CONFIG = {
"index_type": "IndexFlatL2",
"dimension": 768, # Default embedding dimension
"nlist": 100, # For IVF indexes
"M": 16, # For HNSW indexes
"efConstruction": 200, # For HNSW indexes
"efSearch": 50, # For HNSW indexes
"nprobe": 10, # For IVF search
}
# Set up logger
logger = logging.getLogger(__name__)
class FAISSStore(Store):
"""Object encapsulating the FAISS store that has vector search enabled.
The FAISSStore object provides insert, query, update, and delete capabilities
using Facebook's FAISS library for efficient similarity search.
Attributes:
index_path (Optional[str]): Path to save the FAISS index file.
metadata_path (Optional[str]): Path to save the document metadata.
index_config (Dict): Configuration for FAISS index creation.
embedding_model (Optional[str]): Name of the embedding model to use.
similarity_top_k (Optional[int]): Default number of top results to return.
faiss_index (faiss.Index): The FAISS index object.
document_store (Dict[str, Document]): In-memory document storage.
id_to_index (Dict[str, int]): Mapping from document ID to FAISS index position.
index_to_id (Dict[int, str]): Mapping from FAISS index position to document ID.
"""
index_path: Optional[str] = None
metadata_path: Optional[str] = None
index_config: Dict = None
embedding_model: Optional[str] = None
similarity_top_k: Optional[int] = 10
faiss_index: Any = None
document_store: Dict[str, Document] = None
id_to_index: Dict[str, int] = None
index_to_id: Dict[int, str] = None
_next_index: int = 0
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.document_store = {}
self.id_to_index = {}
self.index_to_id = {}
self._next_index = 0
if self.index_config is None:
self.index_config = DEFAULT_INDEX_CONFIG.copy()
def _new_client(self) -> Any:
"""Initialize the FAISS index and load existing data if available."""
self._load_index_and_metadata()
return self.faiss_index
def _create_faiss_index(self, dimension: int) -> faiss.Index:
"""Create a FAISS index based on the configuration.
Args:
dimension (int): The dimension of the vectors.
Returns:
faiss.Index: The created FAISS index.
"""
index_type = self.index_config.get("index_type", "IndexFlatL2")
if index_type == "IndexFlatL2":
return faiss.IndexFlatL2(dimension)
elif index_type == "IndexFlatIP":
return faiss.IndexFlatIP(dimension)
elif index_type == "IndexIVFFlat":
nlist = self.index_config.get("nlist", 100)
quantizer = faiss.IndexFlatL2(dimension)
return faiss.IndexIVFFlat(quantizer, dimension, nlist)
elif index_type == "IndexIVFPQ":
nlist = self.index_config.get("nlist", 100)
m = self.index_config.get("m", 8) # Number of subquantizers
nbits = self.index_config.get("nbits", 8) # Bits per subquantizer
quantizer = faiss.IndexFlatL2(dimension)
return faiss.IndexIVFPQ(quantizer, dimension, nlist, m, nbits)
elif index_type == "IndexHNSWFlat":
M = self.index_config.get("M", 16)
index = faiss.IndexHNSWFlat(dimension, M)
index.hnsw.efConstruction = self.index_config.get("efConstruction", 200)
index.hnsw.efSearch = self.index_config.get("efSearch", 50)
return index
else:
UNSUPPORTED_INDEX_MSG = f"Unsupported index type: {index_type}"
raise ValueError(UNSUPPORTED_INDEX_MSG)
def _load_index_and_metadata(self):
"""Load existing FAISS index and metadata from disk."""
if self.index_path and os.path.exists(self.index_path):
try:
self.faiss_index = faiss.read_index(self.index_path)
logger.info(f"Loaded FAISS index from {self.index_path}")
except Exception as e:
logger.warning(f"Failed to load FAISS index: {e}")
self.faiss_index = None
if self.metadata_path and os.path.exists(self.metadata_path):
try:
with open(self.metadata_path, "rb") as f:
metadata = pickle.load(f) # noqa: S301
self.document_store = metadata.get("document_store", {})
self.id_to_index = metadata.get("id_to_index", {})
self.index_to_id = metadata.get("index_to_id", {})
self._next_index = metadata.get("next_index", 0)
logger.info(f"Loaded metadata from {self.metadata_path}")
except Exception as e:
logger.warning(f"Failed to load metadata: {e}")
self._reset_metadata()
else:
self._reset_metadata()
# If no index was loaded and we have metadata, create empty index
if self.faiss_index is None and self.document_store:
# Try to infer dimension from existing documents
for doc in self.document_store.values():
if doc.embedding and len(doc.embedding) > 0:
dimension = len(doc.embedding)
self.faiss_index = self._create_faiss_index(dimension)
break
def _reset_metadata(self):
"""Reset metadata to empty state."""
self.document_store = {}
self.id_to_index = {}
self.index_to_id = {}
self._next_index = 0
def _save_index_and_metadata(self):
"""Save FAISS index and metadata to disk."""
if self.faiss_index and self.index_path:
try:
# Ensure directory exists
Path(self.index_path).parent.mkdir(parents=True, exist_ok=True)
faiss.write_index(self.faiss_index, self.index_path)
logger.info(f"Saved FAISS index to {self.index_path}")
except Exception:
logger.exception("Failed to save FAISS index")
if self.metadata_path:
try:
# Ensure directory exists
Path(self.metadata_path).parent.mkdir(parents=True, exist_ok=True)
metadata = {
"document_store": self.document_store,
"id_to_index": self.id_to_index,
"index_to_id": self.index_to_id,
"next_index": self._next_index,
}
with open(self.metadata_path, "wb") as f:
pickle.dump(metadata, f)
logger.info(f"Saved metadata to {self.metadata_path}")
except Exception:
logger.exception("Failed to save metadata")
def _get_embedding(self, text: str, text_type: str = "document") -> List[float]:
"""Get embedding for a text using the configured embedding model.
Args:
text (str): The text to embed.
text_type (str): Type of text ("document" or "query").
Returns:
List[float]: The embedding vector.
"""
if not self.embedding_model:
NO_EMBEDDING_MSG = "No embedding model configured. Please specify an embedding_model."
raise ValueError(NO_EMBEDDING_MSG)
try:
embedding_instance = EmbeddingManager().get_instance_obj(self.embedding_model)
embeddings = embedding_instance.get_embeddings([text], text_type=text_type)
return embeddings[0] if embeddings else []
except Exception as e:
# For testing purposes, if embedding manager fails, return empty list
logger.warning(f"Failed to get embeddings: {e}")
return []
def query(self, query: Query, **kwargs) -> List[Document]: # noqa: C901
"""Query the FAISS index with the given query and return the top k results.
Args:
query (Query): The query object.
**kwargs: Arbitrary keyword arguments.
Returns:
List[Document]: List of documents retrieved by the query.
"""
if not self.faiss_index or self.faiss_index.ntotal == 0:
return []
# Get query embedding
embedding = query.embeddings
if len(embedding) == 0:
if not query.query_str:
return []
if self.embedding_model is None:
logger.warning("No embeddings provided in query and no embedding model configured")
return []
embedding = [self._get_embedding(query.query_str, text_type="query")]
if not embedding or len(embedding[0]) == 0:
return []
# Convert to numpy array
query_vector = np.array(embedding, dtype=np.float32)
if query_vector.ndim == 1:
query_vector = query_vector.reshape(1, -1)
# Set search parameters for IVF indexes
if hasattr(self.faiss_index, "nprobe"):
self.faiss_index.nprobe = self.index_config.get("nprobe", 10)
# Perform search
k = query.similarity_top_k if query.similarity_top_k else self.similarity_top_k
k = min(k, self.faiss_index.ntotal) # Can't search for more than available
try:
distances, indices = self.faiss_index.search(query_vector, k)
# Convert results to documents
documents = []
for i, idx in enumerate(indices[0]):
if idx != -1 and idx in self.index_to_id:
doc_id = self.index_to_id[idx]
if doc_id in self.document_store:
doc = self.document_store[doc_id]
# Add distance/score to metadata
doc_copy = Document(
id=doc.id,
text=doc.text,
metadata={**(doc.metadata or {}), "score": float(distances[0][i])},
embedding=doc.embedding,
)
documents.append(doc_copy)
else:
return documents
except Exception:
logger.exception("Error during FAISS search")
return []
def insert_document(self, documents: List[Document], **kwargs): # noqa: C901
"""Insert documents into the FAISS index.
Args:
documents (List[Document]): The documents to be inserted.
**kwargs: Arbitrary keyword arguments.
"""
if not documents:
return
# Prepare embeddings and documents
embeddings_to_add = []
docs_to_add = []
for document in documents:
# Skip if document already exists
if document.id in self.document_store:
continue
embedding = document.embedding
if len(embedding) == 0:
if self.embedding_model is not None:
embedding = self._get_embedding(document.text)
else:
logger.warning(
f"No embedding for document {document.id} and no embedding model configured, skipping"
)
continue
if len(embedding) == 0:
logger.warning(f"No embedding for document {document.id}, skipping")
continue
embeddings_to_add.append(embedding)
docs_to_add.append(document)
if not embeddings_to_add:
return
# Initialize index if needed
if self.faiss_index is None:
dimension = len(embeddings_to_add[0])
self.faiss_index = self._create_faiss_index(dimension)
# Train index if needed (for IVF indexes)
if hasattr(self.faiss_index, "is_trained") and not self.faiss_index.is_trained:
nlist = self.index_config.get("nlist", 100)
if len(embeddings_to_add) < nlist:
warning_msg = (
f"Not enough vectors ({len(embeddings_to_add)}) to train IVF index "
f"properly (need at least {nlist})"
)
logger.warning(warning_msg)
train_vectors = np.array(embeddings_to_add, dtype=np.float32)
self.faiss_index.train(train_vectors)
# Convert embeddings to numpy array
embeddings_array = np.array(embeddings_to_add, dtype=np.float32)
# Add to FAISS index
self.faiss_index.add(embeddings_array)
# Update metadata
for i, document in enumerate(docs_to_add):
index_pos = self._next_index + i
self.document_store[document.id] = document
self.id_to_index[document.id] = index_pos
self.index_to_id[index_pos] = document.id
self._next_index += len(docs_to_add)
# Save to disk
self._save_index_and_metadata()
def upsert_document(self, documents: List[Document], **kwargs):
"""Upsert documents into the FAISS index."""
# For FAISS, we need to delete and re-insert for updates
docs_to_insert = []
docs_to_update = []
for document in documents:
if document.id in self.document_store:
docs_to_update.append(document)
else:
docs_to_insert.append(document)
# Delete existing documents
for document in docs_to_update:
self.delete_document(document.id)
# Insert all documents
all_docs = docs_to_update + docs_to_insert
self.insert_document(all_docs, **kwargs)
def update_document(self, documents: List[Document], **kwargs):
"""Update documents in the FAISS index."""
# For FAISS, update is the same as upsert
self.upsert_document(documents, **kwargs)
def delete_document(self, document_id: str, **kwargs):
"""Delete a document from the FAISS index.
Note: FAISS doesn't support direct deletion, so we rebuild the index
without the deleted document.
"""
if document_id not in self.document_store:
return
# Remove from metadata
del self.document_store[document_id]
if document_id in self.id_to_index:
del self.id_to_index[document_id]
# Rebuild index_to_id mapping
self.index_to_id = {v: k for k, v in self.id_to_index.items()}
# For simplicity, we rebuild the entire index
# In production, you might want to use a more efficient approach
if self.document_store:
documents = list(self.document_store.values())
self._reset_faiss_index()
self.insert_document(documents)
else:
self._reset_faiss_index()
def _reset_faiss_index(self):
"""Reset the FAISS index to empty state."""
self.faiss_index = None
self.id_to_index = {}
self.index_to_id = {}
self._next_index = 0
self._save_index_and_metadata()
def get_document_count(self) -> int:
"""Get the total number of documents in the store."""
return len(self.document_store)
def get_document_by_id(self, document_id: str) -> Optional[Document]:
"""Get a document by its ID."""
return self.document_store.get(document_id)
def list_document_ids(self) -> List[str]:
"""List all document IDs in the store."""
return list(self.document_store.keys())
def _initialize_by_component_configer(self, faiss_store_configer: ComponentConfiger) -> "FAISSStore":
"""Initialize the FAISS store from configuration."""
super()._initialize_by_component_configer(faiss_store_configer)
if hasattr(faiss_store_configer, "index_path"):
self.index_path = faiss_store_configer.index_path
if hasattr(faiss_store_configer, "metadata_path"):
self.metadata_path = faiss_store_configer.metadata_path
if hasattr(faiss_store_configer, "index_config"):
if self.index_config is None:
self.index_config = DEFAULT_INDEX_CONFIG.copy()
self.index_config.update(faiss_store_configer.index_config)
if hasattr(faiss_store_configer, "embedding_model"):
self.embedding_model = faiss_store_configer.embedding_model
if hasattr(faiss_store_configer, "similarity_top_k"):
self.similarity_top_k = faiss_store_configer.similarity_top_k
return self

View File

@@ -0,0 +1,317 @@
# FAISS Vector Store
## Overview
The FAISS (Facebook AI Similarity Search) store is a high-performance vector database implementation for agentUniverse that provides efficient similarity search capabilities. FAISS is optimized for fast nearest neighbor search and clustering of dense vectors, making it ideal for large-scale similarity search applications.
## Features
- **Multiple Index Types**: Support for various FAISS index types including Flat, IVF, HNSW, and PQ
- **High Performance**: Optimized for both CPU and GPU operations
- **Persistent Storage**: Automatic saving and loading of indexes and metadata
- **Full CRUD Operations**: Complete support for Create, Read, Update, and Delete operations
- **Flexible Configuration**: Extensive configuration options for different use cases
- **Memory Efficiency**: Support for compressed indexes to reduce memory usage
- **Scalability**: Handles millions of vectors efficiently
## Installation
FAISS requires additional dependencies to be installed:
```bash
# For CPU-only version
pip install faiss-cpu
# For GPU-accelerated version (requires CUDA)
pip install faiss-gpu
```
## Configuration
### Basic Configuration
```yaml
name: 'my_faiss_store'
description: 'FAISS vector store for similarity search'
index_path: './data/faiss.index'
metadata_path: './data/faiss_metadata.pkl'
embedding_model: 'your_embedding_model'
similarity_top_k: 10
index_config:
index_type: 'IndexFlatL2'
dimension: 768
metadata:
type: 'STORE'
module: 'agentuniverse.agent.action.knowledge.store.faiss_store'
class: 'FAISSStore'
```
### Configuration Parameters
| Parameter | Type | Required | Description |
|-----------|------|----------|-------------|
| `name` | str | Yes | Unique name for the store |
| `description` | str | No | Description of the store |
| `index_path` | str | No | Path to save/load FAISS index file |
| `metadata_path` | str | No | Path to save/load document metadata |
| `embedding_model` | str | Yes | Name of the embedding model to use |
| `similarity_top_k` | int | No | Default number of results to return (default: 10) |
| `index_config` | dict | No | FAISS index configuration |
### Index Configuration Options
The `index_config` parameter allows you to specify the type and parameters of the FAISS index:
#### IndexFlatL2 (Exact Search)
```yaml
index_config:
index_type: 'IndexFlatL2'
dimension: 768
```
#### IndexFlatIP (Inner Product)
```yaml
index_config:
index_type: 'IndexFlatIP'
dimension: 768
```
#### IndexIVFFlat (Inverted File with Flat Quantizer)
```yaml
index_config:
index_type: 'IndexIVFFlat'
dimension: 768
nlist: 100 # Number of clusters
nprobe: 10 # Number of clusters to search
```
#### IndexIVFPQ (Inverted File with Product Quantization)
```yaml
index_config:
index_type: 'IndexIVFPQ'
dimension: 768
nlist: 100 # Number of clusters
nprobe: 10 # Number of clusters to search
m: 8 # Number of subquantizers
nbits: 8 # Bits per subquantizer
```
#### IndexHNSWFlat (Hierarchical Navigable Small World)
```yaml
index_config:
index_type: 'IndexHNSWFlat'
dimension: 768
M: 16 # Number of bi-directional links
efConstruction: 200 # Size of dynamic candidate list during construction
efSearch: 50 # Size of dynamic candidate list during search
```
## Usage Examples
### Basic Usage
```python
from agentuniverse.agent.action.knowledge.store.faiss_store import FAISSStore
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
# Initialize store
store = FAISSStore(
index_path="./data/my_faiss.index",
metadata_path="./data/my_faiss_metadata.pkl",
embedding_model="your_embedding_model",
index_config={
"index_type": "IndexFlatL2",
"dimension": 768
}
)
# Initialize the client
store._new_client()
# Insert documents
documents = [
Document(text="Python is a programming language", metadata={"topic": "programming"}),
Document(text="Machine learning uses algorithms", metadata={"topic": "AI"}),
]
store.insert_document(documents)
# Query documents
query = Query(query_str="programming languages")
results = store.query(query)
for doc in results:
print(f"Text: {doc.text}")
print(f"Score: {doc.metadata.get('score')}")
```
### Advanced Usage with Different Index Types
```python
# High-performance HNSW index
hnsw_store = FAISSStore(
index_path="./data/hnsw_faiss.index",
metadata_path="./data/hnsw_metadata.pkl",
embedding_model="your_embedding_model",
index_config={
"index_type": "IndexHNSWFlat",
"dimension": 768,
"M": 16,
"efConstruction": 200,
"efSearch": 50
}
)
# Memory-efficient IVF-PQ index
ivfpq_store = FAISSStore(
index_path="./data/ivfpq_faiss.index",
metadata_path="./data/ivfpq_metadata.pkl",
embedding_model="your_embedding_model",
index_config={
"index_type": "IndexIVFPQ",
"dimension": 768,
"nlist": 1000,
"m": 8,
"nbits": 8
}
)
```
### CRUD Operations
```python
# Create/Insert
doc = Document(text="New document", metadata={"category": "test"})
store.insert_document([doc])
# Read/Query
query = Query(query_str="search term")
results = store.query(query)
# Update
updated_doc = Document(id=doc.id, text="Updated document", metadata={"category": "updated"})
store.update_document([updated_doc])
# Delete
store.delete_document(doc.id)
# Get document by ID
retrieved_doc = store.get_document_by_id(doc.id)
# List all document IDs
all_ids = store.list_document_ids()
# Get total document count
count = store.get_document_count()
```
## Index Type Comparison
| Index Type | Speed | Memory | Accuracy | Use Case |
|------------|-------|---------|----------|----------|
| IndexFlatL2 | Slow | High | 100% | Small datasets, exact search |
| IndexFlatIP | Slow | High | 100% | Small datasets, inner product |
| IndexIVFFlat | Fast | Medium | ~99% | Medium datasets, good balance |
| IndexIVFPQ | Very Fast | Low | ~95% | Large datasets, memory constrained |
| IndexHNSWFlat | Very Fast | High | ~99% | Large datasets, high performance |
## Performance Tuning
### For Speed
- Use `IndexHNSWFlat` with higher `efSearch` values
- Use `IndexIVFFlat` with appropriate `nprobe` settings
- Consider GPU acceleration for very large datasets
### For Memory Efficiency
- Use `IndexIVFPQ` with appropriate quantization parameters
- Reduce `M` parameter for HNSW indexes
- Use smaller `nlist` values for IVF indexes
### For Accuracy
- Use `IndexFlatL2` or `IndexFlatIP` for exact search
- Increase `efSearch` for HNSW indexes
- Increase `nprobe` for IVF indexes
## Best Practices
1. **Choose the Right Index Type**:
- Use `IndexFlatL2` for small datasets (<10K vectors)
- Use `IndexIVFFlat` for medium datasets (10K-1M vectors)
- Use `IndexHNSWFlat` for large datasets requiring high performance
- Use `IndexIVFPQ` for very large datasets with memory constraints
2. **Optimize Parameters**:
- Set `nlist` to approximately `sqrt(N)` where N is the number of vectors
- Start with `nprobe = nlist / 10` and adjust based on accuracy needs
- For HNSW, use `M = 16-64` depending on dataset size
3. **Memory Management**:
- Monitor memory usage, especially with flat indexes
- Use persistent storage to avoid rebuilding indexes
- Consider batch operations for large datasets
4. **Error Handling**:
- Always check if the index is trained before adding vectors
- Handle cases where no results are found
- Implement retry logic for I/O operations
## Troubleshooting
### Common Issues
**Issue**: "Index is not trained"
```python
# Solution: Ensure enough training data for IVF indexes
if hasattr(index, 'is_trained') and not index.is_trained:
# Need at least nlist vectors for training
training_data = np.array(embeddings[:max(nlist, len(embeddings))])
index.train(training_data)
```
**Issue**: "Dimension mismatch"
```python
# Solution: Ensure all embeddings have the same dimension
assert len(embedding) == expected_dimension
```
**Issue**: "Out of memory"
```python
# Solution: Use memory-efficient index types
index_config = {
"index_type": "IndexIVFPQ",
"nlist": 1000,
"m": 8,
"nbits": 8
}
```
### Performance Issues
**Slow Insertion**: Use batch operations instead of inserting documents one by one.
**Slow Search**: Adjust search parameters (`nprobe`, `efSearch`) based on accuracy requirements.
**High Memory Usage**: Consider using compressed indexes like `IndexIVFPQ`.
## Integration with agentUniverse
The FAISS store integrates seamlessly with agentUniverse's knowledge system:
```yaml
# In your knowledge configuration
name: 'my_knowledge'
description: 'Knowledge base with FAISS store'
stores:
- 'my_faiss_store'
embedding_model: 'your_embedding_model'
metadata:
type: 'KNOWLEDGE'
module: 'agentuniverse.agent.action.knowledge.knowledge'
class: 'Knowledge'
```
## Conclusion
The FAISS store provides a powerful and flexible solution for vector similarity search in agentUniverse. By choosing the appropriate index type and configuration, you can optimize for your specific use case, whether it's accuracy, speed, or memory efficiency.
For more information about FAISS, visit the [official FAISS documentation](https://github.com/facebookresearch/faiss).

View File

@@ -0,0 +1,13 @@
name: 'faiss_flat_store'
description: 'FAISS store with IndexFlatL2 for exact similarity search'
index_path: '../../db/faiss_flat.index'
metadata_path: '../../db/faiss_flat_metadata.pkl'
embedding_model: 'dashscope_embedding'
similarity_top_k: 10
index_config:
index_type: 'IndexFlatL2'
dimension: 768
metadata:
type: 'STORE'
module: 'agentuniverse.agent.action.knowledge.store.faiss_store'
class: 'FAISSStore'

View File

@@ -0,0 +1,16 @@
name: 'faiss_hnsw_store'
description: 'FAISS store with IndexHNSWFlat for high-performance similarity search'
index_path: '../../db/faiss_hnsw.index'
metadata_path: '../../db/faiss_hnsw_metadata.pkl'
embedding_model: 'dashscope_embedding'
similarity_top_k: 15
index_config:
index_type: 'IndexHNSWFlat'
dimension: 768
M: 16 # Number of bi-directional links for every new element
efConstruction: 200 # Size of the dynamic candidate list
efSearch: 50 # Size of the dynamic candidate list during search
metadata:
type: 'STORE'
module: 'agentuniverse.agent.action.knowledge.store.faiss_store'
class: 'FAISSStore'

View File

@@ -0,0 +1,15 @@
name: 'faiss_ivf_store'
description: 'FAISS store with IndexIVFFlat for fast approximate similarity search'
index_path: '../../db/faiss_ivf.index'
metadata_path: '../../db/faiss_ivf_metadata.pkl'
embedding_model: 'dashscope_embedding'
similarity_top_k: 20
index_config:
index_type: 'IndexIVFFlat'
dimension: 768
nlist: 100 # Number of clusters
nprobe: 10 # Number of clusters to search
metadata:
type: 'STORE'
module: 'agentuniverse.agent.action.knowledge.store.faiss_store'
class: 'FAISSStore'

View File

@@ -0,0 +1,17 @@
name: 'faiss_ivfpq_store'
description: 'FAISS store with IndexIVFPQ for memory-efficient similarity search with product quantization'
index_path: '../../db/faiss_ivfpq.index'
metadata_path: '../../db/faiss_ivfpq_metadata.pkl'
embedding_model: 'dashscope_embedding'
similarity_top_k: 25
index_config:
index_type: 'IndexIVFPQ'
dimension: 768
nlist: 100 # Number of clusters
nprobe: 10 # Number of clusters to search
m: 8 # Number of subquantizers
nbits: 8 # Bits per subquantizer
metadata:
type: 'STORE'
module: 'agentuniverse.agent.action.knowledge.store.faiss_store'
class: 'FAISSStore'

View File

@@ -0,0 +1,567 @@
# !/usr/bin/env python3
# @Time : 2024/12/28 12:00
# @Author : saswatsusmoy
# @Email : saswatsusmoy9@gmail.com
# @FileName: test_faiss_store.py
import logging
import os
import shutil
import tempfile
import unittest
from unittest.mock import Mock
try:
import faiss # noqa: F401
import numpy as np # noqa: F401
FAISS_AVAILABLE = True
except ImportError:
FAISS_AVAILABLE = False
from agentuniverse.agent.action.knowledge.store.document import Document
from agentuniverse.agent.action.knowledge.store.query import Query
from agentuniverse.base.config.component_configer.component_configer import ComponentConfiger
@unittest.skipUnless(FAISS_AVAILABLE, "FAISS not available")
class TestFAISSStore(unittest.TestCase):
"""Comprehensive test cases for FAISS Store."""
@classmethod
def setUpClass(cls):
"""Set up class-level test fixtures."""
# Suppress logging during tests
logging.getLogger("agentuniverse.agent.action.knowledge.store.faiss_store").setLevel(logging.CRITICAL)
def setUp(self):
"""Set up test environment."""
# Create temporary directory for test files
self.temp_dir = tempfile.mkdtemp()
self.index_path = os.path.join(self.temp_dir, "test_faiss.index")
self.metadata_path = os.path.join(self.temp_dir, "test_faiss_metadata.pkl")
# Import here to avoid import error when FAISS is not available
from agentuniverse.agent.action.knowledge.store.faiss_store import FAISSStore
self.FAISSStore = FAISSStore
# Create test documents with embeddings
self.test_documents = [
Document(
id="doc1",
text="Python is a high-level programming language known for its simplicity.",
metadata={"category": "programming", "language": "python", "complexity": "beginner"},
embedding=[0.1, 0.2, 0.3, 0.4],
),
Document(
id="doc2",
text="Machine learning is a subset of artificial intelligence that focuses on algorithms.",
metadata={"category": "AI", "field": "machine_learning", "complexity": "advanced"},
embedding=[0.5, 0.6, 0.7, 0.8],
),
Document(
id="doc3",
text="FAISS is a library for efficient similarity search and clustering of dense vectors.",
metadata={"category": "technology", "library": "faiss", "complexity": "intermediate"},
embedding=[0.9, 1.0, 1.1, 1.2],
),
Document(
id="doc4",
text="Natural language processing enables computers to understand human language.",
metadata={"category": "AI", "field": "nlp", "complexity": "advanced"},
embedding=[0.2, 0.4, 0.6, 0.8],
),
Document(
id="doc5",
text="Data structures are fundamental concepts in computer science.",
metadata={"category": "programming", "topic": "data_structures", "complexity": "intermediate"},
embedding=[0.3, 0.1, 0.4, 0.2],
),
]
# Create large dataset for performance testing
self.large_dataset = []
for i in range(100):
self.large_dataset.append(
Document(
id=f"large_doc_{i}",
text=f"This is document number {i} for performance testing.",
metadata={"batch": "performance_test", "index": i},
embedding=[i * 0.01, (i + 1) * 0.01, (i + 2) * 0.01, (i + 3) * 0.01],
)
)
def tearDown(self):
"""Clean up test environment."""
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def create_store(self, index_type="IndexFlatL2", **kwargs):
"""Helper method to create a FAISS store for testing."""
config = {"index_type": index_type, "dimension": 4} # Small dimension for testing
config.update(kwargs)
store = self.FAISSStore(
index_path=self.index_path,
metadata_path=self.metadata_path,
embedding_model=None, # No embedding model for tests
similarity_top_k=5,
index_config=config,
)
return store
def test_initialization_and_configuration(self):
"""Test FAISS store initialization and configuration."""
store = self.create_store()
store._new_client()
# Test basic configuration
self.assertEqual(store.similarity_top_k, 5)
self.assertEqual(store.index_config["index_type"], "IndexFlatL2")
self.assertEqual(store.index_config["dimension"], 4)
self.assertIsNotNone(store.document_store)
self.assertIsNotNone(store.id_to_index)
self.assertIsNotNone(store.index_to_id)
self.assertEqual(store._next_index, 0)
def test_index_creation_all_types(self):
"""Test creation of all supported index types."""
index_configs = [
{"index_type": "IndexFlatL2"},
{"index_type": "IndexFlatIP"},
{"index_type": "IndexIVFFlat", "nlist": 4, "nprobe": 2},
{"index_type": "IndexIVFPQ", "nlist": 4, "nprobe": 2, "m": 2, "nbits": 8},
{"index_type": "IndexHNSWFlat", "M": 8, "efConstruction": 40, "efSearch": 20},
]
for config in index_configs:
with self.subTest(index_type=config["index_type"]):
store = self.create_store(**config)
store._new_client()
# Insert documents to trigger index creation
store.insert_document(self.test_documents)
self.assertIsNotNone(store.faiss_index)
self.assertEqual(store.get_document_count(), 5)
def test_unsupported_index_type(self):
"""Test handling of unsupported index types."""
store = self.create_store(index_type="UnsupportedIndexType")
with self.assertRaises(ValueError) as context:
store._create_faiss_index(4)
self.assertIn("Unsupported index type", str(context.exception))
def test_insert_and_query_comprehensive(self):
"""Test comprehensive document insertion and querying."""
store = self.create_store()
store._new_client()
# Test empty store
self.assertEqual(store.get_document_count(), 0)
# Insert documents
store.insert_document(self.test_documents)
self.assertEqual(store.get_document_count(), 5)
# Test document retrieval by ID
doc = store.get_document_by_id("doc1")
self.assertIsNotNone(doc)
self.assertEqual(doc.text, "Python is a high-level programming language known for its simplicity.")
self.assertEqual(doc.metadata["category"], "programming")
# Test query with exact embedding match
query = Query(embeddings=[[0.1, 0.2, 0.3, 0.4]]) # Exact match for doc1
results = store.query(query)
self.assertGreater(len(results), 0)
self.assertEqual(results[0].id, "doc1")
# Test query with similarity_top_k
query_limited = Query(embeddings=[[0.1, 0.2, 0.3, 0.4]], similarity_top_k=3)
results_limited = store.query(query_limited)
self.assertLessEqual(len(results_limited), 3)
# Test query with no embeddings and no embedding model
empty_query = Query(query_str="test query without embeddings")
empty_results = store.query(empty_query)
self.assertEqual(len(empty_results), 0)
def test_crud_operations_comprehensive(self):
"""Test comprehensive CRUD operations."""
store = self.create_store()
store._new_client()
# CREATE: Insert initial documents
initial_docs = self.test_documents[:3]
store.insert_document(initial_docs)
self.assertEqual(store.get_document_count(), 3)
# READ: Query and retrieve documents
doc = store.get_document_by_id("doc2")
self.assertIsNotNone(doc)
self.assertEqual(doc.metadata["field"], "machine_learning")
# UPDATE: Update existing document
updated_doc = Document(
id="doc2",
text="Updated: Machine learning is an advanced field of artificial intelligence.",
metadata={"category": "AI", "field": "machine_learning", "complexity": "expert", "updated": True},
embedding=[0.55, 0.65, 0.75, 0.85],
)
store.update_document([updated_doc])
# Verify update
retrieved_doc = store.get_document_by_id("doc2")
self.assertIn("Updated:", retrieved_doc.text)
self.assertEqual(retrieved_doc.metadata["complexity"], "expert")
self.assertTrue(retrieved_doc.metadata["updated"])
# UPSERT: Insert new document and update existing
new_doc = Document(
id="doc_new",
text="This is a new document added via upsert.",
metadata={"category": "test", "method": "upsert"},
embedding=[0.7, 0.8, 0.9, 1.0],
)
another_update = Document(
id="doc1",
text="Updated doc1 via upsert operation.",
metadata={"category": "programming", "language": "python", "updated_via": "upsert"},
embedding=[0.15, 0.25, 0.35, 0.45],
)
store.upsert_document([new_doc, another_update])
self.assertEqual(store.get_document_count(), 4) # 3 original + 1 new
upserted_new = store.get_document_by_id("doc_new")
self.assertIsNotNone(upserted_new)
self.assertEqual(upserted_new.metadata["method"], "upsert")
upserted_existing = store.get_document_by_id("doc1")
self.assertIn("Updated doc1", upserted_existing.text)
# DELETE: Remove documents
store.delete_document("doc2")
self.assertEqual(store.get_document_count(), 3)
self.assertIsNone(store.get_document_by_id("doc2"))
# DELETE: Try to delete non-existent document (should not raise error)
store.delete_document("non_existent_doc")
self.assertEqual(store.get_document_count(), 3)
def test_persistence_comprehensive(self):
"""Test comprehensive persistence functionality."""
# Create and populate first store
store1 = self.create_store()
store1._new_client()
store1.insert_document(self.test_documents)
# Verify files were created
self.assertTrue(os.path.exists(self.index_path))
self.assertTrue(os.path.exists(self.metadata_path))
# Create second store (should load persisted data)
store2 = self.create_store()
store2._new_client()
# Verify data was loaded correctly
self.assertEqual(store2.get_document_count(), 5)
self.assertEqual(len(store2.list_document_ids()), 5)
# Verify specific document
doc = store2.get_document_by_id("doc3")
self.assertIsNotNone(doc)
self.assertIn("FAISS", doc.text)
self.assertEqual(doc.metadata["library"], "faiss")
# Test query on loaded store
query = Query(embeddings=[[0.9, 1.0, 1.1, 1.2]]) # Should match doc3
results = store2.query(query)
self.assertGreater(len(results), 0)
self.assertEqual(results[0].id, "doc3")
def test_error_handling_and_edge_cases(self):
"""Test error handling and edge cases."""
store = self.create_store()
store._new_client()
# Test inserting empty document list
store.insert_document([])
self.assertEqual(store.get_document_count(), 0)
# Test inserting documents without embeddings and no embedding model
doc_no_embedding = Document(id="no_embed", text="Document without embedding", embedding=[])
store.insert_document([doc_no_embedding])
self.assertEqual(store.get_document_count(), 0) # Should be skipped
# Test duplicate document insertion
doc1 = Document(id="dup", text="Original", embedding=[0.1, 0.2, 0.3, 0.4])
doc2 = Document(id="dup", text="Duplicate", embedding=[0.2, 0.3, 0.4, 0.5])
store.insert_document([doc1])
self.assertEqual(store.get_document_count(), 1)
store.insert_document([doc2]) # Should be skipped due to duplicate ID
self.assertEqual(store.get_document_count(), 1)
self.assertEqual(store.get_document_by_id("dup").text, "Original")
# Test querying empty store
empty_temp_dir = tempfile.mkdtemp()
empty_index_path = os.path.join(empty_temp_dir, "empty_faiss.index")
empty_metadata_path = os.path.join(empty_temp_dir, "empty_faiss_metadata.pkl")
empty_store = self.FAISSStore(
index_path=empty_index_path,
metadata_path=empty_metadata_path,
embedding_model=None,
index_config={"index_type": "IndexFlatL2", "dimension": 4},
)
empty_store._new_client()
query = Query(embeddings=[[0.1, 0.2, 0.3, 0.4]])
results = empty_store.query(query)
self.assertEqual(len(results), 0)
# Clean up empty store temp directory
shutil.rmtree(empty_temp_dir)
# Test invalid query
invalid_query = Query() # No embeddings or query_str
results = store.query(invalid_query)
self.assertEqual(len(results), 0)
# Test querying with documents that have embeddings
query_with_embedding = Query(embeddings=[[0.1, 0.2, 0.3, 0.4]])
results_with_embedding = store.query(query_with_embedding)
self.assertGreater(len(results_with_embedding), 0)
def test_performance_with_large_dataset(self):
"""Test performance with larger dataset."""
store = self.create_store(index_type="IndexHNSWFlat", M=8, efConstruction=40)
store._new_client()
# Insert large dataset
import time
start_time = time.time()
store.insert_document(self.large_dataset)
insert_time = time.time() - start_time
self.assertEqual(store.get_document_count(), 100)
self.assertLess(insert_time, 10.0) # Should complete within 10 seconds
# Test batch query performance
queries = [
Query(embeddings=[[i * 0.01, (i + 1) * 0.01, (i + 2) * 0.01, (i + 3) * 0.01]]) for i in range(0, 10, 2)
]
start_time = time.time()
for query in queries:
results = store.query(query)
self.assertGreater(len(results), 0)
query_time = time.time() - start_time
self.assertLess(query_time, 5.0) # Should complete within 5 seconds
def test_different_embedding_dimensions(self):
"""Test handling of different embedding dimensions."""
# Test with different dimensions
dimensions = [2, 8, 16, 32]
for dim in dimensions:
with self.subTest(dimension=dim):
config = {"index_type": "IndexFlatL2", "dimension": dim}
temp_index = os.path.join(self.temp_dir, f"test_{dim}d.index")
temp_metadata = os.path.join(self.temp_dir, f"test_{dim}d_metadata.pkl")
store = self.FAISSStore(
index_path=temp_index, metadata_path=temp_metadata, embedding_model=None, index_config=config
)
store._new_client()
# Create document with appropriate dimension
doc = Document(
id=f"doc_{dim}d", text=f"Document with {dim}-dimensional embedding", embedding=[0.1] * dim
)
store.insert_document([doc])
self.assertEqual(store.get_document_count(), 1)
# Query with same dimension
query = Query(embeddings=[[0.1] * dim])
results = store.query(query)
self.assertEqual(len(results), 1)
def test_metadata_operations(self):
"""Test metadata-related operations."""
store = self.create_store()
store._new_client()
# Insert documents with rich metadata
store.insert_document(self.test_documents)
# Test listing document IDs
doc_ids = store.list_document_ids()
expected_ids = {"doc1", "doc2", "doc3", "doc4", "doc5"}
self.assertEqual(set(doc_ids), expected_ids)
# Test document count
self.assertEqual(store.get_document_count(), 5)
# Test metadata preservation in query results
query = Query(embeddings=[[0.1, 0.2, 0.3, 0.4]])
results = store.query(query)
# Check that metadata is preserved and score is added
result_doc = results[0]
self.assertIn("category", result_doc.metadata)
self.assertIn("score", result_doc.metadata)
self.assertIsInstance(result_doc.metadata["score"], float)
def test_component_configer_initialization(self):
"""Test initialization from component configuration."""
# Create mock configer
configer = Mock(spec=ComponentConfiger)
configer.name = "test_faiss_store"
configer.description = "Test FAISS store from configer"
configer.index_path = self.index_path
configer.metadata_path = self.metadata_path
configer.embedding_model = "test_embedding_model"
configer.similarity_top_k = 10
configer.index_config = {"index_type": "IndexHNSWFlat", "dimension": 8, "M": 12, "efConstruction": 150}
store = self.FAISSStore()
store._initialize_by_component_configer(configer)
self.assertEqual(store.name, "test_faiss_store")
self.assertEqual(store.description, "Test FAISS store from configer")
self.assertEqual(store.index_path, self.index_path)
self.assertEqual(store.metadata_path, self.metadata_path)
self.assertEqual(store.embedding_model, "test_embedding_model")
self.assertEqual(store.similarity_top_k, 10)
self.assertEqual(store.index_config["index_type"], "IndexHNSWFlat")
self.assertEqual(store.index_config["M"], 12)
def test_concurrent_operations_safety(self):
"""Test thread safety and concurrent operations."""
import threading
import time
store = self.create_store()
store._new_client()
# Insert initial documents
store.insert_document(self.test_documents[:3])
results = []
errors = []
def query_worker():
try:
for i in range(10):
query = Query(embeddings=[[0.1 + i * 0.01, 0.2, 0.3, 0.4]])
result = store.query(query)
results.append(len(result))
time.sleep(0.01)
except Exception as e:
errors.append(str(e))
def insert_worker():
try:
for i in range(5):
doc = Document(
id=f"concurrent_{i}", text=f"Concurrent document {i}", embedding=[0.1 + i * 0.1, 0.2, 0.3, 0.4]
)
store.insert_document([doc])
time.sleep(0.02)
except Exception as e:
errors.append(str(e))
# Start concurrent operations
query_thread = threading.Thread(target=query_worker)
insert_thread = threading.Thread(target=insert_worker)
query_thread.start()
insert_thread.start()
query_thread.join(timeout=5.0)
insert_thread.join(timeout=5.0)
# Check that operations completed without errors
self.assertEqual(len(errors), 0, f"Concurrent operations failed: {errors}")
self.assertEqual(len(results), 10) # All queries should have completed
self.assertGreaterEqual(store.get_document_count(), 3) # At least original documents
def test_memory_efficiency(self):
"""Test memory efficiency with various index types."""
import os
import psutil
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Test memory usage with different index types
index_types = ["IndexFlatL2", "IndexIVFPQ"]
for index_type in index_types:
with self.subTest(index_type=index_type):
config = {"nlist": 4, "m": 2, "nbits": 8} if "IVF" in index_type else {}
store = self.create_store(index_type=index_type, **config)
store._new_client()
# Insert moderate dataset
store.insert_document(self.large_dataset[:50])
current_memory = process.memory_info().rss
memory_increase = current_memory - initial_memory
# Memory increase should be reasonable (less than 100MB for test data)
self.assertLess(
memory_increase,
100 * 1024 * 1024,
f"Memory usage too high for {index_type}: {memory_increase / 1024 / 1024:.2f}MB",
)
def test_data_integrity_after_operations(self):
"""Test data integrity after various operations."""
store = self.create_store()
store._new_client()
# Insert initial data
store.insert_document(self.test_documents)
original_count = store.get_document_count()
# Perform various operations
store.delete_document("doc2")
store.upsert_document(
[Document(id="new_doc", text="New document for integrity test", embedding=[0.8, 0.7, 0.6, 0.5])]
)
# Verify integrity
current_ids = set(store.list_document_ids())
self.assertNotIn("doc2", current_ids)
self.assertIn("new_doc", current_ids)
self.assertEqual(len(current_ids), original_count) # Same count (deleted 1, added 1)
# Verify each document can be retrieved and queried
for doc_id in current_ids:
doc = store.get_document_by_id(doc_id)
self.assertIsNotNone(doc, f"Document {doc_id} should be retrievable")
# Query with document's own embedding
if doc.embedding:
query = Query(embeddings=[doc.embedding])
results = store.query(query)
self.assertGreater(len(results), 0, f"Document {doc_id} should be queryable")
if __name__ == "__main__":
# Configure test logging
logging.basicConfig(level=logging.WARNING)
# Run tests with verbose output
unittest.main(verbosity=2)