import importlib.util
import inspect
import os
from ..base_op import OPERATORS, Mapper
OP_NAME = 'python_file_mapper'
[docs]
@OPERATORS.register_module(OP_NAME)
class PythonFileMapper(Mapper):
"""Mapper for executing Python function defined in a file."""
[docs]
def __init__(self,
file_path: str = '',
function_name: str = 'process_single',
batched: bool = False,
**kwargs):
"""
Initialization method.
:param file_path: The path to the Python file containing the function
to be executed.
:param function_name: The name of the function defined in the file
to be executed.
:param batched: A boolean indicating whether to process input data in
batches.
:param kwargs: Additional keyword arguments passed to the parent class.
"""
self._batched_op = bool(batched)
super().__init__(**kwargs)
self.file_path = file_path
self.function_name = function_name
if not file_path:
self.func = lambda sample: sample
else:
self.func = self._load_function()
def _load_function(self):
if not os.path.isfile(self.file_path):
raise FileNotFoundError(
f"The file '{self.file_path}' does not exist.")
if not self.file_path.endswith('.py'):
raise ValueError(
f"The file '{self.file_path}' is not a Python file.")
# Load the module from the file
module_name = os.path.splitext(os.path.basename(self.file_path))[0]
spec = importlib.util.spec_from_file_location(module_name,
self.file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Fetch the specified function from the module
if not hasattr(module, self.function_name):
raise ValueError(
f"Function '{self.function_name}' not found in '{self.file_path}'." # noqa: E501
)
func = getattr(module, self.function_name)
if not callable(func):
raise ValueError(
f"The attribute '{self.function_name}' is not callable.")
# Check that the function has exactly one argument
argspec = inspect.getfullargspec(func)
if len(argspec.args) != 1:
raise ValueError(
f"The function '{self.function_name}' must take exactly one argument" # noqa: E501
)
return func
[docs]
def process_single(self, sample):
"""Invoke the loaded function with the provided sample."""
result = self.func(sample)
if not isinstance(result, dict):
raise ValueError(
f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501
)
return result
[docs]
def process_batched(self, samples):
"""Invoke the loaded function with the provided samples."""
result = self.func(samples)
if not isinstance(result, dict):
raise ValueError(
f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501
)
return result