Source code for data_juicer.ops.mapper.python_file_mapper

import importlib.util
import inspect
import os

from ..base_op import OPERATORS, Mapper

OP_NAME = 'python_file_mapper'


[docs] @OPERATORS.register_module(OP_NAME) class PythonFileMapper(Mapper): """Mapper for executing Python function defined in a file."""
[docs] def __init__(self, file_path: str = '', function_name: str = 'process_single', batched: bool = False, **kwargs): """ Initialization method. :param file_path: The path to the Python file containing the function to be executed. :param function_name: The name of the function defined in the file to be executed. :param batched: A boolean indicating whether to process input data in batches. :param kwargs: Additional keyword arguments passed to the parent class. """ self._batched_op = bool(batched) super().__init__(**kwargs) self.file_path = file_path self.function_name = function_name if not file_path: self.func = lambda sample: sample else: self.func = self._load_function()
def _load_function(self): if not os.path.isfile(self.file_path): raise FileNotFoundError( f"The file '{self.file_path}' does not exist.") if not self.file_path.endswith('.py'): raise ValueError( f"The file '{self.file_path}' is not a Python file.") # Load the module from the file module_name = os.path.splitext(os.path.basename(self.file_path))[0] spec = importlib.util.spec_from_file_location(module_name, self.file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Fetch the specified function from the module if not hasattr(module, self.function_name): raise ValueError( f"Function '{self.function_name}' not found in '{self.file_path}'." # noqa: E501 ) func = getattr(module, self.function_name) if not callable(func): raise ValueError( f"The attribute '{self.function_name}' is not callable.") # Check that the function has exactly one argument argspec = inspect.getfullargspec(func) if len(argspec.args) != 1: raise ValueError( f"The function '{self.function_name}' must take exactly one argument" # noqa: E501 ) return func
[docs] def process_single(self, sample): """Invoke the loaded function with the provided sample.""" result = self.func(sample) if not isinstance(result, dict): raise ValueError( f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501 ) return result
[docs] def process_batched(self, samples): """Invoke the loaded function with the provided samples.""" result = self.func(samples) if not isinstance(result, dict): raise ValueError( f'Function must return a dictionary, got {type(result).__name__} instead.' # noqa: E501 ) return result