Skip to content

base

Data Load Module - comprehensive data loading framework with multiple strategies and converters. Supports loading from local files and remote sources with automatic format detection and conversion.

DataConverter

Base class for data format converters that transform raw data into DataSample format.

Separates data format conversion logic from data loading logic for modular design. All converters must implement the convert_to_data_sample method for their specific format.

Attributes:

Name Type Description
config

Configuration parameters specific to the converter

Source code in rm_gallery/core/data/load/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class DataConverter:
    """
    Base class for data format converters that transform raw data into DataSample format.

    Separates data format conversion logic from data loading logic for modular design.
    All converters must implement the convert_to_data_sample method for their specific format.

    Attributes:
        config: Configuration parameters specific to the converter
    """

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        """
        Initialize converter with optional configuration.

        Args:
            config: Converter-specific configuration parameters
        """
        self.config = config or {}

    @abstractmethod
    def convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """
        Convert raw data dictionary to DataSample format.

        Args:
            data_dict: Raw data dictionary from the source
            source_info: Metadata about data source (file_path, dataset_name, etc.)

        Returns:
            Single DataSample or list of DataSamples

        Raises:
            NotImplementedError: If not implemented by concrete converter
        """
        pass

__init__(config=None)

Initialize converter with optional configuration.

Parameters:

Name Type Description Default
config Optional[Dict[str, Any]]

Converter-specific configuration parameters

None
Source code in rm_gallery/core/data/load/base.py
32
33
34
35
36
37
38
39
def __init__(self, config: Optional[Dict[str, Any]] = None):
    """
    Initialize converter with optional configuration.

    Args:
        config: Converter-specific configuration parameters
    """
    self.config = config or {}

convert_to_data_sample(data_dict, source_info) abstractmethod

Convert raw data dictionary to DataSample format.

Parameters:

Name Type Description Default
data_dict Dict[str, Any]

Raw data dictionary from the source

required
source_info Dict[str, Any]

Metadata about data source (file_path, dataset_name, etc.)

required

Returns:

Type Description
Union[DataSample, List[DataSample]]

Single DataSample or list of DataSamples

Raises:

Type Description
NotImplementedError

If not implemented by concrete converter

