Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
### Crawler
- Read data ->
- Classify the data ->
- Detect PHI ->
- Infer schema ->
- Compute extended statistics ->
- Detect outliers ->
- Save to a json

### Roadmap
- Create DDL statements based on the json output
- Create a schema with the DDL statements
- Create DML statements
### **Crawler Workflow**

1. **Read Data**

- Extract raw data from the source.

2. **Classify the Data**

- Identify and categorize data types.

3. **Detect PHI (Protected Health Information)**

- Apply rules or ML models to flag sensitive data.

4. **Infer Schema**

- Automatically determine the structure and format of the data.

5. **Compute Extended Statistics**

- Generate descriptive metrics (e.g., mean, standard deviation, distribution).

6. **Detect Outliers**

- Identify anomalies and unusual patterns in the data.

7. **Save to a JSON**
- Store processed results in a structured JSON format.
181 changes: 48 additions & 133 deletions crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,153 +11,72 @@
from presidio_analyzer import AnalyzerEngine

class DataCrawler:
def __init__(self, data_directory):
"""Initializes the DataCrawler with the directory containing data files.
def __init__(self, file_path):
"""Initializes the DataCrawler with the path to a single data file.

Args:
data_directory (str): Path to the directory containing data files.
file_path (str): Path to the data file.
"""
self.data_directory = data_directory
self.analyzer = AnalyzerEngine() # Initialize Presidio ML PHI Detector
self.file_path = file_path
self.analyzer = AnalyzerEngine() # Initialize Presidio PHI Detector

def read_file(self, file_path):
"""Reads a file from the local filesystem."""
with open(file_path, 'rb') as f:
def read_file(self):
"""Reads the specified file from the local filesystem."""
with open(self.file_path, 'rb') as f:
return f.read()

def classify_file(self, file_path):
"""Classifies the file format based on its extension or content."""
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type:
if 'csv' in mime_type:
return 'csv'
elif 'json' in mime_type:
return 'json'
elif 'parquet' in mime_type:
return 'parquet'
elif 'avro' in mime_type:
return 'avro'
if file_path.endswith('.csv'):
return 'csv'
elif file_path.endswith('.json'):
return 'json'
elif file_path.endswith('.parquet'):
return 'parquet'
elif file_path.endswith('.avro'):
return 'avro'
return 'unknown'

def detect_phi(self, column_name, sample_value):
"""Detects if a column contains PHI data using Presidio.

Args:
column_name (str): Column name to check.
sample_value (str): Sample data from the column.

Returns:
bool: True if PHI is detected, False otherwise.
"""
text_to_check = f"{column_name}: {sample_value}" # Combine name + sample data
results = self.analyzer.analyze(text=text_to_check, entities=None, language='en')
def detect_phi(self, column_name):
"""Uses Presidio to detect PHI/PII in column names."""
results = self.analyzer.analyze(text=column_name, entities = self.analyzer.get_supported_entities(language="en"), language="en")
return len(results) > 0 # Returns True if PHI is detected

return len(results) > 0 # If any PHI entity is detected, return True

def infer_schema(self, file_path):
"""Infers schema from the file content and profiles the data."""
file_type = self.classify_file(file_path)
data = self.read_file(file_path)
def infer_schema(self):
"""
Infers schema from the file content and profiles the data.
"""
file_ext = Path(self.file_path).suffix.lower()

if file_type == 'csv':
with open(self.file_path, "rb") as f:
data = f.read()

if file_ext == '.csv':
df = pd.read_csv(StringIO(data.decode('utf-8')))
elif file_type == 'json':
elif file_ext == '.json':
df = pd.DataFrame(json.loads(data.decode('utf-8')))
elif file_type == 'parquet':
elif file_ext == '.parquet':
df = pq.read_table(BytesIO(data)).to_pandas()
elif file_type == 'avro':
elif file_ext == '.avro':
with BytesIO(data) as bio:
reader = fastavro.reader(bio)
df = pd.DataFrame([record for record in reader])
else:
print("Unsupported file type.")
return None

# Convert dates properly
for col in df.columns:
if df[col].dtype == 'object':
try:
df[col] = pd.to_datetime(df[col])
except (ValueError, TypeError):
pass # Keep as object if conversion fails

schema = {
"file_type": file_type, # Include file type
"file_type": file_ext,
"columns": []
}

for col in df.columns:
sample_value = df[col].dropna().astype(str).iloc[0] if not df[col].dropna().empty else "" # Get sample value
is_phi = self.detect_phi(col, sample_value) # Run ML-based PHI detection

is_phi = self.detect_phi(col)
schema["columns"].append({
'name': col,
'dtype': str(df[col].dtype),
'null_count': int(df[col].isnull().sum()),
'total_count': int(len(df[col])),
'distinct_count': int(df[col].nunique()),
'is_phi': is_phi, # Add PHI flag
'summary_statistics': self.compute_extended_statistics(df[col])
'is_phi': is_phi
})

return schema

def compute_extended_statistics(self, series):
"""Computes extended statistics for numeric and categorical data."""
if pd.api.types.is_numeric_dtype(series):
return {
'min': series.min(),
'q1': series.quantile(0.25),
'median': series.median(),
'q3': series.quantile(0.75),
'max': series.max(),
'mean': series.mean(),
'std_dev': series.std(),
'variance': series.var(),
'skewness': series.skew(),
'kurtosis': series.kurt(),
'missing_percentage': series.isnull().mean() * 100,
'outliers': self.detect_outliers(series)
}
else:
return {
'mode': series.mode().tolist(),
'unique_count': series.nunique(),
'missing_percentage': series.isnull().mean() * 100,
'frequent_values': series.value_counts().head(5).to_dict()
}

def detect_outliers(self, series):
"""Detects potential outliers using the IQR method."""
q1 = series.quantile(0.25)
q3 = series.quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
return series[(series < lower_bound) | (series > upper_bound)].tolist()

# def save_to_yaml(self, file_name, schema):
# """Saves schema and profiling info to a YAML file."""
# with open(file_name, 'w') as f:
# yaml.dump(schema, f, default_flow_style=False, sort_keys=False)

def save_to_json(self, file_name, schema):
"""Saves schema and profiling info to a JSON file.
return schema

