-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathverify_monitor.py
More file actions
executable file
·311 lines (254 loc) · 9.7 KB
/
verify_monitor.py
File metadata and controls
executable file
·311 lines (254 loc) · 9.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!/usr/bin/env python3
"""
Verification test for sigsegv-monitor.
Launches sigsegv_monitor and sample_segfault, then uses unittest assertions
to verify that the monitor correctly recorded:
- A SIGSEGV event for the sample program
- Page faults whose cr2 addresses fall within the pages allocated by the sample
- The SIGSEGV fault address (cr2 == 0 for null pointer dereference)
Usage:
sudo python3 verify_monitor.py # normal run
sudo python3 verify_monitor.py -v # verbose
"""
import json
import os
import signal
import subprocess
import time
import unittest
from typing import Any, Dict, List, NamedTuple, Optional, Set
# ---------------------------------------------------------------------------
# Configuration – tailored to sample_segfault
# ---------------------------------------------------------------------------
MONITOR_BIN: str = "./sigsegv_monitor"
SAMPLE_BIN: str = "./sample_segfault"
# The kernel's comm field is 16 bytes including NUL, so at most 15 chars.
_COMM_MAX: int = 15
PROCESS_NAME: str = os.path.basename(SAMPLE_BIN)[:_COMM_MAX]
# sample_segfault allocates and touches 4 pages and then loads from the null page.
MIN_PAGE_FAULTS: int = 4+1
# Page size is determined at runtime.
PAGE_SIZE: int = os.sysconf("SC_PAGESIZE")
# ---------------------------------------------------------------------------
# Data classes – sample_segfault output
# ---------------------------------------------------------------------------
class SampleOutput(NamedTuple):
"""JSON output emitted by sample_segfault on stdout."""
page_size: int
pages: List[int]
segfault_addr: int
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "SampleOutput":
return SampleOutput(
page_size=raw["page_size"],
pages=[int(p, 16) for p in raw["pages"]],
segfault_addr=int(raw["segfault_addr"], 16),
)
# ---------------------------------------------------------------------------
# Data classes – sigsegv_monitor output
# ---------------------------------------------------------------------------
class ProcessInfo(NamedTuple):
rootns_pid: int
ns_pid: int
comm: str
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "ProcessInfo":
return ProcessInfo(
rootns_pid=raw["rootns_pid"],
ns_pid=raw["ns_pid"],
comm=raw["comm"],
)
class ThreadInfo(NamedTuple):
rootns_tid: int
ns_tid: int
comm: str
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "ThreadInfo":
return ThreadInfo(
rootns_tid=raw["rootns_tid"],
ns_tid=raw["ns_tid"],
comm=raw["comm"],
)
class Registers(NamedTuple):
rip: int
rsp: int
rax: int
rbx: int
rcx: int
rdx: int
rsi: int
rdi: int
rbp: int
r8: int
r9: int
r10: int
r11: int
r12: int
r13: int
r14: int
r15: int
flags: int
trapno: int
err: int
cr2: int
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "Registers":
return Registers(**{name: int(raw[name], 16) for name in cls._fields})
class PageFault(NamedTuple):
cr2: int
err: int
tai: int
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "PageFault":
return PageFault(
cr2=int(raw["cr2"], 16),
err=int(raw["err"], 16),
tai=raw["tai"],
)
class LbrEntry(NamedTuple):
from_addr: int
to_addr: int
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "LbrEntry":
return LbrEntry(
from_addr=int(raw["from"], 16),
to_addr=int(raw["to"], 16),
)
class MonitorEvent(NamedTuple):
"""A single JSON record emitted by sigsegv_monitor."""
cpu: int
tai: int
process: ProcessInfo
thread: ThreadInfo
si_code: int
registers: Registers
page_faults: List[PageFault]
lbr: List[Optional[LbrEntry]]
@classmethod
def from_json(cls, raw: Dict[str, Any]) -> "MonitorEvent":
return MonitorEvent(
cpu=raw["cpu"],
tai=raw["tai"],
process=ProcessInfo.from_json(raw["process"]),
thread=ThreadInfo.from_json(raw["thread"]),
si_code=raw["si_code"],
registers=Registers.from_json(raw["registers"]),
page_faults=[PageFault.from_json(pf) for pf in raw["page_faults"]],
lbr=[
LbrEntry.from_json(e) if e is not None else None
for e in raw["lbr"]
],
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def parse_monitor_events(text: str) -> List[MonitorEvent]:
"""Parse newline-delimited JSON into MonitorEvent objects."""
return [
MonitorEvent.from_json(json.loads(line))
for line in text.splitlines()
if line.strip()
]
def parse_sample_output(text: str) -> SampleOutput:
"""Parse the single JSON line from sample_segfault."""
for line in text.splitlines():
line = line.strip()
if not line:
continue
return SampleOutput.from_json(json.loads(line))
raise ValueError("No JSON found in sample output")
def addr_in_page(addr: int, page_base: int) -> bool:
"""Return True if *addr* falls within [page_base, page_base + PAGE_SIZE)."""
return page_base <= addr < page_base + PAGE_SIZE
# ---------------------------------------------------------------------------
# Test fixture
# ---------------------------------------------------------------------------
class TestSigsegvMonitor(unittest.TestCase):
"""End-to-end test: launch monitor + sample, verify output."""
sample: Optional[SampleOutput] = None
event: Optional[MonitorEvent] = None
@classmethod
def setUpClass(cls) -> None:
"""Start monitor, run sample, stop monitor, collect outputs."""
# 1. Start monitor
monitor_proc = subprocess.Popen(
[MONITOR_BIN],
stdout=subprocess.PIPE,
)
# Give the BPF program a moment to attach.
time.sleep(1)
# Verify the monitor is still running. If it exited already the
# BPF program was never loaded (common causes: missing root
# privileges, BPF verification failure, missing binary).
if monitor_proc.poll() is not None:
stdout_bytes, _ = monitor_proc.communicate()
rc = monitor_proc.returncode
raise RuntimeError(
f"{MONITOR_BIN} exited prematurely (rc={rc}) before the "
f"sample was started. The BPF program was never loaded.\n"
f"Common causes: not running as root, BPF verifier "
f"rejection, or the binary is missing.")
# 2. Run sample
sample_proc = subprocess.Popen(
[SAMPLE_BIN],
stdout=subprocess.PIPE,
)
stdout, _ = sample_proc.communicate(timeout=10)
cls.sample = parse_sample_output(stdout.decode())
# Small grace period so the monitor can pick up the event from the
# kernel queue. Note that it handles the SIGINT below, so
# processing / writing out the JSON should not be an issue.
time.sleep(0.5)
# 3. Stop monitor & collect output
monitor_proc.send_signal(signal.SIGINT)
try:
stdout, _ = monitor_proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
monitor_proc.kill()
stdout, _ = monitor_proc.communicate()
events = parse_monitor_events(stdout.decode())
# 4. Filter events by the known child PID; expect exactly one.
pid = sample_proc.pid
matching = [e for e in events if e.process.rootns_pid == pid]
if len(matching) != 1:
all_pids = [e.process.rootns_pid for e in events]
raise AssertionError(
f"Expected exactly 1 event for PID {pid}, "
f"got {len(matching)} (all pids: {all_pids})")
cls.event = matching[0]
# -- tests --
def test_sigsegv_fault_address(self) -> None:
"""The SIGSEGV cr2 must equal the expected fault address (0 for NULL)."""
cr2 = self.event.registers.cr2
expected = self.sample.segfault_addr
self.assertEqual(cr2, expected)
def test_sigsegv_process_info(self) -> None:
"""Process and thread info must be populated."""
self.assertGreater(self.event.process.rootns_pid, 0)
self.assertGreater(self.event.thread.rootns_tid, 0)
self.assertEqual(PROCESS_NAME, self.event.process.comm)
def test_minimum_page_faults(self) -> None:
"""The monitor must record at least MIN_PAGE_FAULTS page faults."""
self.assertGreaterEqual(
len(self.event.page_faults), MIN_PAGE_FAULTS,
f"Expected >= {MIN_PAGE_FAULTS} page faults, "
f"got {len(self.event.page_faults)}",
)
def test_page_fault_addresses_match_expected_pages(self) -> None:
"""Every page allocated by the sample must appear in the PF list."""
expected_pages = self.sample.pages
matched: Set[int] = set()
for pf in self.event.page_faults:
for pg_idx, pg_base in enumerate(expected_pages):
if addr_in_page(pf.cr2, pg_base):
matched.add(pg_idx)
break
for pg_idx, pg_base in enumerate(expected_pages):
self.assertIn(
pg_idx, matched,
f"Expected page {pg_idx} at 0x{pg_base:x} "
f"(range [0x{pg_base:x}, 0x{pg_base + PAGE_SIZE:x})) "
f"was not matched by any recorded page fault",
)
if __name__ == "__main__":
unittest.main()