data_juicer.core.data.ray_dataset module

data_juicer.core.data.ray_dataset.get_abs_path(path, dataset_dir)[source]
data_juicer.core.data.ray_dataset.convert_to_absolute_paths(samples, dataset_dir, path_keys)[source]
data_juicer.core.data.ray_dataset.set_dataset_to_absolute_path(dataset, dataset_path, cfg)[source]

Set all the path in input data to absolute path. Checks dataset_dir and project_dir for valid paths.

data_juicer.core.data.ray_dataset.preprocess_dataset(dataset: Dataset, dataset_path, cfg) Dataset[source]
data_juicer.core.data.ray_dataset.get_num_gpus(op, op_proc)[source]
data_juicer.core.data.ray_dataset.filter_batch(batch, filter_func)[source]
class data_juicer.core.data.ray_dataset.RayDataset(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None)[source]

Bases: DJDataset

__init__(dataset: Dataset, dataset_path: str = None, cfg: Namespace | None = None) None[source]
schema() Schema[source]

Get dataset schema.

Returns:

Dataset schema containing column names and types

Return type:

Schema

get(k: int) List[Dict[str, Any]][source]

Get k rows from the dataset.

get_column(column: str, k: int | None = None) List[Any][source]

Get column values from Ray dataset.

Parameters:
  • column – Name of the column to retrieve

  • k – Optional number of rows to return. If None, returns all rows

Returns:

List of values from the specified column

Raises:
  • KeyError – If column doesn’t exist

  • ValueError – If k is negative

process(operators, *, exporter=None, checkpointer=None, tracer=None) DJDataset[source]

process a list of operators on the dataset.

classmethod read(data_format: str, paths: str | List[str]) RayDataset[source]
classmethod read_json(paths: str | List[str]) RayDataset[source]
classmethod read_webdataset(paths: str | List[str]) RayDataset[source]
to_list() list[source]

Convert the current dataset to a Python list.

class data_juicer.core.data.ray_dataset.JSONStreamDatasource(paths: str | List[str], is_jsonl: bool = False, *, arrow_json_args: Dict[str, Any] | None = None, **file_based_datasource_kwargs)[source]

Bases: JSONDatasource

A temp Datasource for reading json stream.

Note

Depends on a customized pyarrow with open_json method.

data_juicer.core.data.ray_dataset.read_json_stream(paths: str | List[str], *, filesystem: FileSystem | None = None, parallelism: int = -1, ray_remote_args: Dict[str, Any] = None, arrow_open_stream_args: Dict[str, Any] | None = None, meta_provider=None, partition_filter=None, partitioning=Partitioning(style='hive', base_dir='', field_names=None, field_types={}, filesystem=None), include_paths: bool = False, ignore_missing_paths: bool = False, shuffle: Literal['files'] | None = None, file_extensions: List[str] | None = ['json', 'jsonl'], concurrency: int | None = None, override_num_blocks: int | None = None, **arrow_json_args) Dataset[source]