memoryscope.core.worker

class memoryscope.core.worker.BaseWorker(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]

基类:object

BaseWorker is an abstract class that defines a worker with common functionalities for managing tasks and context in both asynchronous and multi-thread environments.

__init__(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]

Initializes the BaseWorker with the provided parameters.

参数:
  • name (str) -- The name of the worker.

  • context (Dict[str, Any]) -- Shared context dictionary.

  • context_lock (optional) -- Lock for synchronizing access to the context in multithread mode.

  • raise_exception (bool, optional) -- Flag to control whether exceptions should be raised.

  • is_multi_thread (bool, optional) -- Flag indicating if the worker operates in multithread mode.

  • thread_pool (ThreadPoolExecutor, optional) -- Thread pool executor for managing multithread tasks.

  • kwargs -- Additional keyword arguments.

submit_async_task(fn, *args, **kwargs)[源代码]

Submits an asynchronous task to the worker.

参数:
  • fn (callable) -- The function to be executed.

  • args -- Positional arguments for the function.

  • kwargs -- Keyword arguments for the function.

抛出:

RuntimeError -- If called in multithread mode.

gather_async_result()[源代码]

Executes all asynchronous tasks and gathers their results.

返回:

A list of results from the asynchronous tasks.

抛出:

RuntimeError -- If called in multithread mode.

submit_thread_task(fn, *args, **kwargs)[源代码]

Submits a task to be executed in a separate thread.

参数:
  • fn (callable) -- The function to be executed.

  • args -- Positional arguments for the function.

  • kwargs -- Keyword arguments for the function.

gather_thread_result()[源代码]

Gathers results of all submitted multithread tasks.

生成器:

The result of each completed task.

run()[源代码]

Executes the worker's main logic and manages execution flow and exception handling.

Uses a Timer to log the execution time of the worker.

get_workflow_context(key: str, default=None)[源代码]

Retrieves a value from the shared context.

参数:
  • key (str) -- The key for the context value.

  • default (optional) -- Default value if the key is not found.

返回:

The value from the context or the default value.

set_workflow_context(key: str, value: Any)[源代码]

Sets a value in the shared context.

参数:
  • key (str) -- The key for the context value.

  • value (Any) -- The value to be set.

has_content(key: str)[源代码]

Checks if the context contains a specific key.

参数:

key (str) -- The key to check in the context.

返回:

True if the key is in the context, otherwise False.

返回类型:

bool

class memoryscope.core.worker.DummyWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]

基类:MemoryBaseWorker

class memoryscope.core.worker.MemoryBaseWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]

基类:BaseWorker

