data_juicer.core.data.ray_dataset module¶
- data_juicer.core.data.ray_dataset.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[源代码]¶
Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.
- data_juicer.core.data.ray_dataset.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset [源代码]¶
- class data_juicer.core.data.ray_dataset.RayDataset(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None)[源代码]¶
基类:
DJDataset
- schema() Schema [源代码]¶
Get dataset schema.
- 返回:
Dataset schema containing column names and types
- 返回类型:
- get_column(column: str, k: int | None = None) List[Any] [源代码]¶
Get column values from Ray dataset.
- 参数:
column -- Name of the column to retrieve
k -- Optional number of rows to return. If None, returns all rows
- 返回:
List of values from the specified column
- 抛出:
KeyError -- If column doesn't exist
ValueError -- If k is negative
- process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset [源代码]¶
process a list of operators on the dataset.
- classmethod read(data_format: str, paths: str | List[str]) RayDataset [源代码]¶
- classmethod read_json(paths: str | List[str]) RayDataset [源代码]¶
- classmethod read_webdataset(paths: str | List[str]) RayDataset [源代码]¶
- class data_juicer.core.data.ray_dataset.JSONStreamDatasource(paths: str | List[str], is_jsonl: bool = False, *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[源代码]¶
基类:
JSONDatasource
A temp Datasource for reading json stream.
备注
Depends on a customized pyarrow with open_json method.
- data_juicer.core.data.ray_dataset.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset [源代码]¶