Source code for data_juicer.ops.aggregator.entity_attribute_aggregator

import re
from typing import Dict, Optional

from loguru import logger
from pydantic import PositiveInt

from data_juicer.ops.base_op import OPERATORS, Aggregator
from data_juicer.utils.common_utils import (
    avg_split_string_list_under_limit,
    is_string_list,
)
from data_juicer.utils.constant import BatchMetaKeys, Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

from .nested_aggregator import NestedAggregator

OP_NAME = "entity_attribute_aggregator"


# TODO: LLM-based inference.
[docs] @OPERATORS.register_module(OP_NAME) class EntityAttributeAggregator(Aggregator): """Summarizes a given attribute of an entity from a set of documents. The operator extracts and summarizes the specified attribute of a given entity from the provided documents. It uses a system prompt, example prompt, and input template to generate the summary. The output is formatted as a markdown-style summary with the entity and attribute clearly labeled. The summary is limited to a specified number of words (default is 100). The operator uses a Hugging Face tokenizer to handle token limits and splits documents if necessary. If the input key or required fields are missing, the operator logs a warning and returns the sample unchanged. The summary is stored in the batch metadata under the specified output key. The system prompt, input template, example prompt, and output pattern can be customized.""" DEFAULT_SYSTEM_TEMPLATE = ( "给定与`{entity}`相关的一些文档,总结`{entity}`的`{attribute}`。\n" "要求:\n" "- 尽量使用原文专有名词\n" "- 联系上下文,自动忽略上下文不一致的细节错误\n" "- 只对文档中与`{entity}`的`{attribute}`有关的内容进行总结\n" "- 字数限制在**{word_limit}字以内**\n" "- 要求输出格式如下:\n" "# {entity}\n" "## {attribute}\n" "...\n" "{example}" ) DEFAULT_EXAMPLE_PROMPT = ( "- 例如,根据相关文档总结`孙悟空`的`出身背景`,**100字**以内的样例如下:\n" "`孙悟空`的`出身背景`总结:\n" "# 孙悟空\n" "## 出身背景\n" "号称齐天大圣,花果山水帘洞的美猴王、西行取经队伍中的大师兄。" "师父是唐僧玄奘,曾拜菩提祖师学艺。" "亲生父母未知,自石头中孕育而生。自认斗战胜佛,最怕观世音菩萨和紧箍咒。\n" ) DEFAULT_INPUT_TEMPLATE = "`{entity}`的相关文档:\n" "{sub_docs}\n\n" "`{entity}`的`{attribute}`总结:\n" DEFAULT_OUTPUT_PATTERN_TEMPLATE = r"\#\s*{entity}\s*\#\#\s*{attribute}\s*(.*?)\Z" # noqa: E501
[docs] def __init__( self, api_model: str = "gpt-4o", entity: str = None, attribute: str = None, input_key: str = MetaKeys.event_description, output_key: str = BatchMetaKeys.entity_attribute, word_limit: PositiveInt = 100, max_token_num: Optional[PositiveInt] = None, *, api_endpoint: Optional[str] = None, response_path: Optional[str] = None, system_prompt_template: Optional[str] = None, example_prompt: Optional[str] = None, input_template: Optional[str] = None, output_pattern_template: Optional[str] = None, try_num: PositiveInt = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs, ): """ Initialization method. :param api_model: API model name. :param entity: The given entity. :param attribute: The given attribute. :param input_key: The input key in the meta field of the samples. It is "event_description" in default. :param output_key: The output key in the aggregation field of the samples. It is "entity_attribute" in default. :param word_limit: Prompt the output length. :param max_token_num: The max token num of the total tokens of the sub documents. Without limitation if it is None. :param api_endpoint: URL endpoint for the API. :param response_path: Path to extract content from the API response. Defaults to 'choices.0.message.content'. :param system_prompt_template: The system prompt template. :param example_prompt: The example part in the system prompt. :param input_template: The input template. :param output_pattern_template: The output template. :param try_num: The number of retry attempts when there is an API call error or output parsing error. :param model_params: Parameters for initializing the API model. :param sampling_params: Extra parameters passed to the API call. e.g {'temperature': 0.9, 'top_p': 0.95} :param kwargs: Extra keyword arguments. """ super().__init__(**kwargs) if entity is None or attribute is None: raise ValueError("The entity and attribute cannot be None!") self.entity = entity self.attribute = attribute self.input_key = input_key self.output_key = output_key self.word_limit = word_limit self.max_token_num = max_token_num system_prompt_template = system_prompt_template or self.DEFAULT_SYSTEM_TEMPLATE self.example_prompt = example_prompt or self.DEFAULT_EXAMPLE_PROMPT self.input_template = input_template or self.DEFAULT_INPUT_TEMPLATE output_pattern_template = output_pattern_template or self.DEFAULT_OUTPUT_PATTERN_TEMPLATE self.system_prompt = system_prompt_template.format( entity=self.entity, attribute=self.attribute, word_limit=self.word_limit, example=self.example_prompt ) self.output_pattern = output_pattern_template.format(entity=entity, attribute=attribute) self.sampling_params = sampling_params self.model_key = prepare_model( model_type="api", model=api_model, endpoint=api_endpoint, response_path=response_path, return_processor=True, **model_params, ) self.try_num = try_num self.nested_sum = NestedAggregator( api_model=api_model, max_token_num=max_token_num, api_endpoint=api_endpoint, response_path=response_path, try_num=try_num, model_params=model_params, sampling_params=sampling_params, )
[docs] def parse_output(self, response): pattern = re.compile(self.output_pattern, re.VERBOSE | re.DOTALL) matches = pattern.findall(response) if matches: result = matches[0].strip() else: result = "" return result
[docs] def attribute_summary(self, sub_docs, rank=None): if not sub_docs: return "" model, tokenizer = get_model(self.model_key, rank, self.use_cuda()) token_nums = [len(tokenizer.encode(sub_doc)) for sub_doc in sub_docs] group_docs = avg_split_string_list_under_limit(sub_docs, token_nums, self.max_token_num) results = [] for docs in group_docs: doc_str = "\n\n".join(docs) input_prompt = self.input_template.format(entity=self.entity, attribute=self.attribute, sub_docs=doc_str) messages = [{"role": "system", "content": self.system_prompt}, {"role": "user", "content": input_prompt}] result = "" for i in range(self.try_num): try: response = model(messages, **self.sampling_params) result = self.parse_output(response) if len(result) > 0: break except Exception as e: logger.warning(f"Exception: {e}") results.append(result) return self.nested_sum.recursive_summary(results)
[docs] def process_single(self, sample=None, rank=None): if self.output_key in sample[Fields.batch_meta]: return sample if Fields.meta not in sample or self.input_key not in sample[Fields.meta][0]: logger.warning("The input key does not exist in the sample!") return sample sub_docs = [d[self.input_key] for d in sample[Fields.meta]] # if not batched sample if not is_string_list(sub_docs): logger.warning("Require string meta as input!") return sample sample[Fields.batch_meta][self.output_key] = self.attribute_summary(sub_docs, rank=rank) return sample