-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogger.py
More file actions
104 lines (85 loc) · 3.3 KB
/
logger.py
File metadata and controls
104 lines (85 loc) · 3.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Enterprise Honeypot Logging System
Structured logging with rotation and multiple outputs.
"""
import logging
import logging.handlers
import os
import json
import time
from config import LOG_DIR, LOG_LEVEL, LOG_MAX_SIZE, LOG_BACKUP_COUNT
class JSONFormatter(logging.Formatter):
"""Structured JSON log formatter."""
def format(self, record):
log_entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(record.created)),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "service"):
log_entry["service"] = record.service
if hasattr(record, "src_ip"):
log_entry["src_ip"] = record.src_ip
if hasattr(record, "session_id"):
log_entry["session_id"] = record.session_id
if hasattr(record, "extra_data"):
log_entry["data"] = record.extra_data
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
class ConsoleFormatter(logging.Formatter):
"""Colored console formatter."""
COLORS = {
"DEBUG": "\033[36m", # Cyan
"INFO": "\033[32m", # Green
"WARNING": "\033[33m", # Yellow
"ERROR": "\033[31m", # Red
"CRITICAL": "\033[35m", # Magenta
}
RESET = "\033[0m"
def format(self, record):
color = self.COLORS.get(record.levelname, self.RESET)
ts = time.strftime("%H:%M:%S", time.localtime(record.created))
service = getattr(record, "service", "-")
src_ip = getattr(record, "src_ip", "")
ip_str = f" [{src_ip}]" if src_ip else ""
return (f"{color}{ts} {record.levelname:<8}{self.RESET} "
f"[{service:<14}]{ip_str} {record.getMessage()}")
def setup_logging():
"""Initialize the logging system."""
os.makedirs(LOG_DIR, exist_ok=True)
root = logging.getLogger("honeypot")
root.setLevel(getattr(logging, LOG_LEVEL.upper(), logging.INFO))
# JSON file handler (main log)
json_handler = logging.handlers.RotatingFileHandler(
os.path.join(LOG_DIR, "honeypot.json"),
maxBytes=LOG_MAX_SIZE,
backupCount=LOG_BACKUP_COUNT,
)
json_handler.setFormatter(JSONFormatter())
root.addHandler(json_handler)
# Human-readable file handler
text_handler = logging.handlers.RotatingFileHandler(
os.path.join(LOG_DIR, "honeypot.log"),
maxBytes=LOG_MAX_SIZE,
backupCount=LOG_BACKUP_COUNT,
)
text_handler.setFormatter(logging.Formatter(
"%(asctime)s %(levelname)-8s [%(name)s] %(message)s"
))
root.addHandler(text_handler)
# Console handler
console = logging.StreamHandler()
console.setFormatter(ConsoleFormatter())
root.addHandler(console)
return root
def get_service_logger(service_name):
"""Get a logger for a specific service with extra context."""
logger = logging.getLogger(f"honeypot.{service_name}")
class ServiceAdapter(logging.LoggerAdapter):
def process(self, msg, kwargs):
kwargs.setdefault("extra", {})
kwargs["extra"]["service"] = self.extra.get("service", service_name)
return msg, kwargs
return ServiceAdapter(logger, {"service": service_name})