Skip to content

build

Data Build Module - core data pipeline orchestrator for end-to-end data processing. Coordinates loading, processing, annotation, and export stages with flexible configuration.

DataBuilder

Bases: BaseDataModule

Main pipeline orchestrator that coordinates all data processing stages.

Manages the complete data workflow from raw input to final export format, executing each stage in sequence while maintaining data integrity and logging.

Attributes:

Name Type Description
load_module Optional[DataLoader]

Optional data loading component for ingesting external data

process_module Optional[DataProcessor]

Optional processing component for filtering and transforming data

annotation_module Optional[DataAnnotator]

Optional annotation component for adding labels and metadata

export_module Optional[DataExporter]

Optional export component for outputting data in target formats

Source code in rm_gallery/core/data/build.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class DataBuilder(BaseDataModule):
    """
    Main pipeline orchestrator that coordinates all data processing stages.

    Manages the complete data workflow from raw input to final export format,
    executing each stage in sequence while maintaining data integrity and logging.

    Attributes:
        load_module: Optional data loading component for ingesting external data
        process_module: Optional processing component for filtering and transforming data
        annotation_module: Optional annotation component for adding labels and metadata
        export_module: Optional export component for outputting data in target formats
    """

    load_module: Optional[DataLoader] = Field(default=None)
    process_module: Optional[DataProcessor] = Field(default=None)
    annotation_module: Optional[DataAnnotator] = Field(default=None)
    export_module: Optional[DataExporter] = Field(default=None)

    def __init__(
        self,
        name: str,
        config: Optional[Dict[str, Any]] = None,
        metadata: Optional[Dict[str, Any]] = None,
        **modules,
    ):
        """
        Initialize the data build pipeline with specified modules.

        Args:
            name: Unique identifier for the pipeline instance
            config: Pipeline-level configuration parameters
            metadata: Additional metadata for tracking and debugging
            **modules: Keyword arguments for individual pipeline modules
        """
        super().__init__(
            module_type=DataModuleType.BUILD,
            name=name,
            config=config,
            metadata=metadata,
            **modules,
        )

    def run(
        self, input_data: Union[BaseDataSet, List[DataSample], None] = None, **kwargs
    ) -> BaseDataSet:
        """
        Execute the complete data processing pipeline with all configured stages.

        Processes data through sequential stages: loading → processing → annotation → export.
        Each stage is optional and only executed if the corresponding module is configured.

        Args:
            input_data: Initial dataset, list of samples, or None for load-only pipelines
            **kwargs: Additional runtime parameters passed to individual modules

        Returns:
            Final processed dataset after all stages complete

        Raises:
            Exception: If any pipeline stage fails, with detailed error logging
        """
        try:
            current_data = input_data
            logger.info(f"Starting data build pipeline: {self.name}")

            # Define pipeline stages with human-readable names
            stages = [
                ("Loading", self.load_module),
                ("Processing", self.process_module),
                ("Annotation", self.annotation_module),
                ("Export", self.export_module),
            ]

            for stage_name, module in stages:
                if module:
                    logger.info(f"Stage: {stage_name}")
                    current_data = module.run(current_data)
                    logger.info(f"{stage_name} completed: {len(current_data)} items")

            logger.info(f"Pipeline completed: {len(current_data)} items processed")
            return current_data

        except Exception as e:
            logger.error(f"Pipeline execution failed: {str(e)}")
            raise e

__init__(name, config=None, metadata=None, **modules)

Initialize the data build pipeline with specified modules.

Parameters:

Name Type Description Default
name str

Unique identifier for the pipeline instance

required
config Optional[Dict[str, Any]]

Pipeline-level configuration parameters

None
metadata Optional[Dict[str, Any]]

Additional metadata for tracking and debugging

None
**modules

Keyword arguments for individual pipeline modules

{}
Source code in rm_gallery/core/data/build.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    name: str,
    config: Optional[Dict[str, Any]] = None,
    metadata: Optional[Dict[str, Any]] = None,
    **modules,
):
    """
    Initialize the data build pipeline with specified modules.

    Args:
        name: Unique identifier for the pipeline instance
        config: Pipeline-level configuration parameters
        metadata: Additional metadata for tracking and debugging
        **modules: Keyword arguments for individual pipeline modules
    """
    super().__init__(
        module_type=DataModuleType.BUILD,
        name=name,
        config=config,
        metadata=metadata,
        **modules,
    )

