Skip to content
Merged

Dev #67

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 134 additions & 130 deletions lib/maillogsentinel/sql_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import json
import logging
from pathlib import Path
from typing import Optional, List, Dict, Any
from typing import List, Dict, Any
import importlib.resources # Added for loading bundled data

import tempfile
Expand Down Expand Up @@ -212,58 +212,54 @@ def format_sql_value(value: Any, sql_type_def: str) -> str:
Returns:
SQL-formatted string representation of the value.
"""
if (
value is None
or str(value).strip().lower() == "null"
or str(value).strip() == ""
):
# Allow 'DEFAULT NULL' columns to receive NULL
if (
"DEFAULT NULL" in sql_type_def.upper()
or "PRIMARY KEY" not in sql_type_def.upper()
): # Quick check
# A column is considered nullable if "NOT NULL" is absent from its definition.
# The presence of "DEFAULT NULL" implies nullability, but the absence of "NOT NULL" is the key check.
is_nullable = "NOT NULL" not in sql_type_def.upper()

if value is None or str(value).strip().lower() in ["null", "na", "n/a", ""]:
if is_nullable:
return "NULL"
# If it's a NOT NULL column without default and value is empty/None, this is an issue
# The calling code should ideally handle this by providing a default or raising error
# For now, if it's a string type, represent as empty string, otherwise problem.
if "CHAR" in sql_type_def.upper() or "TEXT" in sql_type_def.upper():
return "''"
# This will likely cause an SQL error if the column is NOT NULL
# Consider raising an error here if value is None and column is NOT NULL without default
logger.warning(
f"{LOG_PREFIX}: Null/empty value encountered for a potentially NOT NULL column: {sql_type_def} (value: {value})"
)
return "NULL" # Or raise error
else:
# This is a critical issue: a null-like value for a NOT NULL column.
raise SQLExportError(
f"Null or empty value provided for a NOT NULL column. Column Def: '{sql_type_def}', Value: '{value}'"
)

sql_type_lower = sql_type_def.lower()

if "int" in sql_type_lower or "serial" in sql_type_lower:
try:
# Ensure that empty strings or other non-numeric values are not converted to NULL for NOT NULL columns
if str(value).strip() == "":
raise ValueError("Empty string cannot be converted to integer.")
return str(int(value))
except ValueError:
logger.warning(
f"{LOG_PREFIX}: Could not convert '{value}' to int for SQL; using NULL. Column: {sql_type_def}"
)
return "NULL" # Or raise error
except (ValueError, TypeError):
if is_nullable:
logger.warning(
f"{LOG_PREFIX}: Could not convert '{value}' to int for SQL; using NULL. Column: {sql_type_def}"
)
return "NULL"
else:
raise SQLExportError(
f"Failed to convert value '{value}' to integer for a NOT NULL column. Column Def: '{sql_type_def}'"
)
elif "datetime" in sql_type_lower or "timestamp" in sql_type_lower:
# Assuming value is already in 'YYYY-MM-DD HH:MM:SS' format or a datetime object
# For SQLite, it's typically a string.
if isinstance(value, datetime.datetime):
return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'"
return escape_sql_string(str(value)) # Assuming pre-formatted string
return escape_sql_string(str(value))
elif (
"char" in sql_type_lower or "text" in sql_type_lower or "enum" in sql_type_lower
):
return escape_sql_string(str(value))
elif "bool" in sql_type_lower: # SQLite stores booleans as integers 0 or 1
elif "bool" in sql_type_lower:
return "1" if str(value).lower() in ["true", "1", "yes", "on"] else "0"
else: # Default to string escaping for unknown types (e.g. IP, custom types)
else:
return escape_sql_string(str(value))


def generate_insert_statement(
row_dict: Dict[str, Any], table_name: str, column_mapping: Dict[str, Dict[str, str]]
) -> Optional[str]:
) -> str:
"""
Generates an SQL INSERT statement from a CSV row dictionary.

