memoryscope.core.storage
- class memoryscope.core.storage.BaseMemoryStore[源代码]
基类:
object
An abstract base class defining the interface for a memory store which handles memory nodes. It outlines essential operations like retrieval, updating, flushing, and closing of memory scopes.
- abstract retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- abstract async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- abstract batch_insert(nodes: List[MemoryNode])[源代码]
- abstract batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[源代码]
- abstract batch_delete(nodes: List[MemoryNode])[源代码]
- class memoryscope.core.storage.BaseMonitor(**kwargs)[源代码]
基类:
object
An abstract base class defining the interface for monitor classes. Subclasses should implement the methods defined here to provide concrete monitoring behavior.
- abstract add()[源代码]
Abstract method to add data or events to the monitor. This method should be implemented by subclasses to define how data is added into the monitoring system.
- 返回:
None
- abstract add_token()[源代码]
Abstract method to add a token or a specific type of identifier to the monitor. Subclasses should implement this to specify how tokens are managed within the monitoring context.
- 返回:
None
- class memoryscope.core.storage.DummyMemoryStore(embedding_model: BaseModel, **kwargs)[源代码]
-
Placeholder implementation of a memory storage system interface. Defines methods for querying, updating, and closing memory nodes with asynchronous capabilities, leveraging an embedding model for potential semantic retrieval. Actual storage operations are not implemented.
- __init__(embedding_model: BaseModel, **kwargs)[源代码]
Initializes the DummyMemoryStore with an embedding model and additional keyword arguments.
- 参数:
embedding_model (BaseModel) -- The model used to embed data for potential similarity-based retrieval.
**kwargs -- Additional keyword arguments for configuration or future expansion.
- retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- batch_insert(nodes: List[MemoryNode])[源代码]
- batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[源代码]
- batch_delete(nodes: List[MemoryNode])[源代码]
- class memoryscope.core.storage.DummyMonitor(**kwargs)[源代码]
基类:
BaseMonitor
DummyMonitor serves as a placeholder or mock class extending BaseMonitor, providing empty method bodies for 'add', 'add_token', and 'close' operations. This can be used for testing or in situations where a full monitor implementation is not required.
- class memoryscope.core.storage.LlamaIndexEsMemoryStore(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[源代码]
-
- __init__(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[源代码]
- retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- batch_insert(nodes: List[MemoryNode])[源代码]
- batch_update(nodes: List[MemoryNode], update_embedding: bool = True)[源代码]
- batch_delete(nodes: List[MemoryNode])[源代码]
- insert(node: MemoryNode)[源代码]
- delete(node: MemoryNode)[源代码]
- update(node: MemoryNode, update_embedding: bool = True)[源代码]
- class memoryscope.core.storage.ESCombinedRetrieveStrategy(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[源代码]
基类:
AsyncDenseVectorStrategy
- __init__(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[源代码]
- es_query(*, query: str | None, query_vector: List[float] | None, text_field: str, vector_field: str, k: int, num_candidates: int, filter: List[Dict[str, Any]] | None = None) Dict[str, Any] [源代码]
Returns the Elasticsearch query body for the given parameters. The store will execute the query.
- 参数:
query -- The text query. Can be None if query_vector is given.
k -- The total number of results to retrieve.
num_candidates -- The number of results to fetch initially in knn search.
filter -- List of filter clauses to apply to the query.
query_vector -- The query vector. Can be None if a query string is given.
- 返回:
The Elasticsearch query body.
- before_index_creation(*, client: AsyncElasticsearch, text_field: str, vector_field: str) None [源代码]
Executes before the index is created. Used for setting up any required Elasticsearch resources like a pipeline. Defaults to a no-op.
- 参数:
client -- The Elasticsearch client.
text_field -- The field containing the text data in the index.
vector_field -- The field containing the vector representations in the index.
- class memoryscope.core.storage.SyncElasticsearchStore(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None)[源代码]
基类:
BasePydanticVectorStore
Elasticsearch vector store.
- 参数:
index_name -- Name of the Elasticsearch index.
es_client -- Optional. Pre-existing AsyncElasticsearch client.
es_url -- Optional. Elasticsearch URL.
es_cloud_id -- Optional. Elasticsearch cloud ID.
es_api_key -- Optional. Elasticsearch API key.
es_user -- Optional. Elasticsearch username.
es_password -- Optional. Elasticsearch password.
text_field -- Optional. Name of the Elasticsearch field that stores the text.
vector_field -- Optional. Name of the Elasticsearch field that stores the embedding.
batch_size -- Optional. Batch size for bulk indexing. Defaults to 200.
distance_strategy -- Optional. Distance strategy to use for similarity search. Defaults to "COSINE".
retrieval_strategy -- Retrieval strategy to use. AsyncBM25Strategy / AsyncSparseVectorStrategy / AsyncDenseVectorStrategy / AsyncRetrievalStrategy. Defaults to AsyncDenseVectorStrategy.
- 抛出:
ConnectionError -- If AsyncElasticsearch client cannot connect to Elasticsearch.
ValueError -- If neither es_client nor es_url nor es_cloud_id is provided.
示例
pip install llama-index-vector-stores-elasticsearch
```python from llama_index.vector_stores import ElasticsearchStore
# Additional setup for ElasticsearchStore class index_name = "my_index" es_url = "http://localhost:9200" es_cloud_id = "<cloud-id>" # Found within the deployment page es_user = "elastic" es_password = "<password>" # Provided when creating deployment or can be reset es_api_key = "<api-key>" # Create an API key within Kibana (Security -> API Keys)
# Connecting to ElasticsearchStore locally es_local = ElasticsearchStore(
index_name=index_name, es_url=es_url)
# Connecting to Elastic Cloud with username and password es_cloud_user_pass = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password)
# Connecting to Elastic Cloud with API Key es_cloud_api_key = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_api_key=es_api_key,
)
- stores_text: bool
- index_name: str
- es_client: Any | None
- es_url: str | None
- es_cloud_id: str | None
- es_api_key: str | None
- es_user: str | None
- es_password: str | None
- text_field: str
- vector_field: str
- batch_size: int
- distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None
- retrieval_strategy: AsyncRetrievalStrategy
- __init__(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None) None [源代码]
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- log_elasticsearch_dynamic: bool
- property client: Any
Get the asynchronous Elasticsearch client.
- 返回:
The asynchronous Elasticsearch client instance configured for this store.
- 返回类型:
Any
- add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [源代码]
Adds a list of nodes, each containing embeddings, to an Elasticsearch index. Optionally creates the index if it does not already exist.
- 参数:
nodes (List[BaseNode]) -- A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) -- A flag indicating whether to create the Elasticsearch index if it's not present. Defaults to True.
- 返回:
A list of node IDs that have been successfully added to the index.
- 返回类型:
List[str]
- 抛出:
ImportError -- If the 'elasticsearch[async]' Python package is not installed.
BulkIndexError -- If there is a failure during the asynchronous bulk indexing with AsyncElasticsearch.
备注
This method delegates the actual operation to the sync_add method.
- sync_add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [源代码]
Asynchronously adds a list of nodes, each containing an embedding, to the Elasticsearch index.
This method processes each node to extract its ID, embedding, text content, and metadata, preparing them for batch insertion into the index. It ensures the index is created if not present and respects the dimensionality of the embeddings for consistency.
- 参数:
nodes (List[BaseNode]) -- A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) -- A flag indicating whether to create the Elasticsearch index if it does not already exist. Defaults to True.
**add_kwargs (Any) -- Additional keyword arguments passed to the underlying add_texts method for customization during the indexing process.
- 返回:
A list of node IDs that were successfully added to the index.
- 返回类型:
List[str]
- 抛出:
ImportError -- If the Elasticsearch Python client is not installed.
BulkIndexError -- If there's a failure during the asynchronous bulk indexing operation.
- delete(ref_doc_id: str, **delete_kwargs: Any) None [源代码]
Deletes a node from the Elasticsearch index using the provided reference document ID.
Optionally, extra keyword arguments can be supplied to customize the deletion behavior, which are passed directly to Elasticsearch's delete_by_query operation.
- 参数:
ref_doc_id (str) -- The unique identifier of the node/document to be deleted.
delete_kwargs (Any) -- Additional keyword arguments for Elasticsearch's delete_by_query. These might include query filters, timeouts, or other operational configurations.
- 抛出:
Exception -- If the deletion operation via Elasticsearch's delete_by_query fails.
备注
This method internally calls a synchronous delete method (sync_delete) to execute the deletion operation against Elasticsearch.
- sync_delete(ref_doc_id: str, **delete_kwargs: Any) None [源代码]
Synchronously deletes a node from the Elasticsearch index based on the reference document ID.
- 参数:
ref_doc_id (str) -- The unique identifier of the node/document to be deleted.
delete_kwargs (Any) -- Optional keyword arguments to be passed to the delete_by_query operation of AsyncElasticsearch, allowing for additional customization of the deletion process.
- 抛出:
Exception -- If the deletion operation via AsyncElasticsearch's delete_by_query fails.
备注
The function directly uses '_id' field to match the document for deletion instead of 'metadata.ref_doc_id', ensuring targeted removal based on the document's unique identifier within Elasticsearch.
- query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, **kwargs: Any) VectorStoreQueryResult [源代码]
Executes a query against the Elasticsearch index to retrieve the top k most similar nodes based on the input query embedding. Supports customization of the query process and application of Elasticsearch filters.
- 参数:
query (VectorStoreQuery) -- The query containing the embedding and other parameters.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) -- An optional custom function to modify the Elasticsearch query body, allowing for additional query parameters or logic. Defaults to None.
es_filter (Optional[List[Dict]], optional) -- An optional Elasticsearch filter list to apply to the query. If a filter is directly included in the query, this argument will not be used. Defaults to None.
**kwargs (Any) -- Additional keyword arguments that might be used in the query process.
- 返回:
The result of the query operation, including the most similar nodes.
- 返回类型:
VectorStoreQueryResult
- 抛出:
Exception -- If an error occurs during the Elasticsearch query execution.
- sync_query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, fields: List[str] = []) VectorStoreQueryResult [源代码]
Asynchronously queries the Elasticsearch index for the top k most similar nodes based on the provided query embedding. Supports custom query modifications and application of Elasticsearch filters.
- 参数:
query (VectorStoreQuery) -- The query containing the embedding and other details.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) -- A custom function to modify the Elasticsearch query body. Defaults to None.
es_filter (List[Dict], optional) -- Additional filters to apply during the query. If filters are present in the query, these filters will not be used. Defaults to None.
fields (List[str], optional) --
.
- 返回:
- The result of the query, including nodes, their IDs,
and similarity scores.
- 返回类型:
VectorStoreQueryResult
- 抛出:
Exception -- If the Elasticsearch query encounters an error.
备注
The mode of the query must align with the retrieval strategy set for this store. In case of legacy metadata, a warning is logged and nodes are constructed accordingly.