-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsecure_wipe_auditor.py
More file actions
287 lines (227 loc) · 8.78 KB
/
secure_wipe_auditor.py
File metadata and controls
287 lines (227 loc) · 8.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
#!/usr/bin/env python3
"""
Secure Wipe Auditor - Zero Trust Pre-Wipe Check with Immutable Audit Trail
Implements Merkle Tree-based audit logging with Zero Trust Attestation (ZTA)
"""
import hashlib
import json
import time
import os
import sys
import psutil
from typing import Tuple, List, Dict
# Configuration
AUDIT_LOG_FILE = 'audit_chain_log.txt'
MOCK_FORENSIC_TOOLS = ['wireshark', 'gdb', 'volatility']
# Set UTF-8 encoding for Windows console
if sys.platform.startswith('win'):
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout.detach())
sys.stderr = codecs.getwriter('utf-8')(sys.stderr.detach())
def calculate_hash(data: bytes) -> str:
"""Calculate SHA-256 hash of data"""
return hashlib.sha256(data).hexdigest()
def mock_wipe_disk(device_id: str, status: str = 'SUCCESS') -> Tuple[bool, str]:
"""
Mock wipe function that simulates disk wiping operation
Args:
device_id: Device identifier string
status: SUCCESS or FAILURE to simulate different outcomes
Returns:
Tuple of (success_bool, verification_hash)
"""
if status == 'SUCCESS':
return (True, '0xDEADBEEF_MOCK_HASH')
else:
return (False, '0xBADF00D_MOCK_FAILURE')
def perform_zta_checks(device_id: str) -> Dict:
"""
Perform Zero Trust Attestation checks
Args:
device_id: Device identifier to check
Returns:
Dictionary containing ZTA check results
"""
print("🔒 Performing Zero Trust Attestation (ZTA) Checks...")
# Operator ID Check
operator_id = input("Enter Operator ID: ").strip()
# Process Check - Look for forensic tools
forensic_tool_check = False
running_processes = []
for proc in psutil.process_iter(['name']):
try:
proc_name = proc.info['name'].lower()
running_processes.append(proc_name)
for tool in MOCK_FORENSIC_TOOLS:
if tool.lower() in proc_name:
forensic_tool_check = True
print(f"⚠️ FORENSIC TOOL DETECTED: {proc_name}")
break
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
# Mock Policy Check - Simple rule for demonstration
policy_check = True
if 'SSD' in device_id.upper():
policy_check = True
print("✅ Policy Check: SSD device approved for wiping")
else:
policy_check = False
print("❌ Policy Check: Non-SSD device requires additional approval")
zta_results = {
'operator_id': operator_id,
'forensic_tool_check': forensic_tool_check,
'policy_check': policy_check,
'timestamp': time.time(),
'device_id': device_id
}
print(f"👤 Operator ID: {operator_id}")
print(f"🔍 Forensic Tool Check: {'FAIL' if forensic_tool_check else 'PASS'}")
print(f"📋 Policy Check: {'PASS' if policy_check else 'FAIL'}")
return zta_results
def load_audit_chain(filename: str) -> List[str]:
"""
Load previous root hashes from audit log file
Args:
filename: Path to audit log file
Returns:
List of previous hash entries
"""
if not os.path.exists(filename):
return []
try:
with open(filename, 'r') as f:
hashes = [line.strip() for line in f.readlines() if line.strip()]
return hashes
except Exception as e:
print(f"⚠️ Error loading audit chain: {e}")
return []
def compute_merkle_root(leaf_hashes: List[str]) -> str:
"""
Compute Merkle Tree root from list of leaf hashes
Args:
leaf_hashes: List of hex SHA-256 hashes
Returns:
Merkle root hash as hex string
"""
if not leaf_hashes:
return calculate_hash(b"EMPTY_TREE")
if len(leaf_hashes) == 1:
return leaf_hashes[0]
current_level = leaf_hashes.copy()
while len(current_level) > 1:
next_level = []
# Process pairs
for i in range(0, len(current_level), 2):
left = current_level[i]
# If odd number of hashes, duplicate the last one
if i + 1 < len(current_level):
right = current_level[i + 1]
else:
right = current_level[i] # Duplicate last hash
# Hash the concatenation
combined = (left + right).encode('utf-8')
parent_hash = calculate_hash(combined)
next_level.append(parent_hash)
current_level = next_level
return current_level[0]
def generate_audit_proof(metadata: Dict, old_root: str, audit_hash: str) -> Tuple[str, str]:
"""
Generate audit proof with new Merkle root
Args:
metadata: Audit metadata dictionary
old_root: Previous Merkle root
audit_hash: Current audit hash
Returns:
Tuple of (new_audit_hash, new_merkle_root)
"""
# Convert metadata to canonical JSON
canonical_json = json.dumps(metadata, sort_keys=True, separators=(',', ':'))
# Calculate audit hash of canonical JSON
new_audit_hash = calculate_hash(canonical_json.encode('utf-8'))
# Load previous audit chain
previous_hashes = load_audit_chain(AUDIT_LOG_FILE)
# Add new audit hash to the chain
all_hashes = previous_hashes + [new_audit_hash]
# Compute new Merkle root
new_merkle_root = compute_merkle_root(all_hashes)
return new_audit_hash, new_merkle_root
def main():
"""Main execution function"""
print("🚀 Secure Wipe Auditor - Zero Trust Edition")
print("=" * 50)
# Check for mock mode flag
MOCK_MODE = '--mock' in sys.argv
if MOCK_MODE:
print("🧪 Running in MOCK MODE for testing")
# Define target device
device_id = 'sda1_SSD_SN12345'
print(f"🎯 Target Device: {device_id}")
# Phase 1: Zero Trust Attestation
print(f"\n📋 Phase 1: Zero Trust Attestation")
print("-" * 30)
zta_results = perform_zta_checks(device_id)
# Zero Trust Enforcement - Exit if forensic tools detected
if zta_results['forensic_tool_check']:
print(f"\n❌ ZERO TRUST VIOLATION: Forensic tools detected!")
print(f"🚫 Wipe operation DENIED for security reasons")
print(f"🔒 System must be clean before proceeding")
sys.exit(1)
print(f"\n✅ Zero Trust Attestation PASSED")
# Wipe Operation (Mock)
print(f"\n💽 Phase 2: Secure Wipe Operation")
print("-" * 30)
print(f"🔄 Initiating secure wipe of {device_id}...")
wipe_success, final_verification_hash = mock_wipe_disk(device_id, 'SUCCESS')
if wipe_success:
print(f"✅ Wipe operation completed successfully")
print(f"🔐 Verification Hash: {final_verification_hash}")
else:
print(f"❌ Wipe operation failed")
print(f"💥 Error Hash: {final_verification_hash}")
# Phase 3: Audit Chain Generation
print(f"\n🔗 Phase 3: Immutable Audit Trail")
print("-" * 30)
# Compile final audit metadata
audit_metadata = {
'device_id': device_id,
'zta_results': zta_results,
'wipe_status': 'SUCCESS' if wipe_success else 'FAILURE',
'final_verification_hash': final_verification_hash,
'timestamp': time.time(),
'audit_version': '1.0',
'operator_attestation': True
}
# Load previous audit chain
previous_chain = load_audit_chain(AUDIT_LOG_FILE)
old_root = previous_chain[-1] if previous_chain else "GENESIS_BLOCK"
print(f"📚 Previous audit chain entries: {len(previous_chain)}")
print(f"🌳 Previous Merkle Root: {old_root}")
# Generate new audit proof
new_audit_hash, new_merkle_root = generate_audit_proof(
audit_metadata, old_root, ""
)
# Display results
print(f"\n🎯 AUDIT TRAIL RESULTS")
print("=" * 30)
print(f"📝 New Audit Hash: {new_audit_hash}")
print(f"🌳 New Merkle Root: {new_merkle_root}")
print(f"🔗 Chain Length: {len(previous_chain) + 1}")
# Write new Merkle root to audit log
try:
with open(AUDIT_LOG_FILE, 'a') as f:
f.write(f"{new_audit_hash}\n")
print(f"✅ Audit chain updated: {AUDIT_LOG_FILE}")
except Exception as e:
print(f"❌ Error writing audit log: {e}")
# Final summary
print(f"\n🎉 SECURE WIPE AUDIT COMPLETE")
print("=" * 30)
print(f"Device: {device_id}")
print(f"Status: {'SUCCESS' if wipe_success else 'FAILURE'}")
print(f"Operator: {zta_results['operator_id']}")
print(f"Audit Hash: {new_audit_hash[:16]}...")
print(f"Merkle Root: {new_merkle_root[:16]}...")
print(f"Zero Trust: ENFORCED")
print(f"Immutability: GUARANTEED")
if __name__ == "__main__":
main()