memoryscope.core.storage.llama_index_sync_elasticsearch
Elasticsearch vector store.
- memoryscope.core.storage.llama_index_sync_elasticsearch.get_elasticsearch_client(url: str | None = None, cloud_id: str | None = None, api_key: str | None = None, username: str | None = None, password: str | None = None, use_async: bool | None = False) AsyncElasticsearch [source]
- class memoryscope.core.storage.llama_index_sync_elasticsearch.ESCombinedRetrieveStrategy(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[source]
Bases:
AsyncDenseVectorStrategy
- __init__(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[source]
- es_query(*, query: str | None, query_vector: List[float] | None, text_field: str, vector_field: str, k: int, num_candidates: int, filter: List[Dict[str, Any]] | None = None) Dict[str, Any] [source]
Returns the Elasticsearch query body for the given parameters. The store will execute the query.
- Parameters:
query – The text query. Can be None if query_vector is given.
k – The total number of results to retrieve.
num_candidates – The number of results to fetch initially in knn search.
filter – List of filter clauses to apply to the query.
query_vector – The query vector. Can be None if a query string is given.
- Returns:
The Elasticsearch query body.
- before_index_creation(*, client: AsyncElasticsearch, text_field: str, vector_field: str) None [source]
Executes before the index is created. Used for setting up any required Elasticsearch resources like a pipeline. Defaults to a no-op.
- Parameters:
client – The Elasticsearch client.
text_field – The field containing the text data in the index.
vector_field – The field containing the vector representations in the index.
- class memoryscope.core.storage.llama_index_sync_elasticsearch.SyncElasticsearchStore(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None)[source]
Bases:
BasePydanticVectorStore
Elasticsearch vector store.
- Parameters:
index_name – Name of the Elasticsearch index.
es_client – Optional. Pre-existing AsyncElasticsearch client.
es_url – Optional. Elasticsearch URL.
es_cloud_id – Optional. Elasticsearch cloud ID.
es_api_key – Optional. Elasticsearch API key.
es_user – Optional. Elasticsearch username.
es_password – Optional. Elasticsearch password.
text_field – Optional. Name of the Elasticsearch field that stores the text.
vector_field – Optional. Name of the Elasticsearch field that stores the embedding.
batch_size – Optional. Batch size for bulk indexing. Defaults to 200.
distance_strategy – Optional. Distance strategy to use for similarity search. Defaults to “COSINE”.
retrieval_strategy – Retrieval strategy to use. AsyncBM25Strategy / AsyncSparseVectorStrategy / AsyncDenseVectorStrategy / AsyncRetrievalStrategy. Defaults to AsyncDenseVectorStrategy.
- Raises:
ConnectionError – If AsyncElasticsearch client cannot connect to Elasticsearch.
ValueError – If neither es_client nor es_url nor es_cloud_id is provided.
Examples
pip install llama-index-vector-stores-elasticsearch
```python from llama_index.vector_stores import ElasticsearchStore
# Additional setup for ElasticsearchStore class index_name = “my_index” es_url = “http://localhost:9200” es_cloud_id = “<cloud-id>” # Found within the deployment page es_user = “elastic” es_password = “<password>” # Provided when creating deployment or can be reset es_api_key = “<api-key>” # Create an API key within Kibana (Security -> API Keys)
# Connecting to ElasticsearchStore locally es_local = ElasticsearchStore(
index_name=index_name, es_url=es_url)
# Connecting to Elastic Cloud with username and password es_cloud_user_pass = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password)
# Connecting to Elastic Cloud with API Key es_cloud_api_key = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_api_key=es_api_key,
)
- stores_text: bool
- index_name: str
- es_client: Any | None
- es_url: str | None
- es_cloud_id: str | None
- es_api_key: str | None
- es_user: str | None
- es_password: str | None
- text_field: str
- vector_field: str
- batch_size: int
- distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None
- retrieval_strategy: AsyncRetrievalStrategy
- __init__(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None) None [source]
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- log_elasticsearch_dynamic: bool
- property client: Any
Get the asynchronous Elasticsearch client.
- Returns:
The asynchronous Elasticsearch client instance configured for this store.
- Return type:
Any
- add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [source]
Adds a list of nodes, each containing embeddings, to an Elasticsearch index. Optionally creates the index if it does not already exist.
- Parameters:
nodes (List[BaseNode]) – A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) – A flag indicating whether to create the Elasticsearch index if it’s not present. Defaults to True.
- Returns:
A list of node IDs that have been successfully added to the index.
- Return type:
List[str]
- Raises:
ImportError – If the ‘elasticsearch[async]’ Python package is not installed.
BulkIndexError – If there is a failure during the asynchronous bulk indexing with AsyncElasticsearch.
Note
This method delegates the actual operation to the sync_add method.
- sync_add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [source]
Asynchronously adds a list of nodes, each containing an embedding, to the Elasticsearch index.
This method processes each node to extract its ID, embedding, text content, and metadata, preparing them for batch insertion into the index. It ensures the index is created if not present and respects the dimensionality of the embeddings for consistency.
- Parameters:
nodes (List[BaseNode]) – A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) – A flag indicating whether to create the Elasticsearch index if it does not already exist. Defaults to True.
**add_kwargs (Any) – Additional keyword arguments passed to the underlying add_texts method for customization during the indexing process.
- Returns:
A list of node IDs that were successfully added to the index.
- Return type:
List[str]
- Raises:
ImportError – If the Elasticsearch Python client is not installed.
BulkIndexError – If there’s a failure during the asynchronous bulk indexing operation.
- delete(ref_doc_id: str, **delete_kwargs: Any) None [source]
Deletes a node from the Elasticsearch index using the provided reference document ID.
Optionally, extra keyword arguments can be supplied to customize the deletion behavior, which are passed directly to Elasticsearch’s delete_by_query operation.
- Parameters:
ref_doc_id (str) – The unique identifier of the node/document to be deleted.
delete_kwargs (Any) – Additional keyword arguments for Elasticsearch’s delete_by_query. These might include query filters, timeouts, or other operational configurations.
- Raises:
Exception – If the deletion operation via Elasticsearch’s delete_by_query fails.
Note
This method internally calls a synchronous delete method (sync_delete) to execute the deletion operation against Elasticsearch.
- sync_delete(ref_doc_id: str, **delete_kwargs: Any) None [source]
Synchronously deletes a node from the Elasticsearch index based on the reference document ID.
- Parameters:
ref_doc_id (str) – The unique identifier of the node/document to be deleted.
delete_kwargs (Any) – Optional keyword arguments to be passed to the delete_by_query operation of AsyncElasticsearch, allowing for additional customization of the deletion process.
- Raises:
Exception – If the deletion operation via AsyncElasticsearch’s delete_by_query fails.
Note
The function directly uses ‘_id’ field to match the document for deletion instead of ‘metadata.ref_doc_id’, ensuring targeted removal based on the document’s unique identifier within Elasticsearch.
- query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, **kwargs: Any) VectorStoreQueryResult [source]
Executes a query against the Elasticsearch index to retrieve the top k most similar nodes based on the input query embedding. Supports customization of the query process and application of Elasticsearch filters.
- Parameters:
query (VectorStoreQuery) – The query containing the embedding and other parameters.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) – An optional custom function to modify the Elasticsearch query body, allowing for additional query parameters or logic. Defaults to None.
es_filter (Optional[List[Dict]], optional) – An optional Elasticsearch filter list to apply to the query. If a filter is directly included in the query, this argument will not be used. Defaults to None.
**kwargs (Any) – Additional keyword arguments that might be used in the query process.
- Returns:
The result of the query operation, including the most similar nodes.
- Return type:
VectorStoreQueryResult
- Raises:
Exception – If an error occurs during the Elasticsearch query execution.
- is_embedding_query: bool
- sync_query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, fields: List[str] = []) VectorStoreQueryResult [source]
Asynchronously queries the Elasticsearch index for the top k most similar nodes based on the provided query embedding. Supports custom query modifications and application of Elasticsearch filters.
- Parameters:
query (VectorStoreQuery) – The query containing the embedding and other details.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) – A custom function to modify the Elasticsearch query body. Defaults to None.
es_filter (List[Dict], optional) – Additional filters to apply during the query. If filters are present in the query, these filters will not be used. Defaults to None.
fields (List[str], optional) –
.
- Returns:
- The result of the query, including nodes, their IDs,
and similarity scores.
- Return type:
VectorStoreQueryResult
- Raises:
Exception – If the Elasticsearch query encounters an error.
Note
The mode of the query must align with the retrieval strategy set for this store. In case of legacy metadata, a warning is logged and nodes are constructed accordingly.