Source code for data_juicer.utils.process_utils

import math
import os
import subprocess

import multiprocess as mp
from loguru import logger

from data_juicer.utils.resource_utils import (
    available_gpu_memories,
    available_memories,
    cpu_count,
    cuda_device_count,
)


[docs] def setup_mp(method=None): if mp.current_process().name != "MainProcess": return if method is None: method = ["fork", "forkserver", "spawn"] if not isinstance(method, (list, tuple)): method = [method] method = [m.lower() for m in method] env_method = os.getenv("MP_START_METHOD", "").lower() if env_method in method: method = [env_method] available_methods = mp.get_all_start_methods() for m in method: if m in available_methods: try: logger.debug(f"Setting multiprocess start method to '{m}'") mp.set_start_method(m, force=True) except RuntimeError as e: logger.warning(f"Error setting multiprocess start method: {e}") break
[docs] def get_min_cuda_memory(): # get cuda memory info using "nvidia-smi" command import torch min_cuda_memory = torch.cuda.get_device_properties(0).total_memory / 1024**2 nvidia_smi_output = subprocess.check_output( ["nvidia-smi", "--query-gpu=memory.free", "--format=csv,noheader,nounits"] ).decode("utf-8") for line in nvidia_smi_output.strip().split("\n"): free_memory = int(line) min_cuda_memory = min(min_cuda_memory, free_memory) return min_cuda_memory
[docs] def calculate_np(name, mem_required, cpu_required, use_cuda=False, gpu_required=0): """Calculate the optimum number of processes for the given OP automatically。""" if not use_cuda and gpu_required > 0: raise ValueError( f"Op[{name}] attempted to request GPU resources (gpu_required={gpu_required}), " "but appears to lack GPU support. If you have verified this operator support GPU acceleration, " 'please explicitly set its property: `_accelerator = "cuda"`.' ) eps = 1e-9 # about 1 byte cpu_num = cpu_count() if use_cuda: cuda_mems_available = [m / 1024 for m in available_gpu_memories()] # GB gpu_count = cuda_device_count() if not mem_required and not gpu_required: auto_num_proc = gpu_count logger.warning( f"The required cuda memory and gpu of Op[{name}] " f"has not been specified. " f"Please specify the mem_required field or gpu_required field in the " f"config file. You can reference the config_all.yaml file." f"Set the auto `num_proc` to number of GPUs {auto_num_proc}." ) else: auto_proc_from_mem = sum( [math.floor(mem_available / (mem_required + eps)) for mem_available in cuda_mems_available] ) auto_proc_from_gpu = math.floor(gpu_count / (gpu_required + eps)) auto_proc_from_cpu = math.floor(cpu_num / (cpu_required + eps)) auto_num_proc = min(auto_proc_from_mem, auto_proc_from_gpu, auto_proc_from_cpu) if auto_num_proc < 1: auto_num_proc = len(available_memories()) # set to the number of available nodes logger.info( f"Set the auto `num_proc` to {auto_num_proc} of Op[{name}] based on the " f"required cuda memory: {mem_required}GB " f"required gpu: {gpu_required} and required cpu: {cpu_required}." ) return auto_num_proc else: mems_available = [m / 1024 for m in available_memories()] # GB auto_proc_from_mem = sum([math.floor(mem_available / (mem_required + eps)) for mem_available in mems_available]) auto_proc_from_cpu = math.floor(cpu_num / (cpu_required + eps)) auto_num_proc = min(cpu_num, auto_proc_from_mem, auto_proc_from_cpu) if auto_num_proc < 1.0: auto_num_proc = len(available_memories()) # number of processes is equal to the number of nodes logger.warning( f"The required CPU number: {cpu_required} " f"and memory: {mem_required}GB might " f"be more than the available CPU: {cpu_num} " f"and memory: {mems_available}GB." f"This Op [{name}] might " f"require more resource to run. " f"Set the auto `num_proc` to available nodes number {auto_num_proc}." ) else: logger.info( f"Set the auto `num_proc` to {auto_num_proc} of Op[{name}] based on the " f"required memory: {mem_required}GB " f"and required cpu: {cpu_required}." ) return auto_num_proc