Source code for data_juicer.ops.mapper.python_file_mapper

import importlib.util
import inspect
import os

from ..base_op import OPERATORS, Mapper

OP_NAME = "python_file_mapper"


[docs] @OPERATORS.register_module(OP_NAME) class PythonFileMapper(Mapper): """Mapper for executing Python function defined in a file."""
[docs] def __init__(self, file_path: str = "", function_name: str = "process_single", batched: bool = False, **kwargs): """ Initialization method. :param file_path: The path to the Python file containing the function to be executed. :param function_name: The name of the function defined in the file to be executed. :param batched: A boolean indicating whether to process input data in batches. :param kwargs: Additional keyword arguments passed to the parent class. """ self._batched_op = bool(batched) super().__init__(**kwargs) self.file_path = file_path self.function_name = function_name if not file_path: self.func = lambda sample: sample else: self.func = self._load_function()
def _load_function(self): if not os.path.isfile(self.file_path): raise FileNotFoundError(f"The file '{self.file_path}' does not exist.") if not self.file_path.endswith(".py"): raise ValueError(f"The file '{self.file_path}' is not a Python file.") # Load the module from the file module_name = os.path.splitext(os.path.basename(self.file_path))[0] spec = importlib.util.spec_from_file_location(module_name, self.file_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Fetch the specified function from the module if not hasattr(module, self.function_name): raise ValueError(f"Function '{self.function_name}' not found in '{self.file_path}'.") # noqa: E501 func = getattr(module, self.function_name, None) if not callable(func): raise ValueError(f"The attribute '{self.function_name}' is not callable.") # Check that the function has exactly one argument argspec = inspect.getfullargspec(func) if len(argspec.args) != 1: raise ValueError(f"The function '{self.function_name}' must take exactly one argument") # noqa: E501 return func
[docs] def process_single(self, sample): """Invoke the loaded function with the provided sample.""" result = self.func(sample) if not isinstance(result, dict): raise ValueError(f"Function must return a dictionary, got {type(result).__name__} instead.") # noqa: E501 return result
[docs] def process_batched(self, samples): """Invoke the loaded function with the provided samples.""" result = self.func(samples) if not isinstance(result, dict): raise ValueError(f"Function must return a dictionary, got {type(result).__name__} instead.") # noqa: E501 return result