memoryscope.core.worker.base_worker

class memoryscope.core.worker.base_worker.BaseWorker(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[source]

Bases: object

BaseWorker is an abstract class that defines a worker with common functionalities for managing tasks and context in both asynchronous and multi-thread environments.

__init__(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[source]

Initializes the BaseWorker with the provided parameters.

Parameters:
  • name (str) – The name of the worker.

  • context (Dict[str, Any]) – Shared context dictionary.

  • context_lock (optional) – Lock for synchronizing access to the context in multithread mode.

  • raise_exception (bool, optional) – Flag to control whether exceptions should be raised.

  • is_multi_thread (bool, optional) – Flag indicating if the worker operates in multithread mode.

  • thread_pool (ThreadPoolExecutor, optional) – Thread pool executor for managing multithread tasks.

  • kwargs – Additional keyword arguments.

submit_async_task(fn, *args, **kwargs)[source]

Submits an asynchronous task to the worker.

Parameters:
  • fn (callable) – The function to be executed.

  • args – Positional arguments for the function.

  • kwargs – Keyword arguments for the function.

Raises:

RuntimeError – If called in multithread mode.

gather_async_result()[source]

Executes all asynchronous tasks and gathers their results.

Returns:

A list of results from the asynchronous tasks.

Raises:

RuntimeError – If called in multithread mode.

submit_thread_task(fn, *args, **kwargs)[source]

Submits a task to be executed in a separate thread.

Parameters:
  • fn (callable) – The function to be executed.

  • args – Positional arguments for the function.

  • kwargs – Keyword arguments for the function.

gather_thread_result()[source]

Gathers results of all submitted multithread tasks.

Yields:

The result of each completed task.

run()[source]

Executes the worker’s main logic and manages execution flow and exception handling.

Uses a Timer to log the execution time of the worker.

get_workflow_context(key: str, default=None)[source]

Retrieves a value from the shared context.

Parameters:
  • key (str) – The key for the context value.

  • default (optional) – Default value if the key is not found.

Returns:

The value from the context or the default value.

set_workflow_context(key: str, value: Any)[source]

Sets a value in the shared context.

Parameters:
  • key (str) – The key for the context value.

  • value (Any) – The value to be set.

has_content(key: str)[source]

Checks if the context contains a specific key.

Parameters:

key (str) – The key to check in the context.

Returns:

True if the key is in the context, otherwise False.

Return type:

bool