|
| 1 | +import os |
| 2 | +import types |
| 3 | +import importlib |
| 4 | +import errno |
| 5 | + |
| 6 | +import pytest |
| 7 | +from requests.exceptions import HTTPError |
| 8 | + |
| 9 | +import virl.helpers as helpers |
| 10 | + |
| 11 | + |
| 12 | +class _FakeNode: |
| 13 | + def __init__(self, label="n1", booted=True, exc=None): |
| 14 | + self.label = label |
| 15 | + self._booted = booted |
| 16 | + self._exc = exc |
| 17 | + self.extracted = False |
| 18 | + |
| 19 | + def is_booted(self): |
| 20 | + return self._booted |
| 21 | + |
| 22 | + def extract_configuration(self): |
| 23 | + if self._exc: |
| 24 | + raise self._exc |
| 25 | + self.extracted = True |
| 26 | + |
| 27 | + |
| 28 | +class _FakeLab: |
| 29 | + def __init__(self, nodes): |
| 30 | + self._nodes = nodes |
| 31 | + |
| 32 | + def nodes(self): |
| 33 | + return self._nodes |
| 34 | + |
| 35 | + |
| 36 | +class _FakeInterface: |
| 37 | + def __init__(self, ipv4=None, ipv6=None): |
| 38 | + self.discovered_ipv4 = ipv4 or [] |
| 39 | + self.discovered_ipv6 = ipv6 or [] |
| 40 | + |
| 41 | + |
| 42 | +def test_find_virl_or_else_and_cache_paths(monkeypatch): |
| 43 | + monkeypatch.setattr(helpers, "find_virl", lambda: None) |
| 44 | + assert helpers.find_virl_or_else() == "." |
| 45 | + assert helpers.get_cache_root().endswith("/.virl/cached_cml_labs") |
| 46 | + assert helpers.get_current_lab_link().endswith("/.virl/current_cml_lab") |
| 47 | + assert helpers.get_default_plugin_dir().endswith("/.virl/plugins") |
| 48 | + |
| 49 | + |
| 50 | +def test_safe_join_existing_lab_variants(): |
| 51 | + client = types.SimpleNamespace( |
| 52 | + get_lab_list=lambda: ["lab-1"], |
| 53 | + join_existing_lab=lambda lab_id: {"id": lab_id}, |
| 54 | + find_labs_by_title=lambda title: ["lab-1"] if title == "one" else ["a", "b"], |
| 55 | + ) |
| 56 | + assert helpers.safe_join_existing_lab("lab-1", client) == {"id": "lab-1"} |
| 57 | + assert helpers.safe_join_existing_lab("missing", client) is None |
| 58 | + assert helpers.safe_join_existing_lab_by_title("one", client) == "lab-1" |
| 59 | + assert helpers.safe_join_existing_lab_by_title("dup", client) is None |
| 60 | + |
| 61 | + |
| 62 | +def test_cache_lab_data_and_current_lab_link(monkeypatch, tmp_path): |
| 63 | + cache_root = tmp_path / ".virl" / "cached_cml_labs" |
| 64 | + current = tmp_path / ".virl" / "current_cml_lab" |
| 65 | + monkeypatch.setattr(helpers, "get_cache_root", lambda: str(cache_root)) |
| 66 | + monkeypatch.setattr(helpers, "get_current_lab_link", lambda: str(current)) |
| 67 | + |
| 68 | + helpers.cache_lab_data("lab-1", "topology-data") |
| 69 | + assert (cache_root / "lab-1").read_text() == "topology-data" |
| 70 | + |
| 71 | + helpers.set_current_lab("lab-1") |
| 72 | + assert helpers.get_current_lab() == "lab-1" |
| 73 | + |
| 74 | + helpers.clear_current_lab("other") |
| 75 | + assert os.path.exists(current) |
| 76 | + helpers.clear_current_lab("lab-1") |
| 77 | + assert not os.path.exists(current) |
| 78 | + |
| 79 | + |
| 80 | +def test_check_lab_cache_handles_errors(monkeypatch, tmp_path): |
| 81 | + monkeypatch.setattr(helpers, "get_cache_root", lambda: str(tmp_path)) |
| 82 | + cached = tmp_path / "lab-1" |
| 83 | + cached.write_text("x") |
| 84 | + assert helpers.check_lab_cache("lab-1").endswith("lab-1") |
| 85 | + |
| 86 | + monkeypatch.setattr(helpers, "get_cache_root", lambda: (_ for _ in ()).throw(RuntimeError("boom"))) |
| 87 | + assert helpers.check_lab_cache("lab-1") is None |
| 88 | + |
| 89 | + |
| 90 | +def test_extract_configurations_handles_http_and_generic_errors(monkeypatch): |
| 91 | + good = _FakeNode(label="good") |
| 92 | + bad_http = _FakeNode(label="bad-http") |
| 93 | + bad_generic = _FakeNode(label="bad-generic") |
| 94 | + |
| 95 | + http_error = HTTPError("bad") |
| 96 | + http_error.response = types.SimpleNamespace(status_code=500) |
| 97 | + bad_http._exc = http_error |
| 98 | + bad_generic._exc = RuntimeError("oops") |
| 99 | + |
| 100 | + warnings = [] |
| 101 | + monkeypatch.setattr(helpers.click, "secho", lambda msg, fg="yellow": warnings.append((msg, fg))) |
| 102 | + |
| 103 | + helpers.extract_configurations(_FakeLab([good, bad_http, bad_generic])) |
| 104 | + |
| 105 | + assert good.extracted is True |
| 106 | + assert any("bad-http" in msg for msg, _ in warnings) |
| 107 | + assert any("bad-generic" in msg for msg, _ in warnings) |
| 108 | + |
| 109 | + |
| 110 | +def test_extract_configurations_ignores_http_400(monkeypatch): |
| 111 | + bad_http_400 = _FakeNode(label="bad-http-400") |
| 112 | + err = HTTPError("bad-400") |
| 113 | + err.response = types.SimpleNamespace(status_code=400) |
| 114 | + bad_http_400._exc = err |
| 115 | + |
| 116 | + warnings = [] |
| 117 | + monkeypatch.setattr(helpers.click, "secho", lambda msg, fg="yellow": warnings.append((msg, fg))) |
| 118 | + helpers.extract_configurations(_FakeLab([bad_http_400])) |
| 119 | + assert warnings == [] |
| 120 | + |
| 121 | + |
| 122 | +def test_get_node_mgmt_ip_prefers_ipv4_then_non_link_local_ipv6(): |
| 123 | + node_v4 = types.SimpleNamespace(interfaces=lambda: [_FakeInterface(ipv4=["10.10.10.10"])]) |
| 124 | + assert helpers.get_node_mgmt_ip(node_v4) == "10.10.10.10" |
| 125 | + |
| 126 | + node_v6 = types.SimpleNamespace(interfaces=lambda: [_FakeInterface(ipv6=["fe80::1", "2001:db8::1"])]) |
| 127 | + assert helpers.get_node_mgmt_ip(node_v6) == "2001:db8::1" |
| 128 | + |
| 129 | + |
| 130 | +def test_get_node_mgmt_ip_skips_link_local_only_ipv6(): |
| 131 | + node_v6 = types.SimpleNamespace(interfaces=lambda: [_FakeInterface(ipv6=["fe80::1"])]) |
| 132 | + assert helpers.get_node_mgmt_ip(node_v6) is None |
| 133 | + |
| 134 | + |
| 135 | +def test_get_cml_client_uses_verify_flag_and_clears_env(monkeypatch): |
| 136 | + captured = {} |
| 137 | + |
| 138 | + def fake_client(host, user, password, raise_for_auth_failure, ssl_verify): |
| 139 | + captured.update( |
| 140 | + { |
| 141 | + "host": host, |
| 142 | + "user": user, |
| 143 | + "password": password, |
| 144 | + "raise_for_auth_failure": raise_for_auth_failure, |
| 145 | + "ssl_verify": ssl_verify, |
| 146 | + } |
| 147 | + ) |
| 148 | + return "client" |
| 149 | + |
| 150 | + monkeypatch.setattr(helpers, "ClientLibrary", fake_client) |
| 151 | + os.environ["VIRL2_USER"] = "u" |
| 152 | + os.environ["VIRL2_PASS"] = "p" |
| 153 | + os.environ["VIRL2_URL"] = "url" |
| 154 | + |
| 155 | + server = types.SimpleNamespace(host="h", user="u", passwd="p", config={"CML_VERIFY_CERT": "false"}) |
| 156 | + assert helpers.get_cml_client(server) == "client" |
| 157 | + assert captured["ssl_verify"] is False |
| 158 | + assert "VIRL2_USER" not in os.environ |
| 159 | + |
| 160 | + server.config["CML_VERIFY_CERT"] = "/tmp/cert.pem" |
| 161 | + helpers.get_cml_client(server) |
| 162 | + assert captured["ssl_verify"] == "/tmp/cert.pem" |
| 163 | + helpers.get_cml_client(server, ignore=True) |
| 164 | + assert captured["ssl_verify"] is False |
| 165 | + |
| 166 | + |
| 167 | +def test_group_permission_helpers_and_command_detection(monkeypatch): |
| 168 | + assert helpers.convert_permissions("read_write") == ["lab_view", "lab_edit", "lab_exec", "lab_admin"] |
| 169 | + assert helpers.convert_permissions("read_only") == ["lab_view"] |
| 170 | + |
| 171 | + users = [{"id": "1", "username": "a"}, {"id": "2", "username": "b"}] |
| 172 | + assert helpers.get_group_member_ids(users, ["b"], False) == ["2"] |
| 173 | + assert helpers.get_group_member_ids(users, ["b"], True) == ["1", "2"] |
| 174 | + |
| 175 | + client = types.SimpleNamespace(get_lab_list=lambda: ["lab-1", "lab-2"]) |
| 176 | + assert helpers.get_group_associations(client, None, "read_only") == [ |
| 177 | + {"id": "lab-1", "permissions": ["lab_view"]}, |
| 178 | + {"id": "lab-2", "permissions": ["lab_view"]}, |
| 179 | + ] |
| 180 | + assert helpers.get_group_associations(client, [("lab-9", "read_write")], None) == [ |
| 181 | + {"id": "lab-9", "permissions": ["lab_view", "lab_edit", "lab_exec", "lab_admin"]} |
| 182 | + ] |
| 183 | + assert helpers.get_group_associations(client, None, None) == [] |
| 184 | + |
| 185 | + monkeypatch.setattr(helpers.sys, "argv", ["cml"]) |
| 186 | + assert helpers.get_command() == "cml" |
| 187 | + monkeypatch.setattr(helpers.sys, "argv", ["virl"]) |
| 188 | + assert helpers.get_command() == "virl" |
| 189 | + |
| 190 | + |
| 191 | +def test_set_current_lab_raises_when_cache_missing(monkeypatch, tmp_path): |
| 192 | + monkeypatch.setattr(helpers, "get_cache_root", lambda: str(tmp_path / "cache")) |
| 193 | + monkeypatch.setattr(helpers, "get_current_lab_link", lambda: str(tmp_path / "link")) |
| 194 | + with pytest.raises(FileNotFoundError): |
| 195 | + helpers.set_current_lab("missing") |
| 196 | + |
| 197 | + |
| 198 | +def test_mkdir_p_raises_non_eexist(monkeypatch): |
| 199 | + def _boom(_path): |
| 200 | + raise OSError(errno.EPERM, "denied") |
| 201 | + |
| 202 | + monkeypatch.setattr(helpers.os, "makedirs", _boom) |
| 203 | + with pytest.raises(OSError): |
| 204 | + helpers.mkdir_p("/tmp/denied") |
| 205 | + |
| 206 | + |
| 207 | +def test_find_virl_windows_and_edge_cases(monkeypatch): |
| 208 | + # Force while loop to exit immediately (covers while false branch). |
| 209 | + class _RootLike(str): |
| 210 | + def split(self, _sep): |
| 211 | + return "\\" |
| 212 | + |
| 213 | + monkeypatch.setattr(helpers.os, "getcwd", lambda: _RootLike("\\")) |
| 214 | + monkeypatch.setattr(helpers.os.path, "abspath", lambda _p: "\\") |
| 215 | + monkeypatch.setattr(helpers.platform, "system", lambda: "Windows") |
| 216 | + assert helpers.find_virl() is None |
| 217 | + |
| 218 | + # Trigger Windows path building and IndexError path in pop(). |
| 219 | + monkeypatch.setattr(helpers.os, "getcwd", lambda: "") |
| 220 | + monkeypatch.setattr(helpers.os.path, "abspath", lambda _p: "\\") |
| 221 | + monkeypatch.setattr(helpers.os, "listdir", lambda _p: []) |
| 222 | + assert helpers.find_virl() is None |
| 223 | + |
| 224 | + # Cover non-empty Windows lookin path branch. |
| 225 | + monkeypatch.setattr(helpers.os, "getcwd", lambda: "foo/bar") |
| 226 | + monkeypatch.setattr(helpers.os.path, "abspath", lambda _p: "\\") |
| 227 | + monkeypatch.setattr(helpers.os, "listdir", lambda _p: [".virl"]) |
| 228 | + assert helpers.find_virl() == "foo\\bar" |
| 229 | + |
| 230 | + |
| 231 | +def test_windows_redirection_context_manager(monkeypatch): |
| 232 | + # Reload helpers as if running on Windows so class attributes are created. |
| 233 | + import platform |
| 234 | + import ctypes |
| 235 | + import virl.helpers as helpers_mod |
| 236 | + |
| 237 | + monkeypatch.setattr(platform, "system", lambda: "Windows") |
| 238 | + monkeypatch.setattr(ctypes, "windll", types.SimpleNamespace(kernel32=types.SimpleNamespace()), raising=False) |
| 239 | + monkeypatch.setattr( |
| 240 | + ctypes.windll.kernel32, |
| 241 | + "Wow64DisableWow64FsRedirection", |
| 242 | + lambda *_args: 1, |
| 243 | + raising=False, |
| 244 | + ) |
| 245 | + monkeypatch.setattr( |
| 246 | + ctypes.windll.kernel32, |
| 247 | + "Wow64RevertWow64FsRedirection", |
| 248 | + lambda *_args: 1, |
| 249 | + raising=False, |
| 250 | + ) |
| 251 | + |
| 252 | + win_helpers = importlib.reload(helpers_mod) |
| 253 | + ctx = win_helpers.disable_file_system_redirection() |
| 254 | + ctx.__enter__() |
| 255 | + ctx.__exit__(None, None, None) |
| 256 | + |
| 257 | + # Cover __exit__ no-op branch when disabling redirection fails. |
| 258 | + monkeypatch.setattr(ctypes.windll.kernel32, "Wow64DisableWow64FsRedirection", lambda *_args: 0, raising=False) |
| 259 | + win_helpers = importlib.reload(helpers_mod) |
| 260 | + ctx = win_helpers.disable_file_system_redirection() |
| 261 | + ctx.__enter__() |
| 262 | + ctx.__exit__(None, None, None) |
| 263 | + |
| 264 | + |
| 265 | +def test_get_cml_client_without_verify_cert_key(monkeypatch): |
| 266 | + captured = {} |
| 267 | + |
| 268 | + def fake_client(host, user, password, raise_for_auth_failure, ssl_verify): |
| 269 | + captured["ssl_verify"] = ssl_verify |
| 270 | + return "client" |
| 271 | + |
| 272 | + monkeypatch.setattr(helpers, "ClientLibrary", fake_client) |
| 273 | + server = types.SimpleNamespace(host="h", user="u", passwd="p", config={}) |
| 274 | + assert helpers.get_cml_client(server) == "client" |
| 275 | + assert captured["ssl_verify"] is True |
0 commit comments