data_juicer.ops package

Subpackages

Submodules

data_juicer.ops.base_op module

data_juicer.ops.base_op.convert_list_dict_to_dict_list(samples)[source]
data_juicer.ops.base_op.convert_dict_list_to_list_dict(samples)[source]
data_juicer.ops.base_op.convert_arrow_to_python(method)[source]
data_juicer.ops.base_op.catch_map_batches_exception(method, skip_op_error=False, op_name=None)[source]

For batched-map sample-level fault tolerance.

data_juicer.ops.base_op.catch_map_single_exception(method, return_sample=True, skip_op_error=False, op_name=None)[source]

For single-map sample-level fault tolerance. The input sample is expected batch_size = 1.

class data_juicer.ops.base_op.OP(*args, **kwargs)[source]

Bases: object

__init__(*args, **kwargs)[source]

Base class of operators.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • index_key – index the samples before process if not None

  • batch_size – the batch size for processing

  • work_dir – the working directory for this operator

is_batched_op()[source]
process(*args, **kwargs)[source]
use_cuda()[source]
runtime_np()[source]
remove_extra_parameters(param_dict, keys=None)[source]

at the beginning of the init of the mapper op, call self.remove_extra_parameters(locals()) to get the init parameter dict of the op for convenience

add_parameters(init_parameter_dict, **extra_param_dict)[source]

add parameters for each sample, need to keep extra_param_dict and init_parameter_dict unchanged.

run(dataset)[source]
empty_history()[source]
class data_juicer.ops.base_op.Mapper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts data editing.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_batched(samples, *args, **kwargs)[source]
process_single(sample)[source]

For sample level, sample –> sample

Parameters:

sample – sample to process

Returns:

