data_juicer.utils.compress module

class data_juicer.utils.compress.FileLock(lock_file: str | PathLike[str], timeout: float = -1, mode: int = 420, thread_local: bool = True, *, blocking: bool = True, is_singleton: bool = False, **kwargs: Any)[source]

Bases: FileLock

File lock for compression or decompression, and remove lock file automatically.

class data_juicer.utils.compress.Extractor[source]

Bases: Extractor

Extract content from a compressed file.

classmethod extract(input_path: Path | str, output_path: Path | str, extractor_format: str)[source]

Extract content from a compressed file.

Parameters:
  • input_path – path to compressed file.

  • output_path – path to uncompressed file.

  • extractor_format – extraction format, see supported algorithm in Extractor of huggingface dataset.

class data_juicer.utils.compress.BaseCompressor[source]

Bases: ABC

Base class that compresses a file.

abstractmethod static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.ZstdCompressor[source]

Bases: BaseCompressor

This class compresses a file using the zstd algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.Lz4Compressor[source]

Bases: BaseCompressor

This class compresses a file using the lz4 algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress a input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.GzipCompressor[source]

Bases: BaseCompressor

This class compresses a file using the gzip algorithm.

static compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

class data_juicer.utils.compress.Compressor[source]

Bases: object

This class that contains multiple compressors.

compressors: Dict[str, Type[BaseCompressor]] = {'gzip': <class 'data_juicer.utils.compress.GzipCompressor'>, 'lz4': <class 'data_juicer.utils.compress.Lz4Compressor'>, 'zstd': <class 'data_juicer.utils.compress.ZstdCompressor'>}
classmethod compress(input_path: Path | str, output_path: Path | str, compressor_format: str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

  • compressor_format – compression format, see supported algorithm in compressors.

class data_juicer.utils.compress.CompressManager(compressor_format: str = 'zstd')[source]

Bases: object

This class is used to compress or decompress a input file using compression format algorithms.

__init__(compressor_format: str = 'zstd')[source]

Initialization method.

Parameters:

compressor_format – compression format algorithms, default zstd.

compress(input_path: Path | str, output_path: Path | str)[source]

Compress input file and save to output file.

Parameters:
  • input_path – path to uncompressed file.

  • output_path – path to compressed file.

decompress(input_path: Path | str, output_path: Path | str)[source]

Decompress input file and save to output file.

Parameters:
  • input_path – path to compressed file.

  • output_path – path to uncompressed file.

class data_juicer.utils.compress.CacheCompressManager(compressor_format: str = 'zstd')[source]

Bases: object

This class is used to compress or decompress huggingface cache files using compression format algorithms.

__init__(compressor_format: str = 'zstd')[source]

Initialization method.

Parameters:

compressor_format – compression format algorithms, default zstd.

compress(prev_ds: Dataset, this_ds: Dataset = None, num_proc: int = 1)[source]

Compress cache files with fingerprint in dataset cache directory.

Parameters:
  • prev_ds – previous dataset whose cache files need to be compressed here.

  • this_ds – Current dataset that is computed from the previous dataset. There might be overlaps between cache files of them, so we must not compress cache files that will be used again in the current dataset. If it’s None, it means all cache files of previous dataset should be compressed.

  • num_proc – number of processes to compress cache files.

decompress(ds: Dataset, fingerprints: str | List[str] = None, num_proc: int = 1)[source]

Decompress compressed cache files with fingerprint in dataset cache directory.

Parameters:
  • ds – input dataset.

  • fingerprints – fingerprints of cache files. String or List are accepted. If None, we will find all cache files which starts with cache- and ends with compression format.

  • num_proc – number of processes to decompress cache files.

format_cache_file_name(cache_file_name: str | None) str | None[source]

Use * to replace the sub rank in a cache file name. :param cache_file_name: a cache file name.

cleanup_cache_files(ds)[source]

Clean up all compressed cache files in dataset cache directory, which starts with cache- and ends with compression format :param ds: input dataset.

class data_juicer.utils.compress.CompressionOff[source]

Bases: object

Define a range that turn off the cache compression temporarily.

data_juicer.utils.compress.compress(prev_ds, this_ds=None, num_proc=1)[source]
data_juicer.utils.compress.decompress(ds, fingerprints=None, num_proc=1)[source]
data_juicer.utils.compress.cleanup_compressed_cache_files(ds)[source]