data_juicer.core.data.ray_dataset module

data_juicer.core.data.ray_dataset.get_abs_path(path, dataset_dir)[源代码]
data_juicer.core.data.ray_dataset.convert_to_absolute_paths(samples, dataset_dir, path_keys)[源代码]
data_juicer.core.data.ray_dataset.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[源代码]

Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.

data_juicer.core.data.ray_dataset.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset[源代码]
data_juicer.core.data.ray_dataset.get_num_gpus(op, op_proc)[源代码]
data_juicer.core.data.ray_dataset.filter_batch(batch, filter_func)[源代码]
class data_juicer.core.data.ray_dataset.RayDataset(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None)[源代码]

基类:DJDataset

__init__(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None) None[源代码]
schema() Schema[源代码]

Get dataset schema.

返回:

Dataset schema containing column names and types

返回类型:

Schema

get(k: int) List[Dict[str, Any]][源代码]

Get k rows from the dataset.

get_column(column: str, k: int | None = None) List[Any][源代码]

Get column values from Ray dataset.

参数:
  • column -- Name of the column to retrieve

  • k -- Optional number of rows to return. If None, returns all rows

返回:

List of values from the specified column

抛出:
  • KeyError -- If column doesn't exist

  • ValueError -- If k is negative

process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset[源代码]

process a list of operators on the dataset.

classmethod read(data_format: str, paths: str | List[str]) RayDataset[源代码]
classmethod read_json(paths: str | List[str]) RayDataset[源代码]
classmethod read_webdataset(paths: str | List[str]) RayDataset[源代码]
to_list() list[源代码]

Convert the current dataset to a Python list.

class data_juicer.core.data.ray_dataset.JSONStreamDatasource(paths: str | List[str], is_jsonl: bool = False, *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[源代码]

基类:JSONDatasource

A temp Datasource for reading json stream.

备注

Depends on a customized pyarrow with open_json method.

data_juicer.core.data.ray_dataset.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset[源代码]