import re
from typing import Dict, List, Optional
from loguru import logger
from pydantic import NonNegativeInt, PositiveInt
from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, Mapper
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model
OP_NAME = 'dialog_topic_detection_mapper'
# TODO: LLM-based inference.
[docs]
@TAGGING_OPS.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
class DialogTopicDetectionMapper(Mapper):
"""
Mapper to generate user's topic labels in dialog. Input from
history_key, query_key and response_key. Output lists of
labels and analysis for queries in the dialog.
"""
DEFAULT_SYSTEM_PROMPT = ('请判断用户和LLM多轮对话中用户所讨论的话题。\n'
'要求:\n'
'- 针对用户的每个query,需要先进行分析,然后列出用户正在讨论的话题,下面是'
'一个样例,请模仿样例格式输出。\n'
'用户:你好,今天我们来聊聊秦始皇吧。\n'
'话题分析:用户提到秦始皇,这是中国历史上第一位皇帝。\n'
'话题类别:历史\n'
'LLM:当然可以,秦始皇是中国历史上第一个统一全国的皇帝,他在公元前221年建'
'立了秦朝,并采取了一系列重要的改革措施,如统一文字、度量衡和货币等。\n'
'用户:秦始皇修建的长城和现在的长城有什么区别?\n'
'话题分析:用户提到秦始皇修建的长城,并将其与现代长城进行比较,涉及建筑历史'
'和地理位置。\n'
'话题类别:历史'
'LLM:秦始皇时期修建的长城主要是为了抵御北方游牧民族的入侵,它的规模和修建'
'技术相对较为简陋。现代人所看到的长城大部分是明朝时期修建和扩建的,明长城不'
'仅规模更大、结构更坚固,而且保存得比较完好。\n'
'用户:有意思,那么长城的具体位置在哪些省份呢?\n'
'话题分析:用户询问长城的具体位置,涉及到地理知识。\n'
'话题类别:地理\n'
'LLM:长城横跨中国北方多个省份,主要包括河北、山西、内蒙古、宁夏、陕西、甘'
'肃和北京等。每一段长城都建在关键的战略位置,以便最大限度地发挥其防御作用'
'。\n')
DEFAULT_QUERY_TEMPLATE = '用户:{query}\n'
DEFAULT_RESPONSE_TEMPLATE = 'LLM:{response}\n'
DEFAULT_CANDIDATES_TEMPLATE = '备选话题类别:[{candidate_str}]'
DEFAULT_ANALYSIS_TEMPLATE = '话题分析:{analysis}\n'
DEFAULT_LABELS_TEMPLATE = '话题类别:{labels}\n'
DEFAULT_ANALYSIS_PATTERN = '话题分析:(.*?)\n'
DEFAULT_LABELS_PATTERN = '话题类别:(.*?)($|\n)'
[docs]
def __init__(self,
api_model: str = 'gpt-4o',
topic_candidates: Optional[List[str]] = None,
max_round: NonNegativeInt = 10,
*,
labels_key: str = MetaKeys.dialog_topic_labels,
analysis_key: str = MetaKeys.dialog_topic_labels_analysis,
api_endpoint: Optional[str] = None,
response_path: Optional[str] = None,
system_prompt: Optional[str] = None,
query_template: Optional[str] = None,
response_template: Optional[str] = None,
candidate_template: Optional[str] = None,
analysis_template: Optional[str] = None,
labels_template: Optional[str] = None,
analysis_pattern: Optional[str] = None,
labels_pattern: Optional[str] = None,
try_num: PositiveInt = 3,
model_params: Dict = {},
sampling_params: Dict = {},
**kwargs):
"""
Initialization method.
:param api_model: API model name.
:param topic_candidates: The output topic candidates. Use
open-domain topic labels if it is None.
:param max_round: The max num of round in the dialog to build the
prompt.
:param labels_key: The key name in the meta field to store the
output labels. It is 'dialog_topic_labels' in default.
:param analysis_key: The key name in the meta field to store the
corresponding analysis. It is 'dialog_topic_labels_analysis'
in default.
:param api_endpoint: URL endpoint for the API.
:param response_path: Path to extract content from the API response.
Defaults to 'choices.0.message.content'.
:param system_prompt: System prompt for the task.
:param query_template: Template for query part to build the input
prompt.
:param response_template: Template for response part to build the
input prompt.
:param candidate_template: Template for topic candidates to
build the input prompt.
:param analysis_template: Template for analysis part to build the
input prompt.
:param labels_template: Template for labels part to build the
input prompt.
:param analysis_pattern: Pattern to parse the return topic
analysis.
:param labels_pattern: Pattern to parse the return topic
labels.
:param try_num: The number of retry attempts when there is an API
call error or output parsing error.
:param model_params: Parameters for initializing the API model.
:param sampling_params: Extra parameters passed to the API call.
e.g {'temperature': 0.9, 'top_p': 0.95}
:param kwargs: Extra keyword arguments.
"""
super().__init__(**kwargs)
self.topic_candidates = topic_candidates
self.max_round = max_round
self.labels_key = labels_key
self.analysis_key = analysis_key
self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT
self.query_template = query_template or self.DEFAULT_QUERY_TEMPLATE
self.response_template = response_template or \
self.DEFAULT_RESPONSE_TEMPLATE
self.candidate_template = candidate_template or \
self.DEFAULT_CANDIDATES_TEMPLATE
self.analysis_template = analysis_template or \
self.DEFAULT_ANALYSIS_TEMPLATE
self.labels_template = labels_template or \
self.DEFAULT_LABELS_TEMPLATE
self.analysis_pattern = analysis_pattern or \
self.DEFAULT_ANALYSIS_PATTERN
self.labels_pattern = labels_pattern or \
self.DEFAULT_LABELS_PATTERN
self.sampling_params = sampling_params
self.model_key = prepare_model(model_type='api',
model=api_model,
endpoint=api_endpoint,
response_path=response_path,
**model_params)
self.try_num = try_num
[docs]
def parse_output(self, response):
analysis = ''
labels = ''
match = re.search(self.analysis_pattern, response)
if match:
analysis = match.group(1)
match = re.search(self.labels_pattern, response)
if match:
labels = match.group(1)
return analysis, labels
[docs]
def process_single(self, sample, rank=None):
meta = sample[Fields.meta]
if self.labels_key in meta and self.analysis_key in meta:
return sample
client = get_model(self.model_key, rank=rank)
analysis_list = []
labels_list = []
history = []
dialog = sample[self.history_key]
if self.query_key in sample and sample[self.query_key]:
if self.response_key in sample and sample[self.response_key]:
dialog.append(
(sample[self.query_key], sample[self.response_key]))
else:
dialog.append((sample[self.query_key], ''))
for qa in dialog:
input_prompt = self.build_input(history, qa)
messages = [{
'role': 'system',
'content': self.system_prompt,
}, {
'role': 'user',
'content': input_prompt,
}]
for _ in range(self.try_num):
try:
response = client(messages, **self.sampling_params)
analysis, labels = self.parse_output(response)
if len(analysis) > 0:
break
except Exception as e:
logger.warning(f'Exception: {e}')
analysis_list.append(analysis)
labels_list.append(labels)
history.append(self.query_template.format(query=qa[0]))
history.append(self.analysis_template.format(analysis=analysis))
history.append(self.labels_template.format(labels=labels))
history.append(self.response_template.format(response=qa[1]))
meta[self.labels_key] = labels_list
meta[self.analysis_key] = analysis_list
return sample