data_juicer.core.data package¶
Submodules¶
data_juicer.core.data.config_validator module¶
- exception data_juicer.core.data.config_validator.ConfigValidationError[source]¶
Bases:
Exception
Custom exception for validation errors
data_juicer.core.data.data_validator module¶
- class data_juicer.core.data.data_validator.DataValidator(config: Dict)[source]¶
Bases:
ABC
Base class for data validation
- abstract validate(dataset: DJDataset) None [source]¶
Validate dataset content
- Parameters:
dataset – The dataset to validate
- Raises:
DataValidationError – If validation fails
- exception data_juicer.core.data.data_validator.DataValidationError[source]¶
Bases:
Exception
Custom exception for data validation errors
- class data_juicer.core.data.data_validator.DataValidatorRegistry[source]¶
Bases:
object
Registry for data validators
- classmethod get_validator(validator_type: str) Type[DataValidator] | None [source]¶
- class data_juicer.core.data.data_validator.BaseConversationValidator(config: Dict)[source]¶
Bases:
DataValidator
Base class for conversation validators
- class data_juicer.core.data.data_validator.SwiftMessagesValidator(config: Dict)[source]¶
Bases:
BaseConversationValidator
Validator for Swift Messages conversation format.
This validator ensures conversations follow the Swift Messages format with proper message structure and role assignments.
- Parameters:
config (Dict) –
Configuration dictionary containing: min_turns (int, optional): Minimum number of messages.
Defaults to 1.
- max_turns (int, optional): Maximum number of messages.
Defaults to 100.
- sample_size (int, optional): Number of samples to validate.
Defaults to 100.
- Example Format:
{ "messages": [ {"role": "system", "content": "<system>"}, {"role": "user", "content": "<query>"}, {"role": "assistant", "content": "<response>"}, ... ] }
- Raises:
DataValidationError – If validation fails due to: - Missing ‘messages’ field - Invalid message structure - Invalid role values - Missing content - Message count outside allowed range
- class data_juicer.core.data.data_validator.DataJuicerFormatValidator(config: Dict)[source]¶
Bases:
BaseConversationValidator
Validator for Data-Juicer default conversation format.
This validator ensures conversations follow the Data-Juicer format with proper fields and structure.
- Parameters:
config (Dict) –
Configuration dictionary containing: min_turns (int, optional): Minimum number of conversation turns.
Defaults to 1.
- max_turns (int, optional): Maximum number of conversation turns.
Defaults to 100.
- sample_size (int, optional): Number of samples to validate.
Defaults to 100.
- Example Format:
{ "system": "<system>", # Optional "instruction": "<query-inst>", "query": "<query2>", "response": "<response2>", "history": [ # Optional ["<query1>", "<response1>"], ... ] }
- Raises:
DataValidationError – If validation fails due to: - Missing required fields - Invalid field types - Invalid conversation structure - Turn count outside allowed range
- class data_juicer.core.data.data_validator.CodeDataValidator(config: Dict)[source]¶
Bases:
DataValidator
Validator for code data
- validate(dataset: DJDataset) None [source]¶
Validate dataset content
- Parameters:
dataset – The dataset to validate
- Raises:
DataValidationError – If validation fails
- class data_juicer.core.data.data_validator.RequiredFieldsValidator(config: Dict)[source]¶
Bases:
DataValidator
Validator that checks for required fields in dataset.
This validator ensures that specified fields exist in the dataset and optionally checks their types and missing value ratios.
- Parameters:
config (Dict) – Configuration dictionary containing: required_fields (List[str]): List of field names that must exist field_types (Dict[str, type], optional): Map of field names to expected types allow_missing (float, optional): Maximum ratio of missing values allowed. Defaults to 0.0.
- Example Config:
{ "required_fields": ["field1", "field2"], "field_types": {"field1": str, "field2": int}, "allow_missing": 0.0 }
- Raises:
DataValidationError – If validation fails
- __init__(config: Dict)[source]¶
Initialize validator with config
- Parameters:
config – Dict containing: - required_fields: List of field names that must exist - field_types: Optional map of field names to expected types - allow_missing: Optional float for max ratio missing allowed
- validate(dataset: DJDataset) None [source]¶
Validate dataset has required fields with correct types
- Parameters:
dataset – NestedDataset or RayDataset to validate
- Raises:
DataValidationError – If validation fails
data_juicer.core.data.dataset_builder module¶
- class data_juicer.core.data.dataset_builder.DatasetBuilder(cfg: Namespace, executor_type: str = 'default')[source]¶
Bases:
object
DatasetBuilder is a class that builds a dataset from a configuration.
- data_juicer.core.data.dataset_builder.rewrite_cli_datapath(dataset_path, max_sample_num=None) List [source]¶
rewrite the dataset_path from CLI into proper dataset config format that is compatible with YAML config style; retrofitting CLI input of local files and huggingface path
- Parameters:
dataset_path – a dataset file or a dataset dir or a list of them, e.g. <w1> ds1.jsonl <w2> ds2_dir <w3> ds3_file.json
max_sample_num – the maximum number of samples to load
- Returns:
list of dataset configs
- data_juicer.core.data.dataset_builder.parse_cli_datapath(dataset_path) Tuple[List[str], List[float]] [source]¶
Split every dataset path and its weight.
- Parameters:
dataset_path – a dataset file or a dataset dir or a list of them, e.g. <w1> ds1.jsonl <w2> ds2_dir <w3> ds3_file.json
- Returns:
list of dataset path and list of weights
data_juicer.core.data.dj_dataset module¶
- class data_juicer.core.data.dj_dataset.DJDataset[source]¶
Bases:
ABC
Base dataset of DJ
- abstract process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset [source]¶
process a list of operators on the dataset.
- abstract schema() Schema [source]¶
Get dataset schema.
- Returns:
Dataset schema containing column names and types
- Return type:
- abstract get(k: int) List[Dict[str, Any]] [source]¶
Get k rows from the dataset.
- Parameters:
k – Number of rows to take
- Returns:
A list of rows from the dataset.
- Return type:
List[Any]
- abstract get_column(column: str, k: int | None = None) List[Any] [source]¶
Get values from a specific column/field, optionally limited to first k rows.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist in dataset
ValueError – If k is negative
- data_juicer.core.data.dj_dataset.wrap_func_with_nested_access(f)[source]¶
Before conducting actual function f, wrap its args and kargs into nested ones.
- Parameters:
f – function to be wrapped.
- Returns:
wrapped function
- data_juicer.core.data.dj_dataset.nested_obj_factory(obj)[source]¶
Use nested classes to wrap the input object.
- Parameters:
obj – object to be nested.
- Returns:
nested object
- class data_juicer.core.data.dj_dataset.NestedQueryDict(*args, **kargs)[source]¶
Bases:
dict
Enhanced dict for better usability.
- class data_juicer.core.data.dj_dataset.NestedDatasetDict(*args, **kargs)[source]¶
Bases:
DatasetDict
Enhanced HuggingFace-DatasetDict for better usability and efficiency.
- class data_juicer.core.data.dj_dataset.NestedDataset(*args, **kargs)[source]¶
Bases:
Dataset
,DJDataset
Enhanced HuggingFace-Dataset for better usability and efficiency.
- get_column(column: str, k: int | None = None) List[Any] [source]¶
Get column values from HuggingFace dataset.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist
ValueError – If k is negative
- process(operators, *, work_dir=None, exporter=None, checkpointer=None, tracer=None, adapter=None, open_monitor=True)[source]¶
process a list of operators on the dataset.
- map(*args, **kargs)[source]¶
Override the map func, which is called by most common operations, such that the processed samples can be accessed by nested manner.
- filter(*args, **kargs)[source]¶
Override the filter func, which is called by most common operations, such that the processed samples can be accessed by nested manner.
- select(*args, **kargs)[source]¶
Override the select func, such that selected samples can be accessed by nested manner.
- classmethod from_dict(*args, **kargs)[source]¶
Override the from_dict func, which is called by most from_xx constructors, such that the constructed dataset object is NestedDataset.
- add_column(*args, **kargs)[source]¶
Override the add column func, such that the processed samples can be accessed by nested manner.
- select_columns(*args, **kargs)[source]¶
Override the select columns func, such that the processed samples can be accessed by nested manner.
- remove_columns(*args, **kargs)[source]¶
Override the remove columns func, such that the processed samples can be accessed by nested manner.
- cleanup_cache_files()[source]¶
Override the cleanup_cache_files func, clear raw and compressed cache files.
- static load_from_disk(*args, **kargs)[source]¶
Loads a dataset that was previously saved using [save_to_disk] from a dataset directory, or from a filesystem using any implementation of fsspec.spec.AbstractFileSystem.
- Parameters:
dataset_path (path-like) – Path (e.g. “dataset/train”) or remote URI (e.g. “s3//my-bucket/dataset/train”) of the dataset directory where the dataset will be loaded from.
keep_in_memory (bool, defaults to None) – Whether to copy the dataset in-memory. If None, the dataset will not be copied in-memory unless explicitly enabled by setting datasets.config.IN_MEMORY_MAX_SIZE to nonzero. See more details in the [improve performance](../cache#improve-performance) section.
storage_options (dict, optional) –
Key/value pairs to be passed on to the file-system backend, if any.
<Added version=”2.8.0”/>
- Returns:
If dataset_path is a path of a dataset directory, the dataset requested.
If dataset_path is a path of a dataset dict directory, a datasets.DatasetDict with each split.
- Return type:
[Dataset] or [DatasetDict]
Example:
`py >>> ds = load_from_disk("path/to/dataset/directory") `
- data_juicer.core.data.dj_dataset.nested_query(root_obj: NestedDatasetDict | NestedDataset | NestedQueryDict, key)[source]¶
Find item from a given object, by first checking flatten layer, then checking nested layers.
- Parameters:
root_obj – the object
key – the stored item to be queried, e.g., “meta” or “meta.date”
- Returns:
- data_juicer.core.data.dj_dataset.add_same_content_to_new_column(sample, new_column_name, initial_value=None)[source]¶
A helper function to speed up add_column function. Apply map on this function in parallel instead of using add_column. :param sample: a single sample to add this new column/field. :param new_column_name: the name of this new column/field. :param initial_value: the initial value of this new column/field.
data_juicer.core.data.load_strategy module¶
- class data_juicer.core.data.load_strategy.StrategyKey(executor_type: str, data_type: str, data_source: str)[source]¶
Bases:
object
Immutable key for strategy registration with wildcard support
- executor_type: str¶
- data_type: str¶
- data_source: str¶
- matches(other: StrategyKey) bool [source]¶
Check if this key matches another key with wildcard support
Supports Unix-style wildcards: - ‘*’ matches any string - ‘?’ matches any single character - ‘[seq]’ matches any character in seq - ‘[!seq]’ matches any character not in seq
- __init__(executor_type: str, data_type: str, data_source: str) None ¶
- class data_juicer.core.data.load_strategy.DataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
ABC
,ConfigValidator
abstract class for data load strategy
- class data_juicer.core.data.load_strategy.DataLoadStrategyRegistry[source]¶
Bases:
object
Flexible strategy registry with wildcard matching
- classmethod get_strategy_class(executor_type: str, data_type: str, data_source: str) Type[DataLoadStrategy] | None [source]¶
Retrieve the most specific matching strategy
Matching priority: 1. Exact match 2. Wildcard matches from most specific to most general
- classmethod register(executor_type: str, data_type: str, data_source: str)[source]¶
Decorator for registering data load strategies with wildcard support
- Parameters:
executor_type – Type of executor (e.g., ‘default’, ‘ray’)
data_type – Type of data (e.g., ‘local’, ‘remote’)
data_source – Specific data source (e.g., ‘arxiv’, ‘s3’)
- Returns:
Decorator function
- class data_juicer.core.data.load_strategy.RayDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DataLoadStrategy
abstract class for data load strategy for RayExecutor
- class data_juicer.core.data.load_strategy.DefaultDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DataLoadStrategy
abstract class for data load strategy for LocalExecutor
- class data_juicer.core.data.load_strategy.RayLocalJsonDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
RayDataLoadStrategy
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.RayHuggingfaceDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
RayDataLoadStrategy
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.DefaultLocalDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for on disk data for LocalExecutor rely on AutoFormatter for actual data loading
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.DefaultHuggingfaceDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for Huggingface dataset for LocalExecutor
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'optional_fields': ['split', 'limit', 'name', 'data_files', 'data_dir'], 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.DefaultModelScopeDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for ModelScope dataset for LocalExecutor
- class data_juicer.core.data.load_strategy.DefaultArxivDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for arxiv dataset for LocalExecutor
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.DefaultWikiDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for wiki dataset for LocalExecutor
- CONFIG_VALIDATION_RULES = {'custom_validators': {}, 'field_types': {'path': <class 'str'>}, 'required_fields': ['path']}¶
- class data_juicer.core.data.load_strategy.DefaultCommonCrawlDataLoadStrategy(ds_config: Dict, cfg: Namespace)[source]¶
Bases:
DefaultDataLoadStrategy
data load strategy for commoncrawl dataset for LocalExecutor
- CONFIG_VALIDATION_RULES = {'custom_validators': {'end_snapshot': <function validate_snapshot_format>, 'start_snashot': <function validate_snapshot_format>, 'url_limit': <function DefaultCommonCrawlDataLoadStrategy.<lambda>>}, 'field_types': {'end_snapshot': <class 'str'>, 'start_snapshot': <class 'str'>}, 'optional_fields': ['aws', 'url_limit'], 'required_fields': ['start_snapshot', 'end_snapshot']}¶
data_juicer.core.data.ray_dataset module¶
- data_juicer.core.data.ray_dataset.convert_to_absolute_paths(samples, dataset_dir, path_keys)[source]¶
- data_juicer.core.data.ray_dataset.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[source]¶
Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.
- data_juicer.core.data.ray_dataset.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset [source]¶
- class data_juicer.core.data.ray_dataset.RayDataset(dataset: Dataset, dataset_path: str | None = None, cfg: Namespace | None = None)[source]¶
Bases:
DJDataset
- __init__(dataset: Dataset, dataset_path: str | None = None, cfg: Namespace | None = None) None [source]¶
- schema() Schema [source]¶
Get dataset schema.
- Returns:
Dataset schema containing column names and types
- Return type:
- get_column(column: str, k: int | None = None) List[Any] [source]¶
Get column values from Ray dataset.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist
ValueError – If k is negative
- process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset [source]¶
process a list of operators on the dataset.
- classmethod read_json(paths: str | List[str]) RayDataset [source]¶
- class data_juicer.core.data.ray_dataset.JSONStreamDatasource(paths: str | List[str], *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[source]¶
Bases:
JSONDatasource
A temp Datasource for reading json stream.
Note
Depends on a customized pyarrow with open_json method.
- data_juicer.core.data.ray_dataset.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] | None = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset [source]¶
data_juicer.core.data.schema module¶
- class data_juicer.core.data.schema.Schema(column_types: Dict[str, Any], columns: List[str])[source]¶
Bases:
object
Dataset schema representation.
- column_types¶
Mapping of column names to their types
- Type:
Dict[str, Any]
- columns¶
List of column names in order
- Type:
List[str]
- column_types: Dict[str, Any]¶
- columns: List[str]¶
- classmethod map_hf_type_to_python(feature)[source]¶
Map HuggingFace feature type to Python type.
Recursively maps nested types (e.g., List[str], Dict[str, int]).
Examples
Value(‘string’) -> str Sequence(Value(‘int32’)) -> List[int] Dict({‘text’: Value(‘string’)}) -> Dict[str, Any]
- Parameters:
feature – HuggingFace feature type
- Returns:
Corresponding Python type
- classmethod map_ray_type_to_python(ray_type: DataType) type [source]¶
Map Ray/Arrow data type to Python type.
- Parameters:
ray_type – PyArrow DataType
- Returns:
Corresponding Python type
- __init__(column_types: Dict[str, Any], columns: List[str]) None ¶
Module contents¶
- class data_juicer.core.data.DJDataset[source]¶
Bases:
ABC
Base dataset of DJ
- abstract process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset [source]¶
process a list of operators on the dataset.
- abstract schema() Schema [source]¶
Get dataset schema.
- Returns:
Dataset schema containing column names and types
- Return type:
- abstract get(k: int) List[Dict[str, Any]] [source]¶
Get k rows from the dataset.
- Parameters:
k – Number of rows to take
- Returns:
A list of rows from the dataset.
- Return type:
List[Any]
- abstract get_column(column: str, k: int | None = None) List[Any] [source]¶
Get values from a specific column/field, optionally limited to first k rows.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist in dataset
ValueError – If k is negative
- class data_juicer.core.data.NestedDataset(*args, **kargs)[source]¶
Bases:
Dataset
,DJDataset
Enhanced HuggingFace-Dataset for better usability and efficiency.
- get_column(column: str, k: int | None = None) List[Any] [source]¶
Get column values from HuggingFace dataset.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist
ValueError – If k is negative
- process(operators, *, work_dir=None, exporter=None, checkpointer=None, tracer=None, adapter=None, open_monitor=True)[source]¶
process a list of operators on the dataset.
- map(*args, **kargs)[source]¶
Override the map func, which is called by most common operations, such that the processed samples can be accessed by nested manner.
- filter(*args, **kargs)[source]¶
Override the filter func, which is called by most common operations, such that the processed samples can be accessed by nested manner.
- select(*args, **kargs)[source]¶
Override the select func, such that selected samples can be accessed by nested manner.
- classmethod from_dict(*args, **kargs)[source]¶
Override the from_dict func, which is called by most from_xx constructors, such that the constructed dataset object is NestedDataset.
- add_column(*args, **kargs)[source]¶
Override the add column func, such that the processed samples can be accessed by nested manner.
- select_columns(*args, **kargs)[source]¶
Override the select columns func, such that the processed samples can be accessed by nested manner.
- remove_columns(*args, **kargs)[source]¶
Override the remove columns func, such that the processed samples can be accessed by nested manner.
- cleanup_cache_files()[source]¶
Override the cleanup_cache_files func, clear raw and compressed cache files.
- static load_from_disk(*args, **kargs)[source]¶
Loads a dataset that was previously saved using [save_to_disk] from a dataset directory, or from a filesystem using any implementation of fsspec.spec.AbstractFileSystem.
- Parameters:
dataset_path (path-like) – Path (e.g. “dataset/train”) or remote URI (e.g. “s3//my-bucket/dataset/train”) of the dataset directory where the dataset will be loaded from.
keep_in_memory (bool, defaults to None) – Whether to copy the dataset in-memory. If None, the dataset will not be copied in-memory unless explicitly enabled by setting datasets.config.IN_MEMORY_MAX_SIZE to nonzero. See more details in the [improve performance](../cache#improve-performance) section.
storage_options (dict, optional) –
Key/value pairs to be passed on to the file-system backend, if any.
<Added version=”2.8.0”/>
- Returns:
If dataset_path is a path of a dataset directory, the dataset requested.
If dataset_path is a path of a dataset dict directory, a datasets.DatasetDict with each split.
- Return type:
[Dataset] or [DatasetDict]
Example:
`py >>> ds = load_from_disk("path/to/dataset/directory") `
- data_juicer.core.data.wrap_func_with_nested_access(f)[source]¶
Before conducting actual function f, wrap its args and kargs into nested ones.
- Parameters:
f – function to be wrapped.
- Returns:
wrapped function
- data_juicer.core.data.add_same_content_to_new_column(sample, new_column_name, initial_value=None)[source]¶
A helper function to speed up add_column function. Apply map on this function in parallel instead of using add_column. :param sample: a single sample to add this new column/field. :param new_column_name: the name of this new column/field. :param initial_value: the initial value of this new column/field.