diff --git a/crawl.py b/crawl.py index 7d2e39e..0aee089 100644 --- a/crawl.py +++ b/crawl.py @@ -9,31 +9,47 @@ from io import StringIO, BytesIO from pathlib import Path from presidio_analyzer import AnalyzerEngine +from sentence_transformers import SentenceTransformer, util class DataCrawler: - def __init__(self, file_path): - """Initializes the DataCrawler with the path to a single data file. + def __init__(self, file_path, output_dir, similarity_threshold = 0.7): + """Initializes the DataCrawler with paths. Args: file_path (str): Path to the data file. + output_dir (str): Path to the output directory for JSON. """ self.file_path = file_path - self.analyzer = AnalyzerEngine() # Initialize Presidio PHI Detector + self.output_dir = output_dir + self.analyzer = AnalyzerEngine() # Initialize Presidio PHI & PII Detector + self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") + self.similarity_threshold = similarity_threshold + self.pii_entities = [ + "email", "phone number", "credit card", "ssn", "social security number", + "name", "address", "dob", "date of birth", "passport number", "bank account" + ] def read_file(self): """Reads the specified file from the local filesystem.""" with open(self.file_path, 'rb') as f: return f.read() - + def detect_phi(self, column_name): - """Uses Presidio to detect PHI/PII in column names.""" - results = self.analyzer.analyze(text=column_name, entities = self.analyzer.get_supported_entities(language="en"), language="en") - return len(results) > 0 # Returns True if PHI is detected + """Uses Presidio to detect PHI in column names.""" + results = self.analyzer.analyze(text=column_name, entities=self.analyzer.get_supported_entities(language="en"), language="en") + return any(entity.entity_type in ["MEDICAL_CONDITION", "MEDICAL_TREATMENT", "US_SSN"] for entity in results) + + def detect_pii(self, column_name): + column_embedding = self.model.encode(column_name, convert_to_tensor=True) + pii_embeddings = self.model.encode(self.pii_entities, convert_to_tensor=True) + + similarities = util.cos_sim(column_embedding, pii_embeddings) + + # If any similarity score is above the threshold, flag as PII + return any(similarity > self.similarity_threshold for similarity in similarities[0]) def infer_schema(self): - """ - Infers schema from the file content and profiles the data. - """ + """Infers schema from the file content and profiles the data.""" file_ext = Path(self.file_path).suffix.lower() with open(self.file_path, "rb") as f: @@ -58,22 +74,30 @@ def infer_schema(self): "columns": [] } - for col in df.columns: + for col in df.columns: # converting the field names to lower case to reduce noise is_phi = self.detect_phi(col) + is_pii = self.detect_pii(col) schema["columns"].append({ 'name': col, 'dtype': str(df[col].dtype), 'null_count': int(df[col].isnull().sum()), 'total_count': int(len(df[col])), 'distinct_count': int(df[col].nunique()), - 'is_phi': is_phi + 'is_phi': is_phi, + 'is_pii': is_pii }) return schema def save_to_json(self, schema): - """Saves schema and profiling info to a JSON file.""" - json_file = os.path.basename(self.file_path).replace('.', '_') + ".json" + """Saves schema and profiling info to a JSON file inside the specified output directory.""" + + # Ensure the output directory exists + os.makedirs(self.output_dir, exist_ok=True) + + # Construct the JSON file name (same as input file but with .json extension) + json_file_name = os.path.basename(self.file_path).replace('.', '_') + ".json" + json_file_path = os.path.join(self.output_dir, json_file_name) def convert_types(obj): """Handles serialization of NumPy and Pandas types.""" @@ -95,20 +119,21 @@ def convert_types(obj): return {str(k): convert_types(v) for k, v in obj.items()} return obj - with open(json_file, 'w', encoding='utf-8') as f: + # Save the schema to the specified directory + with open(json_file_path, 'w', encoding='utf-8') as f: json.dump(schema, f, indent=4, ensure_ascii=False, default=convert_types) - print(f"Schema saved: {json_file}") + print(f"Schema saved: {json_file_path}") def run(self): - """ - Runs the crawler for the given file. - """ + """Runs the crawler for the given file.""" schema = self.infer_schema() if schema: self.save_to_json(schema) # Example usage -file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\global_electronics_retailer\Customers.csv" -crawler = DataCrawler(file_path=file_path) -crawler.run() \ No newline at end of file +file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\crm_sales_opportunities\sales_teams.csv" +output_dir = r"D:\Indium Internal Work\Accelerators\inferred_schemas" + +crawler = DataCrawler(file_path=file_path, output_dir=output_dir) +crawler.run() diff --git a/improvements.md b/improvements.md index 1c6b26d..a5858fa 100644 --- a/improvements.md +++ b/improvements.md @@ -1,4 +1,4 @@ 1. Improve precision on the numerical fields. -2. Most of the string and date fields are identified as objects. +2. Most of the string and date fields are identified as objects. - DONE 3. Detect encoding and use the corresponding decoding strategy (ascii, utf, iso). 4. PHI detection accuracy must be improved. diff --git a/pii_detector.py b/pii_detector.py new file mode 100644 index 0000000..babd186 --- /dev/null +++ b/pii_detector.py @@ -0,0 +1,21 @@ +from transformers import pipeline +from pathlib import Path +import pandas as pd +from presidio_analyzer import AnalyzerEngine + + +path = Path(r"D:\Indium Internal Work\Accelerators\testing datasets\large_customers.csv") +df = pd.read_csv(path, nrows=0) # Read only the header row +field_names = [field.lower() for field in df.columns.tolist()] # Convert to list +print(field_names) + +analyzer = AnalyzerEngine() + +def detect_pii(column_name): + """Uses Presidio to detect general PII in column names.""" + results = analyzer.analyze(text = column_name, entities = analyzer.get_supported_entities(language="en"), language = "en") + print(results) + # return any(entity.entity_type in ["EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "PERSON", 'LOCATION', 'PHONE NUMBER', 'NRP', 'IBAN_CODE'] for entity in results) + +for field in field_names: + detect_pii(field) \ No newline at end of file diff --git a/pii_detector_bert.py b/pii_detector_bert.py new file mode 100644 index 0000000..b45374b --- /dev/null +++ b/pii_detector_bert.py @@ -0,0 +1,60 @@ +from transformers import pipeline +from pathlib import Path +import pandas as pd +from presidio_analyzer import AnalyzerEngine +from sentence_transformers import SentenceTransformer, util + + +path = Path(r"D:\Indium Internal Work\Accelerators\testing datasets\large_customers.csv") +df = pd.read_csv(path, nrows=0) # Read only the header row +field_names = [field.lower() for field in df.columns.tolist()] # Convert to list +print(field_names) + + +class PIIDetector: + def __init__(self, pii_entities=None, similarity_threshold=0.7): + """ + Initializes the PII Detector with a predefined list of PII entities. + + Args: + pii_entities (list, optional): List of known PII entity names. + similarity_threshold (float, optional): Threshold for similarity detection. + """ + self.similarity_threshold = similarity_threshold + + # Default PII entities if none are provided + if pii_entities is None: + self.pii_entities = [ + "email", "phone number", "credit card", "ssn", "social security number", + "name", "address", "dob", "date of birth", "passport number", "bank account" + ] + else: + self.pii_entities = pii_entities + + # Load sentence transformer model + self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") + + def detect_pii(self, column_name): + """ + Uses Sentence Transformers to detect PII in column names. + + Args: + column_name (str): The column name to check. + + Returns: + bool: True if column name is considered PII. + """ + column_embedding = self.model.encode(column_name, convert_to_tensor=True) + pii_embeddings = self.model.encode(self.pii_entities, convert_to_tensor=True) + + # Compute cosine similarity between column and PII entities + similarities = util.cos_sim(column_embedding, pii_embeddings) + + # If any similarity score is above the threshold, flag as PII + return any(similarity > self.similarity_threshold for similarity in similarities[0]) + +# Example Usage +detector = PIIDetector() + +for column in field_names: + print(detector.detect_pii(column)) \ No newline at end of file