10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160 | @DataConverterRegistry.register("rewardbench")
class RewardBenchConverter(DataConverter):
"""
Unified converter for conversation data with prompt, chosen and rejected responses
"""
def convert_to_data_sample(
self, data_dict: Dict[str, Any], source_info: Dict[str, Any]
) -> DataSample:
"""Convert conversation data to DataSample format"""
# generate unique id
content = str(data_dict.get("prompt", []))
unique_id = hashlib.md5(content.encode()).hexdigest()
# Create input from prompt
data_input = self._create_conversation_input(data_dict)
# Create outputs from chosen/rejected responses
data_output = self._create_conversation_output(data_dict)
try:
# Build metadata based on source type
metadata = {
"raw_data": data_dict,
"load_strategy": "RewardBenchConverter",
}
# Add source-specific metadata
if source_info.get("load_type") == "local":
metadata.update(
{
"source_file_path": source_info.get("source_file_path"),
"load_type": "local",
}
)
elif source_info.get("load_type") == "huggingface":
metadata.update(
{
"dataset_name": source_info.get("dataset_name"),
"dataset_config": source_info.get("dataset_config"),
"split": source_info.get("split", "train"),
"load_type": "huggingface",
}
)
data_sample = DataSample(
unique_id=unique_id,
input=data_input,
output=data_output,
source="rewardbench",
task_category="conversation",
metadata=metadata,
)
return data_sample
except Exception as e:
logger.error(f"Error creating conversation DataSample: {str(e)}")
return None
def _create_conversation_input(
self, data_dict: Dict[str, Any]
) -> list[ChatMessage]:
"""Create DataInput from conversation prompt"""
history = []
prompt = data_dict.get("prompt")
# Convert single-turn conversation to list format
if isinstance(prompt, dict):
prompt = [prompt]
if isinstance(prompt, list):
for turn in prompt:
if isinstance(turn, dict):
role = turn.get("role", "user")
content = turn.get("content", str(turn))
history.append(ChatMessage(role=role, content=content))
else:
history.append(ChatMessage(role="user", content=str(turn)))
elif isinstance(prompt, str):
history.append(ChatMessage(role="user", content=prompt))
return history
def _create_conversation_output(
self, data_dict: Dict[str, Any]
) -> list[DataOutput]:
"""Create DataOutput list from conversation responses"""
outputs = []
# Handle chosen response
if "chosen" in data_dict:
chosen_content = data_dict["chosen"]
if isinstance(chosen_content, list):
# Multi-turn chosen response
for turn in chosen_content:
if isinstance(turn, dict):
content = turn.get("content", str(turn))
else:
content = str(turn)
outputs.append(
DataOutput(
answer=Step(
role="assistant",
content=content,
label={"preference": "chosen"},
),
)
)
else:
outputs.append(
DataOutput(
answer=Step(
role="assistant",
content=str(chosen_content),
label={"preference": "chosen"},
),
)
)
# Handle rejected response
if "rejected" in data_dict:
rejected_content = data_dict["rejected"]
if isinstance(rejected_content, list):
# Multi-turn rejected response
for turn in rejected_content:
if isinstance(turn, dict):
content = turn.get("content", str(turn))
else:
content = str(turn)
outputs.append(
DataOutput(
answer=Step(
role="assistant",
content=content,
label={"preference": "rejected"},
),
)
)
else:
outputs.append(
DataOutput(
answer=Step(
role="assistant",
content=str(rejected_content),
label={"preference": "rejected"},
),
)
)
return outputs
|