Args:
file_name (str): Name of the output JSON file.
schema (dict): Schema information to be saved.
"""
json_file = file_name.replace('.', '_') + ".json" # Ensure consistent naming
def save_to_json(self, schema):
"""Saves schema and profiling info to a JSON file."""
json_file = os.path.basename(self.file_path).replace('.', '_') + ".json"

# Custom function to handle unsupported types
def convert_types(obj):
"""Handles serialization of NumPy and Pandas types."""
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
Expand All @@ -166,34 +85,30 @@ def convert_types(obj):
return bool(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, pd.Timestamp): # Convert Pandas timestamps to string
elif isinstance(obj, pd.Timestamp):
return obj.isoformat()
elif isinstance(obj, pd.Series): # Convert Pandas Series to list
elif isinstance(obj, pd.Series):
return obj.tolist()
elif isinstance(obj, pd.DataFrame): # Convert DataFrame to dictionary
elif isinstance(obj, pd.DataFrame):
return obj.to_dict(orient="records")
elif isinstance(obj, dict): # Ensure dictionary keys are strings
elif isinstance(obj, dict):
return {str(k): convert_types(v) for k, v in obj.items()}
return obj # Return as is for other types
return obj

with open(json_file, 'w', encoding='utf-8') as f:
json.dump(schema, f, indent=4, ensure_ascii=False, default=convert_types)

print(f"Schema saved: {json_file}")

def run(self):
"""Runs the crawler for a given local directory, inferring schemas and saving them to YAML files."""
files = [os.path.join(self.data_directory, f) for f in os.listdir(self.data_directory)]

for file in files:
schema = self.infer_schema(file)
if schema:
output_file = os.path.basename(file).replace('.', '_') + '.yml'
# self.save_to_yaml(output_file, schema)
self.save_to_json(output_file, schema)
print(f"Schema saved: {output_file}")
"""
Runs the crawler for the given file.
"""
schema = self.infer_schema()
if schema:
self.save_to_json(schema)

# Example usage
path = Path(r"D:\Indium Internal Work\Accelerators\crawler")
crawler = DataCrawler(data_directory = path)
crawler.run()
file_path = r"D:\Indium Internal Work\Accelerators\testing datasets\global_electronics_retailer\Customers.csv"
crawler = DataCrawler(file_path=file_path)
crawler.run()
91 changes: 91 additions & 0 deletions ddl_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
from pathlib import Path

# Define a mapping from Pandas/PySpark types to ANSI SQL types
DATA_TYPE_MAPPING = {
"int64": "BIGINT",
"int32": "INTEGER",
"float64": "DOUBLE PRECISION",
"float32": "FLOAT",
"object": "TEXT",
"bool": "BOOLEAN",
"datetime64[ns]": "TIMESTAMP",
}

class DDLGenerator:
def __init__(self, schema_dir, database_name):
"""Initializes the DDL Generator with schema directory and database name.

Args:
schema_dir (str): Directory containing JSON schema files.
database_name (str): Target database name.
"""
self.schema_dir = Path(schema_dir)
self.database_name = database_name

def load_schema(self, schema_file):
"""Loads a JSON schema file.

Args:
schema_file (Path): Path to the schema JSON file.

Returns:
dict: Parsed schema dictionary.
"""
with schema_file.open("r", encoding="utf-8") as f:
return json.load(f)

def map_dtype(self, dtype):
"""Maps Pandas/PySpark data types to ANSI SQL types."""
return DATA_TYPE_MAPPING.get(dtype, "TEXT") # Default to TEXT for unknown types

def generate_ddl(self, schema):
"""Generates a CREATE TABLE IF NOT EXISTS statement based on the schema.

Args:
schema (dict): Parsed schema dictionary.

Returns:
str: ANSI SQL-compliant CREATE TABLE IF NOT EXISTS statement.
"""
table_name = schema["file_type"] + "_table" # Default table name
columns_definitions = []

for column in schema["columns"]:
col_name = column["name"]
col_type = self.map_dtype(column["dtype"])
is_nullable = "NULL" if column["null_count"] > 0 else "NOT NULL"

columns_definitions.append(f" {col_name} {col_type} {is_nullable}")

columns_sql = ",\n".join(columns_definitions)

ddl = f"""
CREATE TABLE IF NOT EXISTS {self.database_name}.{table_name} (
{columns_sql}
);
""".strip()
return ddl

def save_ddl(self, ddl, output_file):
"""Saves the generated DDL to a file."""
with output_file.open("w", encoding="utf-8") as f:
f.write(ddl + "\n")
print(f"DDL saved: {output_file}")

def run(self):
"""Processes all schema JSON files and generates DDL statements."""
schema_files = list(self.schema_dir.glob("*.json"))

for schema_file in schema_files:
schema = self.load_schema(schema_file)
ddl_statement = self.generate_ddl(schema)

output_ddl_file = schema_file.with_suffix(".sql")
self.save_ddl(ddl_statement, output_ddl_file)

# Example usage
schema_directory = Path(r"D:\Indium Internal Work\Accelerators\inferred_schemas")
database_name = "my_database"
ddl_generator = DDLGenerator(schema_directory, database_name)
ddl_generator.run()
Loading