[docs]classRayExporter:"""The Exporter class is used to export a ray dataset to files of specific format."""# TODO: support config for export, some export methods require additional args_SUPPORTED_FORMATS={"json","jsonl","parquet","csv","tfrecords","webdataset","lance",# 'images',# 'numpy',}
[docs]def__init__(self,export_path,keep_stats_in_res_ds=True,keep_hashes_in_res_ds=False):""" Initialization method. :param export_path: the path to export datasets. :param keep_stats_in_res_ds: whether to keep stats in the result dataset. :param keep_hashes_in_res_ds: whether to keep hashes in the result dataset. """self.export_path=export_pathself.keep_stats_in_res_ds=keep_stats_in_res_dsself.keep_hashes_in_res_ds=keep_hashes_in_res_dsself.export_format=self._get_export_format(export_path)
def_get_export_format(self,export_path):""" Get the suffix of export path and check if it's supported. We only support ["jsonl", "json", "parquet"] for now. :param export_path: the path to export datasets. :return: the export data format. """suffix=os.path.splitext(export_path)[-1].strip(".")ifnotsuffix:logger.warning(f'export_path "{export_path}" does not have a suffix. 'f'We will use "jsonl" as the default export type.')suffix="jsonl"export_format=suffixifexport_formatnotinself._SUPPORTED_FORMATS:raiseNotImplementedError(f'export data format "{export_format}" is not supported 'f"for now. Only support {self._SUPPORTED_FORMATS}.")returnexport_formatdef_export_impl(self,dataset,export_path,columns=None):""" Export a dataset to specific path. :param dataset: the dataset to export. :param export_path: the path to export the dataset. :param columns: the columns to export. :return: """feature_fields=dataset.columns()ifnotcolumnselsecolumnsremoved_fields=[]ifnotself.keep_stats_in_res_ds:extra_fields={Fields.stats,Fields.meta}removed_fields.extend(list(extra_fields.intersection(feature_fields)))ifnotself.keep_hashes_in_res_ds:extra_fields={HashKeys.hash,HashKeys.minhash,HashKeys.simhash,HashKeys.imagehash,HashKeys.videohash,}removed_fields.extend(list(extra_fields.intersection(feature_fields)))iflen(removed_fields):dataset=dataset.drop_columns(removed_fields)ifself.export_formatin{"json","jsonl"}:returndataset.write_json(export_path,force_ascii=False)else:returngetattr(dataset,f"write_{self.export_format}")(export_path)
[docs]defexport(self,dataset,columns=None):""" Export method for a dataset. :param dataset: the dataset to export. :param columns: the columns to export. :return: """self._export_impl(dataset,self.export_path,columns)