data_juicer.ops.aggregator

class data_juicer.ops.aggregator.NestedAggregator(api_model: str = 'gpt-4o', input_key: str = 'event_description', output_key: str = None, max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt: str | None = None, sub_doc_template: str | None = None, input_template: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Bases: Aggregator

Aggregates nested content from multiple samples into a single summary.

This operator uses a recursive summarization approach to aggregate content from multiple samples. It processes the input text, which is split into sub-documents, and generates a summary that maintains the average length of the original documents. The aggregation is performed using an API model, guided by system prompts and templates. The operator supports retrying the API call in case of errors and allows for customization of the summarization process through various parameters. The default system prompt and templates are provided in Chinese, but they can be customized. The operator uses a Hugging Face tokenizer to handle tokenization.

DEFAULT_INPUT_TEMPLATE = '{sub_docs}\n\n文档总结:\n'
DEFAULT_SUB_DOC_TEMPLATE = '文档碎片:\n{text}\n'
DEFAULT_SYSTEM_PROMPT = '给定一些文档碎片,将这些文档整合成一个文档总结。\n要求:\n- 总结的长度与文档碎片的平均长度基本一致\n- 不要包含主观看法\n- 注意要尽可能保留文本的专有名词\n- 只输出文档总结不要输出其他内容\n- 参考如下样例:\n文档碎片:\n唐僧师徒四人行至白虎岭,遇上了变化多端的白骨精。\n\n文档碎片:\n白骨精首次变身少女送斋,被孙悟空识破打死,唐僧责怪悟空。\n\n文档碎片:\n妖怪再变老妇寻女,又被悟空击毙,师傅更加不满,念紧箍咒惩罚。\n\n文档碎片:\n不甘心的白骨精第三次化作老公公来诱骗,依旧逃不过金睛火眼。\n\n文档碎片:\n最终,在观音菩萨的帮助下,真相大白,唐僧明白了自己的误解。\n\n\n文档总结:\n唐僧师徒在白虎岭三遇白骨精变化诱惑,悟空屡次识破击毙妖怪却遭误解,最终观音相助真相大白。'
__init__(api_model: str = 'gpt-4o', input_key: str = 'event_description', output_key: str = None, max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt: str | None = None, sub_doc_template: str | None = None, input_template: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Initialization method. :param api_model: API model name. :param input_key: The input key in the meta field of the samples.

It is “event_description” in default.

Parameters:
  • output_key – The output key in the aggregation field in the samples. It is same as the input_key in default.

  • max_token_num – The max token num of the total tokens of the sub documents. Without limitation if it is None.

  • api_endpoint – URL endpoint for the API.

  • response_path – Path to extract content from the API response. Defaults to ‘choices.0.message.content’.

  • system_prompt – The system prompt.

  • sub_doc_template – The template for input text in each sample.

  • input_template – The input template.

  • try_num – The number of retry attempts when there is an API call error or output parsing error.

  • model_params – Parameters for initializing the API model.

  • sampling_params – Extra parameters passed to the API call. e.g {‘temperature’: 0.9, ‘top_p’: 0.95}

  • kwargs – Extra keyword arguments.

parse_output(response)[source]
process_single(sample=None, rank=None)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

recursive_summary(sub_docs, rank=None)[source]
class data_juicer.ops.aggregator.MetaTagsAggregator(api_model: str = 'gpt-4o', meta_tag_key: str = 'dialog_sentiment_labels', target_tags: List[str] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt: str | None = None, input_template: str | None = None, target_tag_template: str | None = None, tag_template: str | None = None, output_pattern: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Bases: Aggregator

Merge similar meta tags into a single, unified tag.

This operator aggregates and consolidates similar meta tags from the input data. It can handle two scenarios: - When a set of target tags is provided, it maps the original tags to these predefined

categories. If a “miscellaneous” or “other” category is included, any tags that do not fit into the specified categories are grouped under this label.

  • When no target tags are provided, it generates reasonable categories based on the similarity and frequency of the input tags.

The operator uses a language model (default: gpt-4o) to analyze and merge the tags. The system prompt, input template, and output pattern can be customized. The aggregated tags are then updated in the input sample’s metadata.

DEFAULT_INPUT_TEMPLATE = '{target_tag_str}| 合并前标签 | 频次 |\n| ------ | ------ |\n{tag_strs}'
DEFAULT_OUTPUT_PATTERN = '\\*\\*\\s*(\\w+)归类为(\\w+)\\s*\\*\\*'
DEFAULT_SYSTEM_PROMPT = '给定一些标签以及这些标签出现的频次,合并意思相近的标签。\n要求:\n- 任务分为两种情况,一种是给定合并后的标签,需要将合并前的标签映射到这些标签。如果给定的合并后的标签中有类似“其他”这种标签,将无法归类的标签合并到“其他”。以下是这种情况的一个样例:\n合并后的标签应限定在[科技, 健康, 其他]中。\n| 合并前标签 | 频次 |\n| ------ | ------ |\n| 医疗 | 20 |\n| 信息技术 | 16 |\n| 学习 | 19 |\n| 气候变化 | 22 |\n| 人工智能 | 11 |\n| 养生 | 17 |\n| 科学创新 | 10 |\n\n## 分析:“信息技术”、“人工智能”、“科学创新”都属于“科技”类别,“医疗”和“养生”跟“健康”有关联,“学习”、“气候变化”和“科技”还有“健康”关联不强,应该被归为“其他”。\n## 标签合并:\n** 医疗归类为健康 **\n** 信息技术归类为科技 **\n** 学习归类为其他 **\n** 气候变化归类为其他 **\n** 人工智能归类为科技 **\n** 养生归类为健康 **\n** 科学创新归类为科技 **\n- 另外一种情况没有事先给定合并后的标签,需要生成合理的标签类别:| 合并前标签 | 频次 |\n| ------ | ------ |\n| 医疗 | 20 |\n| 信息技术 | 16 |\n| 学习 | 2 |\n| 气候变化 | 1 |\n| 人工智能 | 11 |\n| 养生 | 17 |\n| 科学创新 | 10 |\n\n## 分析:“信息技术”、“人工智能”、“科学创新”这三个标签比较相近,归为同一类,都属于“科技”类别,“医疗”和“养生”都跟“健康”有关系,可以归类为“健康”,“学习”和“气候变化”跟其他标签关联度不强,且频次较低,统一归类为“其他”。\n## 标签合并:\n** 医疗归类为健康 **\n** 信息技术归类为科技 **\n** 学习归类为其他 **\n** 气候变化归类为其他 **\n** 人工智能归类为科技 **\n** 养生归类为健康 **\n** 科学创新归类为科技 **\n'
DEFAULT_TAG_TEMPLATE = '| {tag} | {cnt} |'
DEFAULT_TARGET_TAG_TEMPLATE = '合并后的标签应限定在[{target_tags}]中。\n'
__init__(api_model: str = 'gpt-4o', meta_tag_key: str = 'dialog_sentiment_labels', target_tags: List[str] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt: str | None = None, input_template: str | None = None, target_tag_template: str | None = None, tag_template: str | None = None, output_pattern: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Initialization method. :param api_model: API model name. :param meta_tag_key: The key of the meta tag to be mapped. :param target_tags: The tags that is supposed to be mapped to. :param api_endpoint: URL endpoint for the API. :param response_path: Path to extract content from the API response.

Defaults to ‘choices.0.message.content’.

Parameters:
  • system_prompt – The system prompt.

  • input_template – The input template.

  • target_tag_template – The tap template for target tags.

  • tag_template – The tap template for each tag and its frequency.

  • output_pattern – The output pattern.

  • try_num – The number of retry attempts when there is an API call error or output parsing error.

  • model_params – Parameters for initializing the API model.

  • sampling_params – Extra parameters passed to the API call. e.g {‘temperature’: 0.9, ‘top_p’: 0.95}

  • kwargs – Extra keyword arguments.

meta_map(meta_cnts, rank=None)[source]
parse_output(response)[source]
process_single(sample=None, rank=None)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

class data_juicer.ops.aggregator.EntityAttributeAggregator(api_model: str = 'gpt-4o', entity: str = None, attribute: str = None, input_key: str = 'event_description', output_key: str = 'entity_attribute', word_limit: Annotated[int, Gt(gt=0)] = 100, max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt_template: str | None = None, example_prompt: str | None = None, input_template: str | None = None, output_pattern_template: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Bases: Aggregator

Summarizes a given attribute of an entity from a set of documents.

The operator extracts and summarizes the specified attribute of a given entity from the provided documents. It uses a system prompt, example prompt, and input template to generate the summary. The output is formatted as a markdown-style summary with the entity and attribute clearly labeled. The summary is limited to a specified number of words (default is 100). The operator uses a Hugging Face tokenizer to handle token limits and splits documents if necessary. If the input key or required fields are missing, the operator logs a warning and returns the sample unchanged. The summary is stored in the batch metadata under the specified output key. The system prompt, input template, example prompt, and output pattern can be customized.

DEFAULT_EXAMPLE_PROMPT = '- 例如,根据相关文档总结`孙悟空`的`出身背景`,**100字**以内的样例如下:\n`孙悟空`的`出身背景`总结:\n# 孙悟空\n## 出身背景\n号称齐天大圣,花果山水帘洞的美猴王、西行取经队伍中的大师兄。师父是唐僧玄奘,曾拜菩提祖师学艺。亲生父母未知,自石头中孕育而生。自认斗战胜佛,最怕观世音菩萨和紧箍咒。\n'
DEFAULT_INPUT_TEMPLATE = '`{entity}`的相关文档:\n{sub_docs}\n\n`{entity}`的`{attribute}`总结:\n'
DEFAULT_OUTPUT_PATTERN_TEMPLATE = '\\#\\s*{entity}\\s*\\#\\#\\s*{attribute}\\s*(.*?)\\Z'
DEFAULT_SYSTEM_TEMPLATE = '给定与`{entity}`相关的一些文档,总结`{entity}`的`{attribute}`。\n要求:\n- 尽量使用原文专有名词\n- 联系上下文,自动忽略上下文不一致的细节错误\n- 只对文档中与`{entity}`的`{attribute}`有关的内容进行总结\n- 字数限制在**{word_limit}字以内**\n- 要求输出格式如下:\n# {entity}\n## {attribute}\n...\n{example}'
__init__(api_model: str = 'gpt-4o', entity: str = None, attribute: str = None, input_key: str = 'event_description', output_key: str = 'entity_attribute', word_limit: Annotated[int, Gt(gt=0)] = 100, max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt_template: str | None = None, example_prompt: str | None = None, input_template: str | None = None, output_pattern_template: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Initialization method. :param api_model: API model name. :param entity: The given entity. :param attribute: The given attribute. :param input_key: The input key in the meta field of the samples.

It is “event_description” in default.

Parameters:
  • output_key – The output key in the aggregation field of the samples. It is “entity_attribute” in default.

  • word_limit – Prompt the output length.

  • max_token_num – The max token num of the total tokens of the sub documents. Without limitation if it is None.

  • api_endpoint – URL endpoint for the API.

  • response_path – Path to extract content from the API response. Defaults to ‘choices.0.message.content’.

  • system_prompt_template – The system prompt template.

  • example_prompt – The example part in the system prompt.

  • input_template – The input template.

  • output_pattern_template – The output template.

  • try_num – The number of retry attempts when there is an API call error or output parsing error.

  • model_params – Parameters for initializing the API model.

  • sampling_params – Extra parameters passed to the API call. e.g {‘temperature’: 0.9, ‘top_p’: 0.95}

  • kwargs – Extra keyword arguments.

attribute_summary(sub_docs, rank=None)[source]
parse_output(response)[source]
process_single(sample=None, rank=None)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

class data_juicer.ops.aggregator.MostRelevantEntitiesAggregator(api_model: str = 'gpt-4o', entity: str = None, query_entity_type: str = None, input_key: str = 'event_description', output_key: str = 'most_relevant_entities', max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt_template: str | None = None, input_template: str | None = None, output_pattern: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Bases: Aggregator

Extracts and ranks entities closely related to a given entity from provided texts.

The operator uses a language model API to identify and rank entities, filtering out entities of the same type as the given entity. The ranked list is sorted in descending order of importance. Input texts are aggregated and passed to the model, with an optional token limit. The output is parsed using a regular expression to extract the relevant entities. Results are stored in the batch metadata under the key ‘most_relevant_entities’. The operator retries the API call up to a specified number of times in case of errors. The system prompt, input template, and output pattern can be customized.

DEFAULT_INPUT_TEMPLATE = '`{entity}`的相关文档:\n{sub_docs}\n\n与`{entity}`最相关的一些`{entity_type}`:\n'
DEFAULT_OUTPUT_PATTERN = '\\#\\#\\s*列表\\s*(.*?)\\Z'
DEFAULT_SYSTEM_TEMPLATE = '给定与`{entity}`相关的一些文档,总结一些与`{entity}`最为相关的`{entity_type}`。\n要求:\n- 不用包含与{entity}为同一{entity_type}的{entity_type}。\n- 请按照人物的重要性进行排序,**越重要人物在列表越前面**。\n- 你的返回格式如下:\n## 分析\n你对各个{entity_type}与{entity}关联度的分析\n## 列表\n人物1, 人物2, 人物3, ...'
__init__(api_model: str = 'gpt-4o', entity: str = None, query_entity_type: str = None, input_key: str = 'event_description', output_key: str = 'most_relevant_entities', max_token_num: Annotated[int, Gt(gt=0)] | None = None, *, api_endpoint: str | None = None, response_path: str | None = None, system_prompt_template: str | None = None, input_template: str | None = None, output_pattern: str | None = None, try_num: Annotated[int, Gt(gt=0)] = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs)[source]

Initialization method. :param api_model: API model name. :param entity: The given entity. :param query_entity_type: The type of queried relevant entities. :param input_key: The input key in the meta field of the samples.

It is “event_description” in default.

Parameters:
  • output_key – The output key in the aggregation field of the samples. It is “most_relevant_entities” in default.

  • max_token_num – The max token num of the total tokens of the sub documents. Without limitation if it is None.

  • api_endpoint – URL endpoint for the API.

  • response_path – Path to extract content from the API response. Defaults to ‘choices.0.message.content’.

  • system_prompt_template – The system prompt template.

  • input_template – The input template.

  • output_pattern – The output pattern.

  • try_num – The number of retry attempts when there is an API call error or output parsing error.

  • model_params – Parameters for initializing the API model.

  • sampling_params – Extra parameters passed to the API call. e.g {‘temperature’: 0.9, ‘top_p’: 0.95}

  • kwargs – Extra keyword arguments.

parse_output(response)[source]
process_single(sample=None, rank=None)[source]

For sample level, batched sample –> sample, the input must be the output of some Grouper OP.

Parameters:

sample – batched sample to aggregate

Returns:

aggregated sample

query_most_relevant_entities(sub_docs, rank=None)[source]