Expand All @@ -273,21 +269,19 @@ def generate_insert_statement(
column_mapping: The column mapping dictionary.

Returns:
A string containing the SQL INSERT statement, or None if a row is skipped.
A string containing the SQL INSERT statement.

Raises:
SQLExportError: If data conversion fails for a required column.
"""
# Determine target SQL columns and their corresponding values from the row_dict
sql_columns = []
sql_values = []

valid_row = True
for sql_col_name, mapping_info in column_mapping.items():
csv_col_name = mapping_info.get("csv_column_name")
sql_col_def = mapping_info.get("sql_column_def", "")

if sql_col_name == "id" and "AUTO_INCREMENT" in sql_col_def.upper():
# Skip ID column if it's auto-incrementing; DB will handle it.
# Alternatively, if CSV provides an ID, it should be used, and AUTO_INCREMENT removed from SQL def.
# For now, assuming DB generates ID.
if "AUTO_INCREMENT" in sql_col_def.upper() or "SERIAL" in sql_col_def.upper():
continue

if not csv_col_name:
Expand All @@ -298,29 +292,20 @@ def generate_insert_statement(

raw_value = row_dict.get(csv_col_name)

# Basic validation: if column is NOT NULL and has no DEFAULT, raw_value must exist
# This is a simplified check; actual NOT NULL check depends on precise SQL definition
if (
raw_value is None
and "NOT NULL" in sql_col_def.upper()
and "DEFAULT" not in sql_col_def.upper()
and "AUTO_INCREMENT" not in sql_col_def.upper()
):
logger.error(
f"{LOG_PREFIX}: Missing value for NOT NULL column '{sql_col_name}' (mapped from CSV '{csv_col_name}'). Row: {row_dict}. Skipping row."
)
valid_row = False
break # Skip this row

formatted_value = format_sql_value(raw_value, sql_col_def)

sql_columns.append(sql_col_name)
sql_values.append(formatted_value)

if not valid_row or not sql_columns:
return None

columns_str = ", ".join(sql_columns)
try:
formatted_value = format_sql_value(raw_value, sql_col_def)
sql_columns.append(sql_col_name)
sql_values.append(formatted_value)
except SQLExportError as e:
# Re-raise with more context about the row being processed.
raise SQLExportError(f"Error in row {row_dict}: {e}")

if not sql_columns:
# This can happen if all columns are auto-incrementing, which is unlikely but possible.
# Or if the mapping is empty.
raise SQLExportError("No columns to insert for the given row.")

columns_str = ", ".join(f'"{c}"' for c in sql_columns)
values_str = ", ".join(sql_values)

return f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});"
Expand Down Expand Up @@ -396,9 +381,17 @@ def run_sql_export(config: AppConfig, output_log_level: str = "INFO") -> bool:
f"{LOG_PREFIX}: No user-specific column mapping file configured, attempting to load bundled default."
)
try:
with importlib.resources.files("lib.maillogsentinel.data").joinpath(
"maillogsentinel_sql_column_mapping.json"
) as bundled_path:
# Correctly handle importlib.resources for Python 3.9+
# The 'with' statement for pathlib.Path is removed in Python 3.13
# We get a Traversable object, which we can convert to a Path
bundled_path_traversable = importlib.resources.files(
"lib.maillogsentinel.data"
).joinpath("maillogsentinel_sql_column_mapping.json")

# For older importlib_resources, we might need to use 'as_file' context manager
# but for modern Python, this direct conversion to Path is often sufficient
# if the resource is a file on the filesystem.
with importlib.resources.as_file(bundled_path_traversable) as bundled_path:
if (
not bundled_path.is_file()
): # Should not happen if packaged correctly
Expand Down Expand Up @@ -557,38 +550,35 @@ def run_sql_export(config: AppConfig, output_log_level: str = "INFO") -> bool:

outfile.write("BEGIN TRANSACTION;\n")

for row in reader:
conversion_errors = 0
for row_num, row in enumerate(reader, start=1):
records_processed += 1
# Check for empty rows (e.g. just delimiters ;;;;)
if not any(row.values()):
logger.debug(
f"{LOG_PREFIX}: Skipping empty or malformed row: {row}"
f"{LOG_PREFIX}: Skipping empty or malformed row at line number (approx) {row_num}."
)
# Still need to update offset for this line
# The DictReader handles line ending consumption.
# To get the byte length of the line for offset:
# This is tricky with DictReader. Simplest is to read line by line first,
# then parse. For now, this part of offset update is imprecise with DictReader.
# A more robust offset: re-open and read line-by-line to count bytes.
# For now, new_offset will be updated at the end based on infile.tell().
continue

try:
# Ensure all expected keys are present in row, map to None if not
# This is important if CSV is sparse or has missing optional fields
# Handled by row_dict.get(csv_col_name) in generate_insert_statement
insert_stmt = generate_insert_statement(
row, table_name, column_mapping
)
if insert_stmt:
outfile.write(insert_stmt + "\n")
records_exported += 1
except Exception as e:
outfile.write(insert_stmt + "\n")
records_exported += 1
except SQLExportError as e:
logger.error(
f"{LOG_PREFIX}: Error processing row: {row}. Error: {e}",
f"{LOG_PREFIX}: Failed to process row (approx line {row_num}). Reason: {e}"
)
conversion_errors += 1
except Exception as e:
logger.critical(
f"{LOG_PREFIX}: A critical unexpected error occurred at row (approx line {row_num}): {row}. Aborting export. Error: {e}",
exc_info=True,
)
# Decide if we skip this row or abort. For now, skip.
# This is a more serious error than a simple conversion issue. We should abort.
outfile.close()
sql_file_path.unlink(missing_ok=True)
return False

# After processing all available lines from the current offset
new_offset = infile.tell() # Get the end position
Expand Down Expand Up @@ -628,35 +618,28 @@ def run_sql_export(config: AppConfig, output_log_level: str = "INFO") -> bool:
if "outfile" in locals() and not outfile.closed:
outfile.close()

if conversion_errors > 0:
logger.error(
f"{LOG_PREFIX}: SQL export completed with {conversion_errors} errors. "
f"The generated SQL file '{sql_file_path}' is incomplete and will be deleted."
)
sql_file_path.unlink(missing_ok=True)
# We still update the offset to avoid reprocessing failed rows,
# but the overall operation is a failure.
update_offset(offset_file_path, new_offset)
return False

if records_exported == 0:
if records_processed > 0:
# Processed lines but exported nothing (e.g., all rows skipped due to errors/filters)
logger.warning(
f"{LOG_PREFIX}: Processed {records_processed} lines, but no records were actually exported. SQL file {sql_file_path} will contain only BEGIN/COMMIT. This might indicate data or mapping issues."
)
# Keep the file for inspection in this specific case.
else:
# No records exported AND no records processed (beyond header if it was the first run)
# This includes:
# 1. Truly empty CSV (already handled by returning after unlink if first_line is empty)
# 2. Header-only CSV on first run (current_offset=0 initially, records_processed=0)
# 3. Resume run with no new data lines (current_offset>0 initially, records_processed=0)
logger.info(
f"{LOG_PREFIX}: No records exported and no new data lines processed. Removing SQL export file: {sql_file_path}"
)
sql_file_path.unlink(missing_ok=True)
else: # records_exported > 0
logger.info(
f"{LOG_PREFIX}: No new valid records to export. Removing empty SQL file."
)
sql_file_path.unlink(missing_ok=True)
else:
logger.info(
f"{LOG_PREFIX}: Successfully created SQL export file: {sql_file_path} with {records_exported} records."
)

# Update offset only if processing was generally successful or partially successful
# If a critical error happened early (e.g. cant load mapping), offset should not change.
# Current logic updates offset if we reach here.
# If CSV was not found, we return False before this.
# If mapping failed, we return False before this.
update_offset(offset_file_path, new_offset)

logger.info(
f"{LOG_PREFIX}: SQL export process complete. Final offset: {new_offset}"
)
Expand Down Expand Up @@ -711,9 +694,10 @@ def _create_dummy_csv(

def _get_bundled_mapping_headers_for_test():
try:
with importlib.resources.files("lib.maillogsentinel.data").joinpath(
"maillogsentinel_sql_column_mapping.json"
) as bundled_path_ref:
bundled_path_traversable = importlib.resources.files(
"lib.maillogsentinel.data"
).joinpath("maillogsentinel_sql_column_mapping.json")
with importlib.resources.as_file(bundled_path_traversable) as bundled_path_ref:
# The object returned by importlib.resources.files() is a Traversable
# We need to ensure it's treated as a Path object for load_column_mapping
mapping = load_column_mapping(Path(bundled_path_ref))
Expand All @@ -731,40 +715,37 @@ def _get_bundled_mapping_headers_for_test():
return []


DUMMY_CSV_HEADERS = _get_bundled_mapping_headers_for_test()
if not DUMMY_CSV_HEADERS:
DUMMY_CSV_HEADERS = [
"server",
"event_time",
"ip",
"username",
"hostname",
"reverse_dns_status",
"country_code",
"asn_number_placeholder",
"asn_org_placeholder",
]
logger.warning(f"Using fallback DUMMY_CSV_HEADERS for testing: {DUMMY_CSV_HEADERS}")
DUMMY_CSV_HEADERS = [
"server",
"date",
"ip",
"user",
"hostname",
"reverse_dns_status",
"country_code",
"asn",
"aso",
]


def _make_dummy_csv_data_row(custom_headers_order, values_dict):
"""Helper to create a CSV data row based on DUMMY_CSV_HEADERS global order."""
row = []
for header in DUMMY_CSV_HEADERS: # Ensure consistent order
for header in custom_headers_order: # Use the provided header order
row.append(values_dict.get(header, f"dummy_{header}"))
return row


DUMMY_CSV_DATA_ROW_1_VALS = {
"server": "mail.example.com",
"event_time": "2023-10-26 10:00:00",
"date": "2023-10-26 10:00:00",
"ip": "192.168.1.100",
"username": "testuser",
"user": "testuser",
"hostname": "client.example.org",
"reverse_dns_status": "OK",
"country_code": "US",
"asn_number_placeholder": "12345",
"asn_org_placeholder": "AS-EXAMPLE Example ISP",
"asn": "12345",
"aso": "AS-EXAMPLE Example ISP",
}
DUMMY_CSV_DATA_ROW_1 = _make_dummy_csv_data_row(
DUMMY_CSV_HEADERS, DUMMY_CSV_DATA_ROW_1_VALS
Expand Down Expand Up @@ -916,6 +897,29 @@ def _reset_offset_file(config_obj: DummyTestConfig):
)
all_tests_passed = False

# --- Test Case 5: Data conversion failure for NOT NULL integer ---
test_runner_logger.info(
"\n--- Test Case 5: Data conversion failure for NOT NULL integer ---"
)
config_case5 = DummyTestConfig(base_dir_name="maillog_test_case5_")
test_configs_to_clean.append(config_case5)
# Create a CSV with an empty string for 'asn', which maps to 'asn_int' (NOT NULL)
invalid_row_vals = DUMMY_CSV_DATA_ROW_1_VALS.copy()
invalid_row_vals["asn"] = "" # This should fail conversion for a NOT NULL int
invalid_row_data = [_make_dummy_csv_data_row(DUMMY_CSV_HEADERS, invalid_row_vals)]
_create_dummy_csv(config_case5, DUMMY_CSV_HEADERS, invalid_row_data)
_reset_offset_file(config_case5)
success_case5 = run_sql_export(config_case5)
if not success_case5:
test_runner_logger.info(
"Test Case 5 Result (Bad Data for NOT NULL Int): SUCCESS (aborted as expected)"
)
else:
test_runner_logger.error(
"Test Case 5 Result (Bad Data for NOT NULL Int): FAIL (should have aborted)"
)
all_tests_passed = False

except Exception as e:
test_runner_logger.error(
f"An unexpected error occurred during testing: {e}", exc_info=True
Expand Down
Loading