99from io import StringIO , BytesIO
1010from pathlib import Path
1111from presidio_analyzer import AnalyzerEngine
12+ from sentence_transformers import SentenceTransformer , util
1213
1314class DataCrawler :
14- def __init__ (self , file_path ):
15- """Initializes the DataCrawler with the path to a single data file .
15+ def __init__ (self , file_path , output_dir , similarity_threshold = 0.7 ):
16+ """Initializes the DataCrawler with paths .
1617
1718 Args:
1819 file_path (str): Path to the data file.
20+ output_dir (str): Path to the output directory for JSON.
1921 """
2022 self .file_path = file_path
21- self .analyzer = AnalyzerEngine () # Initialize Presidio PHI Detector
23+ self .output_dir = output_dir
24+ self .analyzer = AnalyzerEngine () # Initialize Presidio PHI & PII Detector
25+ self .model = SentenceTransformer ("sentence-transformers/all-MiniLM-L6-v2" )
26+ self .similarity_threshold = similarity_threshold
27+ self .pii_entities = [
28+ "email" , "phone number" , "credit card" , "ssn" , "social security number" ,
29+ "name" , "address" , "dob" , "date of birth" , "passport number" , "bank account"
30+ ]
2231
2332 def read_file (self ):
2433 """Reads the specified file from the local filesystem."""
2534 with open (self .file_path , 'rb' ) as f :
2635 return f .read ()
27-
36+
2837 def detect_phi (self , column_name ):
29- """Uses Presidio to detect PHI/PII in column names."""
30- results = self .analyzer .analyze (text = column_name , entities = self .analyzer .get_supported_entities (language = "en" ), language = "en" )
31- return len (results ) > 0 # Returns True if PHI is detected
38+ """Uses Presidio to detect PHI in column names."""
39+ results = self .analyzer .analyze (text = column_name , entities = self .analyzer .get_supported_entities (language = "en" ), language = "en" )
40+ return any (entity .entity_type in ["MEDICAL_CONDITION" , "MEDICAL_TREATMENT" , "US_SSN" ] for entity in results )
41+
42+ def detect_pii (self , column_name ):
43+ column_embedding = self .model .encode (column_name , convert_to_tensor = True )
44+ pii_embeddings = self .model .encode (self .pii_entities , convert_to_tensor = True )
45+
46+ similarities = util .cos_sim (column_embedding , pii_embeddings )
47+
48+ # If any similarity score is above the threshold, flag as PII
49+ return any (similarity > self .similarity_threshold for similarity in similarities [0 ])
3250
3351 def infer_schema (self ):
34- """
35- Infers schema from the file content and profiles the data.
36- """
52+ """Infers schema from the file content and profiles the data."""
3753 file_ext = Path (self .file_path ).suffix .lower ()
3854
3955 with open (self .file_path , "rb" ) as f :
@@ -58,22 +74,30 @@ def infer_schema(self):
5874 "columns" : []
5975 }
6076
61- for col in df .columns :
77+ for col in df .columns : # converting the field names to lower case to reduce noise
6278 is_phi = self .detect_phi (col )
79+ is_pii = self .detect_pii (col )
6380 schema ["columns" ].append ({
6481 'name' : col ,
6582 'dtype' : str (df [col ].dtype ),
6683 'null_count' : int (df [col ].isnull ().sum ()),
6784 'total_count' : int (len (df [col ])),
6885 'distinct_count' : int (df [col ].nunique ()),
69- 'is_phi' : is_phi
86+ 'is_phi' : is_phi ,
87+ 'is_pii' : is_pii
7088 })
7189
7290 return schema
7391
7492 def save_to_json (self , schema ):
75- """Saves schema and profiling info to a JSON file."""
76- json_file = os .path .basename (self .file_path ).replace ('.' , '_' ) + ".json"
93+ """Saves schema and profiling info to a JSON file inside the specified output directory."""
94+
95+ # Ensure the output directory exists
96+ os .makedirs (self .output_dir , exist_ok = True )
97+
98+ # Construct the JSON file name (same as input file but with .json extension)
99+ json_file_name = os .path .basename (self .file_path ).replace ('.' , '_' ) + ".json"
100+ json_file_path = os .path .join (self .output_dir , json_file_name )
77101
78102 def convert_types (obj ):
79103 """Handles serialization of NumPy and Pandas types."""
@@ -95,20 +119,21 @@ def convert_types(obj):
95119 return {str (k ): convert_types (v ) for k , v in obj .items ()}
96120 return obj
97121
98- with open (json_file , 'w' , encoding = 'utf-8' ) as f :
122+ # Save the schema to the specified directory
123+ with open (json_file_path , 'w' , encoding = 'utf-8' ) as f :
99124 json .dump (schema , f , indent = 4 , ensure_ascii = False , default = convert_types )
100125
101- print (f"Schema saved: { json_file } " )
126+ print (f"Schema saved: { json_file_path } " )
102127
103128 def run (self ):
104- """
105- Runs the crawler for the given file.
106- """
129+ """Runs the crawler for the given file."""
107130 schema = self .infer_schema ()
108131 if schema :
109132 self .save_to_json (schema )
110133
111134# Example usage
112- file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\global_electronics_retailer\Customers.csv"
113- crawler = DataCrawler (file_path = file_path )
114- crawler .run ()
135+ file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\crm_sales_opportunities\sales_teams.csv"
136+ output_dir = r"D:\Indium Internal Work\Accelerators\inferred_schemas"
137+
138+ crawler = DataCrawler (file_path = file_path , output_dir = output_dir )
139+ crawler .run ()
0 commit comments