data_juicer.tools.DJ_mcp_recipe_flow 源代码

import argparse
import os
from typing import Dict, List, Optional

from mcp.server.fastmcp import FastMCP

from data_juicer.tools.mcp_tool import execute_op
from data_juicer.tools.op_search import OPSearcher

# Operator Management
ops_list_path = os.getenv("DJ_OPS_LIST_PATH", None)
if ops_list_path:
    with open(ops_list_path, "r", encoding="utf-8") as file:
        ops_list = [line.strip() for line in file if line.strip()]
else:
    ops_list = None
searcher = OPSearcher(ops_list)


# Operator Management
[文档] def get_data_processing_ops( op_type: Optional[str] = None, tags: Optional[List[str]] = None, match_all: bool = True, ) -> dict: """ Retrieves a list of available data processing operators based on the specified type and tags. Operators are a collection of basic processes that assist in data modification, cleaning, filtering, deduplication, etc. Should be used with `run_data_recipe`. If both tags and ops_type are None, return a list of all operators. The following `op_type` values are supported: - aggregator: Aggregate for batched samples, such as summary or conclusion. - deduplicator: Detects and removes duplicate samples. - filter: Filters out low-quality samples. - grouper: Group samples to batched samples. - mapper: Edits and transforms samples. - selector: Selects top samples based on ranking. The `tags` parameter specifies the characteristics of the data or the required resources. Available tags are: Modality Tags: - text: process text data specifically. - image: process image data specifically. - audio: process audio data specifically. - video: process video data specifically. - multimodal: process multimodal data. Resource Tags: - cpu: only requires CPU resource. - gpu: requires GPU/CUDA resource as well. Model Tags: - api: equipped with API-based models (e.g. ChatGPT, GPT-4o). - vllm: equipped with models supported by vLLM. - hf: equipped with models from HuggingFace Hub. Tags are used to refine the search for suitable operators based on specific data processing needs. :param op_type: The type of data processing operator to retrieve. If None, no ops_type-based filtering is applied. If specified, must be one of the values listed. Defaults to None. :param tags: An optional list of tags to filter operators. See the tag list above for options. If None, no tag-based filtering is applied. Defaults to None. :param match_all: If True, only operators matching all specified tags are returned. If False, operators matching any of the specified tags are returned. Defaults to True. :returns: A dict containing detailed information about the available operators """ op_results = searcher.search(tags=tags, op_type=op_type, match_all=match_all) ops_dict = dict() for op in op_results: ops_dict[op["name"]] = "\n".join([op["description"], op["param_desc"], "Parameters: ", str(op["signature"])]) return ops_dict
[文档] def run_data_recipe( dataset_path: str, process: list[Dict], export_path: Optional[str] = None, np: int = 1, ) -> str: """ Run data recipe. If you want to run one or more DataJuicer data processing operators, you can use this tool. Supported operators and their arguments should be obtained through the `get_data_processing_ops` tool. :param dataset_path: Path to the dataset to be processed. :param process: List of processing operations to be executed sequentially. Each element is a dictionary with operator name as key and its configuration as value. Multiple operators can be chained. :param export_path: Path to export the processed dataset. Defaults to None, which exports to './outputs' directory. :param np: Number of processes to use. Defaults to 1. Example: # First get available filter operators for text data >>> available_ops = get_data_processing_ops( ... op_type="filter", ... tags=["text"] ... ) # Then run a data recipe with selected filters: # 1. First filter samples with text length 10-50 # 2. Then filter English samples with language confidence score >= 0.8 >>> run_data_recipe( ... "/path/to/dataset.jsonl", ... [ ... { ... "text_length_filter": { ... "min_len": 10, ... "max_len": 50 ... } ... }, ... { ... "language_id_score_filter": { ... "lang": "en", ... "min_score": 0.8 ... } ... } ... ] ... ) """ dj_cfg = dict(locals()) return execute_op(dj_cfg)
[文档] def create_mcp_server(port: str = "8000"): """ Creates the FastMCP server and registers the tools. Args: port (str, optional): Port number. Defaults to "8000". """ mcp = FastMCP("Data-Juicer Server", port=port) mcp.tool()(get_data_processing_ops) mcp.tool()(run_data_recipe) return mcp
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Data-Juicer MCP Server") parser.add_argument( "--port", type=str, default="8000", help="Port number for the MCP server" ) # changed to str for consistency args = parser.parse_args() # Server configuration mcp = create_mcp_server(port=args.port) mcp.run(transport=os.getenv("SERVER_TRANSPORT", "sse"))