-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanswer_utils.py
More file actions
292 lines (260 loc) · 8.09 KB
/
answer_utils.py
File metadata and controls
292 lines (260 loc) · 8.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
from __future__ import annotations
import ast
import json
import re
from collections.abc import Mapping, Sequence
from typing import Any
_NON_ANSWER_PREFIXES = (
"thought:",
"analysis:",
"thinking:",
"plan:",
"scratchpad:",
"note:",
)
_NON_ANSWER_EXACT = {
"thought",
"analysis",
"thinking",
"plan",
"scratchpad",
"none",
"null",
}
_PLAN_INTROS = (
"to complete the request, i will execute the following plan",
"i will execute the following plan",
"i'll execute the following plan",
"the following plan will be used",
"here is the plan",
"search strategy:",
)
_BLOCKED_TEXT_SNIPPETS = (
"insufficient information",
"insufficient evidence",
"cannot be determined",
"cannot be provided at this time",
"cannot be provided from the provided context",
"cannot be provided from the available context",
"no evidence has been",
"no evidence was",
"no evidence provided",
"no evidence gathered",
"no evidence retrieved",
"no documents have been retrieved",
"no information has been retrieved",
"no search has been performed",
"no search has been executed",
"no search or retrieval operations have been performed",
"no research has been conducted",
"remains unknown",
"remain unknown",
"not yet been identified",
"task is currently blocked",
"need more information",
"need additional information",
"requires external research",
"requires further research",
)
_PROGRESS_STATUS_PATTERNS = (
re.compile(r"^i am currently (investigating|searching)\b"),
re.compile(r"^i have (initiated|begun) (?:the )?(?:process of )?search(?:ing)?\b"),
re.compile(r"^i am initiating (?:a )?targeted search"),
re.compile(r"^i need to (?:perform )?(?:additional )?search(?:es)?\b"),
re.compile(r"^i attempted to search\b"),
re.compile(r"^the initial search(?:es)?\b"),
re.compile(r"^the search results provided do not contain\b"),
re.compile(r"^i have performed searches\b.*\b(?:have not|did not|no specific|not yielded)\b"),
)
_DIRECT_ANSWER_KEYS = (
"final_answer",
"answer_artifact",
"answer",
"result",
"winner",
"institution_name",
"individual_name",
"entity_name",
"organization_name",
"company_name",
"school_name",
"university_name",
"candidate_name",
"name",
"institution",
"individual",
"entity",
"candidate",
"brand",
"city",
"country",
)
_PLANNING_KEYS = {
"plan",
"plans",
"sub_questions",
"search_strategy",
"search_queries",
"queries",
"steps",
"step",
"task_package",
"task_packages",
"tool",
"tools",
"parameters",
"purpose",
"dependencies",
"description",
"rationale",
}
_SUPPORT_KEYS = {
"summary",
"critique",
"revision_request",
"confidence",
"unresolved_issues",
"evidence_summary",
"verification_details",
"evidence",
"citations",
"sources",
"details",
"metadata",
"notes",
"reasoning",
"explanation",
"location",
}
def _normalized_text(value: Any) -> str:
return re.sub(r"\s+", " ", str(value or "")).strip()
def _looks_like_plan_text(raw: str) -> bool:
lowered = raw.strip().lower()
if not lowered:
return False
if lowered.startswith(_NON_ANSWER_PREFIXES):
return True
if lowered in _NON_ANSWER_EXACT:
return True
if lowered.startswith(_PLAN_INTROS):
return True
if lowered.startswith(
(
"{'plan':",
'{"plan":',
"{'sub_questions':",
'{"sub_questions":',
"{'search_strategy':",
'{"search_strategy":',
"[{'tool':",
'[{"tool":',
)
):
return True
return False
def _looks_like_blocked_text(raw: str) -> bool:
lowered = raw.strip().lower()
if not lowered:
return False
if any(snippet in lowered for snippet in _BLOCKED_TEXT_SNIPPETS):
return True
return any(pattern.search(lowered) for pattern in _PROGRESS_STATUS_PATTERNS)
def _parse_structured_value(raw: str) -> Any | None:
raw = raw.strip()
if not raw or raw[0] not in "{[" or raw[-1] not in "}]":
return None
for parser in (json.loads, ast.literal_eval):
try:
value = parser(raw)
except Exception:
continue
if isinstance(value, (dict, list)):
return value
return None
def _is_scalar(value: Any) -> bool:
return isinstance(value, (str, int, float, bool))
def _is_plan_mapping(value: Mapping[str, Any]) -> bool:
keys = {str(key).strip().lower() for key in value.keys()}
if not keys:
return False
has_direct_answer = any(key in _DIRECT_ANSWER_KEYS or key.endswith("_name") for key in keys)
has_planning = any(key in _PLANNING_KEYS for key in keys)
if has_planning and not has_direct_answer:
return True
return keys <= (_PLANNING_KEYS | _SUPPORT_KEYS)
def _extract_from_mapping(value: Mapping[str, Any]) -> str:
lowered = {str(key).strip().lower(): item for key, item in value.items()}
for key in _DIRECT_ANSWER_KEYS:
if key not in lowered:
continue
candidate = extract_substantive_answer(lowered[key])
if candidate:
return candidate
for key, item in lowered.items():
if key.endswith("_name") and key not in _PLANNING_KEYS and key not in _SUPPORT_KEYS:
candidate = extract_substantive_answer(item)
if candidate:
return candidate
if _is_plan_mapping(lowered):
return ""
return ""
def _extract_from_sequence(value: Sequence[Any]) -> str:
items = list(value)
if not items:
return ""
if len(items) == 1:
return extract_substantive_answer(items[0])
if all(_is_scalar(item) for item in items):
normalized = [_normalized_text(item) for item in items if _normalized_text(item)]
return "; ".join(normalized)
if all(isinstance(item, Mapping) and _is_plan_mapping(item) for item in items):
return ""
return ""
def extract_substantive_answer(value: Any) -> str:
if value is None:
return ""
if isinstance(value, Mapping):
return _normalized_text(_extract_from_mapping(value))
if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)):
return _normalized_text(_extract_from_sequence(value))
raw = _normalized_text(value)
if not raw or _looks_like_plan_text(raw) or _looks_like_blocked_text(raw):
return ""
parsed = _parse_structured_value(raw)
if parsed is not None:
return _normalized_text(extract_substantive_answer(parsed))
return raw
def classify_answer_mode(value: Any) -> str:
raw = _normalized_text(value)
if not raw:
return "empty"
parsed = _parse_structured_value(raw)
if parsed is not None:
direct = extract_substantive_answer(parsed)
if direct:
return "direct"
support_text = ""
if isinstance(parsed, Mapping):
lowered = {str(key).strip().lower(): item for key, item in parsed.items()}
if _is_plan_mapping(lowered):
return "plan"
support_values = [
_normalized_text(item)
for key, item in lowered.items()
if key in _SUPPORT_KEYS or key in _PLANNING_KEYS
]
support_text = " ".join(item for item in support_values if item)
elif isinstance(parsed, Sequence) and not isinstance(parsed, (str, bytes, bytearray)):
support_text = " ".join(_normalized_text(item) for item in parsed if _normalized_text(item))
if support_text and _looks_like_blocked_text(support_text):
return "blocked"
if support_text and _looks_like_plan_text(support_text):
return "plan"
return "empty"
if _looks_like_plan_text(raw):
return "plan"
if _looks_like_blocked_text(raw):
return "blocked"
return "direct"
def has_substantive_answer(value: Any) -> bool:
return bool(extract_substantive_answer(value))