processed sample

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Filter(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that removes specific info.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_stats_batched(samples, *args, **kwargs)[source]
process_batched(samples)[source]
compute_stats_single(sample, context=False)[source]

Compute stats for the sample which is used as a metric to decide whether to filter this sample.

Parameters:
  • sample – input sample.

  • context – whether to store context information of intermediate vars in the sample temporarily.

Returns:

sample with computed stats

process_single(sample)[source]

For sample level, sample –> Boolean.

Parameters:

sample – sample to decide whether to filter

Returns:

true for keeping and false for filtering

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.base_op.Deduplicator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts deduplication.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_hash(sample)[source]

Compute hash values for the sample.

Parameters:

sample – input sample

Returns:

sample with computed hash value.

process(dataset, show_num=0)[source]

For doc-level, dataset –> dataset.

Parameters:
  • dataset – input dataset

  • show_num – number of traced samples used when tracer is open.

Returns:

deduplicated dataset and the sampled duplicate pairs.

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.base_op.Selector(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts selection in dataset-level.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

selected dataset.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Grouper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

dataset of batched samples.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.base_op.Aggregator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_single(sample)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

run(dataset, *, exporter=None, tracer=None)[source]

data_juicer.ops.load module

data_juicer.ops.load.load_ops(process_list)[source]

Load op list according to the process list from config file.

Parameters:

process_list – A process list. Each item is an op name and its arguments.

Returns:

The op instance list.

data_juicer.ops.mixins module

class data_juicer.ops.mixins.EventDrivenMixin(*args, **kwargs)[source]

Bases: object

Mixin for event-driven capabilities in operations.

This mixin provides functionality for registering event handlers, triggering events, and managing event polling.

__init__(*args, **kwargs)[source]
register_event_handler(event_type: str, handler: Callable)[source]

Register a handler for a specific event type.

Parameters:
  • event_type – Type of event to handle

  • handler – Callback function to handle the event

trigger_event(event_type: str, data: Dict)[source]

Trigger an event and call all registered handlers.

Parameters:
  • event_type – Type of event to trigger

  • data – Event data to pass to handlers

start_polling(event_type: str, poll_func: Callable, interval: int = 60)[source]

Start polling for a specific event type.

Parameters:
  • event_type – Type of event to poll for

  • poll_func – Function to call for polling

  • interval – Polling interval in seconds

stop_polling(event_type: str)[source]

Stop polling for a specific event type.

Parameters:

event_type – Type of event to stop polling for

stop_all_polling()[source]

Stop all polling threads.

wait_for_completion(condition_func: Callable[[], bool], timeout: int = 3600, poll_interval: int = 10, error_message: str = 'Operation timed out')[source]

Wait for a condition to be met.

Parameters:
  • condition_func – Function that returns True when condition is met

  • timeout – Maximum time to wait in seconds

  • poll_interval – Polling interval in seconds

  • error_message – Error message to raise on timeout

Raises:

TimeoutError – If the condition is not met within the timeout

class data_juicer.ops.mixins.NotificationMixin(*args, **kwargs)[source]

Bases: object

Mixin for sending notifications through various channels.

This mixin provides functionality for sending notifications via email, Slack, DingTalk, and other platforms.

Notification configuration can be specified as a “notification_config” parameter within an operator (for backward compatibility): ```yaml process:

  • some_mapper:
    notification_config:

    enabled: true email:

    # … email settings …

```

For security best practices, sensitive information like passwords and tokens should be provided via environment variables:

  • Email: set ‘DATA_JUICER_EMAIL_PASSWORD’ environment variable or service-specific ‘DATA_JUICER_SMTP_SERVER_NAME_PASSWORD’

  • Slack: set ‘DATA_JUICER_SLACK_WEBHOOK’ environment variable

  • DingTalk: set ‘DATA_JUICER_DINGTALK_TOKEN’ and ‘DATA_JUICER_DINGTALK_SECRET’ environment variables

For even more secure email authentication, you can use TLS client certificates instead of passwords:

  1. Generate a client certificate and key (example using OpenSSL): ```bash # Generate a private key openssl genrsa -out client.key 2048

    # Generate a certificate signing request (CSR) openssl req -new -key client.key -out client.csr

    # Generate a self-signed certificate openssl x509 -req -days 365 -in client.csr -signkey client.key

    -out client.crt

    ```

  2. Configure your SMTP server to accept this client certificate for

    authentication

  3. Configure Data Juicer to use certificate authentication: ```yaml notification:

    enabled: true email:

    use_cert_auth: true client_cert_file: “/path/to/client.crt” client_key_file: “/path/to/client.key” smtp_server: “smtp.example.com” smtp_port: 587 sender_email: “notifications@example.com” recipients: [”recipient@example.com”]

    ```

  4. Or use environment variables: `bash export DATA_JUICER_EMAIL_CERT="/path/to/client.crt" export DATA_JUICER_EMAIL_KEY="/path/to/client.key" `

For maximum connection security, you can use a direct SSL connection instead of STARTTLS by enabling the ‘use_ssl’ option:

```yaml notification:

enabled: true email:

use_ssl: true smtp_port: 465 # Common port for SMTP over SSL # … other email configuration …

```

This establishes an encrypted connection from the beginning, rather than

starting with an unencrypted connection and upgrading to TLS as with STARTTLS. Note that this option can be combined with certificate authentication for maximum security.

The email notification system supports various email server configurations

through a flexible configuration system. Here are some examples for different servers:

Standard SMTP with STARTTLS: ```yaml notification:

enabled: true email:

smtp_server: “smtp.example.com” smtp_port: 587 username: “your.username@example.com” sender_email: “your.username@example.com” sender_name: “Your Name” # Optional recipients: [”recipient1@example.com”, “recipient2@example.com”]

```

Direct SSL Connection (e.g., Gmail): ```yaml notification:

enabled: true email:

smtp_server: “smtp.gmail.com” smtp_port: 465 use_ssl: true username: “your.username@gmail.com” sender_email: “your.username@gmail.com” sender_name: “Your Name” recipients: [”recipient1@example.com”, “recipient2@example.com”]

```

Alibaba Email Server: ```yaml notification:

enabled: true email:

smtp_server: “smtp.alibaba-inc.com” smtp_port: 465 username: “your.username@alibaba-inc.com” sender_email: “your.username@alibaba-inc.com” sender_name: “Your Name” recipient_separator: “;” # Use semicolons to separate recipients recipients: [”recipient1@example.com”, “recipient2@example.com”]

```

Environment variable usage examples: ```bash # General email password export DATA_JUICER_EMAIL_PASSWORD=”your_email_password”

# Server-specific passwords (preferred for clarity) export DATA_JUICER_SMTP_GMAIL_COM_PASSWORD=”your_gmail_password” export DATA_JUICER_SMTP_ALIBABA_INC_COM_PASSWORD=”your_alibaba_password”

# Slack webhook export DATA_JUICER_SLACK_WEBHOOK=”your_slack_webhook_url”

# DingTalk credentials export DATA_JUICER_DINGTALK_TOKEN=”your_dingtalk_token” export DATA_JUICER_DINGTALK_SECRET=”your_dingtalk_secret” ```

If environment variables are not set, the system will fall back to using values from the configuration file, but this is less secure and not recommended for production environments.

__init__(*args, **kwargs)[source]
send_notification(message: str, notification_type: str | None = None, **kwargs)[source]

Send a notification message.

Parameters:
  • message – The message to send

  • notification_type – The type of notification to send. Email, Slack, DingTalk. If None, send nothing

  • **kwargs – Additional arguments to pass to the notification handler These can override any configuration settings for this specific notification

Returns:

True if the notification was sent successfully, else False

Return type:

bool

data_juicer.ops.op_fusion module

data_juicer.ops.op_fusion.fuse_operators(ops, probe_res=None)[source]

Fuse the input ops list and return the fused ops list.

Parameters:
  • ops – the corresponding list of op objects.

  • probe_res – the probed speed for each OP from Monitor.

Returns:

a list of fused op objects.

data_juicer.ops.op_fusion.fuse_filter_group(original_filter_group)[source]

Fuse single filter group and return the fused filter group.

Parameters:

original_filter_group – the original filter group, including op definitions and objects.

Returns:

the fused definitions and objects of the input filter group.

class data_juicer.ops.op_fusion.FusedFilter(name: str, fused_filters: List)[source]

Bases: Filter

A fused operator for filters.

__init__(name: str, fused_filters: List)[source]

Initialization method.

Parameters:

fused_filters – a list of filters to be fused.

compute_stats_batched(samples, rank=None)[source]
process_batched(samples)[source]
class data_juicer.ops.op_fusion.GeneralFusedOP(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[source]

Bases: OP

An explicitly fused operator designed to execute multiple sequential operations (OPs) on the same batch, enabling fine-grained control over data processing.

__init__(batch_size: int = 1, fused_op_list: List | None = None, *args, **kwargs)[source]

Base class of operators.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

  • index_key – index the samples before process if not None

  • batch_size – the batch size for processing

  • work_dir – the working directory for this operator

process_batched(samples, rank=None)[source]
run(dataset, *, exporter=None, tracer=None)[source]

Module contents

data_juicer.ops.load_ops(process_list)[source]

Load op list according to the process list from config file.

Parameters:

process_list – A process list. Each item is an op name and its arguments.

Returns:

The op instance list.

class data_juicer.ops.Filter(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that removes specific info.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_stats_batched(samples, *args, **kwargs)[source]
process_batched(samples)[source]
compute_stats_single(sample, context=False)[source]

Compute stats for the sample which is used as a metric to decide whether to filter this sample.

Parameters:
  • sample – input sample.

  • context – whether to store context information of intermediate vars in the sample temporarily.

Returns:

sample with computed stats

process_single(sample)[source]

For sample level, sample –> Boolean.

Parameters:

sample – sample to decide whether to filter

Returns:

true for keeping and false for filtering

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.Mapper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts data editing.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed.

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_batched(samples, *args, **kwargs)[source]
process_single(sample)[source]

For sample level, sample –> sample

Parameters:

sample – sample to process

Returns:

processed sample

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.Deduplicator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts deduplication.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

compute_hash(sample)[source]

Compute hash values for the sample.

Parameters:

sample – input sample

Returns:

sample with computed hash value.

process(dataset, show_num=0)[source]

For doc-level, dataset –> dataset.

Parameters:
  • dataset – input dataset

  • show_num – number of traced samples used when tracer is open.

Returns:

deduplicated dataset and the sampled duplicate pairs.

run(dataset, *, exporter=None, tracer=None, reduce=True)[source]
class data_juicer.ops.Selector(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that conducts selection in dataset-level.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

selected dataset.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.Grouper(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process(dataset)[source]

Dataset –> dataset.

Parameters:

dataset – input dataset

Returns:

dataset of batched samples.

run(dataset, *, exporter=None, tracer=None)[source]
class data_juicer.ops.Aggregator(*args, **kwargs)[source]

Bases: OP

__init__(*args, **kwargs)[source]

Base class that group samples.

Parameters:
  • text_key – the key name of field that stores sample texts to be processed

  • image_key – the key name of field that stores sample image list to be processed

  • audio_key – the key name of field that stores sample audio list to be processed

  • video_key – the key name of field that stores sample video list to be processed

  • query_key – the key name of field that stores sample queries

  • response_key – the key name of field that stores responses

  • history_key – the key name of field that stores history of queries and responses

process_single(sample)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

run(dataset, *, exporter=None, tracer=None)[source]