Skip to content

Commit 3bad7e5

Browse files
committed
Add rule for domain union validation
1 parent d400230 commit 3bad7e5

4 files changed

Lines changed: 414 additions & 6 deletions

File tree

src/fastssv/rules/data_quality/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
from .schema_validation import SchemaValidationRule
99
from .column_type_validation import ColumnTypeValidationRule
1010
from .negative_concept_id_validation import NegativeConceptIdValidationRule
11+
from .union_concept_id_domain_indicator import UnionConceptIdDomainIndicatorRule
1112

1213
__all__ = [
1314
"UnmappedConceptHandlingRule",
1415
"SchemaValidationRule",
1516
"ColumnTypeValidationRule",
1617
"NegativeConceptIdValidationRule",
18+
"UnionConceptIdDomainIndicatorRule",
1719
]
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
"""Union Concept ID Domain Indicator Rule.
2+
3+
OMOP semantic rule OMOP_067:
4+
UNION queries should not combine concept_id columns from different domains without a domain
5+
indicator. Mixing condition_concept_id and drug_concept_id in a single column without context
6+
makes results uninterpretable.
7+
8+
The Problem:
9+
Each OMOP domain has its own concept_id column:
10+
- condition_occurrence.condition_concept_id
11+
- drug_exposure.drug_concept_id
12+
- procedure_occurrence.procedure_concept_id
13+
- measurement.measurement_concept_id
14+
- observation.observation_concept_id
15+
16+
UNION queries that mix these without domain labels create ambiguous results.
17+
18+
Example impact:
19+
SELECT condition_concept_id AS concept_id
20+
FROM condition_occurrence
21+
UNION ALL
22+
SELECT drug_concept_id AS concept_id
23+
FROM drug_exposure
24+
-- Returns: [201826, 1545958, 313217, ...]
25+
-- Which are conditions? Which are drugs? UNKNOWN!
26+
-- Results are uninterpretable without domain context
27+
28+
Violation pattern:
29+
SELECT condition_concept_id AS concept_id
30+
FROM condition_occurrence
31+
UNION ALL
32+
SELECT drug_concept_id AS concept_id
33+
FROM drug_exposure
34+
-- No domain indicator!
35+
36+
Correct patterns:
37+
-- Option 1: Add literal domain column
38+
SELECT 'Condition' AS domain, condition_concept_id AS concept_id
39+
FROM condition_occurrence
40+
UNION ALL
41+
SELECT 'Drug' AS domain, drug_concept_id AS concept_id
42+
FROM drug_exposure
43+
44+
-- Option 2: Use domain_id from concept table
45+
SELECT c.domain_id, co.condition_concept_id AS concept_id
46+
FROM condition_occurrence co
47+
JOIN concept c ON co.condition_concept_id = c.concept_id
48+
UNION ALL
49+
SELECT c.domain_id, de.drug_concept_id AS concept_id
50+
FROM drug_exposure de
51+
JOIN concept c ON de.drug_concept_id = c.concept_id
52+
"""
53+
54+
from typing import Dict, List, Optional, Set
55+
56+
from sqlglot import exp
57+
58+
from fastssv.core.base import Rule, RuleViolation, Severity
59+
from fastssv.core.helpers import (
60+
extract_aliases,
61+
normalize_name,
62+
parse_sql,
63+
resolve_table_col,
64+
)
65+
from fastssv.core.registry import register
66+
67+
68+
# --- Constants -------------------------------------------------------------
69+
70+
TABLE_TO_DOMAIN = {
71+
"condition_occurrence": "condition",
72+
"drug_exposure": "drug",
73+
"procedure_occurrence": "procedure",
74+
"measurement": "measurement",
75+
"observation": "observation",
76+
"device_exposure": "device",
77+
}
78+
79+
VALID_DOMAIN_LITERALS = set(TABLE_TO_DOMAIN.values())
80+
81+
82+
# --- Helpers ---------------------------------------------------------------
83+
84+
def _norm(x: Optional[str]) -> Optional[str]:
85+
return normalize_name(x) if x else None
86+
87+
88+
# 1. Flatten UNION chains properly
89+
def _collect_union_selects(node: exp.Expression) -> List[exp.Select]:
90+
selects: List[exp.Select] = []
91+
92+
def _walk(n: exp.Expression):
93+
if isinstance(n, exp.Union):
94+
_walk(n.this)
95+
_walk(n.expression)
96+
elif isinstance(n, exp.Select):
97+
selects.append(n)
98+
99+
_walk(node)
100+
return selects
101+
102+
103+
# 2. Extract concept_id columns WITH table lineage
104+
def _get_concept_id_domains(
105+
select: exp.Select,
106+
aliases: Dict[str, str],
107+
) -> Set[str]:
108+
domains: Set[str] = set()
109+
110+
for expr in select.expressions:
111+
col = None
112+
113+
if isinstance(expr, exp.Alias):
114+
if isinstance(expr.this, exp.Column):
115+
col = expr.this
116+
elif isinstance(expr, exp.Column):
117+
col = expr
118+
119+
if not col:
120+
continue
121+
122+
col_name = _norm(col.name)
123+
if not col_name or not col_name.endswith("_concept_id"):
124+
continue
125+
126+
table, _ = resolve_table_col(col, aliases)
127+
table = _norm(table)
128+
129+
# If qualified, use the table name
130+
if table and table in TABLE_TO_DOMAIN:
131+
domains.add(TABLE_TO_DOMAIN[table])
132+
# If unqualified, infer from tables in aliases
133+
elif not table:
134+
# Check tables referenced in this SELECT
135+
for alias_table in aliases.values():
136+
norm_table = _norm(alias_table)
137+
if norm_table in TABLE_TO_DOMAIN:
138+
# Infer domain from column name matching table
139+
# e.g., condition_concept_id → condition_occurrence
140+
expected_prefix = col_name.replace("_concept_id", "")
141+
if expected_prefix in norm_table:
142+
domains.add(TABLE_TO_DOMAIN[norm_table])
143+
144+
return domains
145+
146+
147+
# 3. Strong domain indicator detection
148+
def _has_domain_indicator(select: exp.Select) -> bool:
149+
for expr in select.expressions:
150+
# Case 1: explicit domain column
151+
if isinstance(expr, exp.Column):
152+
if _norm(expr.name) in {"domain", "domain_id"}:
153+
return True
154+
155+
# Case 2: aliased literal with domain meaning
156+
if isinstance(expr, exp.Alias):
157+
alias_name = _norm(expr.alias)
158+
159+
if isinstance(expr.this, exp.Literal):
160+
value = _norm(str(expr.this.this))
161+
162+
# Must match known domain values
163+
if value in VALID_DOMAIN_LITERALS:
164+
return True
165+
166+
# OR alias explicitly says domain
167+
if alias_name in {"domain", "domain_id"}:
168+
return True
169+
170+
return False
171+
172+
173+
# 4. Core detection
174+
def _find_violations(tree: exp.Expression) -> List[str]:
175+
issues: List[str] = []
176+
seen: Set[str] = set()
177+
178+
for union in tree.find_all(exp.Union):
179+
# Only process top-level UNIONs (skip nested ones to avoid duplicates)
180+
parent = union.parent
181+
is_nested = False
182+
while parent:
183+
if isinstance(parent, exp.Union):
184+
is_nested = True
185+
break
186+
parent = parent.parent if hasattr(parent, 'parent') else None
187+
188+
if is_nested:
189+
continue
190+
191+
selects = _collect_union_selects(union)
192+
193+
if len(selects) < 2:
194+
continue
195+
196+
all_domains: Set[str] = set()
197+
198+
for select in selects:
199+
aliases = extract_aliases(select)
200+
domains = _get_concept_id_domains(select, aliases)
201+
all_domains.update(domains)
202+
203+
# No concept_id usage → skip
204+
if not all_domains:
205+
continue
206+
207+
# Single domain → OK
208+
if len(all_domains) <= 1:
209+
continue
210+
211+
# Check for domain indicator
212+
has_indicator = any(_has_domain_indicator(s) for s in selects)
213+
214+
if has_indicator:
215+
continue
216+
217+
key = tuple(sorted(all_domains))
218+
if key in seen:
219+
continue
220+
seen.add(key)
221+
222+
domains_str = ", ".join(sorted(d.capitalize() for d in all_domains))
223+
224+
issues.append(
225+
f"UNION combines concept_id columns from multiple domains ({domains_str}) "
226+
f"without a domain indicator column. This makes concept_id values ambiguous."
227+
)
228+
229+
return issues
230+
231+
232+
# --- Rule ------------------------------------------------------------------
233+
234+
@register
235+
class UnionConceptIdDomainIndicatorRule(Rule):
236+
"""Validates domain disambiguation in UNION queries with concept_id."""
237+
238+
rule_id = "semantic.union_concept_id_domain_indicator"
239+
name = "Union Concept ID Domain Indicator"
240+
description = (
241+
"UNION queries combining concept_id values from multiple domains must include "
242+
"a domain indicator column to avoid ambiguity."
243+
)
244+
severity = Severity.WARNING
245+
suggested_fix = (
246+
"Add a domain indicator column to each SELECT, e.g.: "
247+
"SELECT 'Condition' AS domain, condition_concept_id FROM ... UNION ..."
248+
)
249+
250+
def validate(self, sql: str, dialect: str = "postgres") -> List[RuleViolation]:
251+
sql_lower = sql.lower()
252+
253+
# Fast pre-check
254+
if "union" not in sql_lower or "_concept_id" not in sql_lower:
255+
return []
256+
257+
trees, err = parse_sql(sql, dialect)
258+
if err:
259+
return []
260+
261+
violations: List[RuleViolation] = []
262+
263+
for tree in trees:
264+
if not tree:
265+
continue
266+
267+
issues = _find_violations(tree)
268+
269+
for msg in issues:
270+
violations.append(self.create_violation(message=msg))
271+
272+
return violations
273+
274+
275+
__all__ = ["UnionConceptIdDomainIndicatorRule"]

