Source code for data_juicer.tools.mcp_tool

import datetime
import os
import traceback
from typing import Dict

from loguru import logger

from data_juicer.config import get_init_configs
from data_juicer.core.executor import DefaultExecutor

DEFAULT_OUTPUT_DIR = "./outputs"


[docs] def add_extra_cfg(dj_cfg: Dict) -> Dict: """Add extra dj config.""" if not dj_cfg.get("export_path"): logger.info("export_path is not set, use default export_path") timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") dj_cfg["export_path"] = os.path.join(DEFAULT_OUTPUT_DIR, timestamp, "processed_data.jsonl") if not dj_cfg.get("np") or os.getenv("SERVER_TRANSPORT", "sse") == "stdio": dj_cfg["np"] = 1 # set num proc to be 1 logger.warning( "Multiprocessing has been disabled. " "It is only supported when the MCP server transport is " "not 'stdio' and 'np' is explicitly set. Setting 'np' to 1." ) dj_cfg["open_monitor"] = False # unable monitor to avoid multi proc return dj_cfg
[docs] def execute_op(dj_cfg: Dict): try: dj_cfg = add_extra_cfg(dj_cfg) logger.info(f"DJ config in MCP server: {str(dj_cfg)}") dj_cfg = get_init_configs(dj_cfg, load_configs_only=False) executor = DefaultExecutor(dj_cfg) executor.run() return f"Result dataset is saved in: {dj_cfg['export_path']}" except Exception: error_msg = traceback.format_exc() return f"Occur error when executing Data-Juicer: {error_msg}"