1+ """Source Concept ID Usage Warning Rule.
2+
3+ OMOP semantic rule OMOP_022:
4+ The *_source_concept_id columns store the original source vocabulary concept.
5+ For standard analytical queries and cohort identification, use the primary
6+ *_concept_id (standard concept) rather than *_source_concept_id.
7+
8+ Valid uses of source_concept_id:
9+ - Data quality checks
10+ - ETL validation / mapping verification
11+ - Source code exploration
12+ - Provenance tracking
13+
14+ Invalid use (cohort identification):
15+ - SELECT person_id FROM condition_occurrence WHERE condition_source_concept_id = 123
16+
17+ Correct approach:
18+ - SELECT person_id FROM condition_occurrence WHERE condition_concept_id = 456
19+ """
20+
21+ from typing import Dict , List , Set , Tuple
22+
23+ from sqlglot import exp
24+
25+ from fastssv .core .base import Rule , RuleViolation , Severity
26+ from fastssv .core .helpers import (
27+ extract_aliases ,
28+ normalize_name ,
29+ parse_sql ,
30+ resolve_table_col ,
31+ uses_table ,
32+ )
33+ from fastssv .core .registry import register
34+
35+
36+ SOURCE_CONCEPT_ID_COLUMNS : Set [str ] = {
37+ "condition_source_concept_id" ,
38+ "drug_source_concept_id" ,
39+ "procedure_source_concept_id" ,
40+ "measurement_source_concept_id" ,
41+ "observation_source_concept_id" ,
42+ "device_source_concept_id" ,
43+ "visit_source_concept_id" ,
44+ "specimen_source_concept_id" ,
45+ }
46+
47+ SOURCE_TO_STANDARD : Dict [str , str ] = {
48+ "condition_source_concept_id" : "condition_concept_id" ,
49+ "drug_source_concept_id" : "drug_concept_id" ,
50+ "procedure_source_concept_id" : "procedure_concept_id" ,
51+ "measurement_source_concept_id" : "measurement_concept_id" ,
52+ "observation_source_concept_id" : "observation_concept_id" ,
53+ "device_source_concept_id" : "device_concept_id" ,
54+ "visit_source_concept_id" : "visit_concept_id" ,
55+ "specimen_source_concept_id" : "specimen_concept_id" ,
56+ }
57+
58+
59+ def _is_in_where_or_having (node : exp .Expression ) -> bool :
60+ parent = node .parent
61+ while parent :
62+ if isinstance (parent , (exp .Where , exp .Having )):
63+ return True
64+ if isinstance (parent , exp .Join ):
65+ return False
66+ parent = parent .parent
67+ return False
68+
69+
70+ def _find_source_filters (
71+ tree : exp .Expression ,
72+ aliases : Dict [str , str ],
73+ ) -> List [str ]:
74+ issues : List [str ] = []
75+ seen : Set [Tuple [str , str ]] = set ()
76+
77+ for node in tree .walk ():
78+ if not isinstance (node , (exp .EQ , exp .NEQ , exp .In )):
79+ continue
80+
81+ if not _is_in_where_or_having (node ):
82+ continue
83+
84+ left = node .this
85+ right = node .expression
86+
87+ for col_node , _ in [(left , right ), (right , left )]:
88+ if not isinstance (col_node , exp .Column ):
89+ continue
90+
91+ _ , col = resolve_table_col (col_node , aliases )
92+ col_norm = normalize_name (col )
93+
94+ if col_norm not in SOURCE_CONCEPT_ID_COLUMNS :
95+ continue
96+
97+ key = (col_norm , node .sql ())
98+ if key in seen :
99+ continue
100+ seen .add (key )
101+
102+ standard_col = SOURCE_TO_STANDARD .get (
103+ col_norm ,
104+ col_norm .replace ("_source_" , "_" ),
105+ )
106+
107+ issues .append (
108+ f"Filtering on '{ col_norm } ' for cohort/analytical logic is discouraged. "
109+ f"Use '{ standard_col } ' (standard concept) instead. "
110+ f"Source concept IDs are intended for ETL validation, mapping QA, or provenance analysis."
111+ )
112+
113+ return issues
114+
115+
116+ def _is_likely_analytical_query (tree : exp .Expression ) -> bool :
117+ # Cohort queries typically involve PERSON or person_id
118+ if uses_table (tree , "person" ):
119+ return True
120+
121+ for col in tree .find_all (exp .Column ):
122+ if normalize_name (col .name ) == "person_id" :
123+ return True
124+
125+ return False
126+
127+
128+ def _is_source_exploration_query (tree : exp .Expression ) -> bool :
129+ select = tree .find (exp .Select )
130+ if not select :
131+ return False
132+
133+ for expr in select .expressions :
134+ for col in expr .find_all (exp .Column ):
135+ name = normalize_name (col .name )
136+
137+ if (
138+ "source_value" in name
139+ or name .endswith ("_source_concept_id" )
140+ ):
141+ return True
142+
143+ return False
144+
145+
146+ @register
147+ class SourceConceptIdWarningRule (Rule ):
148+ """Production-grade validation for source_concept_id misuse."""
149+
150+ rule_id = "semantic.source_concept_id_warning"
151+ name = "Source Concept ID Not For Analytical Filtering"
152+ description = (
153+ "Avoid using *_source_concept_id for cohort definition or analytical filtering. "
154+ "Use standard *_concept_id instead."
155+ )
156+ severity = Severity .WARNING
157+ suggested_fix = (
158+ "Replace *_source_concept_id with corresponding standard *_concept_id column. "
159+ "If this is for ETL validation or source exploration, this warning can be ignored."
160+ )
161+
162+ def validate (self , sql : str , dialect : str = "postgres" ) -> List [RuleViolation ]:
163+ violations : List [RuleViolation ] = []
164+
165+ trees , error = parse_sql (sql , dialect )
166+ if error :
167+ return []
168+
169+ for tree in trees :
170+ if not tree :
171+ continue
172+
173+ aliases = extract_aliases (tree )
174+
175+ # --- Context detection ---
176+ is_exploration = _is_source_exploration_query (tree )
177+
178+ if is_exploration :
179+ continue
180+
181+ issues = _find_source_filters (tree , aliases )
182+
183+ for issue in issues :
184+ violations .append (self .create_violation (message = issue ))
185+
186+ return violations
187+
188+
189+ __all__ = ["SourceConceptIdWarningRule" ]
0 commit comments