Source code for memoryscope.core.memoryscope

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

from memoryscope.core.chat.base_memory_chat import BaseMemoryChat
from memoryscope.core.config.config_manager import ConfigManager
from memoryscope.core.memoryscope_context import MemoryscopeContext
from memoryscope.core.service.base_memory_service import BaseMemoryService
from memoryscope.core.utils.tool_functions import init_instance_by_config
from memoryscope.enumeration.language_enum import LanguageEnum
from memoryscope.enumeration.model_enum import ModelEnum


[docs] class MemoryScope(ConfigManager):
[docs] def __init__(self, **kwargs): self._context: MemoryscopeContext = MemoryscopeContext() self._context.memory_scope_uuid = datetime.now().strftime(r"%Y%m%d_%H%M%S") super().__init__(**kwargs) self._init_context_by_config()
def _init_context_by_config(self): # set global config global_conf = self.config["global"] self._context.language = LanguageEnum(global_conf["language"]) self._context.thread_pool = ThreadPoolExecutor(max_workers=global_conf["thread_pool_max_workers"]) self._context.meta_data.update({ "enable_ranker": global_conf["enable_ranker"], "enable_today_contra_repeat": global_conf["enable_today_contra_repeat"], "enable_long_contra_repeat": global_conf["enable_long_contra_repeat"], "output_memory_max_count": global_conf["output_memory_max_count"], }) if not global_conf["enable_ranker"]: self.logger.warning("If a semantic ranking model is not available, MemoryScope will use cosine similarity " "scoring as a substitute. However, the ranking effectiveness will be somewhat " "compromised.") # init memory_chat memory_chat_conf_dict = self.config["memory_chat"] if memory_chat_conf_dict: for name, conf in memory_chat_conf_dict.items(): self._context.memory_chat_dict[name] = init_instance_by_config(conf, name=name, context=self._context) # set memory_service memory_service_conf_dict = self.config["memory_service"] assert memory_service_conf_dict for name, conf in memory_service_conf_dict.items(): self._context.memory_service_dict[name] = init_instance_by_config(conf, name=name, context=self._context) # init model model_conf_dict = self.config["model"] assert model_conf_dict for name, conf in model_conf_dict.items(): self._context.model_dict[name] = init_instance_by_config(conf, name=name) # init memory_store memory_store_conf = self.config["memory_store"] assert memory_store_conf emb_model_name: str = memory_store_conf[ModelEnum.EMBEDDING_MODEL.value] embedding_model = self._context.model_dict[emb_model_name] self._context.memory_store = init_instance_by_config(memory_store_conf, embedding_model=embedding_model) # init monitor monitor_conf = self.config["monitor"] if monitor_conf: self._context.monitor = init_instance_by_config(monitor_conf) # set worker config self._context.worker_conf_dict = self.config["worker"]
[docs] def close(self): # wait service to stop for _, service in self._context.memory_service_dict.items(): service.stop_backend_service(wait_service=True) self._context.thread_pool.shutdown() self._context.memory_store.close() if self._context.monitor: self._context.monitor.close() self.logger.close()
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is not None: self.logger.warning(f"An exception occurred: {exc_type.__name__}: {exc_val}\n{exc_tb}") self.close() @property def context(self): return self._context @property def memory_chat_dict(self): return self._context.memory_chat_dict @property def memory_service_dict(self): return self._context.memory_service_dict @property def default_memory_chat(self) -> BaseMemoryChat: return list(self.memory_chat_dict.values())[0] @property def default_memory_service(self) -> BaseMemoryService: return list(self.memory_service_dict.values())[0]
[docs] @classmethod def cli_memory_chat(cls, **kwargs): with cls(**kwargs) as ms: memory_chat = ms.default_memory_chat memory_chat.run()