Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,6 @@ AGENTS_ROADMAP.md
*/logfile
logfile
resources/screening-filtered-navigation.md
resources/L1 Screening 2025_03_18 - fulltext - 100.csv
IMPLEMENTATION_SUMMARY.md

204 changes: 204 additions & 0 deletions backend/api/citations/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,197 @@ def _create_table_and_insert_sync(table_name: str, columns: List[str], rows: Lis
return cits_dp_service.create_table_and_insert_sync(table_name, columns, rows)


def _extract_criteria_questions_from_sr(sr: Optional[Dict[str, Any]]) -> Dict[str, str]:
"""Extract L1 criteria questions from SR criteria_parsed.

Returns a dict mapping criterion_key -> question_text.
Example: {"is_this_article_primary_research": "Is this article primary research?"}
"""
if not sr:
return {}

criteria_parsed = sr.get("criteria_parsed") or {}
l1_criteria = criteria_parsed.get("l1") or {}
questions = l1_criteria.get("questions") or []

result = {}
for q in questions:
if not isinstance(q, str) or not q.strip():
continue
# Use the same key generation as the screening system
# snake_case already converts "Is this article primary research?" to "is_this_article_primary_research"
criterion_key = snake_case(q, max_len=56)
if not criterion_key:
criterion_key = "criterion"
result[criterion_key] = q

return result


def _match_csv_column_to_criterion(csv_col: str, criteria_questions: Dict[str, str]) -> Optional[str]:
"""Match a CSV column header to a criterion key.

Handles the "L1 - " prefix and matches exactly against criteria questions.
Returns the criterion_key if matched, None otherwise.
"""
if not csv_col or not isinstance(csv_col, str):
return None

# Remove "L1 - " prefix if present
col_text = csv_col.strip()
if col_text.startswith("L1 - "):
col_text = col_text[5:].strip()
elif col_text.startswith("L2 - "):
# Skip L2 columns for now
return None

# Match exactly against criteria questions
for criterion_key, question_text in criteria_questions.items():
if col_text == question_text:
return criterion_key

return None


def _parse_human_answer_to_jsonb(answer_value: Any) -> Dict[str, Any]:
"""Convert a CSV human answer value to JSONB format.

Returns a dict with structure:
{
"selected": "Yes - primary research",
"source": "csv_upload",
"timestamp": "2025-03-18T...",
"autofilled": true
}
"""
if answer_value is None or (isinstance(answer_value, str) and answer_value.strip() == ""):
return {
"selected": None,
"source": "csv_upload",
"timestamp": datetime.utcnow().isoformat() + "Z",
"autofilled": True,
}

selected_value = str(answer_value).strip() if answer_value else None

return {
"selected": selected_value,
"source": "csv_upload",
"timestamp": datetime.utcnow().isoformat() + "Z",
"autofilled": True,
}


def _populate_human_answers_from_csv(
table_name: str,
normalized_rows: List[Dict[str, Any]],
include_columns: List[str],
sr: Optional[Dict[str, Any]],
) -> None:
"""Populate human_* JSONB columns from CSV data.

This function:
1. Extracts criteria questions from SR
2. Matches CSV columns to criteria questions
3. For each matched column, populates the corresponding human_* column
4. Updates validation metadata (l1_validated_by, l1_validated_at)
5. Backfills human decision columns
"""
if not sr or not normalized_rows or not include_columns:
return

# Extract criteria questions
criteria_questions = _extract_criteria_questions_from_sr(sr)
if not criteria_questions:
return

# Build mapping of CSV column index to criterion_key
csv_col_to_criterion: Dict[int, str] = {}
for col_idx, col_name in enumerate(include_columns):
criterion_key = _match_csv_column_to_criterion(col_name, criteria_questions)
if criterion_key:
csv_col_to_criterion[col_idx] = criterion_key

if not csv_col_to_criterion:
# No matching columns found
return

# Fetch all rows from the database to get actual citation IDs
try:
all_rows = cits_dp_service.get_citations_by_ids(
list(range(1, len(normalized_rows) + 1)),
table_name=table_name
)
except Exception:
# If we can't fetch rows, skip human answer population
return

# Build a map of row index to citation ID
row_id_map: Dict[int, int] = {}
for idx, db_row in enumerate(all_rows):
if db_row and 'id' in db_row:
row_id_map[idx] = db_row['id']

# Populate human answer columns for each row
for row_idx, row in enumerate(normalized_rows):
citation_id = row_id_map.get(row_idx)
if not citation_id:
continue

for col_idx, criterion_key in csv_col_to_criterion.items():
if col_idx >= len(include_columns):
continue

col_name = include_columns[col_idx]
answer_value = row.get(col_name)

# Convert to JSONB format
human_jsonb = _parse_human_answer_to_jsonb(answer_value)

# Create human_* column name
human_col = f"human_{criterion_key}"

# Update the JSONB column
try:
cits_dp_service.update_jsonb_column(
citation_id=citation_id,
col=human_col,
data=human_jsonb,
table_name=table_name,
)
except Exception:
# Best-effort; continue with other rows
pass

# Update validation metadata
try:
now_iso = datetime.utcnow().isoformat() + "Z"
for row_idx, citation_id in row_id_map.items():
cits_dp_service.update_text_column(
citation_id=citation_id,
col="l1_validated_by",
text_value="csv_upload",
table_name=table_name,
)
cits_dp_service.update_text_column(
citation_id=citation_id,
col="l1_validated_at",
text_value=now_iso,
table_name=table_name,
)
except Exception:
# Best-effort; continue
pass

# Backfill human decision columns
try:
criteria_parsed = sr.get("criteria_parsed") or {}
cits_dp_service.backfill_human_decisions(criteria_parsed, table_name)
except Exception:
# Best-effort; continue
pass


async def _upload_screening_citations_impl(
sr_id: str,
file: UploadFile,
Expand Down Expand Up @@ -378,6 +569,19 @@ async def _upload_screening_citations_impl(
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create table or insert rows: {e}")

# Populate human answers from CSV if criteria config exists
try:
await run_in_threadpool(
_populate_human_answers_from_csv,
table_name,
normalized_rows,
include_columns,
sr,
)
except Exception:
# Best-effort; human answer population should not block the upload
pass

# Save DB connection metadata into SR Mongo doc
try:
screening_info = {
Expand Down
Loading
Loading