[docs]@OPERATORS.register_module(OP_NAME)classPythonFileMapper(Mapper):"""Executes a Python function defined in a file on input data. This operator loads a specified Python function from a given file and applies it to the input data. The function must take exactly one argument and return a dictionary. The operator can process data either sample by sample or in batches, depending on the `batched` parameter. If the file path is not provided, the operator acts as an identity function, returning the input sample unchanged. The function is loaded dynamically, and its name and file path are configurable. Important notes: - The file must be a valid Python file (`.py`). - The function must be callable and accept exactly one argument. - The function's return value must be a dictionary."""
[docs]def__init__(self,file_path:str="",function_name:str="process_single",batched:bool=False,**kwargs):""" Initialization method. :param file_path: The path to the Python file containing the function to be executed. :param function_name: The name of the function defined in the file to be executed. :param batched: A boolean indicating whether to process input data in batches. :param kwargs: Additional keyword arguments passed to the parent class. """self._batched_op=bool(batched)super().__init__(**kwargs)self.file_path=file_pathself.function_name=function_nameifnotfile_path:self.func=lambdasample:sampleelse:self.func=self._load_function()
def_load_function(self):ifnotos.path.isfile(self.file_path):raiseFileNotFoundError(f"The file '{self.file_path}' does not exist.")ifnotself.file_path.endswith(".py"):raiseValueError(f"The file '{self.file_path}' is not a Python file.")# Load the module from the filemodule_name=os.path.splitext(os.path.basename(self.file_path))[0]spec=importlib.util.spec_from_file_location(module_name,self.file_path)module=importlib.util.module_from_spec(spec)spec.loader.exec_module(module)# Fetch the specified function from the moduleifnothasattr(module,self.function_name):raiseValueError(f"Function '{self.function_name}' not found in '{self.file_path}'.")# noqa: E501func=getattr(module,self.function_name,None)ifnotcallable(func):raiseValueError(f"The attribute '{self.function_name}' is not callable.")# Check that the function has exactly one argumentargspec=inspect.getfullargspec(func)iflen(argspec.args)!=1:raiseValueError(f"The function '{self.function_name}' must take exactly one argument")# noqa: E501returnfunc
[docs]defprocess_single(self,sample):"""Invoke the loaded function with the provided sample."""result=self.func(sample)ifnotisinstance(result,dict):raiseValueError(f"Function must return a dictionary, got {type(result).__name__} instead.")# noqa: E501returnresult
[docs]defprocess_batched(self,samples):"""Invoke the loaded function with the provided samples."""result=self.func(samples)ifnotisinstance(result,dict):raiseValueError(f"Function must return a dictionary, got {type(result).__name__} instead.")# noqa: E501returnresult