data_juicer.core.data.ray_dataset module¶
- data_juicer.core.data.ray_dataset.convert_to_absolute_paths(samples, dataset_dir, path_keys)[source]¶
- data_juicer.core.data.ray_dataset.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[source]¶
Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.
- data_juicer.core.data.ray_dataset.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset [source]¶
- class data_juicer.core.data.ray_dataset.RayDataset(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None)[source]¶
Bases:
DJDataset
- schema() Schema [source]¶
Get dataset schema.
- Returns:
Dataset schema containing column names and types
- Return type:
- get_column(column: str, k: int | None = None) List[Any] [source]¶
Get column values from Ray dataset.
- Parameters:
column – Name of the column to retrieve
k – Optional number of rows to return. If None, returns all rows
- Returns:
List of values from the specified column
- Raises:
KeyError – If column doesn’t exist
ValueError – If k is negative
- process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset [source]¶
process a list of operators on the dataset.
- classmethod read(data_format: str, paths: str | List[str]) RayDataset [source]¶
- classmethod read_json(paths: str | List[str]) RayDataset [source]¶
- classmethod read_webdataset(paths: str | List[str]) RayDataset [source]¶
- class data_juicer.core.data.ray_dataset.JSONStreamDatasource(paths: str | List[str], is_jsonl: bool = False, *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[source]¶
Bases:
JSONDatasource
A temp Datasource for reading json stream.
Note
Depends on a customized pyarrow with open_json method.
- data_juicer.core.data.ray_dataset.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset [source]¶