Source code for data_juicer.ops.mapper.dialog_topic_detection_mapper

import re
from typing import Dict, List, Optional

from loguru import logger
from pydantic import NonNegativeInt, PositiveInt

from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, Mapper
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

OP_NAME = 'dialog_topic_detection_mapper'


# TODO: LLM-based inference.
[docs] @TAGGING_OPS.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) class DialogTopicDetectionMapper(Mapper): """ Mapper to generate user's topic labels in dialog. Input from history_key, query_key and response_key. Output lists of labels and analysis for queries in the dialog. """ DEFAULT_SYSTEM_PROMPT = ('请判断用户和LLM多轮对话中用户所讨论的话题。\n' '要求:\n' '- 针对用户的每个query,需要先进行分析,然后列出用户正在讨论的话题,下面是' '一个样例,请模仿样例格式输出。\n' '用户:你好,今天我们来聊聊秦始皇吧。\n' '话题分析:用户提到秦始皇,这是中国历史上第一位皇帝。\n' '话题类别:历史\n' 'LLM:当然可以,秦始皇是中国历史上第一个统一全国的皇帝,他在公元前221年建' '立了秦朝,并采取了一系列重要的改革措施,如统一文字、度量衡和货币等。\n' '用户:秦始皇修建的长城和现在的长城有什么区别?\n' '话题分析:用户提到秦始皇修建的长城,并将其与现代长城进行比较,涉及建筑历史' '和地理位置。\n' '话题类别:历史' 'LLM:秦始皇时期修建的长城主要是为了抵御北方游牧民族的入侵,它的规模和修建' '技术相对较为简陋。现代人所看到的长城大部分是明朝时期修建和扩建的,明长城不' '仅规模更大、结构更坚固,而且保存得比较完好。\n' '用户:有意思,那么长城的具体位置在哪些省份呢?\n' '话题分析:用户询问长城的具体位置,涉及到地理知识。\n' '话题类别:地理\n' 'LLM:长城横跨中国北方多个省份,主要包括河北、山西、内蒙古、宁夏、陕西、甘' '肃和北京等。每一段长城都建在关键的战略位置,以便最大限度地发挥其防御作用' '。\n') DEFAULT_QUERY_TEMPLATE = '用户:{query}\n' DEFAULT_RESPONSE_TEMPLATE = 'LLM:{response}\n' DEFAULT_CANDIDATES_TEMPLATE = '备选话题类别:[{candidate_str}]' DEFAULT_ANALYSIS_TEMPLATE = '话题分析:{analysis}\n' DEFAULT_LABELS_TEMPLATE = '话题类别:{labels}\n' DEFAULT_ANALYSIS_PATTERN = '话题分析:(.*?)\n' DEFAULT_LABELS_PATTERN = '话题类别:(.*?)($|\n)'
[docs] def __init__(self, api_model: str = 'gpt-4o', topic_candidates: Optional[List[str]] = None, max_round: NonNegativeInt = 10, *, labels_key: str = MetaKeys.dialog_topic_labels, analysis_key: str = MetaKeys.dialog_topic_labels_analysis, api_endpoint: Optional[str] = None, response_path: Optional[str] = None, system_prompt: Optional[str] = None, query_template: Optional[str] = None, response_template: Optional[str] = None, candidate_template: Optional[str] = None, analysis_template: Optional[str] = None, labels_template: Optional[str] = None, analysis_pattern: Optional[str] = None, labels_pattern: Optional[str] = None, try_num: PositiveInt = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs): """ Initialization method. :param api_model: API model name. :param topic_candidates: The output topic candidates. Use open-domain topic labels if it is None. :param max_round: The max num of round in the dialog to build the prompt. :param labels_key: The key name in the meta field to store the output labels. It is 'dialog_topic_labels' in default. :param analysis_key: The key name in the meta field to store the corresponding analysis. It is 'dialog_topic_labels_analysis' in default. :param api_endpoint: URL endpoint for the API. :param response_path: Path to extract content from the API response. Defaults to 'choices.0.message.content'. :param system_prompt: System prompt for the task. :param query_template: Template for query part to build the input prompt. :param response_template: Template for response part to build the input prompt. :param candidate_template: Template for topic candidates to build the input prompt. :param analysis_template: Template for analysis part to build the input prompt. :param labels_template: Template for labels part to build the input prompt. :param analysis_pattern: Pattern to parse the return topic analysis. :param labels_pattern: Pattern to parse the return topic labels. :param try_num: The number of retry attempts when there is an API call error or output parsing error. :param model_params: Parameters for initializing the API model. :param sampling_params: Extra parameters passed to the API call. e.g {'temperature': 0.9, 'top_p': 0.95} :param kwargs: Extra keyword arguments. """ super().__init__(**kwargs) self.topic_candidates = topic_candidates self.max_round = max_round self.labels_key = labels_key self.analysis_key = analysis_key self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT self.query_template = query_template or self.DEFAULT_QUERY_TEMPLATE self.response_template = response_template or \ self.DEFAULT_RESPONSE_TEMPLATE self.candidate_template = candidate_template or \ self.DEFAULT_CANDIDATES_TEMPLATE self.analysis_template = analysis_template or \ self.DEFAULT_ANALYSIS_TEMPLATE self.labels_template = labels_template or \ self.DEFAULT_LABELS_TEMPLATE self.analysis_pattern = analysis_pattern or \ self.DEFAULT_ANALYSIS_PATTERN self.labels_pattern = labels_pattern or \ self.DEFAULT_LABELS_PATTERN self.sampling_params = sampling_params self.model_key = prepare_model(model_type='api', model=api_model, endpoint=api_endpoint, response_path=response_path, **model_params) self.try_num = try_num
[docs] def build_input(self, history, query): if self.topic_candidates: input_prompt = self.candidate_template.format( candidate_str=','.join(self.topic_candidates)) else: input_prompt = '' if self.max_round > 0: input_prompt += ''.join(history[-self.max_round * 4:]) input_prompt += self.query_template.format(query=query[0]) return input_prompt
[docs] def parse_output(self, response): analysis = '' labels = '' match = re.search(self.analysis_pattern, response) if match: analysis = match.group(1) match = re.search(self.labels_pattern, response) if match: labels = match.group(1) return analysis, labels
[docs] def process_single(self, sample, rank=None): meta = sample[Fields.meta] if self.labels_key in meta and self.analysis_key in meta: return sample client = get_model(self.model_key, rank=rank) analysis_list = [] labels_list = [] history = [] dialog = sample[self.history_key] if self.query_key in sample and sample[self.query_key]: if self.response_key in sample and sample[self.response_key]: dialog.append( (sample[self.query_key], sample[self.response_key])) else: dialog.append((sample[self.query_key], '')) for qa in dialog: input_prompt = self.build_input(history, qa) messages = [{ 'role': 'system', 'content': self.system_prompt, }, { 'role': 'user', 'content': input_prompt, }] for _ in range(self.try_num): try: response = client(messages, **self.sampling_params) analysis, labels = self.parse_output(response) if len(analysis) > 0: break except Exception as e: logger.warning(f'Exception: {e}') analysis_list.append(analysis) labels_list.append(labels) history.append(self.query_template.format(query=qa[0])) history.append(self.analysis_template.format(analysis=analysis)) history.append(self.labels_template.format(labels=labels)) history.append(self.response_template.format(response=qa[1])) meta[self.labels_key] = labels_list meta[self.analysis_key] = analysis_list return sample