-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
102 lines (88 loc) · 4.28 KB
/
parser.py
File metadata and controls
102 lines (88 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import re
def parse_haproxy_config(cfg):
"""Enhanced parser: extracts `frontend`, `backend`, `acl` and `server` entries.
Returns a dict: { nodes: [{id,label,type,title}], edges: [{from,to}] }
Types: frontend, backend, acl, server
"""
# First, split into sections (frontend/backend) while also collecting global ACLs
sections = []
current = None
acl_defs = {} # name -> definition (text)
for line in cfg.splitlines():
m = re.match(r'^\s*(frontend|backend)\s+(\S+)', line)
m_acl = re.match(r'^\s*acl\s+(\S+)\s+(.*)', line)
if m_acl:
name = m_acl.group(1)
acl_defs[name] = m_acl.group(2).strip()
if m:
if current:
sections.append(current)
current = {'type': m.group(1), 'name': m.group(2), 'lines': []}
elif current is not None:
# collect lines inside current section
current['lines'].append(line)
# also collect acl definitions scoped inside sections
m_acl2 = re.match(r'^\s*acl\s+(\S+)\s+(.*)', line)
if m_acl2:
acl_defs[m_acl2.group(1)] = m_acl2.group(2).strip()
if current:
sections.append(current)
nodes = []
edges = []
backend_names = set()
# collect backends and their servers
server_re = re.compile(r'^\s*server\s+(\S+)\s+(\S+)', re.IGNORECASE)
backend_map = {} # backend -> list of servers (name, addr)
for sec in sections:
if sec['type'] == 'backend':
bname = sec['name']
backend_names.add(bname)
backend_map[bname] = []
for line in sec['lines']:
m = server_re.match(line)
if m:
sname = m.group(1)
addr = m.group(2)
backend_map[bname].append((sname, addr))
# create backend nodes
for b in backend_names:
nodes.append({'id': f'backend::{b}', 'label': b, 'type': 'backend', 'title': b})
# add server nodes and edges
for sname, addr in backend_map.get(b, []):
sid = f'server::{b}::{sname}'
nodes.append({'id': sid, 'label': f"{sname}\n{addr}", 'type': 'server', 'title': addr})
edges.append({'from': f'backend::{b}', 'to': sid})
# add ACL nodes
for aname, adef in acl_defs.items():
nodes.append({'id': f'acl::{aname}', 'label': aname, 'type': 'acl', 'title': adef})
# process frontends and edges (use_backend / default_backend with optional if clause)
use_backend_re = re.compile(r'^\s*(?:use_backend|default_backend)\s+(\S+)(?:\s+if\s+(.+))?', re.IGNORECASE | re.MULTILINE)
for sec in sections:
if sec['type'] == 'frontend':
fname = sec['name']
nodes.append({'id': f'frontend::{fname}', 'label': fname, 'type': 'frontend', 'title': fname})
text = '\n'.join(sec['lines'])
for m in use_backend_re.finditer(text):
b = m.group(1)
cond = m.group(2)
# ensure backend node exists
if f'backend::{b}' not in [n['id'] for n in nodes]:
nodes.append({'id': f'backend::{b}', 'label': b, 'type': 'backend', 'title': b})
# If there's a condition, try to link via ACLs as well
if cond:
tokens = re.findall(r"[A-Za-z0-9_:-]+", cond)
added = False
for t in tokens:
if t in acl_defs:
added = True
edges.append({'from': f'frontend::{fname}', 'to': f'acl::{t}'})
edges.append({'from': f'acl::{t}', 'to': f'backend::{b}'})
# also add a direct dashed/labelled edge from frontend -> backend for visibility
# print(cond)
# edges.append({'from': f'frontend::{fname}', 'to': f'backend::{b}', 'label': cond, 'dashes': True})
if not added:
edges.append({'from': f'frontend::{fname}', 'to': f'backend::{b}', 'label': 'acl', 'dashes': True})
else:
# unconditional direct edge
edges.append({'from': f'frontend::{fname}', 'to': f'backend::{b}'})
return {'nodes': nodes, 'edges': edges}