MemoryScope API 接口文档
Enumeration
- class memoryscope.enumeration.ActionStatusEnum(value)[源代码]
Enumeration representing various statuses of a memory node.
Each status reflects a different state of the node in terms of its lifecycle or content: - NEW: Indicates a newly created node. - MODIFIED: Signifies that the node has been altered. - CONTENT_MODIFIED: Specifies changes in the actual content of the node. - NONE: do nothing. - DELETE: delete memories.
- class memoryscope.enumeration.LanguageEnum(value)[源代码]
An enumeration representing supported languages.
- Members:
CN: Represents the Chinese language.
EN: Represents the English language.
- class memoryscope.enumeration.MemoryTypeEnum(value)[源代码]
Defines an enumeration for different types of memory categories.
Each member represents a distinct type of memory content: - CONVERSATION: Represents conversation-based memories. - OBSERVATION: Denotes observational memories. - INSIGHT: Indicates insightful memories derived from analysis. - OBS_CUSTOMIZED: Customized observational memories.
- class memoryscope.enumeration.MessageRoleEnum(value)[源代码]
Enumeration for different message roles within a conversation context.
This enumeration includes predefined roles such as User, Assistant, and System, which can be used to categorize messages in chat interfaces, AI interactions, or any system that involves distinct participant roles.
- class memoryscope.enumeration.ModelEnum(value)[源代码]
An enumeration representing different types of models used within the system.
- Members:
GENERATION_MODEL: Represents a model responsible for generating content. EMBEDDING_MODEL: Represents a model tasked with creating embeddings, typically used for transforming data into a
numerical form suitable for machine learning tasks.
RANK_MODEL: Denotes a model that specializes in ranking, often used to order items based on relevance.
Scheme
- pydantic model memoryscope.scheme.MemoryNode[源代码]
Represents a memory node with comprehensive attributes to store memory information including unique ID, user details, content, metadata, scoring metrics. Automatically handles timestamp conversion to date format during initialization.
Show JSON schema
{ "title": "MemoryNode", "description": "Represents a memory node with comprehensive attributes to store memory information including unique ID,\nuser details, content, metadata, scoring metrics.\nAutomatically handles timestamp conversion to date format during initialization.", "type": "object", "properties": { "memory_id": { "description": "unique id for memory", "title": "Memory Id", "type": "string" }, "user_name": { "default": "", "description": "the user who owns the memory", "title": "User Name", "type": "string" }, "target_name": { "default": "", "description": "target name described by the memory", "title": "Target Name", "type": "string" }, "meta_data": { "additionalProperties": { "type": "string" }, "default": {}, "description": "meta data infos", "title": "Meta Data", "type": "object" }, "content": { "default": "", "description": "memory content", "title": "Content", "type": "string" }, "key": { "default": "", "description": "memory key", "title": "Key", "type": "string" }, "key_vector": { "default": [], "description": "memory key embedding result", "items": { "type": "number" }, "title": "Key Vector", "type": "array" }, "value": { "default": "", "description": "memory value", "title": "Value", "type": "string" }, "score_recall": { "default": 0, "description": "embedding similarity score used in recall stage", "title": "Score Recall", "type": "number" }, "score_rank": { "default": 0, "description": "rank model score used in rank stage", "title": "Score Rank", "type": "number" }, "score_rerank": { "default": 0, "description": "rerank score used in rerank stage", "title": "Score Rerank", "type": "number" }, "memory_type": { "default": "", "description": "conversation / observation / insight...", "title": "Memory Type", "type": "string" }, "action_status": { "default": "none", "description": "new / content_modified / modified / deleted / none", "title": "Action Status", "type": "string" }, "store_status": { "default": "valid", "description": "store_status: valid / expired", "title": "Store Status", "type": "string" }, "vector": { "default": [], "description": "content embedding result", "items": { "type": "number" }, "title": "Vector", "type": "array" }, "timestamp": { "description": "timestamp of the memory node", "title": "Timestamp", "type": "integer" }, "dt": { "default": "", "description": "dt of the memory node", "title": "Dt", "type": "string" }, "obs_reflected": { "default": 0, "description": "if the observation is reflected: 0/1", "title": "Obs Reflected", "type": "integer" }, "obs_updated": { "default": 0, "description": "if the observation has updated user profile or insight: 0/1", "title": "Obs Updated", "type": "integer" } } }
- Fields:
- field memory_id: str [Optional]
unique id for memory
- field user_name: str = ''
the user who owns the memory
- field target_name: str = ''
target name described by the memory
- field meta_data: Dict[str, str] = {}
meta data infos
- field content: str = ''
memory content
- field key: str = ''
memory key
- field key_vector: List[float] = []
memory key embedding result
- field value: str = ''
memory value
- field score_recall: float = 0
embedding similarity score used in recall stage
- field score_rank: float = 0
rank model score used in rank stage
- field score_rerank: float = 0
rerank score used in rerank stage
- field memory_type: str = ''
conversation / observation / insight...
- field action_status: str = 'none'
new / content_modified / modified / deleted / none
- field store_status: str = 'valid'
store_status: valid / expired
- field vector: List[float] = []
content embedding result
- field timestamp: int [Optional]
timestamp of the memory node
- field obs_reflected: int = 0
if the observation is reflected: 0/1
- field obs_updated: int = 0
if the observation has updated user profile or insight: 0/1
- __init__(**kwargs)[源代码]
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- field dt: str = ''
dt of the memory node
- property node_keys
- pydantic model memoryscope.scheme.Message[源代码]
Represents a structured message object with details about the sender, content, and metadata.
- role
The role of the message sender (e.g., 'user', 'assistant', 'system').
- Type:
str
- role_name
Optional name associated with the role of the message sender.
- Type:
str
- content
The actual content or text of the message.
- Type:
str
- time_created
Timestamp indicating when the message was created.
- Type:
int
- memorized
Flag to indicate if the message has been saved or remembered.
- Type:
bool
- meta_data
Additional data or context attached to the message.
- Type:
Dict[str, str]
Show JSON schema
{ "title": "Message", "description": "Represents a structured message object with details about the sender, content, and metadata.\n\nAttributes:\n role (str): The role of the message sender (e.g., 'user', 'assistant', 'system').\n role_name (str): Optional name associated with the role of the message sender.\n content (str): The actual content or text of the message.\n time_created (int): Timestamp indicating when the message was created.\n memorized (bool): Flag to indicate if the message has been saved or remembered.\n meta_data (Dict[str, str]): Additional data or context attached to the message.", "type": "object", "properties": { "role": { "description": "The role of the message sender (user, assistant, system)", "title": "Role", "type": "string" }, "role_name": { "default": "", "description": "Name describing the role of the message sender", "title": "Role Name", "type": "string" }, "content": { "description": "The primary content of the message", "title": "Content", "type": "string" }, "time_created": { "description": "Timestamp marking the message creation time", "title": "Time Created", "type": "integer" }, "memorized": { "default": false, "description": "Indicates if the message is flagged for memory retention", "title": "Memorized", "type": "boolean" }, "meta_data": { "additionalProperties": { "type": "string" }, "default": {}, "description": "Supplementary data attached to the message", "title": "Meta Data", "type": "object" } }, "required": [ "role", "content" ] }
- Fields:
- field role: str [Required]
The role of the message sender (user, assistant, system)
- field role_name: str = ''
Name describing the role of the message sender
- field content: str [Required]
The primary content of the message
- field time_created: int [Optional]
Timestamp marking the message creation time
- field memorized: bool = False
Indicates if the message is flagged for memory retention
- field meta_data: Dict[str, str] = {}
Supplementary data attached to the message
- pydantic model memoryscope.scheme.ModelResponse[源代码]
Show JSON schema
{ "title": "ModelResponse", "type": "object", "properties": { "message": { "anyOf": [ { "$ref": "#/$defs/Message" }, { "type": "null" } ], "default": null, "description": "generation model result" }, "delta": { "default": "", "description": "New text that just streamed in (only used when streaming)", "title": "Delta", "type": "string" }, "embedding_results": { "anyOf": [ { "items": { "items": { "type": "number" }, "type": "array" }, "type": "array" }, { "items": { "type": "number" }, "type": "array" } ], "default": [], "description": "embedding vector", "title": "Embedding Results" }, "rank_scores": { "additionalProperties": { "type": "number" }, "default": {}, "description": "The rank scores of each documents. key: index, value: rank score", "title": "Rank Scores", "type": "object" }, "m_type": { "$ref": "#/$defs/ModelEnum", "default": "generation_model", "description": "One of LLM, EMB, RANK." }, "status": { "default": true, "description": "Indicates whether the model call was successful.", "title": "Status", "type": "boolean" }, "details": { "default": "", "description": "The details information for model call, usually for storage of raw response or failure messages.", "title": "Details", "type": "string" }, "raw": { "default": "", "description": "Raw response from model call", "title": "Raw" }, "meta_data": { "default": {}, "description": "meta data for model response", "title": "Meta Data", "type": "object" } }, "$defs": { "Message": { "description": "Represents a structured message object with details about the sender, content, and metadata.\n\nAttributes:\n role (str): The role of the message sender (e.g., 'user', 'assistant', 'system').\n role_name (str): Optional name associated with the role of the message sender.\n content (str): The actual content or text of the message.\n time_created (int): Timestamp indicating when the message was created.\n memorized (bool): Flag to indicate if the message has been saved or remembered.\n meta_data (Dict[str, str]): Additional data or context attached to the message.", "properties": { "role": { "description": "The role of the message sender (user, assistant, system)", "title": "Role", "type": "string" }, "role_name": { "default": "", "description": "Name describing the role of the message sender", "title": "Role Name", "type": "string" }, "content": { "description": "The primary content of the message", "title": "Content", "type": "string" }, "time_created": { "description": "Timestamp marking the message creation time", "title": "Time Created", "type": "integer" }, "memorized": { "default": false, "description": "Indicates if the message is flagged for memory retention", "title": "Memorized", "type": "boolean" }, "meta_data": { "additionalProperties": { "type": "string" }, "default": {}, "description": "Supplementary data attached to the message", "title": "Meta Data", "type": "object" } }, "required": [ "role", "content" ], "title": "Message", "type": "object" }, "ModelEnum": { "description": "An enumeration representing different types of models used within the system.\n\nMembers:\n GENERATION_MODEL: Represents a model responsible for generating content.\n EMBEDDING_MODEL: Represents a model tasked with creating embeddings, typically used for transforming data into a\n numerical form suitable for machine learning tasks.\n RANK_MODEL: Denotes a model that specializes in ranking, often used to order items based on relevance.", "enum": [ "generation_model", "embedding_model", "rank_model" ], "title": "ModelEnum", "type": "string" } } }
- Fields:
- field delta: str = ''
New text that just streamed in (only used when streaming)
- field embedding_results: List[List[float]] | List[float] = []
embedding vector
- field rank_scores: Dict[int, float] = {}
The rank scores of each documents. key: index, value: rank score
- field status: bool = True
Indicates whether the model call was successful.
- field details: str = ''
The details information for model call, usually for storage of raw response or failure messages.
- field raw: Any = ''
Raw response from model call
- field meta_data: Dict[str, Any] = {}
meta data for model response
Config
- class memoryscope.core.config.Arguments(language: Literal['cn', 'en'] = 'cn', thread_pool_max_workers: int = 5, memory_chat_class: str = 'cli_memory_chat', chat_stream: bool | None = None, human_name: str = 'user', assistant_name: str = 'AI', consolidate_memory_interval_time: int | None = 1, reflect_and_reconsolidate_interval_time: int | None = 15, worker_params: Dict[str, dict] = <factory>, generation_backend: str = 'dashscope_generation', generation_model: str = 'qwen-max', generation_params: dict = <factory>, embedding_backend: str = 'dashscope_generation', embedding_model: str = 'text-embedding-v2', embedding_params: dict = <factory>, rank_backend: str = 'dashscope_rank', rank_model: str = 'gte-rerank', rank_params: dict = <factory>, es_index_name: str = 'memory_index', es_url: str = 'http://localhost:9200', retrieve_mode: str = 'dense', enable_ranker: bool = False, enable_today_contra_repeat: bool = True, enable_long_contra_repeat: bool = False, output_memory_max_count: int = 20)[源代码]
- __init__(language: ~typing.Literal['cn', 'en'] = 'cn', thread_pool_max_workers: int = 5, memory_chat_class: str = 'cli_memory_chat', chat_stream: bool | None = None, human_name: str = 'user', assistant_name: str = 'AI', consolidate_memory_interval_time: int | None = 1, reflect_and_reconsolidate_interval_time: int | None = 15, worker_params: ~typing.Dict[str, dict] = <factory>, generation_backend: str = 'dashscope_generation', generation_model: str = 'qwen-max', generation_params: dict = <factory>, embedding_backend: str = 'dashscope_generation', embedding_model: str = 'text-embedding-v2', embedding_params: dict = <factory>, rank_backend: str = 'dashscope_rank', rank_model: str = 'gte-rerank', rank_params: dict = <factory>, es_index_name: str = 'memory_index', es_url: str = 'http://localhost:9200', retrieve_mode: str = 'dense', enable_ranker: bool = False, enable_today_contra_repeat: bool = True, enable_long_contra_repeat: bool = False, output_memory_max_count: int = 20) None
Models
- class memoryscope.core.models.DummyGenerationModel(model_name: str, module_name: str, timeout: int | None = None, max_retries: int = 3, retry_interval: float = 1.0, kwargs_filter: bool = True, raise_exception: bool = True, **kwargs)[源代码]
The DummyGenerationModel class serves as a placeholder model for generating responses. It processes input prompts or sequences of messages, adapting them into a structure compatible with chat interfaces. It also facilitates the generation of mock (dummy) responses for testing, supporting both immediate and streamed output.
- before_call(model_response: ModelResponse, **kwargs)[源代码]
Prepares the input data before making a call to the language model. It accepts either a 'prompt' directly or a list of 'messages'. If 'prompt' is provided, it sets the data accordingly. If 'messages' are provided, it constructs a list of ChatMessage objects from the list. Raises an error if neither 'prompt' nor 'messages' are supplied.
- 参数:
model_response -- model_response
**kwargs -- Arbitrary keyword arguments including 'prompt' and 'messages'.
- 抛出:
RuntimeError -- When both 'prompt' and 'messages' inputs are not provided.
- after_call(model_response: ModelResponse, stream: bool = False, **kwargs) ModelResponse | Generator[ModelResponse, None, None] [源代码]
Processes the model's response post-call, optionally streaming the output or returning it as a whole.
This method modifies the input model_response by resetting its message content and, based on the stream parameter, either yields the response in a generated stream or returns the complete response directly.
- 参数:
model_response (ModelResponse) -- The initial response object to be processed.
stream (bool, optional) -- Flag indicating whether to stream the response. Defaults to False.
**kwargs -- Additional keyword arguments (not used in this implementation).
- 返回:
- If stream is True, a generator yielding updated ModelResponse objects;
otherwise, a modified ModelResponse object with the complete content.
- 返回类型:
ModelResponse | ModelResponseGen
- class memoryscope.core.models.LlamaIndexEmbeddingModel(*args, **kwargs)[源代码]
Manages text embeddings utilizing the DashScopeEmbedding within the LlamaIndex framework, facilitating embedding operations for both sync and async modes, inheriting from BaseModel.
- class memoryscope.core.models.LlamaIndexGenerationModel(*args, **kwargs)[源代码]
This class represents a generation model within the LlamaIndex framework, capable of processing input prompts or message histories, selecting an appropriate language model service from a registry, and generating text responses, with support for both streaming and non-streaming modes. It encapsulates logic for formatting these interactions within the context of a memory scope management system.
- before_call(model_response: ModelResponse, **kwargs)[源代码]
Prepares the input data before making a call to the language model. It accepts either a 'prompt' directly or a list of 'messages'. If 'prompt' is provided, it sets the data accordingly. If 'messages' are provided, it constructs a list of ChatMessage objects from the list. Raises an error if neither 'prompt' nor 'messages' are supplied.
- 参数:
model_response -- model_response
**kwargs -- Arbitrary keyword arguments including 'prompt' and 'messages'.
- 抛出:
RuntimeError -- When both 'prompt' and 'messages' inputs are not provided.
- class memoryscope.core.models.LlamaIndexRankModel(*args, **kwargs)[源代码]
The LlamaIndexRankModel class is designed to rerank documents according to their relevance to a provided query, utilizing the DashScope Rerank model. It transforms document lists and queries into a compatible format for ranking, manages the ranking process, and allocates rank scores to individual documents.
- before_call(model_response: ModelResponse, **kwargs)[源代码]
Prepares necessary data before the ranking call by extracting the query and documents, ensuring they are valid, and initializing nodes with dummy scores.
- 参数:
model_response -- model response
**kwargs -- Keyword arguments containing 'query' and 'documents'.
- after_call(model_response: ModelResponse, **kwargs) ModelResponse [源代码]
Processes the model response post-ranking, assigning calculated rank scores to each document based on their index in the original document list.
- 参数:
model_response (ModelResponse) -- The initial response from the ranking model.
**kwargs -- Additional keyword arguments (unused).
- 返回:
Updated response with rank scores assigned to documents.
- 返回类型:
Storage
- class memoryscope.core.storage.BaseMemoryStore[源代码]
An abstract base class defining the interface for a memory store which handles memory nodes. It outlines essential operations like retrieval, updating, flushing, and closing of memory scopes.
- abstract retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- abstract async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- class memoryscope.core.storage.BaseMonitor(**kwargs)[源代码]
An abstract base class defining the interface for monitor classes. Subclasses should implement the methods defined here to provide concrete monitoring behavior.
- abstract add()[源代码]
Abstract method to add data or events to the monitor. This method should be implemented by subclasses to define how data is added into the monitoring system.
- 返回:
None
- abstract add_token()[源代码]
Abstract method to add a token or a specific type of identifier to the monitor. Subclasses should implement this to specify how tokens are managed within the monitoring context.
- 返回:
None
- class memoryscope.core.storage.DummyMemoryStore(embedding_model: BaseModel, **kwargs)[源代码]
Placeholder implementation of a memory storage system interface. Defines methods for querying, updating, and closing memory nodes with asynchronous capabilities, leveraging an embedding model for potential semantic retrieval. Actual storage operations are not implemented.
- __init__(embedding_model: BaseModel, **kwargs)[源代码]
Initializes the DummyMemoryStore with an embedding model and additional keyword arguments.
- 参数:
embedding_model (BaseModel) -- The model used to embed data for potential similarity-based retrieval.
**kwargs -- Additional keyword arguments for configuration or future expansion.
- retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- class memoryscope.core.storage.DummyMonitor(**kwargs)[源代码]
DummyMonitor serves as a placeholder or mock class extending BaseMonitor, providing empty method bodies for 'add', 'add_token', and 'close' operations. This can be used for testing or in situations where a full monitor implementation is not required.
- class memoryscope.core.storage.LlamaIndexEsMemoryStore(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[源代码]
- __init__(embedding_model: BaseModel, index_name: str, es_url: str, retrieve_mode: str = 'dense', hybrid_alpha: float | None = None, **kwargs)[源代码]
- retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode] [源代码]
Retrieves a list of MemoryNode objects that are most relevant to the query, considering a filter dictionary for additional constraints. The number of nodes returned is limited by top_k.
- 参数:
query (str) -- The query string used to find relevant memories.
top_k (int) -- The maximum number of MemoryNode objects to return.
filter_dict (Dict[str, List[str]]) -- A dictionary with keys representing filter fields and values as lists of strings for filtering criteria.
- 返回:
- A list of MemoryNode objects sorted by relevance to the query,
limited to top_k items.
- 返回类型:
List[MemoryNode]
- async a_retrieve_memories(query: str = '', top_k: int = 3, filter_dict: Dict[str, List[str]] | Dict[str, str] | None = None) List[MemoryNode] [源代码]
Asynchronously retrieves a list of MemoryNode objects that best match the query, respecting a filter dictionary, with the result size capped at top_k.
- 参数:
query (str) -- The text to search for in memory nodes.
top_k (int) -- Maximum number of nodes to return.
filter_dict (Dict[str, List[str]]) -- Filters to apply on memory nodes.
- 返回:
A list of up to top_k MemoryNode objects matching the criteria.
- 返回类型:
List[MemoryNode]
- class memoryscope.core.storage.ESCombinedRetrieveStrategy(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[源代码]
- __init__(*, distance: DistanceMetric = DistanceMetric.COSINE, model_id: str | None = None, retrieve_mode: str = 'dense', rrf: bool | Dict[str, Any] = True, text_field: str | None = 'text_field', hybrid_alpha: float | None = None)[源代码]
- es_query(*, query: str | None, query_vector: List[float] | None, text_field: str, vector_field: str, k: int, num_candidates: int, filter: List[Dict[str, Any]] | None = None) Dict[str, Any] [源代码]
Returns the Elasticsearch query body for the given parameters. The store will execute the query.
- 参数:
query -- The text query. Can be None if query_vector is given.
k -- The total number of results to retrieve.
num_candidates -- The number of results to fetch initially in knn search.
filter -- List of filter clauses to apply to the query.
query_vector -- The query vector. Can be None if a query string is given.
- 返回:
The Elasticsearch query body.
- before_index_creation(*, client: AsyncElasticsearch, text_field: str, vector_field: str) None [源代码]
Executes before the index is created. Used for setting up any required Elasticsearch resources like a pipeline. Defaults to a no-op.
- 参数:
client -- The Elasticsearch client.
text_field -- The field containing the text data in the index.
vector_field -- The field containing the vector representations in the index.
- class memoryscope.core.storage.SyncElasticsearchStore(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None)[源代码]
Elasticsearch vector store.
- 参数:
index_name -- Name of the Elasticsearch index.
es_client -- Optional. Pre-existing AsyncElasticsearch client.
es_url -- Optional. Elasticsearch URL.
es_cloud_id -- Optional. Elasticsearch cloud ID.
es_api_key -- Optional. Elasticsearch API key.
es_user -- Optional. Elasticsearch username.
es_password -- Optional. Elasticsearch password.
text_field -- Optional. Name of the Elasticsearch field that stores the text.
vector_field -- Optional. Name of the Elasticsearch field that stores the embedding.
batch_size -- Optional. Batch size for bulk indexing. Defaults to 200.
distance_strategy -- Optional. Distance strategy to use for similarity search. Defaults to "COSINE".
retrieval_strategy -- Retrieval strategy to use. AsyncBM25Strategy / AsyncSparseVectorStrategy / AsyncDenseVectorStrategy / AsyncRetrievalStrategy. Defaults to AsyncDenseVectorStrategy.
- 抛出:
ConnectionError -- If AsyncElasticsearch client cannot connect to Elasticsearch.
ValueError -- If neither es_client nor es_url nor es_cloud_id is provided.
示例
pip install llama-index-vector-stores-elasticsearch
```python from llama_index.vector_stores import ElasticsearchStore
# Additional setup for ElasticsearchStore class index_name = "my_index" es_url = "http://localhost:9200" es_cloud_id = "<cloud-id>" # Found within the deployment page es_user = "elastic" es_password = "<password>" # Provided when creating deployment or can be reset es_api_key = "<api-key>" # Create an API key within Kibana (Security -> API Keys)
# Connecting to ElasticsearchStore locally es_local = ElasticsearchStore(
index_name=index_name, es_url=es_url)
# Connecting to Elastic Cloud with username and password es_cloud_user_pass = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_user=es_user, es_password=es_password)
# Connecting to Elastic Cloud with API Key es_cloud_api_key = ElasticsearchStore(
index_name=index_name, es_cloud_id=es_cloud_id, es_api_key=es_api_key,
)
- __init__(index_name: str, es_client: Any | None = None, es_url: str | None = None, es_cloud_id: str | None = None, es_api_key: str | None = None, es_user: str | None = None, es_password: str | None = None, text_field: str = 'content', vector_field: str = 'embedding', batch_size: int = 200, distance_strategy: Literal['COSINE', 'DOT_PRODUCT', 'EUCLIDEAN_DISTANCE'] | None = 'COSINE', retrieval_strategy: AsyncRetrievalStrategy | None = None) None [源代码]
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- property client: Any
Get the asynchronous Elasticsearch client.
- 返回:
The asynchronous Elasticsearch client instance configured for this store.
- 返回类型:
Any
- add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [源代码]
Adds a list of nodes, each containing embeddings, to an Elasticsearch index. Optionally creates the index if it does not already exist.
- 参数:
nodes (List[BaseNode]) -- A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) -- A flag indicating whether to create the Elasticsearch index if it's not present. Defaults to True.
- 返回:
A list of node IDs that have been successfully added to the index.
- 返回类型:
List[str]
- 抛出:
ImportError -- If the 'elasticsearch[async]' Python package is not installed.
BulkIndexError -- If there is a failure during the asynchronous bulk indexing with AsyncElasticsearch.
备注
This method delegates the actual operation to the sync_add method.
- sync_add(nodes: List[BaseNode], *, create_index_if_not_exists: bool = True, **add_kwargs: Any) List[str] [源代码]
Asynchronously adds a list of nodes, each containing an embedding, to the Elasticsearch index.
This method processes each node to extract its ID, embedding, text content, and metadata, preparing them for batch insertion into the index. It ensures the index is created if not present and respects the dimensionality of the embeddings for consistency.
- 参数:
nodes (List[BaseNode]) -- A list of node objects, each encapsulating an embedding.
create_index_if_not_exists (bool, optional) -- A flag indicating whether to create the Elasticsearch index if it does not already exist. Defaults to True.
**add_kwargs (Any) -- Additional keyword arguments passed to the underlying add_texts method for customization during the indexing process.
- 返回:
A list of node IDs that were successfully added to the index.
- 返回类型:
List[str]
- 抛出:
ImportError -- If the Elasticsearch Python client is not installed.
BulkIndexError -- If there's a failure during the asynchronous bulk indexing operation.
- delete(ref_doc_id: str, **delete_kwargs: Any) None [源代码]
Deletes a node from the Elasticsearch index using the provided reference document ID.
Optionally, extra keyword arguments can be supplied to customize the deletion behavior, which are passed directly to Elasticsearch's delete_by_query operation.
- 参数:
ref_doc_id (str) -- The unique identifier of the node/document to be deleted.
delete_kwargs (Any) -- Additional keyword arguments for Elasticsearch's delete_by_query. These might include query filters, timeouts, or other operational configurations.
- 抛出:
Exception -- If the deletion operation via Elasticsearch's delete_by_query fails.
备注
This method internally calls a synchronous delete method (sync_delete) to execute the deletion operation against Elasticsearch.
- sync_delete(ref_doc_id: str, **delete_kwargs: Any) None [源代码]
Synchronously deletes a node from the Elasticsearch index based on the reference document ID.
- 参数:
ref_doc_id (str) -- The unique identifier of the node/document to be deleted.
delete_kwargs (Any) -- Optional keyword arguments to be passed to the delete_by_query operation of AsyncElasticsearch, allowing for additional customization of the deletion process.
- 抛出:
Exception -- If the deletion operation via AsyncElasticsearch's delete_by_query fails.
备注
The function directly uses '_id' field to match the document for deletion instead of 'metadata.ref_doc_id', ensuring targeted removal based on the document's unique identifier within Elasticsearch.
- query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, **kwargs: Any) VectorStoreQueryResult [源代码]
Executes a query against the Elasticsearch index to retrieve the top k most similar nodes based on the input query embedding. Supports customization of the query process and application of Elasticsearch filters.
- 参数:
query (VectorStoreQuery) -- The query containing the embedding and other parameters.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) -- An optional custom function to modify the Elasticsearch query body, allowing for additional query parameters or logic. Defaults to None.
es_filter (Optional[List[Dict]], optional) -- An optional Elasticsearch filter list to apply to the query. If a filter is directly included in the query, this argument will not be used. Defaults to None.
**kwargs (Any) -- Additional keyword arguments that might be used in the query process.
- 返回:
The result of the query operation, including the most similar nodes.
- 返回类型:
VectorStoreQueryResult
- 抛出:
Exception -- If an error occurs during the Elasticsearch query execution.
- sync_query(query: VectorStoreQuery, custom_query: Callable[[Dict, VectorStoreQuery | None], Dict] | None = None, es_filter: List[Dict] | None = None, fields: List[str] = []) VectorStoreQueryResult [源代码]
Asynchronously queries the Elasticsearch index for the top k most similar nodes based on the provided query embedding. Supports custom query modifications and application of Elasticsearch filters.
- 参数:
query (VectorStoreQuery) -- The query containing the embedding and other details.
custom_query (Callable[[Dict, Union[VectorStoreQuery, None]], Dict], optional) -- A custom function to modify the Elasticsearch query body. Defaults to None.
es_filter (List[Dict], optional) -- Additional filters to apply during the query. If filters are present in the query, these filters will not be used. Defaults to None.
fields (List[str], optional) --
.
- 返回:
- The result of the query, including nodes, their IDs,
and similarity scores.
- 返回类型:
VectorStoreQueryResult
- 抛出:
Exception -- If the Elasticsearch query encounters an error.
备注
The mode of the query must align with the retrieval strategy set for this store. In case of legacy metadata, a warning is logged and nodes are constructed accordingly.
Worker
Base
- class memoryscope.core.worker.BaseWorker(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]
BaseWorker is an abstract class that defines a worker with common functionalities for managing tasks and context in both asynchronous and multi-thread environments.
- __init__(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]
Initializes the BaseWorker with the provided parameters.
- 参数:
name (str) -- The name of the worker.
context (Dict[str, Any]) -- Shared context dictionary.
context_lock (optional) -- Lock for synchronizing access to the context in multithread mode.
raise_exception (bool, optional) -- Flag to control whether exceptions should be raised.
is_multi_thread (bool, optional) -- Flag indicating if the worker operates in multithread mode.
thread_pool (ThreadPoolExecutor, optional) -- Thread pool executor for managing multithread tasks.
kwargs -- Additional keyword arguments.
- submit_async_task(fn, *args, **kwargs)[源代码]
Submits an asynchronous task to the worker.
- 参数:
fn (callable) -- The function to be executed.
args -- Positional arguments for the function.
kwargs -- Keyword arguments for the function.
- 抛出:
RuntimeError -- If called in multithread mode.
- gather_async_result()[源代码]
Executes all asynchronous tasks and gathers their results.
- 返回:
A list of results from the asynchronous tasks.
- 抛出:
RuntimeError -- If called in multithread mode.
- submit_thread_task(fn, *args, **kwargs)[源代码]
Submits a task to be executed in a separate thread.
- 参数:
fn (callable) -- The function to be executed.
args -- Positional arguments for the function.
kwargs -- Keyword arguments for the function.
- gather_thread_result()[源代码]
Gathers results of all submitted multithread tasks.
- 生成器:
The result of each completed task.
- run()[源代码]
Executes the worker's main logic and manages execution flow and exception handling.
Uses a Timer to log the execution time of the worker.
- get_workflow_context(key: str, default=None)[源代码]
Retrieves a value from the shared context.
- 参数:
key (str) -- The key for the context value.
default (optional) -- Default value if the key is not found.
- 返回:
The value from the context or the default value.
- class memoryscope.core.worker.DummyWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
- class memoryscope.core.worker.MemoryBaseWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
- __init__(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Initializes the MemoryBaseWorker with specified models and configurations.
- 参数:
embedding_model (str) -- Identifier or instance of the embedding model used for transforming text.
generation_model (str) -- Identifier or instance of the text generation model.
rank_model (str) -- Identifier or instance of the ranking model for sorting the retrieved memories wrt. the semantic similarities.
**kwargs -- Additional keyword arguments passed to the parent class initializer.
The constructor also initializes key attributes related to memory store, monitoring, user and target identification, and a prompt handler, setting them up for later use.
- property chat_messages: List[List[Message]]
Property to get the chat messages.
- 返回:
List of chat messages.
- 返回类型:
List[Message]
- property chat_messages_scatter: List[Message]
Property to get the chat messages.
- 返回:
List of chat messages.
- 返回类型:
List[Message]
- property chat_kwargs: Dict[str, Any]
Retrieves the chat keyword arguments from the context.
This property getter fetches the chat-related parameters stored in the context, which are used to configure how chat interactions are handled.
- 返回:
A dictionary containing the chat keyword arguments.
- 返回类型:
Dict[str, str]
- property embedding_model: BaseModel
Property to get the embedding model. If the model is currently stored as a string, it will be replaced with the actual model instance from the global context's model dictionary.
- 返回:
The embedding model used for converting text into vector representations.
- 返回类型:
- property generation_model: BaseModel
Property to access the generation model. If the model is stored as a string, it retrieves the actual model instance from the global context's model dictionary.
- 返回:
The model used for text generation.
- 返回类型:
- property rank_model: BaseModel
Property to access the rank model. If the stored rank model is a string, it fetches the actual model instance from the global context's model dictionary before returning it.
- 返回:
The rank model instance used for ranking tasks.
- 返回类型:
- property memory_store: BaseMemoryStore
Property to access the memory vector store. If not initialized, it fetches the global memory store.
- 返回:
The memory store instance used for inserting, updating, retrieving and deleting operations.
- 返回类型:
- property monitor: BaseMonitor
Property to access the monitoring component. If not initialized, it fetches the global monitor.
- 返回:
The monitoring component instance.
- 返回类型:
- property prompt_handler: PromptHandler
Lazily initializes and returns the PromptHandler instance.
- 返回:
An instance of PromptHandler initialized with specific file path and keyword arguments.
- 返回类型:
- property memory_manager: MemoryManager
Lazily initializes and returns the MemoryHandler instance.
- 返回:
An instance of MemoryHandler.
- 返回类型:
MemoryHandler
- get_language_value(languages: dict | List[dict]) Any | List[Any] [源代码]
Retrieves the value(s) corresponding to the current language context.
- 参数:
languages (dict | list[dict]) -- A dictionary or list of dictionaries containing language-keyed values.
- 返回:
The value or list of values matching the current language setting.
- 返回类型:
Any | list[Any]
- prompt_to_msg(system_prompt: str, few_shot: str, user_query: str, concat_system_prompt: bool = True) List[Message] [源代码]
Converts input strings into a structured list of message objects suitable for AI interactions.
- 参数:
system_prompt (str) -- The system-level instruction or context.
few_shot (str) -- An example or demonstration input, often used for illustrating expected behavior.
user_query (str) -- The actual user query or prompt to be processed.
concat_system_prompt (bool) -- Concat system prompt again or not in the user message. A simple method to improve the effectiveness for some LLMs. Defaults to True.
- 返回:
A list of Message objects, each representing a part of the conversation setup.
- 返回类型:
List[Message]
- class memoryscope.core.worker.MemoryManager(memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, workerflow_name: str = 'default_worker')[源代码]
The MemoryHandler class manages memory nodes with memory store.
- __init__(memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, workerflow_name: str = 'default_worker')[源代码]
- property memory_store: BaseMemoryStore
Property to access the memory store. If not initialized, it fetches the memory store from the global context.
- 返回:
The memory store instance associated with this worker.
- 返回类型:
- add_memories(key: str, nodes: MemoryNode | List[MemoryNode], log_repeat: bool = True)[源代码]
Add the memories.
- 参数:
key (str) -- The key mapping to memory nodes.
nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.
log_repeat (bool) -- Log duplicated memory node or not.
- set_memories(key: str, nodes: MemoryNode | List[MemoryNode], log_repeat: bool = True)[源代码]
Add the memories into '_id_memory_dict' and '_key_id_dict'.
- 参数:
key (str) -- The key mapping to memory nodes.
nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.
log_repeat -- if log_repeat=True, print log info
- get_memories(keys: str | List[str]) List[MemoryNode] [源代码]
Fetch the memories by keys.
- 参数:
keys (str | List[str]) -- The key mapping to memory nodes.
- 返回:
Memories mapped to the key.
- 返回类型:
List[MemoryNode]
- delete_memories(nodes: MemoryNode | List[MemoryNode], key: str | None = None)[源代码]
Delete the memories.
- 参数:
key (str) -- The key mapping to memory nodes.
nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be deleted.
- update_memories(keys: str = '', nodes: MemoryNode | List[MemoryNode] | None = None) dict [源代码]
Update the memories.
- 参数:
keys (str) -- The memories.
nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.
Frontend
- class memoryscope.core.worker.frontend.ExtractTimeWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
A specialized worker class designed to identify and extract time-related information from text generated by an LLM, translating date-time keywords based on the set language, and storing this extracted data within a shared context.
- class memoryscope.core.worker.frontend.FuseRerankWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Reranks the memory nodes by scores, types, and temporal relevance. Formats the top-K reranked nodes to print.
- static match_node_time(extract_time_dict: Dict[str, str], node: MemoryNode)[源代码]
Determines whether the node is relevant.
- class memoryscope.core.worker.frontend.PrintMemoryWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Formats the memories to print.
- class memoryscope.core.worker.frontend.ReadMessageWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Fetches unmemorized chat messages.
- class memoryscope.core.worker.frontend.RetrieveMemoryWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Retrieves memories based on specified criteria such as status, type, and timestamp. Processes these memories concurrently, sorts them by similarity, and logs the activity, facilitating efficient memory retrieval operations within a given scope.
- retrieve_from_observation(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- retrieve_from_insight(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- retrieve_expired_memory(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- class memoryscope.core.worker.frontend.SemanticRankWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
The SemanticRankWorker class processes queries by retrieving memory nodes, removing duplicates, ranking them based on semantic relevance using a model, assigning scores, sorting the nodes, and storing the ranked nodes back, while logging relevant information.
- class memoryscope.core.worker.frontend.SetQueryWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
The SetQueryWorker class is responsible for setting a query and its associated timestamp into the context, utilizing either provided chat parameters or details from the most recent chat message.
Backend
- class memoryscope.core.worker.backend.ContraRepeatWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
The ContraRepeatWorker class specializes in processing memory nodes to identify and handle contradictory and repetitive information. It extends the base functionality of MemoryBaseWorker.
Responsibilities: - Collects observation nodes from various memory categories. - Constructs a prompt with these observations for language model analysis. - Parses the model's response to detect contradictions or redundancies. - Adjusts the status of memory nodes based on the analysis. - Persists the updated node statuses back into memory.
- class memoryscope.core.worker.backend.GetObservationWithTimeWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
A specialized worker class that extends GetObservationWorker functionality to handle retrieval of observations which include associated timestamp information from chat messages.
- filter_messages() List[Message] [源代码]
Filters the chat messages to only include those which contain time-related keywords.
- 返回:
A list of filtered messages that mention time.
- 返回类型:
List[Message]
- build_message(filter_messages: List[Message]) List[Message] [源代码]
- Constructs a prompt message for obtaining observations with timestamp information
based on filtered chat messages.
This method processes each filtered message with the timestamp information. It then organizes these timestamped messages into a structured prompt that includes a system prompt, few-shot examples, and the concatenated user queries.
- class memoryscope.core.worker.backend.GetObservationWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
A specialized worker class to generate the observations from the original chat histories.
- add_observation(message: Message, time_infer: str, obs_content: str, keywords: str)[源代码]
Builds a MemoryNode containing the observation details.
- 参数:
message (Message) -- The source message from which the observation is derived.
time_infer (str) -- The inferred time if available.
obs_content (str) -- The content of the observation.
keywords (str) -- Keywords associated with the observation.
- 返回:
The constructed MemoryNode containing the observation.
- 返回类型:
- filter_messages() List[Message] [源代码]
Filters the chat messages to only include those which not contain time-related keywords.
- 返回:
A list of filtered messages that mention time.
- 返回类型:
List[Message]
- class memoryscope.core.worker.backend.GetReflectionSubjectWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
A specialized worker class responsible for retrieving unreflected memory nodes, generating reflection prompts with current insights, invoking an LLM for fresh insights, parsing the LLM responses, forming new insight nodes, and updating memory statuses accordingly.
- new_insight_node(insight_key: str) MemoryNode [源代码]
Creates a new MemoryNode for an insight with the given key, enriched with current datetime metadata.
- 参数:
insight_key (str) -- The unique identifier for the insight.
- 返回:
A new MemoryNode instance representing the insight, marked as new and of type INSIGHT.
- 返回类型:
- class memoryscope.core.worker.backend.InfoFilterWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
This worker filters and modifies the chat message history (self.chat_messages) by retaining only the messages that include significant information. It then constructs a prompt from these filtered messages, utilizes an AI model to process this prompt, parses the AI's generated response to allocate scores, and ultimately retains messages in self.chat_messages based on these assigned scores.
- class memoryscope.core.worker.backend.LoadMemoryWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
- retrieve_not_reflected_memory(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- retrieve_not_updated_memory(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- retrieve_insight_memory(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- retrieve_today_memory(**kwargs)
The wrapper function that manages the timing of the original function.
- 参数:
*args -- Variable length argument list for the decorated function.
**kwargs -- Arbitrary keyword arguments for the decorated function.
- 返回:
The result of the decorated function.
- 返回类型:
Any
- class memoryscope.core.worker.backend.LongContraRepeatWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
Manages and updates memory entries within a conversation scope by identifying and handling contradictions or redundancies. It extends the base MemoryBaseWorker to provide specialized functionality for long conversations with potential contradictory or repetitive statements.
- retrieve_similar_content(node: ~memoryscope.scheme.memory_node.MemoryNode) -> (<class 'memoryscope.scheme.memory_node.MemoryNode'>, typing.List[memoryscope.scheme.memory_node.MemoryNode])[源代码]
Retrieves memory nodes with content similar to the given node, filtering by user/target/status/memory_type. Only returns nodes whose similarity score meets or exceeds the predefined threshold.
- 参数:
node (MemoryNode) -- The reference node used to find similar content in memory.
- 返回:
A tuple containing the original node and a list of similar nodes that passed the similarity threshold.
- 返回类型:
Tuple[MemoryNode, List[MemoryNode]]
- class memoryscope.core.worker.backend.UpdateInsightWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
This class is responsible for updating insight value in a memory system. It filters insight nodes based on their association with observed nodes, utilizes a ranking model to prioritize them, generates refreshed insights via an LLM, and manages node statuses and content updates, incorporating features for concurrent execution and logging.
- filter_obs_nodes(insight_node: ~memoryscope.scheme.memory_node.MemoryNode, obs_nodes: ~typing.List[~memoryscope.scheme.memory_node.MemoryNode]) -> (<class 'memoryscope.scheme.memory_node.MemoryNode'>, typing.List[memoryscope.scheme.memory_node.MemoryNode], <class 'float'>)[源代码]
Filters observed nodes based on their relevance to a given insight node using a ranking model.
- 参数:
insight_node (MemoryNode) -- The insight node used as the basis for filtering.
obs_nodes (List[MemoryNode]) -- A list of observed nodes to be filtered.
- 返回:
- A tuple containing:
The original insight node.
A list of filtered observed nodes that are relevant to the insight node.
The maximum relevance score among the filtered nodes.
- 返回类型:
tuple
- update_insight_node(insight_node: MemoryNode, insight_value: str)[源代码]
Updates the MemoryNode with the new insight value.
- 参数:
insight_node (MemoryNode) -- The MemoryNode whose insight value needs to be updated.
insight_value (str) -- The new insight value.
- 返回:
The updated MemoryNode with potentially revised insight value.
- 返回类型:
- update_insight(insight_node: MemoryNode, filtered_nodes: List[MemoryNode]) MemoryNode [源代码]
Updates the insight value of a given MemoryNode based on the context from a list of filtered MemoryNodes.
- 参数:
insight_node (MemoryNode) -- The MemoryNode whose insight value needs to be updated.
filtered_nodes (List[MemoryNode]) -- A list of MemoryNodes used as context for updating the insight.
- 返回:
The updated MemoryNode with potentially revised insight value.
- 返回类型:
- class memoryscope.core.worker.backend.UpdateMemoryWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]
- from_query()[源代码]
Creates a MemoryNode from the provided query if present in chat_kwargs.
- 返回:
A list containing a single MemoryNode created from the query.
- 返回类型:
List[MemoryNode]
- from_memory_key()[源代码]
Retrieves memories based on the memory key if it exists.
- 返回:
A list of MemoryNode objects retrieved using the memory key.
- 返回类型:
List[MemoryNode]
- delete_all()[源代码]
Marks all memories for deletion by setting their action_status to 'DELETE'.
- 返回:
A list of all MemoryNode objects marked for deletion.
- 返回类型:
List[MemoryNode]
- delete_memory()[源代码]
Marks specific memories for deletion based on query or memory_id present in chat_kwargs.
- 返回:
A list of MemoryNode objects marked for deletion based on the query or memory_id.
- 返回类型:
List[MemoryNode]
Operation
- class memoryscope.core.operation.BackendOperation(interval_time: int, **kwargs)[源代码]
BaseBackendOperation serves as an abstract base class for defining backend operations. It manages operation status, loop control, and integrates with a global context for thread management.
- init_workflow(**kwargs)[源代码]
Initializes the workflow by setting up workers with provided keyword arguments.
- 参数:
**kwargs -- Arbitrary keyword arguments to be passed during worker initialization.
- class memoryscope.core.operation.BaseOperation(name: str, user_name: str, target_names: List[str], chat_messages: List[List[Message]], description: str)[源代码]
An abstract base class representing an operation that can be categorized as either frontend or backend.
- operation_type
Specifies the type of operation, defaulting to "frontend".
- Type:
OPERATION_TYPE
- name
The name of the operation.
- Type:
str
- description
A description of the operation.
- Type:
str
- __init__(name: str, user_name: str, target_names: List[str], chat_messages: List[List[Message]], description: str)[源代码]
Initializes a new instance of the BaseOperation.
- init_workflow(**kwargs)[源代码]
Initialize the workflow with additional keyword arguments if needed.
- 参数:
**kwargs -- Additional parameters for initializing the workflow.
- abstract run_operation(target_name: str, **kwargs)[源代码]
Abstract method to define the operation to be run. Subclasses must implement this method.
- 参数:
target_name (str) -- target_name(human name).
**kwargs -- Keyword arguments for running the operation.
- 抛出:
NotImplementedError -- If the subclass does not implement this method.
- class memoryscope.core.operation.ConsolidateMemoryOp(message_lock, contextual_msg_min_count: int = 0, **kwargs)[源代码]
- __init__(message_lock, contextual_msg_min_count: int = 0, **kwargs)[源代码]
Initializes a new instance of the BaseOperation.
- run_operation(target_name: str, **kwargs)[源代码]
Executes an operation after preparing the chat context, checking message memory status, and updating workflow status accordingly.
If the number of not-memorized messages is less than the contextual message count, the operation is skipped. Otherwise, it sets up the chat context, runs the workflow, captures the result, and updates the memory status.
- 参数:
target_name (str) -- target_name(human name).
**kwargs -- Keyword arguments for chat operation configuration.
- 返回:
The result obtained from running the workflow.
- 返回类型:
Any
- class memoryscope.core.operation.FrontendOperation(name: str, user_name: str, target_names: List[str], chat_messages: List[List[Message]], description: str, **kwargs)[源代码]
- __init__(name: str, user_name: str, target_names: List[str], chat_messages: List[List[Message]], description: str, **kwargs)[源代码]
Initializes a new instance of the BaseOperation.
- init_workflow(**kwargs)[源代码]
Initializes the workflow by setting up workers with provided keyword arguments.
- 参数:
**kwargs -- Arbitrary keyword arguments to be passed during worker initialization.
- run_operation(target_name: str, **kwargs)[源代码]
Executes the main operation of reading recent chat messages, initializing workflow, and returning the result of the workflow execution.
- 参数:
target_name (str) -- target_name(human name).
**kwargs -- Additional keyword arguments used in the operation context.
- 返回:
The result obtained from executing the workflow.
- 返回类型:
Any
Service
- class memoryscope.core.service.BaseMemoryService(memory_operations: ~typing.Dict[str, dict], context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, assistant_name: str | None = None, human_name: str | None = None, **kwargs)[源代码]
An abstract base class for managing memory operations within a multithreaded context. It sets up the infrastructure for operation handling, message storage, and synchronization, along with logging capabilities and customizable configurations.
- __init__(memory_operations: ~typing.Dict[str, dict], context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, assistant_name: str | None = None, human_name: str | None = None, **kwargs)[源代码]
Initializes the BaseMemoryService with operation definitions, keys for memory access, and additional keyword arguments for flexibility.
- 参数:
memory_operations (Dict[str, dict]) -- A dictionary defining available memory operations.
context (MemoryscopeContext) -- runtime context.
human_name (str) -- human name.
assistant_name (str) -- assistant name.
**kwargs -- Additional parameters to customize service behavior.
- property op_description_dict: Dict[str, str]
Property to retrieve a dictionary mapping operation keys to their descriptions. :returns: A dictionary where keys are operation identifiers and values are their descriptions. :rtype: Dict[str, str]
- class memoryscope.core.service.MemoryScopeService(history_msg_count: int = 100, contextual_msg_max_count: int = 10, contextual_msg_min_count: int = 0, **kwargs)[源代码]
- __init__(history_msg_count: int = 100, contextual_msg_max_count: int = 10, contextual_msg_min_count: int = 0, **kwargs)[源代码]
init function. :param history_msg_count: The conversation history in memory, control the quantity, and reduce memory usage. :type history_msg_count: int :param contextual_msg_max_count: The maximum context length in a conversation. If it exceeds this length,
it will not be included in the context to prevent token overflow.
- 参数:
contextual_msg_min_count (int) -- The minimum context length in a conversation. If it is shorter than this length, no conversation summary will be made and no long-term memory will be generated.
kwargs (dict) -- Additional parameters to customize service behavior.
- add_messages_pair(messages: List[Message])[源代码]
Adds a list of messages to the chat history, it can be a pair [user_message, assistant_message]. Ensuring the message list remains sorted by creation time and does not exceed the maximum history message count.
- run_operation(name: str, role_name: str = '', **kwargs)[源代码]
Executes a specific operation by its name with provided keyword arguments.
- 参数:
name (str) -- The name of the operation to execute.
role_name (str) -- The name of the operation to execute.
**kwargs -- Keyword arguments for the operation's execution.
- 返回:
The result of the operation execution, if any. Otherwise, None.
- 抛出:
Warning -- If the operation name is not initialized in _operation_dict.
Chat
- class memoryscope.core.chat.ApiMemoryChat(memory_service: str, generation_model: str, context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, stream: bool = False, **kwargs)[源代码]
- __init__(memory_service: str, generation_model: str, context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, stream: bool = False, **kwargs)[源代码]
- property prompt_handler: PromptHandler
Lazy initialization property for the prompt handler.
This property ensures that the _prompt_handler attribute is only instantiated when it is first accessed. It uses the current file's path and additional keyword arguments for configuration.
- 返回:
An instance of the PromptHandler configured for this CLI session.
- 返回类型:
- property memory_service: BaseMemoryService
Property to access the memory service. If the service is initially set as a string, it will be looked up in the memory service dictionary of context, initialized, and then returned as an instance of BaseMemoryService. Ensures the memory service is properly started before use.
- 返回:
An active memory service instance.
- 返回类型:
- 抛出:
ValueError -- If the declaration of memory service is not found in the memory service dictionary of context.
- property generation_model: BaseModel
Property to get the generation model. If the model is set as a string, it will be resolved from the global context's model dictionary.
- 抛出:
ValueError -- If the declaration of generation model is not found in the model dictionary of context .
- 返回:
An actual generation model instance.
- 返回类型:
- chat_with_memory(query: str, role_name: str | None = None, system_prompt: str | None = None, memory_prompt: str | None = None, temporary_memories: str | None = None, history_message_strategy: Literal['auto', None] | int = 'auto', remember_response: bool = True, **kwargs)[源代码]
The core function that carries out conversation with memory accepts user queries through query and returns the conversation results through model_response. The retrieved memories are stored in the memories within meta_data. :param query: User's query, includes the user's question. :type query: str :param role_name: User's role name. :type role_name: str, optional :param system_prompt: System prompt. Defaults to the system_prompt in "memory_chat_prompt.yaml". :type system_prompt: str, optional :param memory_prompt: Memory prompt, It takes effect when there is a memory and will be placed in
front of the retrieved memory. Defaults to the memory_prompt in "memory_chat_prompt.yaml".
- 参数:
temporary_memories (str, optional) -- Manually added user memory in this function.
history_message_strategy ("auto", None, int) --
- If it is set to "auto", the history messages in the conversation will retain those that have not
yet been summarized. Default to "auto".
If it is set to None, no conversation history will be saved.
If it is set to an integer value "n", recent "n" message-pair[user, assistant] will be retained.
remember_response (bool, optional) -- Flag indicating whether to save the AI's response to memory. Defaults to False.
- 返回:
In non-streaming mode, returns a complete AI response. - ModelResponseGen: In streaming mode, returns a generator yielding AI response parts. - Memories: To obtain the memory by invoking the method of model_response.meta_data[MEMORIES]
- 返回类型:
ModelResponse
- class memoryscope.core.chat.BaseMemoryChat(**kwargs)[源代码]
An abstract base class representing a chat system integrated with memory services. It outlines the method to initiate a chat session leveraging memory data, which concrete subclasses must implement.
- property memory_service: BaseMemoryService
Abstract property to access the memory service.
- 抛出:
NotImplementedError -- This method should be implemented in a subclass.
- abstract chat_with_memory(query: str, role_name: str | None = None, system_prompt: str | None = None, memory_prompt: str | None = None, temporary_memories: str | None = None, history_message_strategy: Literal['auto', None] | int = 'auto', remember_response: bool = True, **kwargs)[源代码]
The core function that carries out conversation with memory accepts user queries through query and returns the conversation results through model_response. The retrieved memories are stored in the memories within meta_data. :param query: User's query, includes the user's question. :type query: str :param role_name: User's role name. :type role_name: str, optional :param system_prompt: System prompt. Defaults to the system_prompt in "memory_chat_prompt.yaml". :type system_prompt: str, optional :param memory_prompt: Memory prompt, It takes effect when there is a memory and will be placed in
front of the retrieved memory. Defaults to the memory_prompt in "memory_chat_prompt.yaml".
- 参数:
temporary_memories (str, optional) -- Manually added user memory in this function.
history_message_strategy ("auto", None, int) --
- If it is set to "auto", the history messages in the conversation will retain those that have not
yet been summarized. Default to "auto".
If it is set to None, no conversation history will be saved.
If it is set to an integer value "n", recent "n" message-pair[user, assistant] will be retained.
remember_response (bool, optional) -- Flag indicating whether to save the AI's response to memory. Defaults to False.
- 返回:
In non-streaming mode, returns a complete AI response. - ModelResponseGen: In streaming mode, returns a generator yielding AI response parts. - Memories: To obtain the memory by invoking the method of model_response.meta_data[MEMORIES]
- 返回类型:
ModelResponse
- class memoryscope.core.chat.CliMemoryChat(**kwargs)[源代码]
Command-line interface for chatting with an AI that integrates memory functionality. Allows users to interact, manage chat history, adjust streaming settings, and view commands' help.
- print_logo()[源代码]
Prints the logo of the CLI application to the console.
The logo is composed of multiple lines, which are iterated through and printed one by one to provide a visual identity for the chat interface.
- chat_with_memory(query: str, role_name: str | None = None, system_prompt: str | None = None, memory_prompt: str | None = None, temporary_memories: str | None = None, history_message_strategy: Literal['auto', None] | int = 'auto', remember_response: bool = True, **kwargs)[源代码]
The core function that carries out conversation with memory accepts user queries through query and returns the conversation results through model_response. The retrieved memories are stored in the memories within meta_data. :param query: User's query, includes the user's question. :type query: str :param role_name: User's role name. :type role_name: str, optional :param system_prompt: System prompt. Defaults to the system_prompt in "memory_chat_prompt.yaml". :type system_prompt: str, optional :param memory_prompt: Memory prompt, It takes effect when there is a memory and will be placed in
front of the retrieved memory. Defaults to the memory_prompt in "memory_chat_prompt.yaml".
- 参数:
temporary_memories (str, optional) -- Manually added user memory in this function.
history_message_strategy ("auto", None, int) --
- If it is set to "auto", the history messages in the conversation will retain those that have not
yet been summarized. Default to "auto".
If it is set to None, no conversation history will be saved.
If it is set to an integer value "n", recent "n" message-pair[user, assistant] will be retained.
remember_response (bool, optional) -- Flag indicating whether to save the AI's response to memory. Defaults to False.
- 返回:
In non-streaming mode, returns a complete AI response. - ModelResponseGen: In streaming mode, returns a generator yielding AI response parts. - Memories: To obtain the memory by invoking the method of model_response.meta_data[MEMORIES]
- 返回类型:
ModelResponse
- static parse_query_command(query: str)[源代码]
Parses the user's input query command, separating it into the command and its associated keyword arguments.
- 参数:
query (str) -- The raw input string from the user which includes the command and its arguments.
- 返回:
A tuple containing the command (str) as the first element and a dictionary (kwargs) of keyword arguments as the second element.
- 返回类型:
tuple
- process_commands(query: str) bool [源代码]
Parses and executes commands from user input in the CLI chat interface. Supports operations like exiting, clearing screen, showing help, toggling stream mode, executing predefined memory operations, and handling unknown commands.
- 参数:
query (str) -- The user's input command string.
- 返回:
Indicates whether to continue running the CLI after processing the command.
- 返回类型:
bool
- run()[源代码]
Runs the CLI chat loop, which handles user input, processes commands, communicates with the AI model, manages conversation memory, and controls the chat session including streaming responses, command execution, and error handling.
The loop continues until the user explicitly chooses to exit.