Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions vulnerabilities/pipelines/enhance_with_exploitdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,16 @@ def add_exploit(self):


def add_vulnerability_exploit(row, logger):
vulnerabilities = set()

aliases = row["codes"].split(";") if row["codes"] else []

if not aliases:
return 0

for raw_alias in aliases:
try:
if alias := Alias.objects.get(alias=raw_alias):
vulnerabilities.add(alias.vulnerability)
except Alias.DoesNotExist:
continue
vulnerabilities = (
Alias.objects.filter(alias__in=aliases, vulnerability__isnull=False)
.values_list("vulnerability_id", flat=True)
.distinct()
)

if not vulnerabilities:
logger(f"No vulnerability found for aliases {aliases}")
Expand All @@ -104,7 +101,7 @@ def add_vulnerability_exploit(row, logger):
add_exploit_references(row["codes"], row["source_url"], row["file"], vulnerability, logger)
try:
Exploit.objects.update_or_create(
vulnerability=vulnerability,
vulnerability_id=vulnerability,
data_source="Exploit-DB",
defaults={
"date_added": date_added,
Expand All @@ -125,7 +122,7 @@ def add_vulnerability_exploit(row, logger):
return 1


def add_exploit_references(ref_id, direct_url, path, vul, logger):
def add_exploit_references(ref_id, direct_url, path, vul_id, logger):
url_map = {
"file_url": f"https://gitlab.com/exploit-database/exploitdb/-/blob/main/{path}",
"direct_url": direct_url,
Expand All @@ -144,7 +141,7 @@ def add_exploit_references(ref_id, direct_url, path, vul, logger):

if created:
VulnerabilityRelatedReference.objects.get_or_create(
vulnerability=vul,
vulnerability_id=vul_id,
reference=ref,
)

Expand Down
41 changes: 22 additions & 19 deletions vulnerabilities/pipelines/enhance_with_kev.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,29 @@ def add_vulnerability_exploit(kev_vul, logger):
if not cve_id:
return 0

vulnerability = None
try:
if alias := Alias.objects.get(alias=cve_id):
vulnerability = alias.vulnerability
except Alias.DoesNotExist:
vulnerabilities = (
Alias.objects.filter(alias=cve_id, vulnerability__isnull=False)
.values_list("vulnerability", flat=True)
.distinct()
)

if not vulnerabilities:
logger(f"No vulnerability found for aliases {cve_id}")
return 0

Exploit.objects.update_or_create(
vulnerability=vulnerability,
data_source="KEV",
defaults={
"description": kev_vul["shortDescription"],
"date_added": kev_vul["dateAdded"],
"required_action": kev_vul["requiredAction"],
"due_date": kev_vul["dueDate"],
"notes": kev_vul["notes"],
"known_ransomware_campaign_use": True
if kev_vul["knownRansomwareCampaignUse"] == "Known"
else False,
},
)
for vulnerability in vulnerabilities:
Exploit.objects.update_or_create(
vulnerability_id=vulnerability,
data_source="KEV",
defaults={
"description": kev_vul["shortDescription"],
"date_added": kev_vul["dateAdded"],
"required_action": kev_vul["requiredAction"],
"due_date": kev_vul["dueDate"],
"notes": kev_vul["notes"],
"known_ransomware_campaign_use": True
if kev_vul["knownRansomwareCampaignUse"] == "Known"
else False,
},
)
return 1
14 changes: 6 additions & 8 deletions vulnerabilities/pipelines/enhance_with_metasploit.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def add_vulnerability_exploits(self):


def add_vulnerability_exploit(record, logger):
vulnerabilities = set()
references = record.get("references", [])

interesting_references = [
Expand All @@ -76,12 +75,11 @@ def add_vulnerability_exploit(record, logger):
if not interesting_references:
return 0

for ref in interesting_references:
try:
if alias := Alias.objects.get(alias=ref):
vulnerabilities.add(alias.vulnerability)
except Alias.DoesNotExist:
continue
vulnerabilities = (
Alias.objects.filter(alias__in=interesting_references, vulnerability__isnull=False)
.values_list("vulnerability", flat=True)
.distinct()
)

if not vulnerabilities:
logger(f"No vulnerability found for aliases {interesting_references}")
Expand All @@ -107,7 +105,7 @@ def add_vulnerability_exploit(record, logger):

for vulnerability in vulnerabilities:
Exploit.objects.update_or_create(
vulnerability=vulnerability,
vulnerability_id=vulnerability,
data_source="Metasploit",
defaults={
"description": description,
Expand Down
15 changes: 15 additions & 0 deletions vulnerabilities/tests/pipelines/test_enhance_with_exploitdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,18 @@ def test_exploit_db_improver(mock_get):
# Run Exploit-DB Improver again when there are matching aliases.
improver.execute()
assert Exploit.objects.count() == 1


@pytest.mark.django_db
@mock.patch("requests.get")
def test_invalid_exploit_db_improver(mock_get):
mock_response = Mock(status_code=200)
with open(TEST_DATA, "r") as f:
mock_response.text = f.read()
mock_get.return_value = mock_response

improver = ExploitDBImproverPipeline()
Alias.objects.create(alias="CVE-2009-3699", vulnerability=None)
status, _ = improver.execute()
assert status == 0
assert Exploit.objects.count() == 0
15 changes: 15 additions & 0 deletions vulnerabilities/tests/pipelines/test_enhance_with_kev.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,18 @@ def test_kev_improver(mock_get):
# Run Kev Improver again when there are matching aliases.
improver.execute()
assert Exploit.objects.count() == 1


@pytest.mark.django_db
@mock.patch("requests.get")
def test_invalid_kev_improver(mock_get):
mock_response = Mock(status_code=200)
mock_response.json.return_value = load_json(TEST_DATA)
mock_get.return_value = mock_response

improver = VulnerabilityKevPipeline()
Alias.objects.create(alias="CVE-2021-38647", vulnerability=None)

status, _ = improver.execute()
assert status == 0
assert Exploit.objects.count() == 0
14 changes: 14 additions & 0 deletions vulnerabilities/tests/pipelines/test_enhance_with_metasploit.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ def test_metasploit_improver(mock_get):
# Run metasploit Improver again when there are matching aliases.
improver.execute()
assert Exploit.objects.count() == 1


@pytest.mark.django_db
@mock.patch("requests.get")
def test_invalid_metasploit_improver(mock_get):
mock_response = Mock(status_code=200)
mock_response.json.return_value = load_json(TEST_DATA)
mock_get.return_value = mock_response

Alias.objects.create(alias="CVE-2007-4387", vulnerability=None) # Alias without vulnerability
improver = MetasploitImproverPipeline()
status, _ = improver.execute()
assert status == 0
assert Exploit.objects.count() == 0
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@
#

import json
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import patch

import pytest

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import VulnrichImporterPipeline
from vulnerabilities.severity_systems import Cvssv4ScoringSystem


@pytest.fixture
Expand Down Expand Up @@ -58,8 +59,10 @@ def mock_pathlib(tmp_path):
"metrics": [
{
"cvssV4_0": {
"baseScore": 7.5,
"vectorString": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
"version": "4.0",
"baseScore": 5.3,
"baseSeverity": "MEDIUM",
"vectorString": "CVSS:4.0/AV:N/AC:L/AT:N/PR:L/UI:N/VC:L/VI:L/VA:L/SC:N/SI:N/SA:N",
}
}
],
Expand Down Expand Up @@ -103,15 +106,20 @@ def test_collect_advisories(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs)
mock_parse.return_value = AdvisoryDataV2(
advisory_id="CVE-2021-1234",
summary="Sample PyPI vulnerability",
references=[{"url": "https://example.com"}],
references=[ReferenceV2(url="https://example.com")],
affected_packages=[],
weaknesses=[],
url="https://example.com",
severities=[
VulnerabilitySeverity(
system="cvssv4",
value=7.5,
scoring_elements="AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
system=Cvssv4ScoringSystem(
identifier="cvssv4",
name="CVSSv4 Base Score",
url="https://www.first.org/cvss/v4-0/",
notes="CVSSv4 base score and vector",
),
value="5.3",
scoring_elements="CVSS:4.0/AV:N/AC:L/AT:N/PR:L/UI:N/VC:L/VI:L/VA:L/SC:N/SI:N/SA:N",
)
],
)
Expand All @@ -126,6 +134,7 @@ def test_collect_advisories(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs)
assert advisory.advisory_id == "CVE-2021-1234"
assert advisory.summary == "Sample PyPI vulnerability"
assert advisory.url == "https://example.com"
assert len(advisory.severities) == 1


def test_clean_downloads(mock_vcs_response, mock_fetch_via_vcs):
Expand Down Expand Up @@ -165,8 +174,10 @@ def test_parse_cve_advisory(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs)
"metrics": [
{
"cvssV4_0": {
"baseScore": 7.5,
"vectorString": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
"version": "4.0",
"baseScore": 5.3,
"baseSeverity": "MEDIUM",
"vectorString": "CVSS:4.0/AV:N/AC:L/AT:N/PR:L/UI:N/VC:L/VI:L/VA:L/SC:N/SI:N/SA:N",
}
}
],
Expand All @@ -185,7 +196,7 @@ def test_parse_cve_advisory(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs)
assert advisory.summary == "Sample PyPI vulnerability"
assert advisory.url == advisory_url
assert len(advisory.severities) == 1
assert advisory.severities[0].value == 7.5
assert advisory.severities[0].value == 5.3


def test_collect_advisories_with_invalid_json(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
Expand Down