FILE_PATH: str = '/home/runner/work/MemoryScope/MemoryScope/memoryscope/core/worker/memory_base_worker.py'
__init__(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[源代码]

Initializes the MemoryBaseWorker with specified models and configurations.

参数:
  • embedding_model (str) -- Identifier or instance of the embedding model used for transforming text.

  • generation_model (str) -- Identifier or instance of the text generation model.

  • rank_model (str) -- Identifier or instance of the ranking model for sorting the retrieved memories wrt. the semantic similarities.

  • **kwargs -- Additional keyword arguments passed to the parent class initializer.

The constructor also initializes key attributes related to memory store, monitoring, user and target identification, and a prompt handler, setting them up for later use.

property chat_messages: List[List[Message]]

Property to get the chat messages.

返回:

List of chat messages.

返回类型:

List[Message]

property chat_messages_scatter: List[Message]

Property to get the chat messages.

返回:

List of chat messages.

返回类型:

List[Message]

property chat_kwargs: Dict[str, Any]

Retrieves the chat keyword arguments from the context.

This property getter fetches the chat-related parameters stored in the context, which are used to configure how chat interactions are handled.

返回:

A dictionary containing the chat keyword arguments.

返回类型:

Dict[str, str]

property user_name: str
property target_name: str
property workflow_name: str
property language: LanguageEnum
property embedding_model: BaseModel

Property to get the embedding model. If the model is currently stored as a string, it will be replaced with the actual model instance from the global context's model dictionary.

返回:

The embedding model used for converting text into vector representations.

返回类型:

BaseModel

property generation_model: BaseModel

Property to access the generation model. If the model is stored as a string, it retrieves the actual model instance from the global context's model dictionary.

返回:

The model used for text generation.

返回类型:

BaseModel

property rank_model: BaseModel

Property to access the rank model. If the stored rank model is a string, it fetches the actual model instance from the global context's model dictionary before returning it.

返回:

The rank model instance used for ranking tasks.

返回类型:

BaseModel

property memory_store: BaseMemoryStore

Property to access the memory vector store. If not initialized, it fetches the global memory store.

返回:

The memory store instance used for inserting, updating, retrieving and deleting operations.

返回类型:

BaseMemoryStore

property monitor: BaseMonitor

Property to access the monitoring component. If not initialized, it fetches the global monitor.

返回:

The monitoring component instance.

返回类型:

BaseMonitor

property prompt_handler: PromptHandler

Lazily initializes and returns the PromptHandler instance.

返回:

An instance of PromptHandler initialized with specific file path and keyword arguments.

返回类型:

PromptHandler

property memory_manager: MemoryManager

Lazily initializes and returns the MemoryHandler instance.

返回:

An instance of MemoryHandler.

返回类型:

MemoryHandler

get_language_value(languages: dict | List[dict]) Any | List[Any][源代码]

Retrieves the value(s) corresponding to the current language context.

参数:

languages (dict | list[dict]) -- A dictionary or list of dictionaries containing language-keyed values.

返回:

The value or list of values matching the current language setting.

返回类型:

Any | list[Any]

prompt_to_msg(system_prompt: str, few_shot: str, user_query: str, concat_system_prompt: bool = True) List[Message][源代码]

Converts input strings into a structured list of message objects suitable for AI interactions.

参数:
  • system_prompt (str) -- The system-level instruction or context.

  • few_shot (str) -- An example or demonstration input, often used for illustrating expected behavior.

  • user_query (str) -- The actual user query or prompt to be processed.

  • concat_system_prompt (bool) -- Concat system prompt again or not in the user message. A simple method to improve the effectiveness for some LLMs. Defaults to True.

返回:

A list of Message objects, each representing a part of the conversation setup.

返回类型:

List[Message]

class memoryscope.core.worker.MemoryManager(memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, workerflow_name: str = 'default_worker')[源代码]

基类:object

The MemoryHandler class manages memory nodes with memory store.

__init__(memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, workerflow_name: str = 'default_worker')[源代码]
property memory_store: BaseMemoryStore

Property to access the memory store. If not initialized, it fetches the memory store from the global context.

返回:

The memory store instance associated with this worker.

返回类型:

BaseMemoryStore

clear()[源代码]

Clear all memory nodes cached, reset the class instance.

add_memories(key: str, nodes: MemoryNode | List[MemoryNode], log_repeat: bool = True)[源代码]

Add the memories.

参数:
  • key (str) -- The key mapping to memory nodes.

  • nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.

  • log_repeat (bool) -- Log duplicated memory node or not.

set_memories(key: str, nodes: MemoryNode | List[MemoryNode], log_repeat: bool = True)[源代码]

Add the memories into '_id_memory_dict' and '_key_id_dict'.

参数:
  • key (str) -- The key mapping to memory nodes.

  • nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.

  • log_repeat -- if log_repeat=True, print log info

get_memories(keys: str | List[str]) List[MemoryNode][源代码]

Fetch the memories by keys.

参数:

keys (str | List[str]) -- The key mapping to memory nodes.

返回:

Memories mapped to the key.

返回类型:

List[MemoryNode]

delete_memories(nodes: MemoryNode | List[MemoryNode], key: str | None = None)[源代码]

Delete the memories.

参数:
  • key (str) -- The key mapping to memory nodes.

  • nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be deleted.

update_memories(keys: str = '', nodes: MemoryNode | List[MemoryNode] | None = None) dict[源代码]

Update the memories.

参数:
  • keys (str) -- The memories.

  • nodes (List[MemoryNode]) -- A single memory node or a list of memory nodes to be updated.