memoryscope.core.storage

class memoryscope.core.storage.BaseMemoryStore[source]

Bases: object

An abstract base class defining the interface for a memory store which handles memory nodes. It outlines essential operations like retrieval, updating, flushing, and closing of memory scopes.

abstract retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode][source]

Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.

Parameters:
  • query (str) – The query string used to find relevant memories.

  • top_k (int) – The maximum number of MemoryNode objects to return.

  • filter_dict (Dict[str, List[str]]) – A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.

Returns:

A list of MemoryNode objects sorted by relevance to the query,

limited to top_k items.

Return type:

List[MemoryNode]

abstract async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode][source]

Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.

Parameters:
  • query (str) – The text to search for in memory nodes.

  • top_k (int) – Maximum number of nodes to return.

  • filter_dict (Dict[str, List[str]]) – Filters to apply on memory nodes.

Returns:

A list of up to top_k MemoryNode objects matching the criteria.

Return type:

List[MemoryNode]

abstract batch_insert(nodes: List[MemoryNode])[source]
abstract batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[source]
abstract batch_delete(nodes: List[MemoryNode])[source]
flush()[source]

Flushes any pending memory updates or operations to ensure data consistency. This method should be overridden by subclasses to provide the specific flushing mechanism.

abstract close()[source]

Closes the memory store, releasing any resources associated with it. Subclasses must implement this method to define how the memory store is properly closed.

class memoryscope.core.storage.BaseMonitor(**kwargs)[source]

Bases: object

An abstract base class defining the interface for monitor classes. Subclasses should implement the methods defined here to provide concrete monitoring behavior.

__init__(**kwargs)[source]
abstract add()[source]

Abstract method to add data or events to the monitor. This method should be implemented by subclasses to define how data is added into the monitoring system.

Returns:

None

abstract add_token()[source]

Abstract method to add a token or a specific type of identifier to the monitor. Subclasses should implement this to specify how tokens are managed within the monitoring context.

Returns:

None

flush()[source]

Method to flush any buffered data in the monitor. Intended to ensure that all pending recorded data is processed or written out.

Returns:

None

close()[source]

Method to close the monitor, performing necessary cleanup operations. This could include releasing resources, closing files, or any other termination tasks.

Returns:

None

class memoryscope.core.storage.DummyMemoryStore(embedding_model: BaseModel, **kwargs)[source]

Bases: BaseMemoryStore

Placeholder implementation of a memory storage system interface. Defines methods for querying, updating, and closing memory nodes with asynchronous capabilities, leveraging an embedding model for potential semantic retrieval. Actual storage operations are not implemented.

__init__(embedding_model: BaseModel, **kwargs)[source]

Initializes the DummyMemoryStore with an embedding model and additional keyword arguments.

Parameters:
  • embedding_model (BaseModel) – The model used to embed data for potential similarity-based retrieval.

  • **kwargs – Additional keyword arguments for configuration or future expansion.

retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode][source]

Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.

Parameters:
  • query (str) – The query string used to find relevant memories.

  • top_k (int) – The maximum number of MemoryNode objects to return.

  • filter_dict (Dict[str, List[str]]) – A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.

Returns:

A list of MemoryNode objects sorted by relevance to the query,

limited to top_k items.

Return type:

List[MemoryNode]

async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode][source]

Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.

Parameters:
  • query (str) – The text to search for in memory nodes.

  • top_k (int) – Maximum number of nodes to return.

  • filter_dict (Dict[str, List[str]]) – Filters to apply on memory nodes.

Returns:

A list of up to top_k MemoryNode objects matching the criteria.

Return type:

List[MemoryNode]

batch_insert(nodes: List[MemoryNode])[source]
batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[source]
batch_delete(nodes: List[MemoryNode])[source]
close()[source]

Closes the memory store, releasing any resources associated with it. Subclasses must implement this method to define how the memory store is properly closed.

class memoryscope.core.storage.DummyMonitor(**kwargs)[source]

Bases: BaseMonitor

DummyMonitor serves as a placeholder or mock class extending BaseMonitor, providing empty method bodies for ‘add’, ‘add_token’, and ‘close’ operations. This can be used for testing or in situations where a full monitor implementation is not required.

add()[source]

Placeholder for adding data to the monitor. This method currently does nothing.

add_token()[source]

Placeholder for adding a token to the monitored data. This method currently does nothing.

close()[source]

Placeholder for closing the monitor and performing any necessary cleanup. This method currently does nothing.

class memoryscope.core.storage.LlamaIndexEsMemoryStore(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[source]

Bases: BaseMemoryStore

__init__(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[source]
retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode][source]

Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.

Parameters:
  • query (str) – The query string used to find relevant memories.

  • top_k (int) – The maximum number of MemoryNode objects to return.

  • filter_dict (Dict[str, List[str]]) – A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.

Returns:

A list of MemoryNode objects sorted by relevance to the query,

limited to top_k items.

Return type:

List[MemoryNode]

async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode][source]

Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.

