data_juicer.ops.mixins module

class data_juicer.ops.mixins.EventDrivenMixin(*args, **kwargs)[源代码]

基类:object

Mixin for event-driven capabilities in operations.

This mixin provides functionality for registering event handlers, triggering events, and managing event polling.

__init__(*args, **kwargs)[源代码]
register_event_handler(event_type: str, handler: Callable)[源代码]

Register a handler for a specific event type.

参数:
  • event_type -- Type of event to handle

  • handler -- Callback function to handle the event

trigger_event(event_type: str, data: Dict)[源代码]

Trigger an event and call all registered handlers.

参数:
  • event_type -- Type of event to trigger

  • data -- Event data to pass to handlers

start_polling(event_type: str, poll_func: Callable, interval: int = 60)[源代码]

Start polling for a specific event type.

参数:
  • event_type -- Type of event to poll for

  • poll_func -- Function to call for polling

  • interval -- Polling interval in seconds

stop_polling(event_type: str)[源代码]

Stop polling for a specific event type.

参数:

event_type -- Type of event to stop polling for

stop_all_polling()[源代码]

Stop all polling threads.

wait_for_completion(condition_func: Callable[[], bool], timeout: int = 3600, poll_interval: int = 10, error_message: str = 'Operation timed out')[源代码]

Wait for a condition to be met.

参数:
  • condition_func -- Function that returns True when condition is met

  • timeout -- Maximum time to wait in seconds

  • poll_interval -- Polling interval in seconds

  • error_message -- Error message to raise on timeout

抛出:

TimeoutError -- If the condition is not met within the timeout

class data_juicer.ops.mixins.NotificationMixin(*args, **kwargs)[源代码]

基类:object

Mixin for sending notifications through various channels.

This mixin provides functionality for sending notifications via email, Slack, DingTalk, and other platforms.

Notification configuration can be specified as a "notification_config" parameter within an operator (for backward compatibility): ```yaml process:

  • some_mapper:
    notification_config:

    enabled: true email:

    # ... email settings ...

```

For security best practices, sensitive information like passwords and tokens should be provided via environment variables:

  • Email: set 'DATA_JUICER_EMAIL_PASSWORD' environment variable or service-specific 'DATA_JUICER_SMTP_SERVER_NAME_PASSWORD'

  • Slack: set 'DATA_JUICER_SLACK_WEBHOOK' environment variable

  • DingTalk: set 'DATA_JUICER_DINGTALK_TOKEN' and 'DATA_JUICER_DINGTALK_SECRET' environment variables

For even more secure email authentication, you can use TLS client certificates instead of passwords:

  1. Generate a client certificate and key (example using OpenSSL): ```bash # Generate a private key openssl genrsa -out client.key 2048

    # Generate a certificate signing request (CSR) openssl req -new -key client.key -out client.csr

    # Generate a self-signed certificate openssl x509 -req -days 365 -in client.csr -signkey client.key

    -out client.crt

    ```

  2. Configure your SMTP server to accept this client certificate for

    authentication

  3. Configure Data Juicer to use certificate authentication: ```yaml notification:

    enabled: true email:

    use_cert_auth: true client_cert_file: "/path/to/client.crt" client_key_file: "/path/to/client.key" smtp_server: "smtp.example.com" smtp_port: 587 sender_email: "notifications@example.com" recipients: ["recipient@example.com"]

    ```

  4. Or use environment variables: `bash export DATA_JUICER_EMAIL_CERT="/path/to/client.crt" export DATA_JUICER_EMAIL_KEY="/path/to/client.key" `

For maximum connection security, you can use a direct SSL connection instead of STARTTLS by enabling the 'use_ssl' option:

```yaml notification:

enabled: true email:

use_ssl: true smtp_port: 465 # Common port for SMTP over SSL # ... other email configuration ...

```

This establishes an encrypted connection from the beginning, rather than

starting with an unencrypted connection and upgrading to TLS as with STARTTLS. Note that this option can be combined with certificate authentication for maximum security.

The email notification system supports various email server configurations

through a flexible configuration system. Here are some examples for different servers:

Standard SMTP with STARTTLS: ```yaml notification:

enabled: true email:

smtp_server: "smtp.example.com" smtp_port: 587 username: "your.username@example.com" sender_email: "your.username@example.com" sender_name: "Your Name" # Optional recipients: ["recipient1@example.com", "recipient2@example.com"]

```

Direct SSL Connection (e.g., Gmail): ```yaml notification:

enabled: true email:

smtp_server: "smtp.gmail.com" smtp_port: 465 use_ssl: true username: "your.username@gmail.com" sender_email: "your.username@gmail.com" sender_name: "Your Name" recipients: ["recipient1@example.com", "recipient2@example.com"]

```

Alibaba Email Server: ```yaml notification:

enabled: true email:

smtp_server: "smtp.alibaba-inc.com" smtp_port: 465 username: "your.username@alibaba-inc.com" sender_email: "your.username@alibaba-inc.com" sender_name: "Your Name" recipient_separator: ";" # Use semicolons to separate recipients recipients: ["recipient1@example.com", "recipient2@example.com"]

```

Environment variable usage examples: ```bash # General email password export DATA_JUICER_EMAIL_PASSWORD="your_email_password"

# Server-specific passwords (preferred for clarity) export DATA_JUICER_SMTP_GMAIL_COM_PASSWORD="your_gmail_password" export DATA_JUICER_SMTP_ALIBABA_INC_COM_PASSWORD="your_alibaba_password"

# Slack webhook export DATA_JUICER_SLACK_WEBHOOK="your_slack_webhook_url"

# DingTalk credentials export DATA_JUICER_DINGTALK_TOKEN="your_dingtalk_token" export DATA_JUICER_DINGTALK_SECRET="your_dingtalk_secret" ```

If environment variables are not set, the system will fall back to using values from the configuration file, but this is less secure and not recommended for production environments.

__init__(*args, **kwargs)[源代码]
send_notification(message: str, notification_type: str = None, **kwargs)[源代码]

Send a notification message.

参数:
  • message -- The message to send

  • notification_type -- The type of notification to send. Email, Slack, DingTalk. If None, send nothing

  • **kwargs -- Additional arguments to pass to the notification handler These can override any configuration settings for this specific notification

返回:

True if the notification was sent successfully, else False

返回类型:

bool