|
| 1 | +"""Schema Validation Rule. |
| 2 | +
|
| 3 | +OMOP CDM schema validation: |
| 4 | +Validates that columns referenced in SQL queries exist in the OMOP CDM schema. |
| 5 | +Catches common errors like using concept_ancestor columns on concept_relationship table. |
| 6 | +""" |
| 7 | + |
| 8 | +from typing import Dict, List, Set |
| 9 | + |
| 10 | +from sqlglot import exp |
| 11 | + |
| 12 | +from fastssv.core.base import Rule, RuleViolation, Severity |
| 13 | +from fastssv.core.helpers import extract_aliases, parse_sql, resolve_table_col |
| 14 | +from fastssv.core.registry import register |
| 15 | +from fastssv.schemas import CDM_COLUMNS, get_table_columns |
| 16 | + |
| 17 | + |
| 18 | +@register |
| 19 | +class SchemaValidationRule(Rule): |
| 20 | + """Validates column references against OMOP CDM schema.""" |
| 21 | + |
| 22 | + rule_id = "vocabulary.schema_validation" |
| 23 | + name = "Schema Validation" |
| 24 | + description = ( |
| 25 | + "Validates that columns referenced in queries exist in the OMOP CDM schema. " |
| 26 | + "Catches errors like using concept_ancestor columns on concept_relationship." |
| 27 | + ) |
| 28 | + severity = Severity.ERROR |
| 29 | + suggested_fix = "Check OMOP CDM documentation for correct column names" |
| 30 | + |
| 31 | + def validate(self, sql: str, dialect: str = "postgres") -> List[RuleViolation]: |
| 32 | + """Validate SQL and return list of violations.""" |
| 33 | + trees, parse_error = parse_sql(sql, dialect) |
| 34 | + if parse_error: |
| 35 | + return [] |
| 36 | + |
| 37 | + violations: List[RuleViolation] = [] |
| 38 | + |
| 39 | + for tree in trees: |
| 40 | + if tree is None: |
| 41 | + continue |
| 42 | + |
| 43 | + aliases = extract_aliases(tree) |
| 44 | + |
| 45 | + # Track which columns we've already reported to avoid duplicates |
| 46 | + reported: Set[tuple] = set() |
| 47 | + |
| 48 | + for col in tree.find_all(exp.Column): |
| 49 | + table_name, col_name = resolve_table_col(col, aliases) |
| 50 | + |
| 51 | + if not table_name: |
| 52 | + continue |
| 53 | + |
| 54 | + if table_name not in CDM_COLUMNS: |
| 55 | + continue |
| 56 | + |
| 57 | + if (table_name, col_name) in reported: |
| 58 | + continue |
| 59 | + |
| 60 | + valid_columns = get_table_columns(table_name) |
| 61 | + |
| 62 | + if not valid_columns: |
| 63 | + continue |
| 64 | + |
| 65 | + if col_name not in valid_columns: |
| 66 | + reported.add((table_name, col_name)) |
| 67 | + |
| 68 | + suggestion = self.suggested_fix |
| 69 | + if table_name == "concept_relationship" and col_name in {"ancestor_concept_id", "descendant_concept_id"}: |
| 70 | + suggestion = ( |
| 71 | + f"Column '{col_name}' belongs to concept_ancestor table, not concept_relationship. " |
| 72 | + f"concept_relationship uses concept_id_1 and concept_id_2." |
| 73 | + ) |
| 74 | + elif table_name == "concept_ancestor" and col_name in {"concept_id_1", "concept_id_2"}: |
| 75 | + suggestion = ( |
| 76 | + f"Column '{col_name}' belongs to concept_relationship table, not concept_ancestor. " |
| 77 | + f"concept_ancestor uses ancestor_concept_id and descendant_concept_id." |
| 78 | + ) |
| 79 | + |
| 80 | + violations.append(self.create_violation( |
| 81 | + message=f"Column '{col_name}' does not exist in table '{table_name}'", |
| 82 | + suggested_fix=suggestion, |
| 83 | + details={ |
| 84 | + "table": table_name, |
| 85 | + "column": col_name, |
| 86 | + "valid_columns": sorted(list(valid_columns))[:10] # Show first 10 |
| 87 | + } |
| 88 | + )) |
| 89 | + |
| 90 | + return violations |
| 91 | + |
| 92 | + |
| 93 | +__all__ = ["SchemaValidationRule"] |
0 commit comments