Parameters:
  • query (str) – The text to search for in memory nodes.

  • top_k (int) – Maximum number of nodes to return.

  • filter_dict (Dict[str, List[str]]) – Filters to apply on memory nodes.

Returns:

A list of up to top_k MemoryNode objects matching the criteria.

Return type:

List[MemoryNode]

batch_insert(nodes: List[MemoryNode])[source]
batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[source]
batch_delete(nodes: List[MemoryNode])[source]
insert(node: MemoryNode)[source]
delete(node: MemoryNode)[source]
update(node: MemoryNode, update_embedding: bool = True)[source]
close()[source]

Closes the Elasticsearch store, releasing any resources associated with it.

dummy_query_vector()[source]
class memoryscope.core.storage.ESCombinedRetrieveStrategy(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[source]

Bases: AsyncDenseVectorStrategy

__init__(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[source]
es_query(*, query: str | None, query_vector: List[float] | None, text_field: str, vector_field: str, k: int, num_candidates: int, filter: List[Dict[str, Any]] | None = None) Dict[str, Any][source]

Returns the Elasticsearch query body for the given parameters. The store will execute the query.

Parameters:
  • query – The text query. Can be None if query_vector is given.

  • k – The total number of results to retrieve.

  • num_candidates – The number of results to fetch initially in knn search.

  • filter – List of filter clauses to apply to the query.

  • query_vector – The query vector. Can be None if a query string is given.

Returns:

The Elasticsearch query body.

before_index_creation(*, client: AsyncElasticsearch, text_field: str, vector_field: str) None[source]

Executes before the index is created. Used for setting up any required Elasticsearch resources like a pipeline. Defaults to a no-op.

Parameters:
  • client – The Elasticsearch client.

  • text_field – The field containing the text data in the index.

  • vector_field – The field containing the vector representations in the index.

class memoryscope.core.storage.SyncElasticsearchStore(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None)[source]

Bases: BasePydanticVectorStore

Elasticsearch vector store.

Parameters:
  • index_name – Name of the Elasticsearch index.

  • es_client – Optional. Pre-existing AsyncElasticsearch client.

  • es_url – Optional. Elasticsearch URL.

  • es_cloud_id – Optional. Elasticsearch cloud ID.

  • es_api_key – Optional. Elasticsearch API key.

  • es_user – Optional. Elasticsearch username.

  • es_password – Optional. Elasticsearch password.

  • text_field – Optional. Name of the Elasticsearch field that stores the text.

  • vector_field – Optional. Name of the Elasticsearch field that stores the embedding.

  • batch_size – Optional. Batch size for bulk indexing. Defaults to 200.

  • distance_strategy – Optional. Distance strategy to use for similarity search. Defaults to “COSINE”.

  • retrieval_strategy – Retrieval strategy to use. AsyncBM25Strategy / AsyncSparseVectorStrategy / AsyncDenseVectorStrategy / AsyncRetrievalStrategy. Defaults to AsyncDenseVectorStrategy.

Raises:
  • ConnectionError – If AsyncElasticsearch client cannot connect to Elasticsearch.

  • ValueError – If neither es_client nor es_url nor es_cloud_id is provided.

Examples

pip install llama-index-vector-stores-elasticsearch

```python from llama_index.vector_stores import ElasticsearchStore

# Additional setup for ElasticsearchStore class index_name = “my_index” es_url = “http://localhost:9200” es_cloud_id = “<cloud-id>” # Found within the deployment page es_user = “elastic” es_password = “<password>” # Provided when creating deployment or can be reset es_api_key = “<api-key>” # Create an API key within Kibana (Security -> API Keys)

# Connecting to ElasticsearchStore locally es_local = ElasticsearchStore(

index_name=index_name, es_url=es_url)

# Connecting to Elastic Cloud with username and password es_cloud_user_pass = ElasticsearchStore(

index_name=index_name, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password)

# Connecting to Elastic Cloud with API Key es_cloud_api_key = ElasticsearchStore(

index_name=index_name, es_cloud_id=es_cloud_id, es_api_key=es_api_key,

)

class Config[source]

Bases: object

arbitrary_types_allowed = True
stores_text: bool
index_name: str
es_client: Any | None
es_url: str | None
es_cloud_id: str | None
es_api_key: str | None
es_user: str | None
es_password: str | None
text_field: str
vector_field: str
batch_size: int
distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None
retrieval_strategy: AsyncRetrievalStrategy
__init__(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None) None[source]

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

logger: Logger
log_elasticsearch_dynamic: bool
property client: Any

Get the asynchronous Elasticsearch client.

Returns:

The asynchronous Elasticsearch client instance configured for this store.

Return type:

Any

close() None[source]
add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str][source]

Adds a list of nodes, each containing embeddings, to an Elasticsearch index. Optionally creates the index if it does not already exist.

Parameters:
  • nodes (List[BaseNode]) – A list of node objects, each encapsulating an embedding.

  • create_index_if_not_exists (bool, optional) – A flag indicating whether to create the Elasticsearch index if it’s not present. Defaults to True.

Returns:

A list of node IDs that have been successfully added to the index.

Return type:

List[str]

Raises:
  • ImportError – If the ‘elasticsearch[async]’ Python package is not installed.

  • BulkIndexError – If there is a failure during the asynchronous bulk indexing with AsyncElasticsearch.

Note

This method delegates the actual operation to the sync_add method.

sync_add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str][source]

Asynchronously adds a list of nodes, each containing an embedding, to the Elasticsearch index.

This method processes each node to extract its ID, embedding, text content, and metadata, preparing them for batch insertion into the index. It ensures the index is created if not present and respects the dimensionality of the embeddings for consistency.

Parameters:
  • nodes (List[BaseNode]) – A list of node objects, each encapsulating an embedding.

  • create_index_if_not_exists (bool, optional) – A flag indicating whether to create the Elasticsearch index if it does not already exist. Defaults to True.

  • **add_kwargs (Any) – Additional keyword arguments passed to the underlying add_texts method for customization during the indexing process.

Returns:

A list of node IDs that were successfully added to the index.

Return type:

List[str]

Raises:
  • ImportError – If the Elasticsearch Python client is not installed.

  • BulkIndexError – If there’s a failure during the asynchronous bulk indexing operation.

delete(ref_doc_id: str, **delete_kwargs: Any) None[source]

Deletes a node from the Elasticsearch index using the provided reference document ID.

Optionally, extra keyword arguments can be supplied to customize the deletion behavior, which are passed directly to Elasticsearch’s delete_by_query operation.

Parameters:
  • ref_doc_id (str) – The unique identifier of the node/document to be deleted.

  • delete_kwargs (Any) – Additional keyword arguments for Elasticsearch’s delete_by_query. These might include query filters, timeouts, or other operational configurations.

Raises:

Exception – If the deletion operation via Elasticsearch’s delete_by_query fails.

Note

This method internally calls a synchronous delete method (sync_delete) to execute the deletion operation against Elasticsearch.

sync_delete(ref_doc_id: str, **delete_kwargs: Any) None[source]

Synchronously deletes a node from the Elasticsearch index based on the reference document ID.

Parameters:
  • ref_doc_id (str) – The unique identifier of the node/document to be deleted.

  • delete_kwargs (Any) – Optional keyword arguments to be passed to the delete_by_query operation of AsyncElasticsearch, allowing for additional customization of the deletion process.

Raises:

Exception – If the deletion operation via AsyncElasticsearch’s delete_by_query fails.

Note

The function directly uses ‘_id’ field to match the document for deletion instead of ‘metadata.ref_doc_id’, ensuring targeted removal based on the document’s unique identifier within Elasticsearch.

query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, **kwargs: Any) VectorStoreQueryResult[source]

Executes a query against the Elasticsearch index to retrieve the top k most similar nodes based on the input query embedding. Supports customization of the query process and application of Elasticsearch filters.

Parameters:
  • query (VectorStoreQuery) – The query containing the embedding and other parameters.

  • custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) – An optional custom function to modify the Elasticsearch query body, allowing for additional query parameters or logic. Defaults to None.

  • es_filter (Optional[List[Dict]], optional) – An optional Elasticsearch filter list to apply to the query. If a filter is directly included in the query, this argument will not be used. Defaults to None.

  • **kwargs (Any) – Additional keyword arguments that might be used in the query process.

Returns:

The result of the query operation, including the most similar nodes.

Return type:

VectorStoreQueryResult

Raises:

Exception – If an error occurs during the Elasticsearch query execution.

sync_delete_all()[source]
sync_search_all()[source]
log_vector_store_brief(title='current vector store content')[source]
sync_search_all_with_filter(es_filter, fields)[source]
sync_query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, fields: List[str] = []) VectorStoreQueryResult[source]

Asynchronously queries the Elasticsearch index for the top k most similar nodes based on the provided query embedding. Supports custom query modifications and application of Elasticsearch filters.

Parameters:
  • query (VectorStoreQuery) – The query containing the embedding and other details.

  • custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) – A custom function to modify the Elasticsearch query body. Defaults to None.

  • es_filter (List[Dict], optional) – Additional filters to apply during the query. If filters are present in the query, these filters will not be used. Defaults to None.

  • fields (List[str], optional) –

    .

Returns:

The result of the query, including nodes, their IDs,

and similarity scores.

Return type:

VectorStoreQueryResult

Raises:

Exception – If the Elasticsearch query encounters an error.

Note

The mode of the query must align with the retrieval strategy set for this store. In case of legacy metadata, a warning is logged and nodes are constructed accordingly.

post_process_hits(hits: List[Dict[str, Any]]) VectorStoreQueryResult[source]