run(input_data=None, **kwargs)

Execute the complete data processing pipeline with all configured stages.

Processes data through sequential stages: loading → processing → annotation → export. Each stage is optional and only executed if the corresponding module is configured.

Parameters:

Name Type Description Default
input_data Union[BaseDataSet, List[DataSample], None]

Initial dataset, list of samples, or None for load-only pipelines

None
**kwargs

Additional runtime parameters passed to individual modules

{}

Returns:

Type Description
BaseDataSet

Final processed dataset after all stages complete

Raises:

Type Description
Exception

If any pipeline stage fails, with detailed error logging

Source code in rm_gallery/core/data/build.py
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def run(
    self, input_data: Union[BaseDataSet, List[DataSample], None] = None, **kwargs
) -> BaseDataSet:
    """
    Execute the complete data processing pipeline with all configured stages.

    Processes data through sequential stages: loading → processing → annotation → export.
    Each stage is optional and only executed if the corresponding module is configured.

    Args:
        input_data: Initial dataset, list of samples, or None for load-only pipelines
        **kwargs: Additional runtime parameters passed to individual modules

    Returns:
        Final processed dataset after all stages complete

    Raises:
        Exception: If any pipeline stage fails, with detailed error logging
    """
    try:
        current_data = input_data
        logger.info(f"Starting data build pipeline: {self.name}")

        # Define pipeline stages with human-readable names
        stages = [
            ("Loading", self.load_module),
            ("Processing", self.process_module),
            ("Annotation", self.annotation_module),
            ("Export", self.export_module),
        ]

        for stage_name, module in stages:
            if module:
                logger.info(f"Stage: {stage_name}")
                current_data = module.run(current_data)
                logger.info(f"{stage_name} completed: {len(current_data)} items")

        logger.info(f"Pipeline completed: {len(current_data)} items processed")
        return current_data

    except Exception as e:
        logger.error(f"Pipeline execution failed: {str(e)}")
        raise e

create_builder(name, config=None, **modules)

Factory function to create a data build module with specified configuration.

Parameters:

Name Type Description Default
name str

Unique identifier for the pipeline

required
config Optional[Dict[str, Any]]

Pipeline configuration parameters

None
**modules

Individual module instances to include in the pipeline

{}

Returns:

Type Description
DataBuilder

Configured DataBuilder instance ready for execution

Source code in rm_gallery/core/data/build.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
def create_builder(
    name: str, config: Optional[Dict[str, Any]] = None, **modules
) -> DataBuilder:
    """
    Factory function to create a data build module with specified configuration.

    Args:
        name: Unique identifier for the pipeline
        config: Pipeline configuration parameters
        **modules: Individual module instances to include in the pipeline

    Returns:
        Configured DataBuilder instance ready for execution
    """
    return DataBuilder(name=name, config=config, **modules)

create_builder_from_yaml(config_path)

Create a data build module from YAML configuration file.

Supports comprehensive pipeline configuration including data sources, processing operators, annotation settings, and export formats.

Parameters:

Name Type Description Default
config_path str

Path to YAML configuration file

required

Returns:

Type Description
DataBuilder

Fully configured DataBuilder instance based on YAML specification

Raises:

Type Description
FileNotFoundError

If configuration file does not exist

ValueError

If configuration format is invalid

Source code in rm_gallery/core/data/build.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def create_builder_from_yaml(config_path: str) -> DataBuilder:
    """
    Create a data build module from YAML configuration file.

    Supports comprehensive pipeline configuration including data sources,
    processing operators, annotation settings, and export formats.

    Args:
        config_path: Path to YAML configuration file

    Returns:
        Fully configured DataBuilder instance based on YAML specification

    Raises:
        FileNotFoundError: If configuration file does not exist
        ValueError: If configuration format is invalid
    """
    config_path = Path(config_path)
    if not config_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_path}")

    config = read_yaml(config_path)

    # Support new dataset structure
    if "dataset" in config:
        return _create_from_dataset_config(config["dataset"])
    else:
        raise ValueError("Invalid configuration file")