forked from Tongyi-EconML/FinQwen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevaluate.py
302 lines (245 loc) · 10.1 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# coding=utf-8
import json
import re
import numpy as np
import pandas as pd
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import Levenshtein
import jsonlines
import os
import sys
from tqdm import tqdm
# from tokenizer import MsTokenizer, JiebaTokenizer
import jieba
from .embedding_utils import similarity_match
# 错误字典,这里只是示例
error_msg = {
1: "Bad input file",
2: "Wrong input file format",
3: "Duplicate ids in the submit files",
4: "Not find valid submit files.",
5: "Unequal size between the submit files and question files.",
6: "Unaligned id information.",
7: "None or empty answer in the submit files."
}
def report_error_msg(detail, showMsg, out_p):
error_dict = dict()
error_dict['errorDetail'] = detail
error_dict['errorMsg'] = showMsg
error_dict['score'] = 0
error_dict['scoreJson'] = {}
error_dict['success'] = False
dump_2_json(error_dict, out_p)
def report_score(score, out_p):
result = dict()
result['success'] = True
result['score'] = score["score"]
# 这里{}里面的score注意保留,但可以增加其他key,比如这样:
# result['scoreJson'] = {'score': score, 'aaaa': 0.1}
result['scoreJson'] = {
'score': score["score"],
"data_query": score.get("数据查询", -1),
"text_comprehension": score.get("文本理解", -1)
}
# result['scoreJson'] = score
dump_2_json(result, out_p)
def read_jsonl(path):
content = []
with jsonlines.open(path, "r") as json_file:
for obj in json_file.iter(type=dict, skip_invalid=True):
content.append(obj)
return content
def dump_2_json(info, path):
with open(path, 'w') as output_json_file:
json.dump(info, output_json_file, ensure_ascii=False, indent=4)
def tokenize(text, method="qwen"):
# if method == "qwen":
# tkr = MsTokenizer(name="TongyiFinance/Tongyi-Finance-14B")
# else:
# tkr = JiebaTokenizer()
# return tkr.tokenize(text)
return jieba.lcut(text)
def calculate_bleu_score(reference_sentence, hypothesis_sentence):
"""
N-gram precision metric.
"""
smoothing = SmoothingFunction().method1
reference_tokens = [tokenize(reference_sentence)]
hypothesis_tokens = tokenize(hypothesis_sentence)
bleu_score = sentence_bleu(
reference_tokens, hypothesis_tokens, smoothing_function=smoothing, auto_reweigh=True)
return bleu_score
def calculate_t2v_score(reference_sentence, hypothesis_sentence):
if sim_model is not None:
t2v_score = sim_model.get_score(reference_sentence, hypothesis_sentence)
else:
t2v_score = similarity_match(reference_sentence, hypothesis_sentence, )
return t2v_score
def calculate_leven_score(reference_sentence, hypothesis_sentence):
l_score = Levenshtein.distance(reference_sentence, hypothesis_sentence)
return l_score
def calculate_f1(reference_sentence, hypothesis_sentence):
"""
Set F1 score.
"""
reference_tokens = set(tokenize(reference_sentence))
hypothesis_tokens = set(tokenize(hypothesis_sentence))
if len(reference_tokens) == 0 or len(hypothesis_tokens) == 0:
return 0
commons = hypothesis_tokens & reference_tokens
# if there are no common tokens then f1 = 0
if len(commons) == 0:
return 0
prec = len(commons) / len(hypothesis_tokens)
rec = len(commons) / len(reference_tokens)
return 2 * (prec * rec) / (prec + rec)
def calculate_scores(reference_sentence, hypothesis_sentence):
scores = dict()
# n-gram precision metric
# scores["BLEU"] = calculate_bleu_nopenalty_score(reference_sentence, hypothesis_sentence)
# embedding, semantic metric
scores["text2vec"] = calculate_t2v_score(reference_sentence, hypothesis_sentence)
# precision & recall
scores["f1_score"] = calculate_f1(reference_sentence, hypothesis_sentence)
scores["score"] = 0.6 * scores["text2vec"] + 0.4 * scores["f1_score"]
return scores
def evaluate_answer(reference_data, reference_answer, user_answer):
"""根据新的标准评估用户的答案"""
score = 0.0
if user_answer is None or user_answer == "":
return score
# 标准化答案和参考数据的日期格式
user_answer = standardize_extended_date_formats(user_answer)
reference_answer = standardize_extended_date_formats(reference_answer)
reference_data = [standardize_extended_date_formats(i) for i in reference_data]
# if no reference data given, evaluate the results according to the semantic matching only.
if reference_data is not None and len(reference_data) > 0:
score_weight = (0.6, 0.4)
matched_data_count = sum(1.0 for data in reference_data if data in user_answer)
score += score_weight[0] * (matched_data_count / len(reference_data))
else:
score_weight = (0, 1.0)
# 计算语义相似度得分
semantic_scores = calculate_scores(reference_answer, user_answer)
if semantic_scores:
score += score_weight[1] * semantic_scores["score"]
return score
def standardize_extended_date_formats(text):
"""标准化扩展的日期格式"""
# 定义日期格式的正则表达式及其替代格式
patterns = [
(r'(\d{4})年(\d{1,2})月(\d{1,2})[日号]', "{0}{1:02}{2:02}"), # YYYY年MM月DD日
(r'(\d{4})/(\d{1,2})/(\d{1,2})', "{0}{1:02}{2:02}"), # YYYY/MM/DD
(r'(\d{4})-(\d{1,2})-(\d{1,2})', "{0}{1:02}{2:02}"), # YYYY-MM-DD
(r'(\d{4})\.(\d{1,2})\.(\d{1,2})', "{0}{1:02}{2:02}"), # YYYY.MM.DD
(r'(\d{1,2})月(\d{1,2})[日号][,,](\d{4})年', "{2}{0:02}{1:02}"), # MM月DD日,YYYY年
(r'(\d{1,2})[日号](\d{1,2})月[,,](\d{4})年', "{2}{1:02}{0:02}"), # DD月MM日,YYYY年
(r'(\d{4})年?一季度', "{0}0331"),
(r'(\d{4})年?第一季度', "{0}0331"),
(r'(\d{4})年?Q1', "{0}0331"),
(r'(\d{4})年?二季度', "{0}0630"),
(r'(\d{4})年?第二季度', "{0}0630"),
(r'(\d{4})年?Q2', "{0}0630"),
(r'(\d{4})年?三季度', "{0}0930"),
(r'(\d{4})年?第三季度', "{0}0930"),
(r'(\d{4})年?Q3', "{0}0930"),
(r'(\d{4})年?四季度', "{0}1231"),
(r'(\d{4})年?第四季度', "{0}1231"),
(r'(\d{4})年?Q4', "{0}1231"),
(r'(\d{6})日期?', "{0}"),
]
# 遍历所有的模式并进行替换
for pattern, replacement_format in patterns:
def replacement(match):
groups = [int(g) for g in match.groups()]
return replacement_format.format(*groups)
text = re.sub(pattern, replacement, text)
return text
def evaluate(correct_data, user_data):
total_score = 0.0
data = []
paired_data = list(zip(correct_data, user_data))
pbar = tqdm(len(paired_data), desc="Processing")
for i, (correct_answer, user_answer) in enumerate(paired_data):
score = evaluate_answer(
correct_answer["answer_term"],
correct_answer["answer"],
user_answer.get("answer", "")
)
total_score += score # 累加得分
c = {
'id': correct_answer['id'],
'type': correct_answer['type'],
'question': correct_answer['question'],
'refer_answer': correct_answer['answer'],
'refer_answer_term': correct_answer['answer_term'],
'user_answer': user_answer.get('answer', ""),
'score': score
}
data.append(c)
if (i + 1) % 50 == 0:
pbar.set_postfix({"score": round(score, 4)})
pbar.update(50)
dump_2_json(data, "./evaluate_result_detail.jsonl")
data_df = pd.DataFrame(data)
total_score = round(data_df["score"].mean() * 100.0, 2)
score_dict = np.round(data_df.groupby("type")["score"].mean() * 100.0, 2).to_dict()
score_dict["score"] = total_score
print(f"Scores: {score_dict}", flush=True)
return score_dict
if __name__ == "__main__":
'''
online evaluation
'''
in_param_path = sys.argv[1]
out_path = sys.argv[2]
try:
submit_path = sys.argv[3]
except IndexError:
submit_path = None
# read submit and answer file from first parameter
with open(in_param_path, 'r') as load_f:
input_params = json.load(load_f)
# 标准答案路径
standard_path = input_params["fileData"]["standardFilePath"]
print("Read standard from %s" % standard_path)
# 选手提交的结果文件路径
if submit_path is None:
submit_path = input_params["fileData"]["userFilePath"]
print("Read user submit file from %s" % submit_path)
# 假设这是您已经下载并加载到环境中的模型,huggingface上可以下载该模型
sim_model_path = "shibing624/text2vec-base-chinese"
# sim_model_path = None
if isinstance(sim_model_path, str) and os.path.exists(sim_model_path):
from text2vec import Similarity
sim_model = Similarity(max_seq_length=256)
else:
sim_model = None
if not os.path.exists(submit_path):
report_error_msg(error_msg[4], f"Error message: {error_msg[4]}", out_path)
sys.exit()
try:
standard_labels = read_jsonl(standard_path)
submit_preds = read_jsonl(submit_path)
except json.JSONDecodeError as e:
report_error_msg(e.msg, f"Error message: {error_msg[2]}", out_path)
sys.exit()
if len(standard_labels) != len(submit_preds):
report_error_msg(error_msg[5], f"Error message: {error_msg[5]}", out_path)
sys.exit()
submit_preds = sorted(submit_preds, key=lambda s: s['id'])
label_ids = [s["id"] for s in standard_labels]
pred_ids = [s["id"] for s in submit_preds]
if label_ids != pred_ids:
report_error_msg(error_msg[6], f"Error message: {error_msg[6]}", out_path)
sys.exit()
for s in submit_preds:
ans = s.get("answer", "")
if ans is None:
report_error_msg(error_msg[7], f"Error message: {error_msg[7]}", out_path)
sys.exit()
try:
eval_score = evaluate(standard_labels, submit_preds)
report_score(eval_score, out_path)
except Exception as e:
report_error_msg(f"{e}", f"Error message: {e}", out_path)