tasks/IMPLEMENTATION_STATUS.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ This checklist tracks which rules from `omop_rules.json` have been implemented i
99

1010
**Statistics:**
1111
- Total rules in JSON: 350+
12-
- Implemented: 62 rules (including covered rules)
13-
- Coverage: 17.7%
12+
- Implemented: 63 rules (including covered rules)
13+
- Coverage: 18.0%
1414
- Last updated: March 2026
1515

1616
---
@@ -153,12 +153,12 @@ This checklist tracks which rules from `omop_rules.json` have been implemented i
153153
- *Status: SKIPPED - This is an analytics best practice, not a semantic CDM violation. The rule would detect COUNT(*) vs COUNT(DISTINCT person_id) in prevalence queries, but this is more about query intent (counting patients vs events) which is highly context-dependent and subjective. Many legitimate use cases exist for counting total events. Better addressed through documentation, code review, and analytics training rather than static SQL validation. High false positive risk due to inability to determine query intent.*
154154
- [x] **OMOP_064**: drug_strength_valid_start_end_date_filter
155155
- *Implemented as: `domain_specific/drug/drug_strength_validity_filter.py`*
156-
- [ ] **OMOP_065**: observation_table_heterogeneous_domain
157-
- *Suggested group: `domain_specific/observation/`*
156+
- [-] **OMOP_065**: observation_table_heterogeneous_domain
157+
- *Status: SKIPPED - This is an analytics best practice, not a semantic violation. While the observation table is heterogeneous (surveys, social history, family history), requiring observation_concept_id filter is prescriptive about query patterns rather than catching semantic errors. Many legitimate use cases exist for querying observation without concept filters (metadata queries, exploratory analysis, data quality checks). Better addressed through analytics documentation and query guidelines rather than static SQL validation. High false positive risk for legitimate analytical queries.*
158158
- [x] **OMOP_066**: concept_domain_id_matches_target_table
159159
- *Implemented as: `concept_standardization/concept_domain_validation.py` (comprehensive merged rule covering all 35+ concept_id columns)*
160-
- [ ] **OMOP_067**: no_union_different_concept_id_types
161-
- *Suggested group: `data_quality/`*
160+
- [x] **OMOP_067**: no_union_different_concept_id_types
161+
- *Implemented as: `data_quality/union_concept_id_domain_indicator.py`*
162162
- [ ] **OMOP_068**: specimen_table_not_for_lab_results
163163
- *Suggested group: `domain_specific/specimen/`*
164164
- [ ] **OMOP_069**: cohort_definition_id_required_for_cohort_query

0 commit comments

Comments
 (0)