Source code for data_juicer.core.monitor

import os
import time
from functools import partial
from multiprocessing import get_context

from data_juicer.utils.resource_utils import (get_cpu_count,
                                              get_cpu_utilization,
                                              query_cuda_info, query_mem_info)


[docs] def resource_monitor(mdict, interval): # function to monitor the resource # interval is the sampling interval this_states = [] while True: this_states.append(Monitor.monitor_current_resources()) time.sleep(interval) try: stop_sign = mdict['stop'] except (BrokenPipeError, FileNotFoundError): # mdict crushes due to the main process is terminated already, # which is not the fault here return if stop_sign: break mdict['resource'] = this_states
[docs] class Monitor: """ Monitor resource utilization and other information during the data processing. Resource utilization dict: (for each func) '''python { 'time': 10, 'sampling interval': 0.5, 'resource': [ { 'timestamp': xxx, 'CPU count': xxx, 'GPU free mem.': xxx. ... }, { 'timestamp': xxx, 'CPU count': xxx, 'GPU free mem.': xxx, ... }, ] } ''' Based on the structure above, the resource utilization analysis result will add several extra fields on the first level: '''python { 'time': 10, 'sampling interval': 0.5, 'resource': [...], 'resource_analysis': { 'GPU free mem.': { 'max': xxx, 'min': xxx, 'avg': xxx, }, ... } } ''' Only those fields in DYNAMIC_FIELDS will be analyzed. """ DYNAMIC_FIELDS = { 'CPU util.', 'Used mem.', 'Free mem.', 'Available mem.', 'Mem. util.', 'GPU free mem.', 'GPU used mem.', 'GPU util.', }
[docs] def __init__(self): pass
[docs] def monitor_all_resources(self): """ Detect the resource utilization of all distributed nodes. """ # TODO raise NotImplementedError
[docs] @staticmethod def monitor_current_resources(): """ Detect the resource utilization of the current environment/machine. All data of "util." is ratios in the range of [0.0, 1.0]. All data of "mem." is in MB. """ resource_dict = dict() # current time resource_dict['timestamp'] = time.time() # CPU resource_dict['CPU count'] = get_cpu_count() resource_dict['CPU util.'] = get_cpu_utilization() / 100.0 resource_dict['Total mem.'] = query_mem_info('total') resource_dict['Used mem.'] = query_mem_info('used') resource_dict['Free mem.'] = query_mem_info('free') resource_dict['Available mem.'] = query_mem_info('available') resource_dict['Mem. util.'] = resource_dict[ 'Used mem.'] / resource_dict['Total mem.'] # GPU resource_dict['GPU total mem.'] = query_cuda_info('memory.total') resource_dict['GPU free mem.'] = query_cuda_info('memory.free') resource_dict['GPU used mem.'] = query_cuda_info('memory.used') resource_dict['GPU util.'] = query_cuda_info('utilization.gpu') if resource_dict['GPU util.']: resource_dict['GPU util.'] = [ x / 100.0 for x in resource_dict['GPU util.'] ] return resource_dict
[docs] @staticmethod def draw_resource_util_graph(resource_util_list, store_dir): import matplotlib.pyplot as plt for idx, resource_util_dict in enumerate(resource_util_list): resource_list = resource_util_dict['resource'] interval = resource_util_dict['sampling interval'] for focus_metric in Monitor.DYNAMIC_FIELDS: fn = f'func_{idx}_{focus_metric.replace(" ", "_")}.jpg' ylbl = '%' if focus_metric.endswith('util.') else 'MB' metric_list = [item[focus_metric] for item in resource_list] plt.plot([i * interval for i in range(len(metric_list))], metric_list) plt.title(focus_metric) plt.xlabel('Time (s)') plt.ylabel(ylbl) plt.savefig(os.path.join(store_dir, fn), bbox_inches='tight') plt.clf()
[docs] @staticmethod def analyze_resource_util_list(resource_util_list): """ Analyze the resource utilization for a given resource util list. Compute {'max', 'min', 'avg'} of resource metrics for each dict item. """ res_list = [] for item in resource_util_list: res_list.append(Monitor.analyze_single_resource_util(item)) return res_list
[docs] @staticmethod def analyze_single_resource_util(resource_util_dict): """ Analyze the resource utilization for a single resource util dict. Compute {'max', 'min', 'avg'} of each resource metrics. """ analysis_res = {} record_list = {} for record in resource_util_dict['resource']: for key in Monitor.DYNAMIC_FIELDS: if key in record: if record[key] is None: continue elif isinstance(record[key], list): record_list.setdefault(key, []).extend(record[key]) else: record_list.setdefault(key, []).append(record[key]) # analyze the max, min, and avg for key in record_list: analysis_res[key] = { 'max': max(record_list[key]), 'min': min(record_list[key]), 'avg': sum(record_list[key]) / len(record_list[key]), } resource_util_dict['resource_analysis'] = analysis_res return resource_util_dict
[docs] @staticmethod def monitor_func(func, args=None, sample_interval=0.5): """ Process the input dataset and probe related information for each OP in the specified operator list. For now, we support the following targets to probe: "resource": resource utilization for each OP. "speed": average processing speed for each OP. The probe result is a list and each item in the list is the probe result for each OP. """ if args is None: args = {} if isinstance(args, dict): func = partial(func, **args) elif isinstance(args, list) or isinstance(args, tuple): func = partial(func, *args) else: func = partial(func, args) # resource utilization dict resource_util_dict = {} # start monitor start_method = 'fork' if os.name == 'nt': # for Windows start_method = 'spawn' ctx = get_context(start_method) with ctx.Manager() as manager: mdict = manager.dict() mdict['stop'] = False monitor_proc = ctx.Process(target=resource_monitor, args=( mdict, sample_interval, )) monitor_proc.start() # start timer start = time.time() # run single op ret = func() # end timer end = time.time() # stop monitor mdict['stop'] = True monitor_proc.join() resource_util_dict['resource'] = mdict['resource'] # record interval resource_util_dict['sampling interval'] = sample_interval # calculate speed resource_util_dict['time'] = end - start return ret, resource_util_dict