data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator module

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.IdGenerator(start_id=0)[源代码]

基类:object

__init__(start_id=0)[源代码]
get_next_id(count)[源代码]
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.EdgeBuffer[源代码]

基类:object

__init__()[源代码]
clear()[源代码]
set_edges(edge_dict)[源代码]
get_edges(key)[源代码]
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.BTSUnionFind(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[源代码]

基类:object

A distributed implementation of Union-Find with load balancing.

The original paper on BTS Union-Find is available at: https://ieeexplore.ieee.org/document/10598116

__init__(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[源代码]
add_key_value_pairs(pairs)[源代码]
flush_key_value_pairs()[源代码]
balanced_union_find()[源代码]
distribute_edge(u, v)[源代码]
set_edge_buffer()[源代码]
edge_redistribution()[源代码]
communication()[源代码]
find(x)[源代码]
union(x, y)[源代码]
union_list(x_list)[源代码]
rebalancing()[源代码]
squeeze()[源代码]
dup_idx(queries)[源代码]
data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.get_remote_classes()[源代码]

Get remote versions of classes with Ray decorators applied at runtime.

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.GPUMinHashActor(width: int = 5, perm_a: ndarray = None, perm_b: ndarray = None, lowercase: bool = True)[源代码]

基类:object

__init__(width: int = 5, perm_a: ndarray = None, perm_b: ndarray = None, lowercase: bool = True)[源代码]
compute_minhash(text_arr: Array) Array[源代码]

Compute MinHash signatures for texts in a table

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.RayBTSMinhashDeduplicator(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, minhash_batch_size: int | str | None = 'auto', memory_per_sample: float | None = 0.1, *args, **kwargs)[源代码]

基类:Deduplicator

A MinhashLSH deduplicator that operates in Ray distributed mode.

This operator uses the MinHash LSH technique to identify and remove near-duplicate samples from a dataset. It supports various tokenization methods, including space, punctuation, character, and sentencepiece. The Jaccard similarity threshold is used to determine if two samples are considered duplicates. If the Jaccard similarity of two samples is greater than or equal to the specified threshold, one of the samples is filtered out. The operator computes the MinHash values for each sample and uses a union- find algorithm to group similar samples. The key metric, Jaccard similarity, is computed based on the shingling of the text. The operator can run on both CPU and GPU, with specific batch size and memory configurations for each.

EMPTY_HASH_VALUE = 'EMPTY'
__init__(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, minhash_batch_size: int | str | None = 'auto', memory_per_sample: float | None = 0.1, *args, **kwargs)[源代码]

Initialization method.

参数:
  • tokenization -- tokenization method for sample texts. It should be one of [space, punctuation, character, sentencepiece]. For English-like languages, we recommend to use 'space', for Chinese-like languages, we recommend to use 'character', and for multiple languages, we recommend to use 'sentencepiece'. If using 'sentencepiece', please provided the model path in the 'tokenizer_model' field.

  • window_size -- window size of shingling

  • lowercase -- whether to convert text to lower case first

  • ignore_pattern -- whether to ignore sub-strings with specific pattern when computing minhash

  • num_permutations -- number of permutations in minhash computing

  • jaccard_threshold -- the min jaccard similarity threshold in near-duplicate detection. When the jaccard similarity of two sample texts is >= this threshold, they are regarded as similar samples and this op will only keep one of them after deduplication

  • num_bands -- number of bands in LSH. Default it's None, and it will be determined by an optimal params computation algorithm by minimize the weighted sum of probs of False Positives and False Negatives

  • num_rows_per_band -- number of rows in each band in LSH. Default it's None, and it will be determined by an optimal params computation algorithm

  • tokenizer_model -- path for the sentencepiece model, used for sentencepiece tokenization.

  • union_find_parallel_num -- number of parallel workers for union-find algorithm. Default it's 'auto', and it will be determined by half of the number of CPUs.

  • union_threshold -- threshold for minhash values group to perform union-find algorithm. Default it's 256.

  • max_pending_edge_buffer_task -- max number of pending edge buffer ray tasks. Default it's 20.

  • num_edge_buffer_task_returns -- number of edge buffer tasks for ray.wait to return. Default it's 10.

  • max_pending_filter_tasks -- max number of pending filter ray tasks. Default it's 20.

  • num_filter_task_returns -- number of filter tasks for ray.wait to return. Default it's 10.

  • merge_batch_size -- batch size for BTS operations. Default it's 1000.

  • minhash_batch_size -- batch size for MinHash computation. If "auto", it will be set to default value on CPU(1024), or auto calculated per available GPU memory and memory_per_sample setting for GPU.

  • memory_per_sample -- estimated memory needed per sample in MB. Used to calculate batch size based on available GPU memory. Default is 0.1 MB per sample.

band_minhash(minhash_list, uid_list)[源代码]

Logic for creating and pusing LSH bands to the union find list

calc_minhash(text_list: Array, uid_list: List) Table[源代码]

Logic for computing minhash values for each text in the input table

merge_op_batch(object_refs)[源代码]
merge()[源代码]
filter_with_union_find(samples: Table) Table[源代码]
run(dataset, **kwargs)[源代码]