Source code for data_juicer.ops.mapper.dialog_sentiment_detection_mapper

import re
from typing import Dict, List, Optional

from loguru import logger
from pydantic import NonNegativeInt, PositiveInt

from data_juicer.ops.base_op import OPERATORS, TAGGING_OPS, Mapper
from data_juicer.utils.constant import Fields, MetaKeys
from data_juicer.utils.model_utils import get_model, prepare_model

OP_NAME = 'dialog_sentiment_detection_mapper'


# TODO: LLM-based inference.
[docs] @TAGGING_OPS.register_module(OP_NAME) @OPERATORS.register_module(OP_NAME) class DialogSentimentDetectionMapper(Mapper): """ Mapper to generate user's sentiment labels in dialog. Input from history_key, query_key and response_key. Output lists of labels and analysis for queries in the dialog. """ DEFAULT_SYSTEM_PROMPT = ('请判断用户和LLM多轮对话中用户所具有的情绪。\n' '要求:\n' '- 需要先进行分析,然后罗列用户所具有的情绪,下面是一个样例,请模仿样例格式输出' '。\n' '用户:最近工作压力好大,我觉得整个人都快被压垮了。\n' '情感分析:用户的言语中透露出明显的压力和疲惫感,可能还夹杂着一些无助和焦虑。\n' '情感类别:压力、疲惫、无助、焦虑\n' 'LLM:听起来你真的承受了很多,面临这种情况确实不容易。有没有考虑过找一些放松的' '方式,比如听音乐或者散步来减轻压力呢?\n' '用户:试过了,但是好像没什么效果,每天的事情都堆积如山。\n' '情感分析:用户感到无力解决现状,有挫败感,并且对尝试放松的方式失去信心。\n' '情感类别:无力、挫败\n' 'LLM:我理解你的感受,有时候压力积累到一定程度确实让人难以承受。或许你可以尝试' '规划一下时间,把任务分成小块来完成,这样可能会减少一些压力感。\n' '用户:这个主意不错,我会试着让自己更有条理一些,谢谢你的建议。\n' '情感分析:用户对建议表现出认同和感激,同时展现出试图积极面对问题的态度。\n' '情感类别:认同、感激、积极\n' 'LLM:不用谢,我很高兴能帮到你。记得给自己一些时间去适应新的计划,有任何需要' '随时可以跟我说哦!\n') DEFAULT_QUERY_TEMPLATE = '用户:{query}\n' DEFAULT_RESPONSE_TEMPLATE = 'LLM:{response}\n' DEFAULT_CANDIDATES_TEMPLATE = '备选情感类别:[{candidate_str}]' DEFAULT_ANALYSIS_TEMPLATE = '情感分析:{analysis}\n' DEFAULT_LABELS_TEMPLATE = '情感类别:{labels}\n' DEFAULT_ANALYSIS_PATTERN = '情感分析:(.*?)\n' DEFAULT_LABELS_PATTERN = '情感类别:(.*?)($|\n)'
[docs] def __init__(self, api_model: str = 'gpt-4o', sentiment_candidates: Optional[List[str]] = None, max_round: NonNegativeInt = 10, *, labels_key: str = MetaKeys.dialog_sentiment_labels, analysis_key: str = MetaKeys.dialog_sentiment_labels_analysis, api_endpoint: Optional[str] = None, response_path: Optional[str] = None, system_prompt: Optional[str] = None, query_template: Optional[str] = None, response_template: Optional[str] = None, candidate_template: Optional[str] = None, analysis_template: Optional[str] = None, labels_template: Optional[str] = None, analysis_pattern: Optional[str] = None, labels_pattern: Optional[str] = None, try_num: PositiveInt = 3, model_params: Dict = {}, sampling_params: Dict = {}, **kwargs): """ Initialization method. :param api_model: API model name. :param sentiment_candidates: The output sentiment candidates. Use open-domain sentiment labels if it is None. :param max_round: The max num of round in the dialog to build the prompt. :param labels_key: The key name in the meta field to store the output labels. It is 'dialog_sentiment_labels' in default. :param analysis_key: The key name in the meta field to store the corresponding analysis. It is 'dialog_sentiment_labels_analysis' in default. :param api_endpoint: URL endpoint for the API. :param response_path: Path to extract content from the API response. Defaults to 'choices.0.message.content'. :param system_prompt: System prompt for the task. :param query_template: Template for query part to build the input prompt. :param response_template: Template for response part to build the input prompt. :param candidate_template: Template for sentiment candidates to build the input prompt. :param analysis_template: Template for analysis part to build the input prompt. :param labels_template: Template for labels part to build the input prompt. :param analysis_pattern: Pattern to parse the return sentiment analysis. :param labels_pattern: Pattern to parse the return sentiment labels. :param try_num: The number of retry attempts when there is an API call error or output parsing error. :param model_params: Parameters for initializing the API model. :param sampling_params: Extra parameters passed to the API call. e.g {'temperature': 0.9, 'top_p': 0.95} :param kwargs: Extra keyword arguments. """ super().__init__(**kwargs) self.sentiment_candidates = sentiment_candidates self.max_round = max_round self.labels_key = labels_key self.analysis_key = analysis_key self.system_prompt = system_prompt or self.DEFAULT_SYSTEM_PROMPT self.query_template = query_template or self.DEFAULT_QUERY_TEMPLATE self.response_template = response_template or \ self.DEFAULT_RESPONSE_TEMPLATE self.candidate_template = candidate_template or \ self.DEFAULT_CANDIDATES_TEMPLATE self.analysis_template = analysis_template or \ self.DEFAULT_ANALYSIS_TEMPLATE self.labels_template = labels_template or \ self.DEFAULT_LABELS_TEMPLATE self.analysis_pattern = analysis_pattern or \ self.DEFAULT_ANALYSIS_PATTERN self.labels_pattern = labels_pattern or \ self.DEFAULT_LABELS_PATTERN self.sampling_params = sampling_params self.model_key = prepare_model(model_type='api', model=api_model, endpoint=api_endpoint, response_path=response_path, **model_params) self.try_num = try_num
[docs] def build_input(self, history, query): if self.sentiment_candidates: input_prompt = self.candidate_template.format( candidate_str=','.join(self.sentiment_candidates)) else: input_prompt = '' if self.max_round > 0: input_prompt += ''.join(history[-self.max_round * 4:]) input_prompt += self.query_template.format(query=query[0]) return input_prompt
[docs] def parse_output(self, response): analysis = '' labels = '' match = re.search(self.analysis_pattern, response) if match: analysis = match.group(1) match = re.search(self.labels_pattern, response) if match: labels = match.group(1) return analysis, labels
[docs] def process_single(self, sample, rank=None): meta = sample[Fields.meta] if self.labels_key in meta and self.analysis_key in meta: return sample client = get_model(self.model_key, rank=rank) analysis_list = [] labels_list = [] history = [] dialog = sample[self.history_key] if self.query_key in sample and sample[self.query_key]: if self.response_key in sample and sample[self.response_key]: dialog.append( (sample[self.query_key], sample[self.response_key])) else: dialog.append((sample[self.query_key], '')) for qa in dialog: input_prompt = self.build_input(history, qa) messages = [{ 'role': 'system', 'content': self.system_prompt, }, { 'role': 'user', 'content': input_prompt, }] for _ in range(self.try_num): try: response = client(messages, **self.sampling_params) analysis, labels = self.parse_output(response) if len(analysis) > 0: break except Exception as e: logger.warning(f'Exception: {e}') analysis_list.append(analysis) labels_list.append(labels) history.append(self.query_template.format(query=qa[0])) history.append(self.analysis_template.format(analysis=analysis)) history.append(self.labels_template.format(labels=labels)) history.append(self.response_template.format(response=qa[1])) meta[self.labels_key] = labels_list meta[self.analysis_key] = analysis_list return sample