Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Mozzo interacts with Nagios Core (4.x) via `cmd.cgi` and `statusjson.cgi` using
- [Listing Service Details on All Hosts](#listing-service-details-on-all-hosts)
- [Listing Service Details with Output](#listing-service-details-with-output)
- [Listing Service Details with Filter](#listing-service-details-with-filter)
- [Viewing Nagios Logs](#viewing-nagios-logs)
- [Service Reporting and Uptime](#service-reporting-and-uptime)
- [Uptime Reporting](#uptime-reporting)
- [Report Uptime by Service](#report-uptime-by-service)
Expand Down Expand Up @@ -293,6 +294,35 @@ Further, you can combine them all to show full plugin output for **all** DNS fai
mozzo --status --service "DNS" --output-filter CRITICAL --show-output
```

### Viewing Nagios Logs

View Nagios alert logs from the last 24 hours:

```bash
mozzo --log
```

View logs for a custom time range:

```bash
mozzo --log --days 7
```

View logs for the last 12 hours:

```bash
mozzo --log --days 0.5
```

View raw log including state dumps (for debugging):

```bash
mozzo --log --full
```

> [!NOTE]
> On busy servers, `--log` may take 1-2 minutes as it downloads the full log file. Use shell pipes to limit output: `mozzo --log | head -n 100`

## Service Reporting and Uptime

- We also support reporting for uptime per host and per service based on Nagios `archivejson.cgi`
Expand Down
107 changes: 98 additions & 9 deletions src/mozzo/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,32 @@ def send(self, request, **kwargs):


class MozzoNagiosClient:
# Status emoji mappings (single source of truth)
STATUS_EMOJIS = {
'PENDING': '⏳',
'OK': '✅',
'WARNING': '⚠️ ',
'CRITICAL': '❌',
'UNKNOWN': '❓',
'UP': '✅',
'DOWN': '❌',
'UNREACHABLE': '❓',
}

# Status and filter maps used throughout the class
SERVICE_STATUS_MAP = {
1: "⏳ PENDING",
2: "✅ OK",
4: "⚠️ WARNING",
8: "❓ UNKNOWN",
16: "❌ CRITICAL",
1: f"{STATUS_EMOJIS['PENDING']} PENDING",
2: f"{STATUS_EMOJIS['OK']} OK",
4: f"{STATUS_EMOJIS['WARNING']} WARNING",
8: f"{STATUS_EMOJIS['UNKNOWN']} UNKNOWN",
16: f"{STATUS_EMOJIS['CRITICAL']} CRITICAL",
}

HOST_STATUS_MAP = {
0: "⏳ PENDING",
2: "✅ UP",
4: "❌ DOWN",
8: "❓ UNREACHABLE"
0: f"{STATUS_EMOJIS['PENDING']} PENDING",
2: f"{STATUS_EMOJIS['UP']} UP",
4: f"{STATUS_EMOJIS['DOWN']} DOWN",
8: f"{STATUS_EMOJIS['UNREACHABLE']} UNREACHABLE"
}

FILTER_MAP = {
Expand Down Expand Up @@ -99,6 +111,7 @@ def __init__(self, config_path=None, message=None, days=None):
self.cmd_url = f"{self.server}/{self.cgi_path}/cmd.cgi"
self.json_url = f"{self.server}/{self.cgi_path}/statusjson.cgi"
self.archive_url = f"{self.server}/{self.cgi_path}/archivejson.cgi"
self.showlog_url = f"{self.server}/{self.cgi_path}/showlog.cgi"

# Set the custom message or fallback to default
self.message = message if message else "Action issued by Mozzo CLI"
Expand Down Expand Up @@ -981,6 +994,69 @@ def show_ack_history(self, host, service=None, days=7):
except Exception as e:
print(f"❌ Error fetching history from status API: {e}")

def show_logs(self, days=1.0, full=False):
"""Display Nagios log entries for the specified time range.

Args:
days: Number of days to look back (default: 1.0 for 24 hours)
full: If True, show all entries including CURRENT STATE (default: False)
"""
import re

start_ts = int((datetime.datetime.now() - datetime.timedelta(days=days)).timestamp())

params = {
'ts_start': start_ts,
'ts_end': int(datetime.datetime.now().timestamp())
}

try:
response = self.session.get(
self.showlog_url,
params=params,
auth=self.auth,
verify=self.verify_ssl
)
response.raise_for_status()

log_pattern = r'\[(\d{2}-\d{2}-\d{4}\s+\d{2}:\d{2}:\d{2})\]\s*([^\[<\n]+)'
matches = re.findall(log_pattern, response.text)

if not matches:
print(f"No log entries found for the last {days} day(s).")
return

print(f"\n--- Nagios Log Entries (Last {days} day(s)) ---\n")

filtered_count = 0
displayed_count = 0

for timestamp, message in matches:
message = message.strip()
if not message:
continue

if not full:
if 'CURRENT HOST STATE' in message or 'CURRENT SERVICE STATE' in message:
filtered_count += 1
continue

status_icon = ''
if 'SERVICE ALERT' in message or 'HOST ALERT' in message:
for status_key, icon in self.STATUS_EMOJIS.items():
if status_key in message.upper():
status_icon = f"{icon} "
break

print(f"{status_icon}[{timestamp}] {message}")
displayed_count += 1

if displayed_count == 0:
print("No alert entries found for the specified time range.")

except requests.exceptions.RequestException as e:
print(f"❌ Error fetching logs: {e}")


def main():
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -1060,6 +1136,16 @@ def main():
action="store_true",
help="Show history of acknowledgements",
)
parser.add_argument(
"--log",
action="store_true",
help="Show Nagios log entries",
)
parser.add_argument(
"--full",
action="store_true",
help="Show raw log including state dumps (for debugging)",
)

args = parser.parse_args()
client = MozzoNagiosClient(
Expand Down Expand Up @@ -1124,6 +1210,9 @@ def main():
elif args.ack_history and args.host:
history_days = args.days if args.days is not None else client.report_days
client.show_ack_history(args.host, args.service, history_days)
elif args.log:
log_days = args.days if args.days is not None else 1.0
client.show_logs(log_days, full=args.full)
else:
parser.print_help()

Expand Down
163 changes: 163 additions & 0 deletions tests/test_phase6_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from unittest.mock import Mock, patch
import requests


def test_show_logs_default_days(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Warning[04-29-2026 17:32:20] SERVICE ALERT: host.example.com;HTTP;WARNING;HARD;3;Connection timeout</div>
<div>Service Ok[04-29-2026 17:30:00] SERVICE ALERT: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
<div>Informational Message[04-29-2026 17:25:00] Auto-save of retention data completed successfully.</div>
<div>State Ok[04-29-2026 17:20:00] CURRENT HOST STATE: host.example.com;UP;HARD;1;PING OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs()

captured = capsys.readouterr()
assert "Nagios Log Entries" in captured.out
assert "Last 1.0 day(s)" in captured.out


def test_show_logs_custom_days(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Ok[04-29-2026 17:30:00] SERVICE ALERT: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs(days=7.0)

captured = capsys.readouterr()
assert "Last 7.0 day(s)" in captured.out


def test_show_logs_parses_service_alerts(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Warning[04-29-2026 17:32:20] SERVICE ALERT: host.example.com;HTTP;WARNING;HARD;3;Connection timeout</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs()

captured = capsys.readouterr()
assert "SERVICE ALERT" in captured.out


def test_show_logs_filters_current_state(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Ok[04-29-2026 17:30:00] SERVICE ALERT: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
<div>State Ok[04-29-2026 17:20:00] CURRENT HOST STATE: host.example.com;UP;HARD;1;PING OK</div>
<div>State Ok[04-29-2026 17:15:00] CURRENT SERVICE STATE: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs()

captured = capsys.readouterr()
assert "CURRENT HOST STATE" not in captured.out
assert "CURRENT SERVICE STATE" not in captured.out


def test_show_logs_shows_icons(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Warning[04-29-2026 17:32:20] SERVICE ALERT: host.example.com;HTTP;WARNING;HARD;3;Timeout</div>
<div>Service Ok[04-29-2026 17:30:00] SERVICE ALERT: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
<div>Service Critical[04-29-2026 17:28:00] SERVICE ALERT: host.example.com;HTTP;CRITICAL;HARD;1;Connection refused</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs()

captured = capsys.readouterr()
assert "⚠️" in captured.out or "WARNING" in captured.out
assert "✅" in captured.out or "OK" in captured.out
assert "❌" in captured.out or "CRITICAL" in captured.out


def test_show_logs_no_entries(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "<html><body></body></html>"

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs()

captured = capsys.readouterr()
assert "No log entries found" in captured.out or "No alert entries found" in captured.out


def test_show_logs_http_error(client, capsys):
with patch.object(client.session, 'get', side_effect=requests.exceptions.RequestException("HTTP Error")):
client.show_logs()

captured = capsys.readouterr()
assert "Error fetching logs" in captured.out


def test_show_logs_timestamp_params(client):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Ok[04-29-2026 17:30:00] SERVICE ALERT: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response) as mock_get:
client.show_logs(days=2.0)

assert mock_get.called
call_kwargs = mock_get.call_args[1]
params = call_kwargs['params']

assert 'ts_start' in params
assert 'ts_end' in params

ts_start = params['ts_start']
ts_end = params['ts_end']

time_diff = ts_end - ts_start
expected_diff = 2.0 * 24 * 60 * 60

assert abs(time_diff - expected_diff) < 5


def test_show_logs_full_flag(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Warning[04-29-2026 17:32:20] SERVICE ALERT: host.example.com;HTTP;WARNING;HARD;3;Timeout</div>
<div>State Ok[04-29-2026 17:20:00] CURRENT HOST STATE: host.example.com;UP;HARD;1;PING OK</div>
<div>State Ok[04-29-2026 17:15:00] CURRENT SERVICE STATE: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs(full=True)

captured = capsys.readouterr()
assert "CURRENT HOST STATE" in captured.out
assert "CURRENT SERVICE STATE" in captured.out


def test_show_logs_without_full_flag(client, capsys):
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = """
<div>Service Warning[04-29-2026 17:32:20] SERVICE ALERT: host.example.com;HTTP;WARNING;HARD;3;Timeout</div>
<div>State Ok[04-29-2026 17:20:00] CURRENT HOST STATE: host.example.com;UP;HARD;1;PING OK</div>
<div>State Ok[04-29-2026 17:15:00] CURRENT SERVICE STATE: host.example.com;HTTP;OK;HARD;1;HTTP OK</div>
"""

with patch.object(client.session, 'get', return_value=mock_response):
client.show_logs(full=False)

captured = capsys.readouterr()
assert "CURRENT HOST STATE" not in captured.out
assert "CURRENT SERVICE STATE" not in captured.out
Loading