memoryscope.core.worker.base_worker

class memoryscope.core.worker.base_worker.BaseWorker(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]

基类:object

BaseWorker is an abstract class that defines a worker with common functionalities for managing tasks and context in both asynchronous and multi-thread environments.

__init__(name: str, context: ~typing.Dict[str, ~typing.Any], memoryscope_context: ~memoryscope.core.utils.singleton.singleton.<locals>._singleton, context_lock=None, raise_exception: bool = True, is_multi_thread: bool = False, thread_pool: ~concurrent.futures.thread.ThreadPoolExecutor | None = None, **kwargs)[源代码]

Initializes the BaseWorker with the provided parameters.

参数:
  • name (str) -- The name of the worker.

  • context (Dict[str, Any]) -- Shared context dictionary.

  • context_lock (optional) -- Lock for synchronizing access to the context in multithread mode.

  • raise_exception (bool, optional) -- Flag to control whether exceptions should be raised.

  • is_multi_thread (bool, optional) -- Flag indicating if the worker operates in multithread mode.

  • thread_pool (ThreadPoolExecutor, optional) -- Thread pool executor for managing multithread tasks.

  • kwargs -- Additional keyword arguments.

submit_async_task(fn, *args, **kwargs)[源代码]

Submits an asynchronous task to the worker.

参数:
  • fn (callable) -- The function to be executed.

  • args -- Positional arguments for the function.

  • kwargs -- Keyword arguments for the function.

抛出:

RuntimeError -- If called in multithread mode.

gather_async_result()[源代码]

Executes all asynchronous tasks and gathers their results.

返回:

A list of results from the asynchronous tasks.

抛出:

RuntimeError -- If called in multithread mode.

submit_thread_task(fn, *args, **kwargs)[源代码]

Submits a task to be executed in a separate thread.

参数:
  • fn (callable) -- The function to be executed.

  • args -- Positional arguments for the function.

  • kwargs -- Keyword arguments for the function.

gather_thread_result()[源代码]

Gathers results of all submitted multithread tasks.

生成器:

The result of each completed task.

run()[源代码]

Executes the worker's main logic and manages execution flow and exception handling.

Uses a Timer to log the execution time of the worker.

get_workflow_context(key: str, default=None)[源代码]

Retrieves a value from the shared context.

参数:
  • key (str) -- The key for the context value.

  • default (optional) -- Default value if the key is not found.

返回:

The value from the context or the default value.

set_workflow_context(key: str, value: Any)[源代码]

Sets a value in the shared context.

参数:
  • key (str) -- The key for the context value.

  • value (Any) -- The value to be set.

has_content(key: str)[源代码]

Checks if the context contains a specific key.

参数:

key (str) -- The key to check in the context.

返回:

True if the key is in the context, otherwise False.

返回类型:

bool