data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator module

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.IdGenerator(start_id=0)[source]

Bases: object

__init__(start_id=0)[source]
get_next_id(count)[source]
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.EdgeBuffer[source]

Bases: object

__init__()[source]
clear()[source]
set_edges(edge_dict)[source]
get_edges(key)[source]
class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.BTSUnionFind(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[source]

Bases: object

A distributed implementation of Union-Find with load balancing.

The original paper on BTS Union-Find is available at: https://ieeexplore.ieee.org/document/10598116

__init__(union_threshold, parallel_num, parallel_id, remote_edge_buffers, max_pending_edge_buffer_task, num_edge_buffer_task_returns)[source]
add_key_value_pairs(pairs)[source]
flush_key_value_pairs()[source]
balanced_union_find()[source]
distribute_edge(u, v)[source]
set_edge_buffer()[source]
edge_redistribution()[source]
communication()[source]
find(x)[source]
union(x, y)[source]
union_list(x_list)[source]
rebalancing()[source]
squeeze()[source]
dup_idx(queries)[source]
data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.get_remote_classes()[source]

Get remote versions of classes with Ray decorators applied at runtime.

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.GPUMinHashActor(width: int = 5, perm_a: ndarray = None, perm_b: ndarray = None, lowercase: bool = True)[source]

Bases: object

__init__(width: int = 5, perm_a: ndarray = None, perm_b: ndarray = None, lowercase: bool = True)[source]
compute_minhash(text_arr: Array) Array[source]

Compute MinHash signatures for texts in a table

class data_juicer.ops.deduplicator.ray_bts_minhash_deduplicator.RayBTSMinhashDeduplicator(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, minhash_batch_size: str | int | None = 'auto', memory_per_sample: float | None = 0.1, *args, **kwargs)[source]

Bases: Deduplicator

A MinhashLSH deduplicator based on RAY.

EMPTY_HASH_VALUE = 'EMPTY'
__init__(tokenization: str = 'space', window_size: Annotated[int, Gt(gt=0)] = 5, lowercase: bool = True, ignore_pattern: str | None = None, num_permutations: Annotated[int, Gt(gt=0)] = 256, jaccard_threshold: Annotated[float, FieldInfo(annotation=NoneType, required=True, metadata=[Ge(ge=0), Le(le=1)])] = 0.7, num_bands: Annotated[int, Gt(gt=0)] | None = None, num_rows_per_band: Annotated[int, Gt(gt=0)] | None = None, tokenizer_model: str | None = None, union_find_parallel_num: int | str = 'auto', union_threshold: int | None = 256, max_pending_edge_buffer_task: int | None = 20, num_edge_buffer_task_returns: int | None = 10, max_pending_filter_tasks: int | None = 20, num_filter_task_returns: int | None = 10, merge_batch_size: int | None = 1000, minhash_batch_size: str | int | None = 'auto', memory_per_sample: float | None = 0.1, *args, **kwargs)[source]

Initialization method.

Parameters:
  • tokenization – tokenization method for sample texts. It should be one of [space, punctuation, character, sentencepiece]. For English-like languages, we recommend to use ‘space’, for Chinese-like languages, we recommend to use ‘character’, and for multiple languages, we recommend to use ‘sentencepiece’. If using ‘sentencepiece’, please provided the model path in the ‘tokenizer_model’ field.

  • window_size – window size of shingling

  • lowercase – whether to convert text to lower case first

  • ignore_pattern – whether to ignore sub-strings with specific pattern when computing minhash

  • num_permutations – number of permutations in minhash computing

  • jaccard_threshold – the min jaccard similarity threshold in near-duplicate detection. When the jaccard similarity of two sample texts is >= this threshold, they are regarded as similar samples and this op will only keep one of them after deduplication

  • num_bands – number of bands in LSH. Default it’s None, and it will be determined by an optimal params computation algorithm by minimize the weighted sum of probs of False Positives and False Negatives

  • num_rows_per_band – number of rows in each band in LSH. Default it’s None, and it will be determined by an optimal params computation algorithm

  • tokenizer_model – path for the sentencepiece model, used for sentencepiece tokenization.

  • union_find_parallel_num – number of parallel workers for union-find algorithm. Default it’s ‘auto’, and it will be determined by half of the number of CPUs.

  • union_threshold – threshold for minhash values group to perform union-find algorithm. Default it’s 256.

  • max_pending_edge_buffer_task – max number of pending edge buffer ray tasks. Default it’s 20.

  • num_edge_buffer_task_returns – number of edge buffer tasks for ray.wait to return. Default it’s 10.

  • max_pending_filter_tasks – max number of pending filter ray tasks. Default it’s 20.

  • num_filter_task_returns – number of filter tasks for ray.wait to return. Default it’s 10.

  • merge_batch_size – batch size for BTS operations. Default it’s 1000.

  • minhash_batch_size – batch size for MinHash computation. If “auto”, it will be set to default value on CPU(1024), or auto calculated per available GPU memory and memory_per_sample setting for GPU.

  • memory_per_sample – estimated memory needed per sample in MB. Used to calculate batch size based on available GPU memory. Default is 0.1 MB per sample.

band_minhash(minhash_list, uid_list)[source]

Logic for creating and pusing LSH bands to the union find list

calc_minhash(text_list: Array, uid_list: List) Table[source]

Logic for computing minhash values for each text in the input table

merge_op_batch(object_refs)[source]
merge()[source]
filter_with_union_find(samples: Table) Table[source]
run(dataset, **kwargs)[source]