-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfirewall.py
More file actions
117 lines (95 loc) · 3.66 KB
/
firewall.py
File metadata and controls
117 lines (95 loc) · 3.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import os
import sys
import time
import threading
from collections import defaultdict
from scapy.all import sniff, IP, TCP
THRESHOLD = 10 # packets per second
BLOCK_TIME = 1 # seconds for packet rate check
INTERFACE = "enp0s3" # change to your network interface
lock = threading.Lock()
# File handling functions (whitelist and blacklist)
def read_ip_file(filename):
if not os.path.isfile(filename):
return set()
with open(filename, "r") as file:
ips = [line.strip() for line in file if line.strip()]
return set(ips)
# Nimda worm signature check with proper payload decoding
def is_nimda_worm(packet):
if packet.haslayer(TCP) and packet[TCP].dport == 80:
payload_bytes = bytes(packet[TCP].payload)
try:
payload_str = payload_bytes.decode('utf-8', errors='ignore')
except Exception:
payload_str = ""
return "GET /scripts/root.exe" in payload_str
return False
# Log events to file
def log_event(message):
log_folder = "logs"
os.makedirs(log_folder, exist_ok=True)
timestamp = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
log_file = os.path.join(log_folder, f"log_{timestamp}.txt")
with open(log_file, "a") as file:
file.write(f"{message}\n")
# Packet callback to analyze traffic
def packet_callback(packet):
if not packet.haslayer(IP):
return
src_ip = packet[IP].src
# Ignore invalid source IPs
if src_ip == "0.0.0.0":
return
# Check if IP is in the whitelist
if src_ip in whitelist_ips:
return
with lock:
# Check if IP is in the blacklist and block if not already blocked
if src_ip in blacklist_ips and src_ip not in blocked_ips:
os.system(f"iptables -I INPUT -s {src_ip} -j DROP")
blocked_ips.add(src_ip)
log_event(f"Blocking blacklisted IP: {src_ip}")
return
# Check for Nimda worm signature and block
if is_nimda_worm(packet):
if src_ip not in blocked_ips:
print(f"Blocking Nimda source IP: {src_ip}")
os.system(f"iptables -I INPUT -s {src_ip} -j DROP")
blocked_ips.add(src_ip)
log_event(f"Blocking Nimda source IP: {src_ip}")
return
# Track packet count for rate-limiting
packet_count[src_ip] += 1
print(f"[>] Packet from {src_ip}, count = {packet_count[src_ip]}")
# Analyze traffic and apply rate-limiting
def analyze_traffic():
global packet_count
with lock:
counts = dict(packet_count)
packet_count.clear()
for ip, count in counts.items():
packet_rate = count / BLOCK_TIME
if packet_rate > THRESHOLD and ip not in blocked_ips:
print(f"[!!!] Blocking {ip} (packet rate: {packet_rate:.2f} pkts/sec)")
os.system(f"iptables -I INPUT -s {ip} -j DROP")
blocked_ips.add(ip)
log_event(f"Blocking IP: {ip}, packet rate: {packet_rate:.2f}")
threading.Timer(BLOCK_TIME, analyze_traffic).start()
if __name__ == "__main__":
if os.geteuid() != 0:
print("This script requires root privileges. Run with sudo.")
sys.exit(1)
whitelist_ips = read_ip_file("whitelist.txt")
blacklist_ips = read_ip_file("blacklist.txt")
packet_count = defaultdict(int)
blocked_ips = set()
print(f"[+] Starting firewall on {INTERFACE}, threshold = {THRESHOLD} pkts/sec")
# Start the periodic traffic analysis
analyze_traffic()
try:
sniff(filter="ip", prn=packet_callback, iface=INTERFACE)
except KeyboardInterrupt:
print("\n[!] Stopped by user.")
except Exception as e:
print(f"[!] Error occurred: {e}")