[docs]@OPERATORS.register_module(OP_NAME)classPythonFileMapper(Mapper):"""Mapper for executing Python function defined in a file."""
[docs]def__init__(self,file_path:str='',function_name:str='process_single',batched:bool=False,**kwargs):""" Initialization method. :param file_path: The path to the Python file containing the function to be executed. :param function_name: The name of the function defined in the file to be executed. :param batched: A boolean indicating whether to process input data in batches. :param kwargs: Additional keyword arguments passed to the parent class. """self._batched_op=bool(batched)super().__init__(**kwargs)self.file_path=file_pathself.function_name=function_nameifnotfile_path:self.func=lambdasample:sampleelse:self.func=self._load_function()
def_load_function(self):ifnotos.path.isfile(self.file_path):raiseFileNotFoundError(f"The file '{self.file_path}' does not exist.")ifnotself.file_path.endswith('.py'):raiseValueError(f"The file '{self.file_path}' is not a Python file.")# Load the module from the filemodule_name=os.path.splitext(os.path.basename(self.file_path))[0]spec=importlib.util.spec_from_file_location(module_name,self.file_path)module=importlib.util.module_from_spec(spec)spec.loader.exec_module(module)# Fetch the specified function from the moduleifnothasattr(module,self.function_name):raiseValueError(f"Function '{self.function_name}' not found in '{self.file_path}'."# noqa: E501)func=getattr(module,self.function_name,None)ifnotcallable(func):raiseValueError(f"The attribute '{self.function_name}' is not callable.")# Check that the function has exactly one argumentargspec=inspect.getfullargspec(func)iflen(argspec.args)!=1:raiseValueError(f"The function '{self.function_name}' must take exactly one argument"# noqa: E501)returnfunc
[docs]defprocess_single(self,sample):"""Invoke the loaded function with the provided sample."""result=self.func(sample)ifnotisinstance(result,dict):raiseValueError(f'Function must return a dictionary, got {type(result).__name__} instead.'# noqa: E501)returnresult
[docs]defprocess_batched(self,samples):"""Invoke the loaded function with the provided samples."""result=self.func(samples)ifnotisinstance(result,dict):raiseValueError(f'Function must return a dictionary, got {type(result).__name__} instead.'# noqa: E501)returnresult