Source code for data_juicer.tools.DJ_mcp_granular_ops

import argparse
import inspect
import os
import sys
from typing import Annotated, Optional

from mcp.server.fastmcp import FastMCP
from pydantic import Field

from data_juicer.tools.mcp_tool import execute_op
from data_juicer.tools.op_search import OPSearcher


# Dynamic MCP Tool Creation
[docs] def process_parameter(name: str, param: inspect.Parameter) -> inspect.Parameter: """ Processes a function parameter: - Converts jsonargparse.typing.ClosedUnitInterval to a local equivalent annotation. """ ClosedUnitInterval = Annotated[float, Field(ge=0.0, le=1.0, description="float restricted to be ≥0 and ≤1")] if param.annotation == getattr(sys.modules.get("jsonargparse.typing"), "ClosedUnitInterval", None): return param.replace(annotation=ClosedUnitInterval) return param
[docs] def create_operator_function(op, mcp): """Creates a callable function for a Data-Juicer operator class. This function dynamically creates a function that can be registered as an MCP tool, with proper signature and documentation based on the operator's __init__ method. """ sig = op["signature"] docstring = op["description"] param_docstring = op["param_desc"] # Create new function signature with dataset_path as first parameter # Consider adding other common parameters later, such as export_psth new_parameters = [ inspect.Parameter("dataset_path", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=str), inspect.Parameter( "export_path", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Optional[str], default=None, ), inspect.Parameter( "np", inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Optional[int], default=None, ), ] + [ process_parameter(name, param) for name, param in sig.parameters.items() if name not in ("args", "kwargs", "self") ] new_signature = sig.replace(parameters=new_parameters, return_annotation=str) def func(*args, **kwargs): args_dict = {} bound_arguments = new_signature.bind(*args, **kwargs) bound_arguments.apply_defaults() export_path = bound_arguments.arguments.pop("export_path") dataset_path = bound_arguments.arguments.pop("dataset_path") np = bound_arguments.arguments.pop("np") args_dict = {k: v for k, v in bound_arguments.arguments.items() if v} dj_cfg = { "dataset_path": dataset_path, "export_path": export_path, "process": [{op["name"]: args_dict}], "np": np, } return execute_op(dj_cfg) func.__signature__ = new_signature func.__doc__ = f"""{docstring}\n\n{param_docstring}\n""" func.__name__ = op["name"] decorated_func = mcp.tool()(func) return decorated_func
[docs] def create_mcp_server(port: str = "8000"): """ Creates the FastMCP server and registers the tools. Args: port (str, optional): Port number. Defaults to "8000". """ mcp = FastMCP("Data-Juicer Server", port=port) # Operator Management ops_list_path = os.getenv("DJ_OPS_LIST_PATH", None) if ops_list_path: with open(ops_list_path, "r", encoding="utf-8") as file: ops_list = [line.strip() for line in file if line.strip()] else: ops_list = None searcher = OPSearcher(ops_list) op_results = searcher.search() # Register all operators as MCP tools for op in op_results: _ = create_operator_function(op, mcp) return mcp
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Data-Juicer MCP Server") parser.add_argument( "--port", type=str, default="8000", help="Port number for the MCP server" ) # changed to str for consistency args = parser.parse_args() # Server configuration mcp = create_mcp_server(port=args.port) mcp.run(transport=os.getenv("SERVER_TRANSPORT", "sse"))