Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 47 additions & 22 deletions crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,47 @@
from io import StringIO, BytesIO
from pathlib import Path
from presidio_analyzer import AnalyzerEngine
from sentence_transformers import SentenceTransformer, util

class DataCrawler:
def __init__(self, file_path):
"""Initializes the DataCrawler with the path to a single data file.
def __init__(self, file_path, output_dir, similarity_threshold = 0.7):
"""Initializes the DataCrawler with paths.

Args:
file_path (str): Path to the data file.
output_dir (str): Path to the output directory for JSON.
"""
self.file_path = file_path
self.analyzer = AnalyzerEngine() # Initialize Presidio PHI Detector
self.output_dir = output_dir
self.analyzer = AnalyzerEngine() # Initialize Presidio PHI & PII Detector
self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
self.similarity_threshold = similarity_threshold
self.pii_entities = [
"email", "phone number", "credit card", "ssn", "social security number",
"name", "address", "dob", "date of birth", "passport number", "bank account"
]

def read_file(self):
"""Reads the specified file from the local filesystem."""
with open(self.file_path, 'rb') as f:
return f.read()

def detect_phi(self, column_name):
"""Uses Presidio to detect PHI/PII in column names."""
results = self.analyzer.analyze(text=column_name, entities = self.analyzer.get_supported_entities(language="en"), language="en")
return len(results) > 0 # Returns True if PHI is detected
"""Uses Presidio to detect PHI in column names."""
results = self.analyzer.analyze(text=column_name, entities=self.analyzer.get_supported_entities(language="en"), language="en")
return any(entity.entity_type in ["MEDICAL_CONDITION", "MEDICAL_TREATMENT", "US_SSN"] for entity in results)

def detect_pii(self, column_name):
column_embedding = self.model.encode(column_name, convert_to_tensor=True)
pii_embeddings = self.model.encode(self.pii_entities, convert_to_tensor=True)

similarities = util.cos_sim(column_embedding, pii_embeddings)

# If any similarity score is above the threshold, flag as PII
return any(similarity > self.similarity_threshold for similarity in similarities[0])

def infer_schema(self):
"""
Infers schema from the file content and profiles the data.
"""
"""Infers schema from the file content and profiles the data."""
file_ext = Path(self.file_path).suffix.lower()

with open(self.file_path, "rb") as f:
Expand All @@ -58,22 +74,30 @@ def infer_schema(self):
"columns": []
}

for col in df.columns:
for col in df.columns: # converting the field names to lower case to reduce noise
is_phi = self.detect_phi(col)
is_pii = self.detect_pii(col)
schema["columns"].append({
'name': col,
'dtype': str(df[col].dtype),
'null_count': int(df[col].isnull().sum()),
'total_count': int(len(df[col])),
'distinct_count': int(df[col].nunique()),
'is_phi': is_phi
'is_phi': is_phi,
'is_pii': is_pii
})

return schema

def save_to_json(self, schema):
"""Saves schema and profiling info to a JSON file."""
json_file = os.path.basename(self.file_path).replace('.', '_') + ".json"
"""Saves schema and profiling info to a JSON file inside the specified output directory."""

# Ensure the output directory exists
os.makedirs(self.output_dir, exist_ok=True)

# Construct the JSON file name (same as input file but with .json extension)
json_file_name = os.path.basename(self.file_path).replace('.', '_') + ".json"
json_file_path = os.path.join(self.output_dir, json_file_name)

def convert_types(obj):
"""Handles serialization of NumPy and Pandas types."""
Expand All @@ -95,20 +119,21 @@ def convert_types(obj):
return {str(k): convert_types(v) for k, v in obj.items()}
return obj

with open(json_file, 'w', encoding='utf-8') as f:
# Save the schema to the specified directory
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(schema, f, indent=4, ensure_ascii=False, default=convert_types)

print(f"Schema saved: {json_file}")
print(f"Schema saved: {json_file_path}")

def run(self):
"""
Runs the crawler for the given file.
"""
"""Runs the crawler for the given file."""
schema = self.infer_schema()
if schema:
self.save_to_json(schema)

# Example usage
file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\global_electronics_retailer\Customers.csv"
crawler = DataCrawler(file_path=file_path)
crawler.run()
file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\crm_sales_opportunities\sales_teams.csv"
output_dir = r"D:\Indium Internal Work\Accelerators\inferred_schemas"

crawler = DataCrawler(file_path=file_path, output_dir=output_dir)
crawler.run()
2 changes: 1 addition & 1 deletion improvements.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
1. Improve precision on the numerical fields.
2. Most of the string and date fields are identified as objects.
2. Most of the string and date fields are identified as objects. - DONE
3. Detect encoding and use the corresponding decoding strategy (ascii, utf, iso).
4. PHI detection accuracy must be improved.
21 changes: 21 additions & 0 deletions pii_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from transformers import pipeline
from pathlib import Path
import pandas as pd
from presidio_analyzer import AnalyzerEngine


path = Path(r"D:\Indium Internal Work\Accelerators\testing datasets\large_customers.csv")
df = pd.read_csv(path, nrows=0) # Read only the header row
field_names = [field.lower() for field in df.columns.tolist()] # Convert to list
print(field_names)

analyzer = AnalyzerEngine()

def detect_pii(column_name):
"""Uses Presidio to detect general PII in column names."""
results = analyzer.analyze(text = column_name, entities = analyzer.get_supported_entities(language="en"), language = "en")
print(results)
# return any(entity.entity_type in ["EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "PERSON", 'LOCATION', 'PHONE NUMBER', 'NRP', 'IBAN_CODE'] for entity in results)

for field in field_names:
detect_pii(field)
60 changes: 60 additions & 0 deletions pii_detector_bert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from transformers import pipeline
from pathlib import Path
import pandas as pd
from presidio_analyzer import AnalyzerEngine
from sentence_transformers import SentenceTransformer, util


path = Path(r"D:\Indium Internal Work\Accelerators\testing datasets\large_customers.csv")
df = pd.read_csv(path, nrows=0) # Read only the header row
field_names = [field.lower() for field in df.columns.tolist()] # Convert to list
print(field_names)


class PIIDetector:
def __init__(self, pii_entities=None, similarity_threshold=0.7):
"""
Initializes the PII Detector with a predefined list of PII entities.

Args:
pii_entities (list, optional): List of known PII entity names.
similarity_threshold (float, optional): Threshold for similarity detection.
"""
self.similarity_threshold = similarity_threshold

# Default PII entities if none are provided
if pii_entities is None:
self.pii_entities = [
"email", "phone number", "credit card", "ssn", "social security number",
"name", "address", "dob", "date of birth", "passport number", "bank account"
]
else:
self.pii_entities = pii_entities

# Load sentence transformer model
self.model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")

def detect_pii(self, column_name):
"""
Uses Sentence Transformers to detect PII in column names.

Args:
column_name (str): The column name to check.

Returns:
bool: True if column name is considered PII.
"""
column_embedding = self.model.encode(column_name, convert_to_tensor=True)
pii_embeddings = self.model.encode(self.pii_entities, convert_to_tensor=True)

# Compute cosine similarity between column and PII entities
similarities = util.cos_sim(column_embedding, pii_embeddings)

# If any similarity score is above the threshold, flag as PII
return any(similarity > self.similarity_threshold for similarity in similarities[0])

# Example Usage
detector = PIIDetector()

for column in field_names:
print(detector.detect_pii(column))
Loading