memoryscope.core.worker.backend.get_observation_worker
- class memoryscope.core.worker.backend.get_observation_worker.GetObservationWorker(embedding_model: str = '', generation_model: str = '', rank_model: str = '', **kwargs)[source]
Bases:
MemoryBaseWorker
A specialized worker class to generate the observations from the original chat histories.
- FILE_PATH: str = '/home/runner/work/MemoryScope/MemoryScope/memoryscope/core/worker/backend/get_observation_worker.py'
- OBS_STORE_KEY: str = 'new_obs_nodes'
- add_observation(message: Message, time_infer: str, obs_content: str, keywords: str)[source]
Builds a MemoryNode containing the observation details.
- Parameters:
message (Message) – The source message from which the observation is derived.
time_infer (str) – The inferred time if available.
obs_content (str) – The content of the observation.
keywords (str) – Keywords associated with the observation.
- Returns:
The constructed MemoryNode containing the observation.
- Return type:
- filter_messages() List[Message] [source]
Filters the chat messages to only include those which not contain time-related keywords.
- Returns:
A list of filtered messages that mention time.
- Return type:
List[Message]
- build_message(filter_messages: List[Message]) List[Message] [source]
Constructs a formatted message for observation based on input messages, incorporating system prompts, few-shot examples, and user queries.
- name: str
- workflow_context: Dict[str, Any]
- memoryscope_context: MemoryscopeContext
- raise_exception: bool
- is_multi_thread: bool
- thread_pool: ThreadPoolExecutor
- enable_parallel: bool
- kwargs: dict
- continue_run: bool
- async_task_list: list
- thread_task_list: list