[docs]@OPERATORS.register_module(OP_NAME)classPythonFileMapper(Mapper):"""Mapper for executing Python function defined in a file."""
[docs]def__init__(self,file_path:str="",function_name:str="process_single",batched:bool=False,**kwargs):""" Initialization method. :param file_path: The path to the Python file containing the function to be executed. :param function_name: The name of the function defined in the file to be executed. :param batched: A boolean indicating whether to process input data in batches. :param kwargs: Additional keyword arguments passed to the parent class. """self._batched_op=bool(batched)super().__init__(**kwargs)self.file_path=file_pathself.function_name=function_nameifnotfile_path:self.func=lambdasample:sampleelse:self.func=self._load_function()
def_load_function(self):ifnotos.path.isfile(self.file_path):raiseFileNotFoundError(f"The file '{self.file_path}' does not exist.")ifnotself.file_path.endswith(".py"):raiseValueError(f"The file '{self.file_path}' is not a Python file.")# Load the module from the filemodule_name=os.path.splitext(os.path.basename(self.file_path))[0]spec=importlib.util.spec_from_file_location(module_name,self.file_path)module=importlib.util.module_from_spec(spec)spec.loader.exec_module(module)# Fetch the specified function from the moduleifnothasattr(module,self.function_name):raiseValueError(f"Function '{self.function_name}' not found in '{self.file_path}'.")# noqa: E501func=getattr(module,self.function_name,None)ifnotcallable(func):raiseValueError(f"The attribute '{self.function_name}' is not callable.")# Check that the function has exactly one argumentargspec=inspect.getfullargspec(func)iflen(argspec.args)!=1:raiseValueError(f"The function '{self.function_name}' must take exactly one argument")# noqa: E501returnfunc
[docs]defprocess_single(self,sample):"""Invoke the loaded function with the provided sample."""result=self.func(sample)ifnotisinstance(result,dict):raiseValueError(f"Function must return a dictionary, got {type(result).__name__} instead.")# noqa: E501returnresult
[docs]defprocess_batched(self,samples):"""Invoke the loaded function with the provided samples."""result=self.func(samples)ifnotisinstance(result,dict):raiseValueError(f"Function must return a dictionary, got {type(result).__name__} instead.")# noqa: E501returnresult