From aa26e1922732062c181e7eddf3652a6c12ae3b47 Mon Sep 17 00:00:00 2001 From: Dhirenderchoudhary Date: Mon, 2 Mar 2026 20:10:51 +0530 Subject: [PATCH] Enhance advisory grouping with alias and summary similarity Signed-off-by: Dhirenderchoudhary --- .../tests/test_advisory_grouping.py | 299 ++++++++++++++++++ vulnerabilities/utils.py | 118 ++++++- 2 files changed, 401 insertions(+), 16 deletions(-) create mode 100644 vulnerabilities/tests/test_advisory_grouping.py diff --git a/vulnerabilities/tests/test_advisory_grouping.py b/vulnerabilities/tests/test_advisory_grouping.py new file mode 100644 index 000000000..da891c812 --- /dev/null +++ b/vulnerabilities/tests/test_advisory_grouping.py @@ -0,0 +1,299 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from unittest import TestCase + +import pytest +from django.test import TestCase as DjangoTestCase +from packageurl import PackageURL +from univers.version_range import VersionRange + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.pipes.advisory import insert_advisory_v2 +from vulnerabilities.utils import SUMMARY_SIMILARITY_THRESHOLD +from vulnerabilities.utils import _find +from vulnerabilities.utils import _union +from vulnerabilities.utils import compute_summary_similarity +from vulnerabilities.utils import group_advisories_by_content + + +class TestComputeSummarySimilarity(TestCase): + + def test_empty_texts_return_zero(self): + assert compute_summary_similarity("", "some text") == 0.0 + assert compute_summary_similarity("some text", "") == 0.0 + assert compute_summary_similarity("", "") == 0.0 + assert compute_summary_similarity(None, "text") == 0.0 + assert compute_summary_similarity("text", None) == 0.0 + + def test_identical_texts_return_one(self): + text = "A critical vulnerability in the AccessControl module." + assert compute_summary_similarity(text, text) == 1.0 + + def test_identical_after_normalization(self): + text1 = "Security flaw in module" + text2 = " Security Flaw In Module " + assert compute_summary_similarity(text1, text2) == 1.0 + + def test_completely_different_texts(self): + text1 = "Buffer overflow in network stack" + text2 = "Unrelated cooking recipe for chocolate cake" + similarity = compute_summary_similarity(text1, text2) + assert similarity < SUMMARY_SIMILARITY_THRESHOLD + + def test_short_summary_contained_in_long_summary(self): + short = ( + "The module AccessControl defines security policies for " + "Python code used in restricted code within Zope applications." + ) + long = ( + "The module AccessControl defines security policies for " + "Python code used in restricted code within Zope applications. " + "Restricted code is any code that resides in Zope's object database, " + "such as the contents of Script (Python) objects. The policies " + "defined in AccessControl severely restrict access to Python modules " + "and only exempt a few that are deemed safe." + ) + similarity = compute_summary_similarity(short, long) + assert similarity >= SUMMARY_SIMILARITY_THRESHOLD + + def test_similar_summaries_above_threshold(self): + text1 = "SQL injection vulnerability in login form of web application" + text2 = "SQL injection vulnerability found in the login form of the web application" + similarity = compute_summary_similarity(text1, text2) + assert similarity >= SUMMARY_SIMILARITY_THRESHOLD + + def test_partially_overlapping_summaries(self): + text1 = "Remote code execution via crafted XML payload" + text2 = "Remote code execution through specially crafted XML input" + similarity = compute_summary_similarity(text1, text2) + assert similarity > 0.4 + + def test_symmetry(self): + text1 = "Cross-site scripting in admin panel" + text2 = "XSS vulnerability found in the administration panel" + assert compute_summary_similarity(text1, text2) == compute_summary_similarity( + text2, text1 + ) + + +class TestUnionFind(TestCase): + + def test_find_on_singleton(self): + parent = [0, 1, 2] + assert _find(parent, 0) == 0 + assert _find(parent, 2) == 2 + + def test_union_merges_two_sets(self): + parent = [0, 1, 2, 3] + _union(parent, 0, 1) + assert _find(parent, 0) == _find(parent, 1) + + def test_transitive_union(self): + parent = [0, 1, 2, 3, 4] + _union(parent, 0, 1) + _union(parent, 2, 3) + _union(parent, 1, 3) + root = _find(parent, 0) + assert _find(parent, 1) == root + assert _find(parent, 2) == root + assert _find(parent, 3) == root + assert _find(parent, 4) == 4 + + def test_path_compression(self): + parent = [0, 0, 1, 2] + root = _find(parent, 3) + assert root == 0 + assert parent[3] == 0 or _find(parent, 3) == 0 + + +@pytest.mark.django_db +class TestGroupAdvisoriesByContent(DjangoTestCase): + + def _create_advisory(self, advisory_id, datasource_id, summary, aliases=None, url=None): + advisory_data = AdvisoryDataV2( + advisory_id=advisory_id, + aliases=aliases or [], + summary=summary, + affected_packages=[ + AffectedPackageV2( + package=PackageURL(type="pypi", name="accesscontrol"), + affected_version_range=VersionRange.from_string("vers:pypi/>=4.0|<4.3"), + ), + ], + references=[ReferenceV2(url=url or "https://example.com")], + url=url or f"https://example.com/{advisory_id}", + ) + insert_advisory_v2( + advisory=advisory_data, + pipeline_id=datasource_id, + ) + return AdvisoryV2.objects.get( + datasource_id=datasource_id, + advisory_id=advisory_id, + ) + + def test_group_by_exact_content_same_hash(self): + adv1 = self._create_advisory( + advisory_id="ADV-001", + datasource_id="source_a", + summary="Identical summary", + url="https://source-a.example.com/ADV-001", + ) + adv2 = self._create_advisory( + advisory_id="ADV-001", + datasource_id="source_b", + summary="Identical summary", + url="https://source-b.example.com/ADV-001", + ) + grouped = group_advisories_by_content([adv1, adv2]) + assert len(grouped) == 1 + group = list(grouped.values())[0] + all_advisories = {group["primary"]} | group["secondary"] + assert adv1 in all_advisories + assert adv2 in all_advisories + + def test_different_content_no_alias_no_similarity(self): + adv1 = self._create_advisory( + advisory_id="ADV-100", + datasource_id="source_a", + summary="Buffer overflow in network stack", + url="https://example.com/ADV-100", + ) + adv2 = self._create_advisory( + advisory_id="ADV-200", + datasource_id="source_b", + summary="Unrelated cooking instructions for pizza dough", + url="https://example.com/ADV-200", + ) + grouped = group_advisories_by_content([adv1, adv2]) + assert len(grouped) == 2 + + def test_group_by_shared_alias(self): + adv1 = self._create_advisory( + advisory_id="CVE-2021-32807", + datasource_id="gitlab_importer_v2", + summary="Improperly Controlled Modification of Dynamically-Determined Object Attributes", + aliases=["CVE-2021-32807", "GHSA-qcx9-j53g-ccgf"], + url="https://gitlab.com/gitlab-org/advisories-community/-/blob/main/pypi/AccessControl/CVE-2021-32807.yml", + ) + adv2 = self._create_advisory( + advisory_id="PYSEC-2021-335", + datasource_id="pypa_importer_v2", + summary=( + "The module AccessControl defines security policies for Python code " + "used in restricted code within Zope applications. Restricted code is " + "any code that resides in Zopes object database." + ), + aliases=["CVE-2021-32807", "GHSA-qcx9-j53g-ccgf"], + url="https://github.com/pypa/advisory-database/blob/main/vulns/accesscontrol/PYSEC-2021-335.yaml", + ) + grouped = group_advisories_by_content([adv1, adv2]) + assert len(grouped) == 1 + group = list(grouped.values())[0] + all_advisories = {group["primary"]} | group["secondary"] + assert adv1 in all_advisories + assert adv2 in all_advisories + + def test_alias_chain_merges_three_advisories(self): + adv_a = self._create_advisory( + advisory_id="ADV-A", + datasource_id="source_1", + summary="Summary A about access control", + aliases=["CVE-2099-0001"], + url="https://example.com/a", + ) + adv_b = self._create_advisory( + advisory_id="ADV-B", + datasource_id="source_2", + summary="Summary B about restricted code", + aliases=["CVE-2099-0001", "GHSA-xxxx-yyyy-zzzz"], + url="https://example.com/b", + ) + adv_c = self._create_advisory( + advisory_id="ADV-C", + datasource_id="source_3", + summary="Summary C about Zope security", + aliases=["GHSA-xxxx-yyyy-zzzz"], + url="https://example.com/c", + ) + grouped = group_advisories_by_content([adv_a, adv_b, adv_c]) + assert len(grouped) == 1 + + def test_group_by_summary_similarity(self): + base_summary = ( + "SQL injection vulnerability in the login form of the web application " + "allows remote attackers to execute arbitrary SQL commands" + ) + variant_summary = ( + "SQL injection vulnerability in the login form of the web application " + "allows remote attackers to execute arbitrary SQL commands via crafted input" + ) + adv1 = self._create_advisory( + advisory_id="ADV-SQL-1", + datasource_id="src_x", + summary=base_summary, + url="https://example.com/sql1", + ) + adv2 = self._create_advisory( + advisory_id="ADV-SQL-2", + datasource_id="src_y", + summary=variant_summary, + url="https://example.com/sql2", + ) + grouped = group_advisories_by_content([adv1, adv2]) + assert len(grouped) == 1 + + def test_highest_precedence_becomes_primary(self): + adv_low = self._create_advisory( + advisory_id="ADV-P1", + datasource_id="low_src", + summary="Same summary here", + aliases=["CVE-2099-9999"], + url="https://example.com/p1", + ) + adv_high = self._create_advisory( + advisory_id="ADV-P2", + datasource_id="high_src", + summary="Same summary here", + aliases=["CVE-2099-9999"], + url="https://example.com/p2", + ) + adv_low.precedence = 1 + adv_low.save() + adv_high.precedence = 10 + adv_high.save() + + grouped = group_advisories_by_content([adv_low, adv_high]) + assert len(grouped) == 1 + group = list(grouped.values())[0] + assert group["primary"] == adv_high + assert adv_low in group["secondary"] + + def test_empty_input(self): + assert group_advisories_by_content([]) == {} + + def test_single_advisory(self): + adv = self._create_advisory( + advisory_id="SOLO-1", + datasource_id="solo_src", + summary="Lonely advisory", + url="https://example.com/solo", + ) + grouped = group_advisories_by_content([adv]) + assert len(grouped) == 1 + group = list(grouped.values())[0] + assert group["primary"] == adv + assert group["secondary"] == set() + + def test_none_input(self): + assert group_advisories_by_content(None) == {} diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index 82f29bcea..99a31fbb8 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -10,6 +10,7 @@ import bisect import csv import dataclasses +import difflib import hashlib import json import logging @@ -52,6 +53,8 @@ commit_regex = re.compile(r"\b[0-9a-f]{5,40}\b", re.IGNORECASE) is_commit = commit_regex.fullmatch +SUMMARY_SIMILARITY_THRESHOLD = 0.8 + @dataclasses.dataclass(order=True, frozen=True) class AffectedPackage: @@ -865,26 +868,109 @@ def compute_patch_checksum(patch_text: str): return hashlib.sha512(patch_text.encode("utf-8")).hexdigest() -def group_advisories_by_content(advisories): - grouped = {} +def _normalize_text(text): + if not text: + return "" + return " ".join(text.lower().split()) - for advisory in advisories: - content_hash = advisory.compute_advisory_content() - entry = grouped.setdefault( - content_hash, - {"primary": advisory, "secondary": set()}, - ) +def compute_summary_similarity(text1, text2): + if not text1 or not text2: + return 0.0 - primary = entry["primary"] + norm1 = _normalize_text(text1) + norm2 = _normalize_text(text2) - if advisory is primary: - continue + if not norm1 or not norm2: + return 0.0 - if advisory.precedence > primary.precedence: - entry["primary"] = advisory - entry["secondary"].add(primary) - else: - entry["secondary"].add(advisory) + if norm1 == norm2: + return 1.0 + + seq_ratio = difflib.SequenceMatcher(None, norm1, norm2).ratio() + + short, long = (norm1, norm2) if len(norm1) <= len(norm2) else (norm2, norm1) + short_words = short.split() + long_words = long.split() + + if len(short_words) > 0 and len(long_words) >= len(short_words): + if long_words[: len(short_words)] == short_words: + return 1.0 + + prefix_match_count = 0 + for i, word in enumerate(short_words): + if i < len(long_words) and long_words[i] == word: + prefix_match_count += 1 + else: + break + if prefix_match_count >= len(short_words) * 0.9: + return max(seq_ratio, 0.9) + + if short in long: + return max(seq_ratio, 0.85) + + return seq_ratio + + +def _find(parent, i): + if parent[i] != i: + parent[i] = _find(parent, parent[i]) + return parent[i] + + +def _union(parent, i, j): + root_i = _find(parent, i) + root_j = _find(parent, j) + if root_i != root_j: + parent[root_j] = root_i + + +def group_advisories_by_content(advisories): + if not advisories: + return {} + + adv_list = list(advisories) + n = len(adv_list) + if n == 0: + return {} + + parent = list(range(n)) + + content_groups = defaultdict(list) + for i, adv in enumerate(adv_list): + content_hash = adv.compute_advisory_content() + content_groups[content_hash].append(i) + + for indices in content_groups.values(): + for idx in indices[1:]: + _union(parent, indices[0], idx) + + alias_groups = defaultdict(list) + for i, adv in enumerate(adv_list): + for alias_obj in adv.aliases.all(): + alias_groups[alias_obj.alias].append(i) + + for indices in alias_groups.values(): + for idx in indices[1:]: + _union(parent, indices[0], idx) + + for i in range(n): + for j in range(i + 1, n): + if _find(parent, i) == _find(parent, j): + continue + sim = compute_summary_similarity(adv_list[i].summary, adv_list[j].summary) + if sim >= SUMMARY_SIMILARITY_THRESHOLD: + _union(parent, i, j) + + final_groups = defaultdict(list) + for i in range(n): + final_groups[_find(parent, i)].append(adv_list[i]) + + grouped = {} + for root_idx, group_advs in final_groups.items(): + primary = max(group_advs, key=lambda a: a.precedence) + secondary = {a for a in group_advs if a is not primary} + group_key = primary.compute_advisory_content() + grouped[group_key] = {"primary": primary, "secondary": secondary} return grouped