Source code in rm_gallery/core/data/load/base.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@abstractmethod
def convert_to_data_sample(
    self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> Union[DataSample, List[DataSample]]:
    """
    Convert raw data dictionary to DataSample format.

    Args:
        data_dict: Raw data dictionary from the source
        source_info: Metadata about data source (file_path, dataset_name, etc.)

    Returns:
        Single DataSample or list of DataSamples

    Raises:
        NotImplementedError: If not implemented by concrete converter
    """
    pass

DataConverterRegistry

Registry for managing data format converters with automatic discovery.

Provides decorator-based registration and factory methods for converter instantiation. Enables extensible converter ecosystem for different data formats.

Source code in rm_gallery/core/data/load/base.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class DataConverterRegistry:
    """
    Registry for managing data format converters with automatic discovery.

    Provides decorator-based registration and factory methods for converter instantiation.
    Enables extensible converter ecosystem for different data formats.
    """

    _converters: Dict[str, Type[DataConverter]] = {}

    @classmethod
    def register(cls, data_source: str):
        """
        Decorator for registering data converters with specific source identifiers.

        Args:
            data_source: String identifier for the data source format

        Returns:
            Decorator function that registers the converter class
        """

        def decorator(converter_class: Type[DataConverter]):
            cls._converters[data_source] = converter_class
            return converter_class

        return decorator

    @classmethod
    def get_converter(
        cls, data_source: str, config: Optional[Dict[str, Any]] = None
    ) -> Optional[DataConverter]:
        """
        Get converter instance for specified data source with fallback to generic.

        Args:
            data_source: Data source identifier to find converter for
            config: Configuration parameters for the converter

        Returns:
            Configured converter instance or None if not found
        """
        converter_class = cls._converters.get(data_source)
        if converter_class:
            return converter_class(config)
        return None

    @classmethod
    def list_sources(cls) -> List[str]:
        """
        List all registered data source identifiers.

        Returns:
            List of registered source identifiers
        """
        return list(cls._converters.keys())

get_converter(data_source, config=None) classmethod

Get converter instance for specified data source with fallback to generic.

Parameters:

Name Type Description Default
data_source str

Data source identifier to find converter for

required
config Optional[Dict[str, Any]]

Configuration parameters for the converter

None

Returns:

Type Description
Optional[DataConverter]

Configured converter instance or None if not found

Source code in rm_gallery/core/data/load/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@classmethod
def get_converter(
    cls, data_source: str, config: Optional[Dict[str, Any]] = None
) -> Optional[DataConverter]:
    """
    Get converter instance for specified data source with fallback to generic.

    Args:
        data_source: Data source identifier to find converter for
        config: Configuration parameters for the converter

    Returns:
        Configured converter instance or None if not found
    """
    converter_class = cls._converters.get(data_source)
    if converter_class:
        return converter_class(config)
    return None

list_sources() classmethod

List all registered data source identifiers.

Returns:

Type Description
List[str]

List of registered source identifiers

Source code in rm_gallery/core/data/load/base.py
108
109
110
111
112
113
114
115
116
@classmethod
def list_sources(cls) -> List[str]:
    """
    List all registered data source identifiers.

    Returns:
        List of registered source identifiers
    """
    return list(cls._converters.keys())

register(data_source) classmethod

Decorator for registering data converters with specific source identifiers.

Parameters:

Name Type Description Default
data_source str

String identifier for the data source format

required

Returns:

Type Description

Decorator function that registers the converter class

Source code in rm_gallery/core/data/load/base.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@classmethod
def register(cls, data_source: str):
    """
    Decorator for registering data converters with specific source identifiers.

    Args:
        data_source: String identifier for the data source format

    Returns:
        Decorator function that registers the converter class
    """

    def decorator(converter_class: Type[DataConverter]):
        cls._converters[data_source] = converter_class
        return converter_class

    return decorator

DataLoader

Bases: BaseDataModule, ABC

Abstract base class for data loading modules with multiple strategies and sources.

Defines the common interface and behavior for all data loading strategies while requiring concrete implementations to provide strategy-specific loading logic. Use create_loader() factory function to instantiate the appropriate concrete loader.

Attributes:

Name Type Description
load_strategy_type str

Strategy identifier (local/huggingface)

data_source str

Source identifier for converter selection

Input Sources
  • Local: JSON, JSONL, Parquet files or directories (FileDataLoader)
  • Remote: HuggingFace datasets with various configurations (HuggingFaceDataLoader)

Output: BaseDataSet containing converted DataSample objects

Source code in rm_gallery/core/data/load/base.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class DataLoader(BaseDataModule, ABC):
    """
    Abstract base class for data loading modules with multiple strategies and sources.

    Defines the common interface and behavior for all data loading strategies while
    requiring concrete implementations to provide strategy-specific loading logic.
    Use create_loader() factory function to instantiate the appropriate concrete loader.

    Attributes:
        load_strategy_type: Strategy identifier (local/huggingface)
        data_source: Source identifier for converter selection

    Input Sources:
        - Local: JSON, JSONL, Parquet files or directories (FileDataLoader)
        - Remote: HuggingFace datasets with various configurations (HuggingFaceDataLoader)

    Output: BaseDataSet containing converted DataSample objects
    """

    load_strategy_type: str = Field(
        default="local", description="data load strategy type (local or remote)"
    )
    data_source: str = Field(default="*", description="data source")

    def __init__(
        self,
        name: str,
        load_strategy_type: str = "local",
        data_source: str = "*",
        config: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        """
        Initialize data load module with strategy and source configuration.

        Args:
            name: Unique identifier for the loading module
            load_strategy_type: Loading strategy type (local/huggingface)
            data_source: Data source identifier for converter selection
            config: Strategy-specific configuration parameters
            metadata: Additional metadata for tracking and debugging
            **kwargs: Additional initialization parameters
        """
        super().__init__(
            module_type=DataModuleType.LOAD,
            name=name,
            load_strategy_type=load_strategy_type,
            data_source=data_source,
            config=config or {},
            metadata=metadata,
            **kwargs,
        )
        self.validate_config(config or {})

    def validate_config(self, config: Dict[str, Any]) -> None:
        """
        Validate the configuration dictionary for strategy-specific requirements.

        Override this method in subclasses to add specific validation rules
        for different loading strategies.

        Args:
            config: Configuration dictionary to validate

        Raises:
            ValueError: If configuration is invalid
        """
        pass

    @abstractmethod
    def load_data(self, **kwargs) -> List[DataSample]:
        """
        Load data from the configured source using the strategy-specific implementation.

        Each loading strategy must implement this method to handle
        the actual data loading and conversion process.

        Args:
            **kwargs: Strategy-specific loading parameters

        Returns:
            List of DataSample objects loaded from the source

        Raises:
            RuntimeError: If loading fails at any stage
        """
        pass

    def run(
        self, input_data: Union[BaseDataSet, List[DataSample], None] = None, **kwargs
    ) -> BaseDataSet:
        """
        Execute the data loading pipeline and return structured dataset.

        Loads data using the configured strategy, applies optional limits,
        and packages the result as a BaseDataSet for pipeline integration.

        Args:
            input_data: Unused for loading modules (loads from external sources)
            **kwargs: Additional parameters passed to loading strategy

        Returns:
            BaseDataSet containing loaded and converted data samples

        Raises:
            RuntimeError: If data loading process fails
        """
        try:
            # Load data using strategy
            loaded_items = self.load_data(**kwargs)

            # Convert loaded items to DataSample objects if needed
            data_samples = []
            for item in loaded_items:
                data_samples.append(item)

            # Apply limit (if specified)
            if (
                "limit" in self.config
                and self.config["limit"] is not None
                and self.config["limit"] > 0
            ):
                limit = min(int(self.config["limit"]), len(data_samples))
                data_samples = random.sample(data_samples, limit)
                logger.info(
                    f"Applied limit of {limit}, final count: {len(data_samples)}"
                )

            # Create output dataset
            output_dataset = BaseDataSet(
                name=self.name,
                metadata={
                    "source": self.data_source,
                    "strategy_type": self.load_strategy_type,
                    "config": self.config,
                },
                datasamples=data_samples,
            )
            logger.info(
                f"Successfully loaded {len(data_samples)} items from {self.data_source}"
            )

            return output_dataset
        except Exception as e:
            error_msg = f"Data loading failed: {str(e)}"
            logger.error(error_msg)
            raise RuntimeError(error_msg) from e

__init__(name, load_strategy_type='local', data_source='*', config=None, metadata=None, **kwargs)

Initialize data load module with strategy and source configuration.

Parameters:

Name Type Description Default
name str

Unique identifier for the loading module

required
load_strategy_type str

Loading strategy type (local/huggingface)

'local'
data_source str

Data source identifier for converter selection

'*'
config Optional[Dict[str, Any]]

Strategy-specific configuration parameters

None
metadata Optional[Dict[str, Any]]

Additional metadata for tracking and debugging

None
**kwargs

Additional initialization parameters

{}
Source code in rm_gallery/core/data/load/base.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def __init__(
    self,
    name: str,
    load_strategy_type: str = "local",
    data_source: str = "*",
    config: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
    **kwargs,
):
    """
    Initialize data load module with strategy and source configuration.

    Args:
        name: Unique identifier for the loading module
        load_strategy_type: Loading strategy type (local/huggingface)
        data_source: Data source identifier for converter selection
        config: Strategy-specific configuration parameters
        metadata: Additional metadata for tracking and debugging
        **kwargs: Additional initialization parameters
    """
    super().__init__(
        module_type=DataModuleType.LOAD,
        name=name,
        load_strategy_type=load_strategy_type,
        data_source=data_source,
        config=config or {},
        metadata=metadata,
        **kwargs,
    )
    self.validate_config(config or {})

load_data(**kwargs) abstractmethod

Load data from the configured source using the strategy-specific implementation.

Each loading strategy must implement this method to handle the actual data loading and conversion process.

Parameters:

Name Type Description Default
**kwargs

Strategy-specific loading parameters

{}

Returns:

Type Description
List[DataSample]

List of DataSample objects loaded from the source

Raises:

Type Description
RuntimeError

If loading fails at any stage

Source code in rm_gallery/core/data/load/base.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@abstractmethod
def load_data(self, **kwargs) -> List[DataSample]:
    """
    Load data from the configured source using the strategy-specific implementation.

    Each loading strategy must implement this method to handle
    the actual data loading and conversion process.

    Args:
        **kwargs: Strategy-specific loading parameters

    Returns:
        List of DataSample objects loaded from the source

    Raises:
        RuntimeError: If loading fails at any stage
    """
    pass

run(input_data=None, **kwargs)

Execute the data loading pipeline and return structured dataset.

Loads data using the configured strategy, applies optional limits, and packages the result as a BaseDataSet for pipeline integration.

Parameters:

Name Type Description Default
input_data Union[BaseDataSet, List[DataSample], None]

Unused for loading modules (loads from external sources)

None
**kwargs

Additional parameters passed to loading strategy

{}

Returns:

Type Description
BaseDataSet

BaseDataSet containing loaded and converted data samples

Raises:

Type Description
RuntimeError

If data loading process fails

Source code in rm_gallery/core/data/load/base.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def run(
    self, input_data: Union[BaseDataSet, List[DataSample], None] = None, **kwargs
) -> BaseDataSet:
    """
    Execute the data loading pipeline and return structured dataset.

    Loads data using the configured strategy, applies optional limits,
    and packages the result as a BaseDataSet for pipeline integration.

    Args:
        input_data: Unused for loading modules (loads from external sources)
        **kwargs: Additional parameters passed to loading strategy

    Returns:
        BaseDataSet containing loaded and converted data samples

    Raises:
        RuntimeError: If data loading process fails
    """
    try:
        # Load data using strategy
        loaded_items = self.load_data(**kwargs)

        # Convert loaded items to DataSample objects if needed
        data_samples = []
        for item in loaded_items:
            data_samples.append(item)

        # Apply limit (if specified)
        if (
            "limit" in self.config
            and self.config["limit"] is not None
            and self.config["limit"] > 0
        ):
            limit = min(int(self.config["limit"]), len(data_samples))
            data_samples = random.sample(data_samples, limit)
            logger.info(
                f"Applied limit of {limit}, final count: {len(data_samples)}"
            )

        # Create output dataset
        output_dataset = BaseDataSet(
            name=self.name,
            metadata={
                "source": self.data_source,
                "strategy_type": self.load_strategy_type,
                "config": self.config,
            },
            datasamples=data_samples,
        )
        logger.info(
            f"Successfully loaded {len(data_samples)} items from {self.data_source}"
        )

        return output_dataset
    except Exception as e:
        error_msg = f"Data loading failed: {str(e)}"
        logger.error(error_msg)
        raise RuntimeError(error_msg) from e

validate_config(config)

Validate the configuration dictionary for strategy-specific requirements.

Override this method in subclasses to add specific validation rules for different loading strategies.

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to validate

required

Raises:

Type Description
ValueError

If configuration is invalid

Source code in rm_gallery/core/data/load/base.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def validate_config(self, config: Dict[str, Any]) -> None:
    """
    Validate the configuration dictionary for strategy-specific requirements.

    Override this method in subclasses to add specific validation rules
    for different loading strategies.

    Args:
        config: Configuration dictionary to validate

    Raises:
        ValueError: If configuration is invalid
    """
    pass

FileDataLoader

Bases: DataLoader

File-based data loading strategy for local JSON, JSONL, and Parquet files.

Supports loading from single files or entire directories with recursive file discovery. Automatically detects file formats and applies appropriate parsers with error handling.

Configuration Requirements
  • path: File or directory path to load from
Supported Formats
  • JSON: Single object or array of objects
  • JSONL: One JSON object per line
  • Parquet: Columnar data format with pandas integration
Source code in rm_gallery/core/data/load/base.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
class FileDataLoader(DataLoader):
    """
    File-based data loading strategy for local JSON, JSONL, and Parquet files.

    Supports loading from single files or entire directories with recursive file discovery.
    Automatically detects file formats and applies appropriate parsers with error handling.

    Configuration Requirements:
        - path: File or directory path to load from

    Supported Formats:
        - JSON: Single object or array of objects
        - JSONL: One JSON object per line
        - Parquet: Columnar data format with pandas integration
    """

    def __init__(self, **kwargs):
        """
        Initialize file loading strategy with converter registration.

        Args:
            **kwargs: Initialization parameters passed to parent class
        """
        super().__init__(**kwargs)
        # Initialize data_converter after parent initialization as a normal attribute
        converter = DataConverterRegistry.get_converter(self.data_source, self.config)
        # Set as a normal Python attribute, not a Pydantic field
        object.__setattr__(self, "data_converter", converter)

    def validate_config(self, config: Dict[str, Any]) -> None:
        """
        Validate file loading configuration requirements.

        Args:
            config: Configuration dictionary to validate

        Raises:
            ValueError: If required configuration is missing or invalid
            FileNotFoundError: If specified path does not exist
        """
        if "path" not in config:
            raise ValueError("File data strategy requires 'path' in config")
        if not isinstance(config["path"], str):
            raise ValueError("'path' must be a string")

        path = Path(config["path"])
        if not path.exists():
            raise FileNotFoundError(f"Could not find path '{path}'")

        # If it's a file, validate the file format
        if path.is_file():
            ext = path.suffix.lower()
            if ext not in [".json", ".jsonl", ".parquet"]:
                raise ValueError(
                    f"Unsupported file format: {ext}. Supported formats: .json, .jsonl, .parquet"
                )
        # If it's a directory, check if it contains any supported files
        elif path.is_dir():
            supported_files = self._find_supported_files(path)
            if not supported_files:
                raise ValueError(
                    f"Directory '{path}' contains no supported files. Supported formats: .json, .jsonl, .parquet"
                )
        else:
            raise ValueError(f"Path '{path}' is neither a file nor a directory")

    def _find_supported_files(self, directory: Path) -> List[Path]:
        """
        Recursively find all supported files in directory and subdirectories.

        Args:
            directory: Directory path to search

        Returns:
            Sorted list of supported file paths
        """
        supported_extensions = {".json", ".jsonl", ".parquet"}
        supported_files = []

        # Walk through directory and all subdirectories
        for file_path in directory.rglob("*"):
            if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
                supported_files.append(file_path)

        # Sort files for consistent ordering
        return sorted(supported_files)

    def load_data(self, **kwargs) -> List[DataSample]:
        path = Path(self.config["path"])

        try:
            all_data_samples = []

            # If it's a single file, load it directly
            if path.is_file():
                ext = path.suffix.lower()
                if ext == ".json":
                    file_data = self._load_json(path, source_file_path=path)
                elif ext == ".jsonl":
                    file_data = self._load_jsonl(path, source_file_path=path)
                elif ext == ".parquet":
                    file_data = self._load_parquet(path, source_file_path=path)
                else:
                    raise ValueError(f"Unsupported file format: {ext}")
                all_data_samples.extend(file_data)
                logger.info(f"Loaded {len(file_data)} samples from file: {path}")

            # If it's a directory, load all supported files
            elif path.is_dir():
                supported_files = self._find_supported_files(path)
                logger.info(
                    f"Found {len(supported_files)} supported files in directory: {path}"
                )

                for file_path in supported_files:
                    try:
                        ext = file_path.suffix.lower()
                        if ext == ".json":
                            file_data = self._load_json(
                                file_path, source_file_path=file_path
                            )
                        elif ext == ".jsonl":
                            file_data = self._load_jsonl(
                                file_path, source_file_path=file_path
                            )
                        elif ext == ".parquet":
                            file_data = self._load_parquet(
                                file_path, source_file_path=file_path
                            )
                        else:
                            logger.warning(
                                f"Skipping unsupported file format: {file_path}"
                            )
                            continue

                        all_data_samples.extend(file_data)
                        logger.info(
                            f"Loaded {len(file_data)} samples from file: {file_path}"
                        )

                    except Exception as e:
                        logger.error(f"Failed to load data from {file_path}: {str(e)}")
                        # Continue with other files instead of failing completely
                        continue

                logger.info(
                    f"Total loaded {len(all_data_samples)} samples from {len(supported_files)} files"
                )

            else:
                raise ValueError(f"Path '{path}' is neither a file nor a directory")

            return all_data_samples

        except Exception as e:
            raise RuntimeError(f"Failed to load data from {path}: {str(e)}")

    def _load_json(self, path: Path, source_file_path: Path) -> List[DataSample]:
        """Load data from JSON file"""
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)

        all_samples = []
        if isinstance(data, list):
            for item in data:
                samples = self._convert_to_data_sample(item, source_file_path)
                if isinstance(samples, list):
                    # Add group ID for samples from the same original data
                    group_id = str(uuid.uuid4())
                    for sample in samples:
                        if sample.metadata is None:
                            sample.metadata = {}
                        sample.metadata["data_group_id"] = group_id
                    all_samples.extend(samples)
                else:
                    all_samples.append(samples)
        elif isinstance(data, dict):
            samples = self._convert_to_data_sample(data, source_file_path)
            if isinstance(samples, list):
                # Add group ID for samples from the same original data
                group_id = str(uuid.uuid4())
                for sample in samples:
                    if sample.metadata is None:
                        sample.metadata = {}
                    sample.metadata["data_group_id"] = group_id
                all_samples.extend(samples)
            else:
                all_samples.append(samples)
        else:
            raise ValueError("Invalid JSON format: expected list or dict")

        return all_samples

    def _load_jsonl(self, path: Path, source_file_path: Path) -> List[DataSample]:
        """Load data from JSONL file"""
        data_list = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                if line.strip():  # Skip empty lines
                    data = json.loads(line)
                    samples = self._convert_to_data_sample(data, source_file_path)
                    if isinstance(samples, list):
                        # Add group ID for samples from the same original data
                        group_id = str(uuid.uuid4())
                        for sample in samples:
                            if sample.metadata is None:
                                sample.metadata = {}
                            sample.metadata["data_group_id"] = group_id
                        data_list.extend(samples)
                    else:
                        data_list.append(samples)
        return data_list

    def _load_parquet(self, path: Path, source_file_path: Path) -> List[DataSample]:
        """Load data from Parquet file"""
        try:
            df = pd.read_parquet(path)
        except ImportError:
            raise ImportError("Please install pandas package: pip install pandas")

        data_list = []
        for _, row in df.iterrows():
            try:
                # Convert row to dict and handle any non-serializable types
                data_dict = {}
                for k, v in row.items():
                    if hasattr(v, "item"):
                        try:
                            data_dict[k] = v.item()
                        except (ValueError, AttributeError):
                            # if array type, convert to list and handle nested structures
                            if hasattr(v, "tolist"):
                                data_dict[k] = v.tolist()
                            else:
                                data_dict[k] = v
                    elif hasattr(v, "tolist"):
                        # Handle numpy arrays
                        data_dict[k] = v.tolist()
                    else:
                        data_dict[k] = v

                # ensure data dict contains necessary fields
                if "prompt" not in data_dict:
                    logger.warning(f"Row missing 'prompt' field, skipping: {data_dict}")
                    continue

                # convert data to DataSample object
                samples = self._convert_to_data_sample(data_dict, source_file_path)
                if samples is not None:
                    if isinstance(samples, list):
                        # Add group ID for samples from the same original data
                        group_id = str(uuid.uuid4())
                        for sample in samples:
                            if sample.metadata is None:
                                sample.metadata = {}
                            sample.metadata["data_group_id"] = group_id
                        data_list.extend(samples)
                    else:
                        data_list.append(samples)
            except Exception as e:
                logger.error(f"Error processing row: {str(e)}")
                continue

        return data_list

    def _convert_to_data_sample(
        self, data_dict: Dict[str, Any], source_file_path: Path
    ) -> Union[DataSample, List[DataSample]]:
        """Convert raw data dictionary to DataSample format"""
        if hasattr(self, "data_converter") and self.data_converter:
            source_info = {
                "source_file_path": str(source_file_path),
                "load_type": "local",
            }
            return self.data_converter.convert_to_data_sample(data_dict, source_info)
        else:
            # Fallback to abstract method for backward compatibility
            return self._convert_to_data_sample_impl(data_dict, source_file_path)

    def _convert_to_data_sample_impl(
        self, data_dict: Dict[str, Any], source_file_path: Path
    ) -> DataSample:
        """Abstract method for backward compatibility - override in subclasses if not using converters"""
        raise NotImplementedError(
            "Either use a data converter or implement _convert_to_data_sample_impl method"
        )

__init__(**kwargs)

Initialize file loading strategy with converter registration.

Parameters:

Name Type Description Default
**kwargs

Initialization parameters passed to parent class

{}
Source code in rm_gallery/core/data/load/base.py
285
286
287
288
289
290
291
292
293
294
295
296
def __init__(self, **kwargs):
    """
    Initialize file loading strategy with converter registration.

    Args:
        **kwargs: Initialization parameters passed to parent class
    """
    super().__init__(**kwargs)
    # Initialize data_converter after parent initialization as a normal attribute
    converter = DataConverterRegistry.get_converter(self.data_source, self.config)
    # Set as a normal Python attribute, not a Pydantic field
    object.__setattr__(self, "data_converter", converter)

validate_config(config)

Validate file loading configuration requirements.

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to validate

required

Raises:

Type Description
ValueError

If required configuration is missing or invalid

FileNotFoundError

If specified path does not exist

Source code in rm_gallery/core/data/load/base.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def validate_config(self, config: Dict[str, Any]) -> None:
    """
    Validate file loading configuration requirements.

    Args:
        config: Configuration dictionary to validate

    Raises:
        ValueError: If required configuration is missing or invalid
        FileNotFoundError: If specified path does not exist
    """
    if "path" not in config:
        raise ValueError("File data strategy requires 'path' in config")
    if not isinstance(config["path"], str):
        raise ValueError("'path' must be a string")

    path = Path(config["path"])
    if not path.exists():
        raise FileNotFoundError(f"Could not find path '{path}'")

    # If it's a file, validate the file format
    if path.is_file():
        ext = path.suffix.lower()
        if ext not in [".json", ".jsonl", ".parquet"]:
            raise ValueError(
                f"Unsupported file format: {ext}. Supported formats: .json, .jsonl, .parquet"
            )
    # If it's a directory, check if it contains any supported files
    elif path.is_dir():
        supported_files = self._find_supported_files(path)
        if not supported_files:
            raise ValueError(
                f"Directory '{path}' contains no supported files. Supported formats: .json, .jsonl, .parquet"
            )
    else:
        raise ValueError(f"Path '{path}' is neither a file nor a directory")

HuggingFaceDataLoader

Bases: DataLoader

HuggingFace dataset loading strategy for remote datasets from Hugging Face Hub.

Supports streaming and non-streaming modes with configurable splits and trust settings. Automatically handles dataset download, caching, and conversion to internal format.

Configuration Options
  • dataset_config: Optional dataset configuration name
  • huggingface_split: Dataset split to load (train/test/validation)
  • streaming: Enable streaming mode for large datasets
  • trust_remote_code: Allow execution of remote code in datasets
  • limit: Maximum number of samples to load (streaming mode)
Source code in rm_gallery/core/data/load/base.py
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
class HuggingFaceDataLoader(DataLoader):
    """
    HuggingFace dataset loading strategy for remote datasets from Hugging Face Hub.

    Supports streaming and non-streaming modes with configurable splits and trust settings.
    Automatically handles dataset download, caching, and conversion to internal format.

    Configuration Options:
        - dataset_config: Optional dataset configuration name
        - huggingface_split: Dataset split to load (train/test/validation)
        - streaming: Enable streaming mode for large datasets
        - trust_remote_code: Allow execution of remote code in datasets
        - limit: Maximum number of samples to load (streaming mode)
    """

    def __init__(self, **kwargs):
        """
        Initialize HuggingFace loading strategy with converter registration.

        Args:
            **kwargs: Initialization parameters passed to parent class
        """
        super().__init__(**kwargs)
        # Initialize data_converter after parent initialization as a normal attribute
        converter = DataConverterRegistry.get_converter(self.data_source, self.config)
        # Set as a normal Python attribute, not a Pydantic field
        object.__setattr__(self, "data_converter", converter)

    def validate_config(self, config: Dict[str, Any]) -> None:
        """
        Validate HuggingFace dataset configuration parameters.

        Args:
            config: Configuration dictionary to validate

        Note:
            Currently minimal validation - can be extended for specific requirements
        """
        pass

    def load_data(self, **kwargs) -> List[DataSample]:
        """
        Load data from HuggingFace dataset with automatic conversion.

        Downloads dataset from HuggingFace Hub, applies configured settings,
        and converts each item to DataSample format using registered converters.

        Args:
            **kwargs: Additional loading parameters

        Returns:
            List of converted DataSample objects

        Raises:
            RuntimeError: If dataset loading or conversion fails
        """
        dataset_name = self.name
        dataset_config = self.config.get("dataset_config", None)
        split = self.config.get("huggingface_split", "train")
        streaming = self.config.get("streaming", False)
        trust_remote_code = self.config.get("trust_remote_code", False)

        try:
            logger.info(
                f"Loading dataset: {dataset_name}, config: {dataset_config}, split: {split}"
            )

            # Load dataset from HuggingFace
            dataset = load_dataset(
                dataset_name,
                dataset_config,
                split=split,
                streaming=streaming,
                trust_remote_code=trust_remote_code,
            )

            # Convert to list if streaming
            if streaming:
                # For streaming datasets, take a limited number of samples
                limit = self.config.get("limit", 1000)
                dataset_items = []
                for i, item in enumerate(dataset):
                    if i >= limit:
                        break
                    dataset_items.append(item)
            else:
                dataset_items = dataset

            # Convert to DataSample objects
            data_samples = []
            for item in dataset_items:
                try:
                    samples = self._convert_to_data_sample(item)
                    if samples is not None:
                        if isinstance(samples, list):
                            # Add group ID for samples from the same original data
                            group_id = str(uuid.uuid4())
                            for sample in samples:
                                if sample.metadata is None:
                                    sample.metadata = {}
                                sample.metadata["data_group_id"] = group_id
                            data_samples.extend(samples)
                        else:
                            data_samples.append(samples)
                except Exception as e:
                    logger.error(f"Error converting item to DataSample: {str(e)}")
                    continue

            logger.info(
                f"Successfully loaded {len(data_samples)} samples from HuggingFace dataset: {dataset_name}"
            )
            return data_samples

        except Exception as e:
            raise RuntimeError(
                f"Failed to load data from HuggingFace dataset {dataset_name}: {str(e)}"
            )

    def _convert_to_data_sample(
        self, data_dict: Dict[str, Any]
    ) -> Union[DataSample, List[DataSample]]:
        """
        Convert raw HuggingFace data dictionary to DataSample format using registered converter.

        Args:
            data_dict: Raw data item from HuggingFace dataset

        Returns:
            Converted DataSample object(s) or None if conversion fails
        """
        if hasattr(self, "data_converter") and self.data_converter:
            source_info = {
                "dataset_name": self.config.get("name"),
                "load_type": "huggingface",
                "dataset_config": self.config.get("dataset_config"),
                "split": self.config.get("huggingface_split", "train"),
            }
            return self.data_converter.convert_to_data_sample(data_dict, source_info)
        else:
            # Fallback to abstract method for backward compatibility
            return self._convert_to_data_sample_impl(data_dict)

    def _convert_to_data_sample_impl(self, data_dict: Dict[str, Any]) -> DataSample:
        """
        Abstract fallback method for backward compatibility.

        Args:
            data_dict: Raw data dictionary to convert

        Returns:
            DataSample object

        Raises:
            NotImplementedError: If no converter is available and method not implemented
        """
        raise NotImplementedError(
            "Either use a data converter or implement _convert_to_data_sample_impl method"
        )

__init__(**kwargs)

Initialize HuggingFace loading strategy with converter registration.

Parameters:

Name Type Description Default
**kwargs

Initialization parameters passed to parent class

{}
Source code in rm_gallery/core/data/load/base.py
572
573
574
575
576
577
578
579
580
581
582
583
def __init__(self, **kwargs):
    """
    Initialize HuggingFace loading strategy with converter registration.

    Args:
        **kwargs: Initialization parameters passed to parent class
    """
    super().__init__(**kwargs)
    # Initialize data_converter after parent initialization as a normal attribute
    converter = DataConverterRegistry.get_converter(self.data_source, self.config)
    # Set as a normal Python attribute, not a Pydantic field
    object.__setattr__(self, "data_converter", converter)

load_data(**kwargs)

Load data from HuggingFace dataset with automatic conversion.

Downloads dataset from HuggingFace Hub, applies configured settings, and converts each item to DataSample format using registered converters.

Parameters:

Name Type Description Default
**kwargs

Additional loading parameters

{}

Returns:

Type Description
List[DataSample]

List of converted DataSample objects

Raises:

Type Description
RuntimeError

If dataset loading or conversion fails

Source code in rm_gallery/core/data/load/base.py
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
def load_data(self, **kwargs) -> List[DataSample]:
    """
    Load data from HuggingFace dataset with automatic conversion.

    Downloads dataset from HuggingFace Hub, applies configured settings,
    and converts each item to DataSample format using registered converters.

    Args:
        **kwargs: Additional loading parameters

    Returns:
        List of converted DataSample objects

    Raises:
        RuntimeError: If dataset loading or conversion fails
    """
    dataset_name = self.name
    dataset_config = self.config.get("dataset_config", None)
    split = self.config.get("huggingface_split", "train")
    streaming = self.config.get("streaming", False)
    trust_remote_code = self.config.get("trust_remote_code", False)

    try:
        logger.info(
            f"Loading dataset: {dataset_name}, config: {dataset_config}, split: {split}"
        )

        # Load dataset from HuggingFace
        dataset = load_dataset(
            dataset_name,
            dataset_config,
            split=split,
            streaming=streaming,
            trust_remote_code=trust_remote_code,
        )

        # Convert to list if streaming
        if streaming:
            # For streaming datasets, take a limited number of samples
            limit = self.config.get("limit", 1000)
            dataset_items = []
            for i, item in enumerate(dataset):
                if i >= limit:
                    break
                dataset_items.append(item)
        else:
            dataset_items = dataset

        # Convert to DataSample objects
        data_samples = []
        for item in dataset_items:
            try:
                samples = self._convert_to_data_sample(item)
                if samples is not None:
                    if isinstance(samples, list):
                        # Add group ID for samples from the same original data
                        group_id = str(uuid.uuid4())
                        for sample in samples:
                            if sample.metadata is None:
                                sample.metadata = {}
                            sample.metadata["data_group_id"] = group_id
                        data_samples.extend(samples)
                    else:
                        data_samples.append(samples)
            except Exception as e:
                logger.error(f"Error converting item to DataSample: {str(e)}")
                continue

        logger.info(
            f"Successfully loaded {len(data_samples)} samples from HuggingFace dataset: {dataset_name}"
        )
        return data_samples

    except Exception as e:
        raise RuntimeError(
            f"Failed to load data from HuggingFace dataset {dataset_name}: {str(e)}"
        )

validate_config(config)

Validate HuggingFace dataset configuration parameters.

Parameters:

Name Type Description Default
config Dict[str, Any]

Configuration dictionary to validate

required
Note

Currently minimal validation - can be extended for specific requirements

Source code in rm_gallery/core/data/load/base.py
585
586
587
588
589
590
591
592
593
594
595
def validate_config(self, config: Dict[str, Any]) -> None:
    """
    Validate HuggingFace dataset configuration parameters.

    Args:
        config: Configuration dictionary to validate

    Note:
        Currently minimal validation - can be extended for specific requirements
    """
    pass

create_loader(name, load_strategy_type='local', data_source='*', config=None, metadata=None)

Factory function to create data loading module with specified strategy.

Automatically selects and instantiates the appropriate concrete loader class based on the load_strategy_type parameter.

Parameters:

Name Type Description Default
name str

Unique identifier for the loading module

required
load_strategy_type str

Loading strategy type (local/huggingface)

'local'
data_source str

Data source identifier for converter selection

'*'
config Optional[Dict[str, Any]]

Strategy-specific configuration parameters

None
metadata Optional[Dict[str, Any]]

Additional metadata for tracking and debugging

None

Returns:

Type Description
DataLoader

Configured concrete DataLoader instance ready for pipeline integration

Raises:

Type Description
ValueError

If unsupported strategy type is specified

Source code in rm_gallery/core/data/load/base.py
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
def create_loader(
    name: str,
    load_strategy_type: str = "local",
    data_source: str = "*",
    config: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> DataLoader:
    """
    Factory function to create data loading module with specified strategy.

    Automatically selects and instantiates the appropriate concrete loader class
    based on the load_strategy_type parameter.

    Args:
        name: Unique identifier for the loading module
        load_strategy_type: Loading strategy type (local/huggingface)
        data_source: Data source identifier for converter selection
        config: Strategy-specific configuration parameters
        metadata: Additional metadata for tracking and debugging

    Returns:
        Configured concrete DataLoader instance ready for pipeline integration

    Raises:
        ValueError: If unsupported strategy type is specified
    """
    # Choose strategy based on load_strategy_type
    if load_strategy_type == "local":
        return FileDataLoader(
            name=name,
            load_strategy_type=load_strategy_type,
            data_source=data_source,
            config=config,
            metadata=metadata,
        )
    elif load_strategy_type == "huggingface":
        return HuggingFaceDataLoader(
            name=name,
            load_strategy_type=load_strategy_type,
            data_source=data_source,
            config=config,
            metadata=metadata,
        )
    else:
        raise ValueError(f"Unsupported load strategy type: {load_strategy_type}")