diff --git a/src/extensions/score_draw_uml_funcs/__init__.py b/src/extensions/score_draw_uml_funcs/__init__.py index e3f254ecf..96e0b93c4 100644 --- a/src/extensions/score_draw_uml_funcs/__init__.py +++ b/src/extensions/score_draw_uml_funcs/__init__.py @@ -83,6 +83,36 @@ def scripts_directory_hash(): # ╰──────────────────────────────────────────────────────────────────────────────╯ +def _process_interfaces( + iface_list: list[str], + relation: str, + need: dict[str, str], + all_needs: dict[str, dict[str, str]], + proc_dict: dict[str, str] | dict[str, list[str]], + linkage_text: str, +) -> str: + """Helper to process either implemented or used interfaces.""" + for iface in iface_list: + # check for misspelled interface + if not all_needs.get(iface, []): + logger.info(f"{need}: {relation} {iface} could not be found") + continue + + if relation == "implements": + if not proc_dict.get(iface, []): + linkage_text += ( + f"{gen_link_text(need, '-u->', all_needs[iface], 'implements')} \n" + ) + proc_dict[iface] = need["id"] + else: # "uses" + if not proc_dict.get(iface, []): + proc_dict[iface] = [need["id"]] + else: + proc_dict[iface].append(need["id"]) + + return linkage_text + + def draw_comp_incl_impl_int( need: dict[str, str], all_needs: dict[str, dict[str, str]], @@ -133,36 +163,25 @@ def draw_comp_incl_impl_int( local_impl_interfaces = get_interface_from_component(need, "implements", all_needs) local_used_interfaces = get_interface_from_component(need, "uses", all_needs) - # Add all interfaces which are implemented by component to global list - # and provide implementation - for iface in local_impl_interfaces: - # check for misspelled implements - if not all_needs.get(iface, []): - logger.info(f"{need}: implements {iface} could not be found") - continue - - if not proc_impl_interfaces.get(iface, []): - linkage_text += f"{ - gen_link_text( - need, - '-u->', - all_needs[iface], - 'implements', - ) - } \n" - proc_impl_interfaces[iface] = need["id"] - - # Add all elements which are used by component to global list - for iface in local_used_interfaces: - # check for misspelled used - if not all_needs.get(iface, []): - logger.info(f"{need}: uses {iface} could not be found") - continue + # Process implemented interfaces + linkage_text = _process_interfaces( + local_impl_interfaces, + "implements", + need, + all_needs, + proc_impl_interfaces, + linkage_text, + ) - if not proc_used_interfaces.get(iface, []): - proc_used_interfaces[iface] = [need["id"]] - else: - proc_used_interfaces[iface].append(need["id"]) + # Process used interfaces + linkage_text = _process_interfaces( + local_used_interfaces, + "uses", + need, + all_needs, + proc_used_interfaces, + linkage_text, + ) return structure_text, linkage_text, proc_impl_interfaces, proc_used_interfaces @@ -191,6 +210,68 @@ def draw_impl_interface( return local_impl_interfaces +def _process_impl_interfaces( + need: dict[str, str], + all_needs: dict[str, dict[str, str]], + proc_impl_interfaces: dict[str, str], + structure_text: str, +) -> str: + """Handle implemented interfaces outside the boxes.""" + local_impl_interfaces = draw_impl_interface(need, all_needs, set()) + # Add all interfaces which are implemented by component to global list + # and provide implementation + for iface in local_impl_interfaces: + # check for misspelled implements + if not all_needs.get(iface, []): + logger.info(f"{need}: implements {iface} could not be found") + continue + if not proc_impl_interfaces.get(iface, []): + structure_text += gen_interface_element(iface, all_needs, True) + return structure_text + + +def _process_used_interfaces( + need: dict[str, str], + all_needs: dict[str, dict[str, str]], + proc_impl_interfaces: dict[str, str], + proc_used_interfaces: dict[str, list[str]], + local_impl_interfaces: list[str], + structure_text: str, + linkage_text: str, +) -> tuple[str, str]: + """Handle all interfaces which are used by component.""" + for iface, comps in proc_used_interfaces.items(): + if iface not in proc_impl_interfaces: + # Add implementing components and modules + impl_comp_str = get_impl_comp_from_logic_iface(iface, all_needs) + impl_comp = all_needs.get(impl_comp_str[0], {}) if impl_comp_str else "" + + if impl_comp: + retval = get_hierarchy_text(impl_comp_str[0], all_needs) + structure_text += retval[2] # module open + structure_text += retval[0] # rest open + structure_text += retval[1] # rest close + structure_text += retval[3] # module close + if iface not in local_impl_interfaces: + structure_text += gen_interface_element(iface, all_needs, True) + # Draw connection between implementing components and interface + linkage_text += f"{ + gen_link_text(impl_comp, '-u->', all_needs[iface], 'implements') + } \n" + else: + # Add only interface if component not defined + print(f"{iface}: No implementing component defined") + structure_text += gen_interface_element(iface, all_needs, True) + + # Interface can be used by multiple components + for comp in comps: + linkage_text += f"{ + gen_link_text(all_needs[comp], '-d[#green]->', all_needs[iface], 'uses') + } \n" + + return structure_text, linkage_text + + def draw_module( need: dict[str, str], all_needs: dict[str, dict[str, str]], @@ -254,17 +335,9 @@ def draw_module( # Draw all implemented interfaces outside the boxes local_impl_interfaces = draw_impl_interface(need, all_needs, set()) - - # Add all interfaces which are implemented by component to global list - # and provide implementation - for iface in local_impl_interfaces: - # check for misspelled implements - if not all_needs.get(iface, []): - logger.info(f"{need}: implements {iface} could not be found") - continue - - if not proc_impl_interfaces.get(iface, []): - structure_text += gen_interface_element(iface, all_needs, True) + structure_text = _process_impl_interfaces( + need, all_needs, proc_impl_interfaces, structure_text + ) # Draw outer module structure_text += f"{gen_struct_element('package', need)} {{\n" @@ -272,21 +345,17 @@ def draw_module( # Draw inner components recursively for need_inc in need.get("includes", []): curr_need = all_needs.get(need_inc, {}) - # check for misspelled include if not curr_need: logger.info(f"{need}: include with id {need_inc} could not be found") continue - if curr_need["type"] not in ["comp_arc_sta", "mod_view_sta"]: continue - sub_structure, sub_linkage, proc_impl_interfaces, proc_used_interfaces = ( draw_comp_incl_impl_int( curr_need, all_needs, proc_impl_interfaces, proc_used_interfaces ) ) - structure_text += sub_structure linkage_text += sub_linkage @@ -294,35 +363,15 @@ def draw_module( structure_text += f"}} /' {need['title']} '/ \n\n" # Add all interfaces which are used by component - for iface, comps in proc_used_interfaces.items(): - if iface not in proc_impl_interfaces: - # Add implementing components and modules - impl_comp_str = get_impl_comp_from_logic_iface(iface, all_needs) - - impl_comp = all_needs.get(impl_comp_str[0], {}) if impl_comp_str else "" - - if impl_comp: - retval = get_hierarchy_text(impl_comp_str[0], all_needs) - structure_text += retval[2] # module open - structure_text += retval[0] # rest open - - structure_text += retval[1] # rest close - structure_text += retval[3] # module close - if iface not in local_impl_interfaces: - structure_text += gen_interface_element(iface, all_needs, True) - - # Draw connection between implementing components and interface - linkage_text += f"{gen_link_text(impl_comp, '-u->', all_needs[iface], 'implements')} \n" - - else: - # Add only interface if component not defined - print(f"{iface}: No implementing component defined") - structure_text += gen_interface_element(iface, all_needs, True) - - # Interface can be used by multiple components - for comp in comps: - # Draw connection between used interfaces and components - linkage_text += f"{gen_link_text(all_needs[comp], '-d[#green]->', all_needs[iface], 'uses')} \n" + structure_text, linkage_text = _process_used_interfaces( + need, + all_needs, + proc_impl_interfaces, + proc_used_interfaces, + local_impl_interfaces, + structure_text, + linkage_text, + ) # Remove duplicate links linkage_text = "\n".join(set(linkage_text.split("\n"))) + "\n" @@ -339,43 +388,30 @@ class draw_full_feature: def __repr__(self): return "draw_full_feature" + " in " + scripts_directory_hash() - def __call__( - self, need: dict[str, str], all_needs: dict[str, dict[str, str]] - ) -> str: - interfacelist: list[str] = [] - impl_comp: dict[str, str] = dict() - # Store all Elements which have already been processed - proc_impl_interfaces: dict[str, str] = dict() - proc_used_interfaces: dict[str, list[str]] = dict() - proc_modules: list[str] = list() - - link_text = "" - structure_text = ( - f'actor "Feature User" as {get_alias({"id": "Feature_User"})} \n' - ) - - # Define Feature as a package - # structure_text += f"{gen_struct_element('package', need)} {{\n" - - # Add logical Interfaces / Interface Operations (aka includes) - for need_inc in need.get("includes", []): - # Generate list of interfaces since both interfaces - # and interface operations can be included - iface = get_interface_from_int(need_inc, all_needs) - if iface not in interfacelist: - interfacelist.append(iface) - + def _collect_interfaces_and_modules( + self, + need: dict[str, str], + all_needs: dict[str, dict[str, str]], + interfacelist: list[str], + impl_comp: dict[str, str], + proc_modules: list[str], + proc_impl_interfaces: dict[str, str], + proc_used_interfaces: dict[str, list[str]], + structure_text: str, + link_text: str, + ) -> tuple[ + str, str, dict[str, str], dict[str, list[str]], dict[str, str], list[str] + ]: + """Process interfaces and load modules for implementation.""" for iface in interfacelist: - if iface_need := all_needs.get(iface): + if all_needs.get(iface): if iface: comps = get_impl_comp_from_logic_iface(iface, all_needs) - if comps: impl_comp[iface] = comps[0] if imcomp := impl_comp.get(iface, {}): module = get_module(imcomp, all_needs) - # FIXME: sometimes module is empty, then the following code fails if not module: logger.info( @@ -395,14 +431,27 @@ def __call__( ) structure_text += tmp proc_modules.append(module) - else: logger.info(f"{need}: Interface {iface} could not be found") continue + return ( + structure_text, + link_text, + proc_impl_interfaces, + proc_used_interfaces, + impl_comp, + proc_modules, + ) - # Close Package - # structure_text += f"}} /' {need['title']} '/ \n\n" - + def _build_links( + self, + need: dict[str, str], + all_needs: dict[str, dict[str, str]], + interfacelist: list[str], + impl_comp: dict[str, str], + link_text: str, + ) -> str: + """Add actor-interface and interface-component relations.""" for iface in interfacelist: if imcomp := impl_comp.get(iface): # Add relation between Actor and Interfaces @@ -429,6 +478,61 @@ def __call__( else: logger.info(f"{need}: Interface {iface} could not be found") continue + return link_text + + def __call__( + self, need: dict[str, str], all_needs: dict[str, dict[str, str]] + ) -> str: + interfacelist: list[str] = [] + impl_comp: dict[str, str] = dict() + # Store all Elements which have already been processed + proc_impl_interfaces: dict[str, str] = dict() + proc_used_interfaces: dict[str, list[str]] = dict() + proc_modules: list[str] = list() + + link_text = "" + structure_text = ( + f'actor "Feature User" as {get_alias({"id": "Feature_User"})} \n' + ) + + # Define Feature as a package + # structure_text += f"{gen_struct_element('package', need)} {{\n" + + # Add logical Interfaces / Interface Operations (aka includes) + for need_inc in need.get("includes", []): + # Generate list of interfaces since both interfaces + # and interface operations can be included + iface = get_interface_from_int(need_inc, all_needs) + if iface not in interfacelist: + interfacelist.append(iface) + + # Process interfaces and collect required modules + ( + structure_text, + link_text, + proc_impl_interfaces, + proc_used_interfaces, + impl_comp, + proc_modules, + ) = self._collect_interfaces_and_modules( + need, + all_needs, + interfacelist, + impl_comp, + proc_modules, + proc_impl_interfaces, + proc_used_interfaces, + structure_text, + link_text, + ) + + # Close Package + # structure_text += f"}} /' {need['title']} '/ \n\n" + + # Build all links between actor, interfaces, and components + link_text = self._build_links( + need, all_needs, interfacelist, impl_comp, link_text + ) # Remove duplicate links link_text = "\n".join(set(link_text.split("\n"))) + "\n" diff --git a/src/extensions/score_metamodel/__init__.py b/src/extensions/score_metamodel/__init__.py index 7f5cbdd2d..c6e57680b 100644 --- a/src/extensions/score_metamodel/__init__.py +++ b/src/extensions/score_metamodel/__init__.py @@ -11,7 +11,6 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* import importlib -import json import os import pkgutil from collections.abc import Callable @@ -24,13 +23,6 @@ from sphinx_needs.config import NeedType from sphinx_needs.data import NeedsInfoType, NeedsView, SphinxNeedsData -from src.helper_lib import ( - find_git_root, - find_ws_root, - get_current_git_hash, - get_github_repo_info, -) - from .external_needs import connect_external_needs from .log import CheckLogger diff --git a/src/extensions/score_metamodel/checks/attributes_format.py b/src/extensions/score_metamodel/checks/attributes_format.py index 2d3a6cb39..ebf07157c 100644 --- a/src/extensions/score_metamodel/checks/attributes_format.py +++ b/src/extensions/score_metamodel/checks/attributes_format.py @@ -70,10 +70,15 @@ def check_id_length(app: Sphinx, need: NeedsInfoType, log: CheckLogger): if parts[1] == "example_feature": max_lenght += 17 # _example_feature_ if len(need["id"]) > max_lenght: + length = 0 + if "example_feature" not in need["id"]: + length = len(need["id"]) + else: + length = len(need["id"]) - 17 msg = ( f"exceeds the maximum allowed length of 45 characters " "(current length: " - f"{len(need['id']) if 'example_feature' not in need['id'] else len(need['id']) - 17})." + f"{length})." ) log.warning_for_option(need, "id", msg) @@ -82,7 +87,7 @@ def _check_options_for_prohibited_words( prohibited_word_checks: ProhibitedWordCheck, need: NeedsInfoType, log: CheckLogger ): options: list[str] = [ - x for x in prohibited_word_checks.option_check.keys() if x != "types" + x for x in prohibited_word_checks.option_check if x != "types" ] for option in options: forbidden_words = prohibited_word_checks.option_check[option] diff --git a/src/extensions/score_metamodel/checks/check_options.py b/src/extensions/score_metamodel/checks/check_options.py index 1fe7354b0..aee3b18a8 100644 --- a/src/extensions/score_metamodel/checks/check_options.py +++ b/src/extensions/score_metamodel/checks/check_options.py @@ -11,6 +11,7 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* import re +from os import error from score_metamodel import ( CheckLogger, @@ -33,6 +34,30 @@ def get_need_type(needs_types: list[ScoreNeedType], directive: str) -> ScoreNeed raise ValueError(f"Need type {directive} not found in needs_types") +def _normalize_values(raw_value: str | list[str] | None) -> list[str]: + """Normalize a raw value into a list of strings.""" + if raw_value is None: + return [] + if isinstance(raw_value, str): + return [raw_value] + if isinstance(raw_value, list) and all(isinstance(v, str) for v in raw_value): + return raw_value + raise ValueError + + +def _validate_value_pattern( + value: str, pattern: str, need: NeedsInfoType, field: str, log: CheckLogger +) -> None: + """Check if a value matches the given pattern, log warnings if not.""" + try: + if not re.match(pattern, value): + log.warning_for_option(need, field, f"does not follow pattern `{pattern}`.") + except TypeError: + log.warning_for_option( + need, field, f"pattern `{pattern}` is not a valid regex pattern." + ) + + def validate_fields( need: NeedsInfoType, log: CheckLogger, @@ -64,31 +89,18 @@ def remove_prefix(word: str, prefixes: list[str]) -> str: need, f"is missing required {field_type}: `{field}`." ) continue # Skip empty optional fields - - values: list[str] - - if isinstance(raw_value, str): - values = [raw_value] - elif isinstance(raw_value, list) and all(isinstance(v, str) for v in raw_value): - values = raw_value - else: - values = [str(raw_value)] - + try: + values = _normalize_values(raw_value) + except ValueError as err: + raise ValueError( + f"An Attribute inside need {need['id']} is " + "not of type str. Only Strings are allowed" + ) from err # The filter ensures that the function is only called when needed. for value in values: if allowed_prefixes: value = remove_prefix(value, allowed_prefixes) - try: - if not re.match(pattern, value): - log.warning_for_option( - need, field, f"does not follow pattern `{pattern}`." - ) - except TypeError: - log.warning_for_option( - need, - field, - f"pattern `{pattern}` is not a valid regex pattern.", - ) + _validate_value_pattern(value, pattern, need, field, log) # req-Id: tool_req__docs_req_attr_reqtype diff --git a/src/extensions/score_metamodel/tests/test_check_options.py b/src/extensions/score_metamodel/tests/test_check_options.py index 094850481..438105a6b 100644 --- a/src/extensions/score_metamodel/tests/test_check_options.py +++ b/src/extensions/score_metamodel/tests/test_check_options.py @@ -156,7 +156,7 @@ def test_invalid_option_type(self): target_id="wf_req__001", id="wf_req__001", type="workflow", - some_invalid_option=42, + some_invalid_option="42", docname=None, lineno=None, ) diff --git a/src/extensions/score_source_code_linker/__init__.py b/src/extensions/score_source_code_linker/__init__.py index ce6fe2461..b8184c4e4 100644 --- a/src/extensions/score_source_code_linker/__init__.py +++ b/src/extensions/score_source_code_linker/__init__.py @@ -65,7 +65,8 @@ def setup_once(app: Sphinx, config: Config): LOGGER.debug(f"DEBUG: Git root is {find_git_root()}") # Run only for local files! - # ws_root is not set when running on any on bazel run command repositories (dependencies) + # ws_root is not set when running on any on bazel run + # command repositories (dependencies) ws_root = find_ws_root() if not ws_root: return @@ -146,7 +147,9 @@ def group_by_need(source_code_links: list[NeedLink]) -> dict[str, list[NeedLink] return source_code_links_by_need -def get_github_link(needlink: NeedLink = DefaultNeedLink()) -> str: +def get_github_link(needlink: NeedLink | None = None) -> str: + if needlink is None: + needlink = DefaultNeedLink() passed_git_root = find_git_root() if passed_git_root is None: passed_git_root = Path() diff --git a/src/extensions/score_source_code_linker/tests/test_requirement_links.py b/src/extensions/score_source_code_linker/tests/test_requirement_links.py index 1eb748ece..a6460c1d2 100644 --- a/src/extensions/score_source_code_linker/tests/test_requirement_links.py +++ b/src/extensions/score_source_code_linker/tests/test_requirement_links.py @@ -416,7 +416,7 @@ def test_get_current_git_hash(git_repo): def test_get_current_git_hash_invalid_repo(temp_dir): """Test getting git hash from invalid repository.""" - with pytest.raises(Exception): + with pytest.raises(subprocess.CalledProcessError): get_current_git_hash(temp_dir) @@ -519,7 +519,7 @@ def test_group_by_need_and_find_need_integration(sample_needlinks): ) # Test finding needs for each group - for need_id, links in grouped.items(): + for need_id in grouped: found_need = find_need(all_needs, need_id, ["PREFIX_"]) if need_id in ["TREQ_ID_1", "TREQ_ID_2"]: assert found_need is not None diff --git a/src/extensions/score_source_code_linker/tests/test_source_link.py b/src/extensions/score_source_code_linker/tests/test_source_link.py index 3e0b0ed68..32c022f0d 100644 --- a/src/extensions/score_source_code_linker/tests/test_source_link.py +++ b/src/extensions/score_source_code_linker/tests/test_source_link.py @@ -10,6 +10,7 @@ # # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* +import contextlib import json import os import shutil @@ -150,14 +151,11 @@ def _create_app(): base_dir = sphinx_base_dir docs_dir = base_dir / "docs" + original_cwd = None # CRITICAL: Change to a directory that exists and is accessible # This fixes the "no such file or directory" error in Bazel - original_cwd = None - try: + with contextlib.suppress(FileNotFoundError): original_cwd = os.getcwd() - except FileNotFoundError: - # Current working directory doesn't exist, which is the problem - pass # Change to the base_dir before creating SphinxTestApp os.chdir(base_dir) @@ -173,11 +171,9 @@ def _create_app(): finally: # Try to restore original directory, but don't fail if it doesn't exist if original_cwd is not None: - try: + # Original directory might not exist anymore in Bazel sandbox + with contextlib.suppress(FileNotFoundError, OSError): os.chdir(original_cwd) - except (FileNotFoundError, OSError): - # Original directory might not exist anymore in Bazel sandbox - pass return _create_app diff --git a/src/find_runfiles/test_find_runfiles.py b/src/find_runfiles/test_find_runfiles.py index 3ed2fc9db..97d73d84b 100644 --- a/src/find_runfiles/test_find_runfiles.py +++ b/src/find_runfiles/test_find_runfiles.py @@ -32,25 +32,25 @@ def get_runfiles_dir_impl( def test_run_incremental(): """bazel run //process-docs:incremental""" # in incremental.py: - assert ( - get_runfiles_dir_impl( - cwd="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles/_main", - conf_dir="process-docs", - env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles", - git_root="/workspaces/process", - ) - == "/workspaces/process/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles" + assert get_runfiles_dir_impl( + cwd="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles/_main", + conf_dir="process-docs", + env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles", + git_root="/workspaces/process", + ) == ( + "/workspaces/process/bazel-out/k8-fastbuild/bin/process-docs/" + "incremental.runfiles" ) # in conf.py: - assert ( - get_runfiles_dir_impl( - cwd="/workspaces/process/process-docs", - conf_dir="process-docs", - env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles", - git_root="/workspaces/process", - ) - == "/workspaces/process/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles" + assert get_runfiles_dir_impl( + cwd="/workspaces/process/process-docs", + conf_dir="process-docs", + env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/process-docs/incremental.runfiles", + git_root="/workspaces/process", + ) == ( + "/workspaces/process/bazel-out/k8-fastbuild/bin/process-docs/" + "incremental.runfiles" ) @@ -83,12 +83,11 @@ def test_esbonio_old(): def test3(): # docs named differently, just to make sure nothing is hardcoded # bazel run //other-docs:incremental - assert ( - get_runfiles_dir_impl( - cwd="/workspaces/process/other-docs", - conf_dir="other-docs", - env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/other-docs/incremental.runfiles", - git_root="/workspaces/process", - ) - == "/workspaces/process/bazel-out/k8-fastbuild/bin/other-docs/incremental.runfiles" + assert get_runfiles_dir_impl( + cwd="/workspaces/process/other-docs", + conf_dir="other-docs", + env_runfiles="/home/vscode/.cache/bazel/_bazel_vscode/6084288f00f33db17acb4220ce8f1999/execroot/_main/bazel-out/k8-fastbuild/bin/other-docs/incremental.runfiles", + git_root="/workspaces/process", + ) == ( + "/workspaces/process/bazel-out/k8-fastbuild/bin/other-docs/incremental.runfiles" ) diff --git a/src/helper_lib/test_helper_lib.py b/src/helper_lib/test_helper_lib.py index 78042cb00..e3ca45d4c 100644 --- a/src/helper_lib/test_helper_lib.py +++ b/src/helper_lib/test_helper_lib.py @@ -42,7 +42,7 @@ def test_git_operations_with_no_commits(temp_dir): os.chdir(Path(git_dir).absolute()) # Should raise an exception when trying to get hash - with pytest.raises(Exception): + with pytest.raises(subprocess.CalledProcessError): get_current_git_hash(git_dir) diff --git a/src/tests/test_consumer.py b/src/tests/test_consumer.py index 6c722d25f..36ce33656 100644 --- a/src/tests/test_consumer.py +++ b/src/tests/test_consumer.py @@ -118,6 +118,7 @@ def sphinx_base_dir(tmp_path_factory: TempPathFactory, pytestconfig) -> Path: temp_dir = tmp_path_factory.mktemp("testing_dir") print(f"[blue]Using temporary directory: {temp_dir}[/blue]") return temp_dir + CACHE_DIR.mkdir(parents=True, exist_ok=True) print(f"[green]Using persistent cache directory: {CACHE_DIR}[/green]") return CACHE_DIR @@ -210,11 +211,10 @@ def parse_bazel_output(BR: BuildOutput, pytestconfig) -> BuildOutput: split_warnings = [x for x in err_lines if "WARNING: " in x] warning_dict: dict[str, list[str]] = defaultdict(list) - if pytestconfig.get_verbosity() >= 2: - if os.getenv("CI"): - print("[DEBUG] Raw warnings in CI:") - for i, warning in enumerate(split_warnings): - print(f"[DEBUG] Warning {i}: {repr(warning)}") + if pytestconfig.get_verbosity() >= 2 and os.getenv("CI"): + print("[DEBUG] Raw warnings in CI:") + for i, warning in enumerate(split_warnings): + print(f"[DEBUG] Warning {i}: {repr(warning)}") for raw_warning in split_warnings: # In the CLI we seem to have some ansi codes in the warnings. @@ -479,46 +479,44 @@ def setup_test_environment(sphinx_base_dir, pytestconfig): """Set up the test environment and return necessary paths and metadata.""" git_root = find_git_root() if git_root is None: - assert False, "Git root was none" + raise RuntimeError("Git root was not found") + gh_url = get_github_base_url() current_hash = get_current_git_commit(git_root) os.chdir(Path(sphinx_base_dir).absolute()) - verbosity = pytestconfig.get_verbosity() - if verbosity >= 2: - print(f"[DEBUG] git_root: {git_root}") + def debug_print(message): + if verbosity >= 2: + print(f"[DEBUG] {message}") - # Get GitHub URL and current hash for git override + debug_print(f"git_root: {git_root}") - if verbosity >= 2: - print(f"[DEBUG] gh_url: {gh_url}") - print(f"[DEBUG] current_hash: {current_hash}") - print( - "[DEBUG] Working directory has uncommitted changes: " - f"{has_uncommitted_changes(git_root)}" - ) - - # Create symlink for local docs-as-code - docs_as_code_dest = sphinx_base_dir / "docs_as_code" - if docs_as_code_dest.exists() or docs_as_code_dest.is_symlink(): - # Remove existing symlink/directory to recreate it - if docs_as_code_dest.is_symlink(): - docs_as_code_dest.unlink() - if verbosity >= 2: - print(f"[DEBUG] Removed existing symlink: {docs_as_code_dest}") - elif docs_as_code_dest.is_dir(): - import shutil - - shutil.rmtree(docs_as_code_dest) - if verbosity >= 2: - print(f"[DEBUG] Removed existing directory: {docs_as_code_dest}") - - docs_as_code_dest.symlink_to(git_root) + # Get GitHub URL and current hash for git override + debug_print(f"gh_url: {gh_url}") + debug_print(f"current_hash: {current_hash}") + debug_print( + "Working directory has uncommitted changes: " + f"{has_uncommitted_changes(git_root)}" + ) - if verbosity >= 2: - print(f"[DEBUG] Symlink created: {docs_as_code_dest} -> {git_root}") + def recreate_symlink(dest, target): + # Create symlink for local docs-as-code + if dest.exists() or dest.is_symlink(): + # Remove existing symlink/directory to recreate it + if dest.is_symlink(): + dest.unlink() + debug_print(f"Removed existing symlink: {dest}") + elif dest.is_dir(): + import shutil + + shutil.rmtree(dest) + debug_print(f"Removed existing directory: {dest}") + dest.symlink_to(target) + debug_print(f"Symlink created: {dest} -> {target}") + + recreate_symlink(sphinx_base_dir / "docs_as_code", git_root) return gh_url, current_hash