Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dojo/importers/default_reimporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,8 +764,12 @@ def process_matched_mitigated_finding(
else:
# TODO: Delete this after the move to Locations
# Reactivate mitigated endpoints that are not false positives, out of scope, or risk accepted
# status_finding_non_special is prefetched by build_candidate_scope_queryset
self.endpoint_manager.chunk_endpoints_and_reactivate(existing_finding.status_finding_non_special)
# status_finding_non_special is prefetched by build_candidate_scope_queryset.
# Falls back to a DB query when the finding was not loaded through that queryset
# (e.g. a finding created during the same reimport batch).
self.endpoint_manager.chunk_endpoints_and_reactivate(
self.endpoint_manager.get_non_special_endpoint_statuses(existing_finding),
)
existing_finding.notes.add(note)
self.reactivated_items.append(existing_finding)
# The new finding is active while the existing on is mitigated. The existing finding needs to
Expand Down
26 changes: 24 additions & 2 deletions dojo/importers/endpoint_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from django.core.exceptions import MultipleObjectsReturned, ValidationError
from django.db.models.query_utils import Q
from django.urls import reverse
from django.utils import timezone

Expand Down Expand Up @@ -147,6 +148,25 @@ def chunk_endpoints_and_mitigate(
) -> None:
dojo_dispatch_task(EndpointManager.mitigate_endpoint_status, endpoint_status_list, user, sync=True)

@staticmethod
def get_non_special_endpoint_statuses(
finding: Finding,
) -> list[Endpoint_Status]:
"""
Return the non-special (not false_positive, out_of_scope, or risk_accepted) endpoint
statuses for a finding. Uses the prefetched ``status_finding_non_special`` attribute
when available (set by ``build_candidate_scope_queryset`` with ``mode="reimport"``),
and falls back to a database query otherwise — e.g. when the finding was created
during the same reimport batch and therefore never went through the prefetch queryset.
"""
if hasattr(finding, "status_finding_non_special"):
return finding.status_finding_non_special
return list(
finding.status_finding.exclude(
Q(false_positive=True) | Q(out_of_scope=True) | Q(risk_accepted=True),
).select_related("endpoint"),
)

def update_endpoint_status(
self,
existing_finding: Finding,
Expand All @@ -158,8 +178,10 @@ def update_endpoint_status(
# New endpoints are already added in serializers.py / views.py (see comment "# for existing findings: make sure endpoints are present or created")
# So we only need to mitigate endpoints that are no longer present
# status_finding_non_special is prefetched by build_candidate_scope_queryset with the
# special-status exclusion and endpoint select_related already applied at the DB level
existing_finding_endpoint_status_list = existing_finding.status_finding_non_special
# special-status exclusion and endpoint select_related already applied at the DB level.
# Falls back to a DB query when the finding was not loaded through that queryset
# (e.g. a finding created during the same reimport batch).
existing_finding_endpoint_status_list = self.get_non_special_endpoint_statuses(existing_finding)
new_finding_endpoints_list = new_finding.unsaved_endpoints
if new_finding.is_mitigated:
# New finding is mitigated, so mitigate all old endpoints
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"service": "StackHawk",
"scanCompleted": {
"scan": {
"id": "f3aa6752-8fef-58fa-c854-1d3g8e972f38",
"hawkscanVersion": "2.1.1",
"env": "Development",
"status": "COMPLETED",
"application": "Secured Application",
"startedTimestamp": "2022-02-16T23:07:19.575Z",
"scanURL": "https://app.stackhawk.com/scans/f3aa6752-8fef-58fa-c854-1d3g8e972f38"
},
"scanDuration": "21",
"spiderDuration": "45",
"completedScanStats": {
"urlsCount": "2",
"duration": "66",
"scanResultsStats": {
"totalCount": "2",
"lowCount": "0",
"mediumCount": "0",
"highCount": "2",
"lowTriagedCount": "0",
"mediumTriagedCount": "0",
"highTriagedCount": "0"
}
},
"findings": [
{
"pluginId": "90001",
"pluginName": "Insecure JSF ViewState",
"severity": "High",
"host": "https://localhost:9000",
"paths": [
{
"path": "/app/login",
"method": "GET",
"status": "NEW",
"pathURL": "https://app.stackhawk.com/scans/f3aa6752-8fef-58fa-c854-1d3g8e972f38/finding/90001/path/1/message/1"
}
],
"pathStats": [
{
"status": "NEW",
"count": 1
}
],
"totalCount": "1",
"category": "Security Misconfiguration",
"findingURL": "https://app.stackhawk.com/scans/f3aa6752-8fef-58fa-c854-1d3g8e972f38/finding/90001"
},
{
"pluginId": "90002",
"pluginName": "Insecure HTTP Header",
"severity": "High",
"host": "https://localhost:9000",
"paths": [
{
"path": "/app/dashboard",
"method": "GET",
"status": "NEW",
"pathURL": "https://app.stackhawk.com/scans/f3aa6752-8fef-58fa-c854-1d3g8e972f38/finding/90002/path/1/message/1"
}
],
"pathStats": [
{
"status": "NEW",
"count": 1
}
],
"totalCount": "1",
"category": "Security Misconfiguration",
"findingURL": "https://app.stackhawk.com/scans/f3aa6752-8fef-58fa-c854-1d3g8e972f38/finding/90002"
}
]
}
}
249 changes: 249 additions & 0 deletions unittests/test_reimport_prefetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
"""
Regression tests for handling duplicate findings within the same reimport report.

When a scan report contains two findings that produce the same hash_code, the first
creates a new finding via process_finding_that_was_not_matched() and gets added to
the candidate dictionaries. When the second finding in the same batch matches against
this newly-created finding, accessing existing_finding.status_finding_non_special
raises AttributeError because the finding was never loaded through
build_candidate_scope_queryset (which sets up the Prefetch with to_attr).

Bugfix: https://github.com/DefectDojo/django-DefectDojo/pull/14569
Batch endpoint optimization (related): https://github.com/DefectDojo/django-DefectDojo/pull/14489
"""


from crum import impersonate
from django.conf import settings
from django.test import override_settings
from django.utils import timezone

from dojo.importers.default_reimporter import DefaultReImporter
from dojo.location.models import LocationFindingReference
from dojo.location.status import FindingLocationStatus
from dojo.models import (
Development_Environment,
Dojo_User,
Endpoint,
Endpoint_Status,
Engagement,
Finding,
Product,
Product_Type,
Test,
Test_Type,
User,
UserContactInfo,
)

from .dojo_test_case import DojoTestCase, get_unit_tests_scans_path, skip_unless_v2

SCAN_TYPE = "StackHawk HawkScan"

# Reimport: two findings with different pluginIds (so the parser keeps both as separate
# findings) but identical hash_code fields (component_name + component_version).
# Each finding has one unique endpoint/location path (/app/login and /app/dashboard).
# This triggers the same-batch matching scenario where the second finding matches
# against the first one that was just created in the same reimport batch.
REIMPORT_SCAN = get_unit_tests_scans_path("stackhawk") / "stackhawk_two_vul_same_hashcode_fabricated.json"


def _hashcode_fields_without_vuln_id():
"""
Return a copy of HASHCODE_FIELDS_PER_SCANNER with StackHawk configured to
hash only on component_name + component_version (excluding vuln_id_from_tool).

By default StackHawk hashes on [vuln_id_from_tool, component_name, component_version].
Since vuln_id_from_tool maps to pluginId and the parser deduplicates by pluginId,
it is normally impossible for two separate findings to share a hash_code.

By removing vuln_id_from_tool from the hash fields, two findings with different
pluginIds but the same application/env metadata will produce the same hash_code,
allowing us to exercise the same-batch matching code path.
"""
fields = dict(settings.HASHCODE_FIELDS_PER_SCANNER)
fields[SCAN_TYPE] = ["component_name", "component_version"]
return fields


class ReimportDuplicateFindingsTestBase(DojoTestCase):

"""Shared setup for duplicate-findings-in-same-report tests."""

def setUp(self):
super().setUp()
testuser, _ = User.objects.get_or_create(username="admin")
UserContactInfo.objects.get_or_create(user=testuser, defaults={"block_execution": True})

self.system_settings(enable_deduplication=True)
self.system_settings(enable_product_grade=False)

product_type, _ = Product_Type.objects.get_or_create(name="test")
product, _ = Product.objects.get_or_create(
name="ReimportPrefetchTest",
description="Test",
prod_type=product_type,
)
engagement, _ = Engagement.objects.get_or_create(
name="Test Engagement",
product=product,
target_start=timezone.now(),
target_end=timezone.now(),
)
self.lead, _ = User.objects.get_or_create(username="admin")
environment, _ = Development_Environment.objects.get_or_create(name="Development")
test_type, _ = Test_Type.objects.get_or_create(name=SCAN_TYPE)
self.test = Test.objects.create(
engagement=engagement,
test_type=test_type,
scan_type=SCAN_TYPE,
target_start=timezone.now(),
target_end=timezone.now(),
environment=environment,
)
self.product = product

def _reimport_with_overridden_hashcode(self):
"""Reimport the two-finding scan file with overridden hash settings."""
hashcode_override = _hashcode_fields_without_vuln_id()
with (
impersonate(Dojo_User.objects.get(username="admin")),
override_settings(HASHCODE_FIELDS_PER_SCANNER=hashcode_override),
REIMPORT_SCAN.open(encoding="utf-8") as scan,
):
reimporter = DefaultReImporter(
test=self.test,
user=self.lead,
lead=self.lead,
scan_date=None,
minimum_severity="Info",
active=True,
verified=True,
sync=True,
scan_type=SCAN_TYPE,
)
return reimporter.process_scan(scan)


@skip_unless_v2
class TestReimportDuplicateFindingsEndpointHandling(ReimportDuplicateFindingsTestBase):

"""Regression test: reimport must handle endpoints correctly for duplicate findings in the same report."""

def test_reimport_duplicate_findings_in_same_report_endpoints(self):
"""
Reimporting a report with two findings that share the same hash_code must
not raise AttributeError on status_finding_non_special, and must correctly
create endpoints for both findings.

The scan file has two findings with different pluginIds, each with one
unique endpoint (/app/login and /app/dashboard). With the overridden hash
settings, both produce the same hash_code. The first finding is created as
new and added to candidates. The second finding matches against it (a
batch-created finding), and its endpoint is added via finding_post_processing.
"""
# This previously raised:
# AttributeError: 'Finding' object has no attribute 'status_finding_non_special'
_test, _, len_new, len_closed, _, _, _ = self._reimport_with_overridden_hashcode()

# The first finding is new (empty test, no candidates). The second finding
# matches the first (same hash_code) so it is not counted as new.
self.assertEqual(len_new, 1, "Reimport should create one new finding (second matches first)")
self.assertEqual(len_closed, 0, "No findings should be closed")

# Only one finding should exist — the second was matched to the first
findings = Finding.objects.filter(test=self.test)
self.assertEqual(findings.count(), 1, "Only one finding should exist after reimport")

finding = findings.first()

# The first finding (new, pluginId=90001) creates endpoint /app/login.
# The second finding (pluginId=90002) matches the first via hash_code.
# finding_post_processing runs for BOTH iterations, adding each finding's
# endpoints to the (single) matched finding. So the finding ends up with
# 2 endpoints: /app/login (from the first) and /app/dashboard (from the second).
endpoints = Endpoint.objects.filter(product=self.product)
self.assertEqual(endpoints.count(), 2, "Two endpoints should be created (one per finding in report)")

endpoint_statuses = Endpoint_Status.objects.filter(finding=finding)
self.assertEqual(endpoint_statuses.count(), 2, "Finding should have two endpoint statuses")

# Because the second finding is dynamic, update_endpoint_status() compares
# the first finding's existing endpoint statuses against the second finding's
# unsaved_endpoints. The second finding only has /app/dashboard, so /app/login
# (which belongs to the first finding) is considered "no longer present" and
# gets mitigated. This is arguably wrong for batch-created findings — both
# endpoints came from the same report — but it is the current behavior.
self.assertEqual(
endpoint_statuses.filter(mitigated=False).count(),
1,
"One endpoint status should be active (/app/dashboard from the matched finding)",
)
self.assertEqual(
endpoint_statuses.filter(mitigated=True).count(),
1,
"One endpoint status should be mitigated (/app/login — mitigated by update_endpoint_status "
"because it is not in the second finding's endpoint list)",
)


@override_settings(V3_FEATURE_LOCATIONS=True)
class TestReimportDuplicateFindingsLocationHandling(ReimportDuplicateFindingsTestBase):

"""Test that reimport handles locations correctly for duplicate findings in the same report."""

def test_reimport_duplicate_findings_in_same_report_locations(self):
"""
Reimporting a report with two findings that share the same hash_code must
correctly create locations for both findings.

The locations code does not use the to_attr prefetch that caused the
AttributeError in the endpoint code path, so it does not crash. However,
the same logical issue applies: update_location_status() compares the
batch-created finding's locations against the second finding's unsaved
locations, mitigating locations that are "no longer present".

The scan file has two findings with different pluginIds, each with one
unique location (https://localhost:9000/app/login and .../app/dashboard).
With the overridden hash settings, both produce the same hash_code. The
first finding is created as new and added to candidates. The second finding
matches against it (a batch-created finding).
"""
_test, _, len_new, len_closed, _, _, _ = self._reimport_with_overridden_hashcode()

# The first finding is new (empty test, no candidates). The second finding
# matches the first (same hash_code) so it is not counted as new.
self.assertEqual(len_new, 1, "Reimport should create one new finding (second matches first)")
self.assertEqual(len_closed, 0, "No findings should be closed")

# Only one finding should exist — the second was matched to the first
findings = Finding.objects.filter(test=self.test)
self.assertEqual(findings.count(), 1, "Only one finding should exist after reimport")

finding = findings.first()

# The first finding (new, pluginId=90001) creates location for /app/login.
# The second finding (pluginId=90002) matches the first via hash_code.
# finding_post_processing runs for BOTH iterations, adding each finding's
# locations to the (single) matched finding. So the finding ends up with
# 2 location references pointing to 2 distinct Location objects.
location_refs = LocationFindingReference.objects.filter(finding=finding)
self.assertEqual(location_refs.count(), 2, "Finding should have two location references")

# Same behavior as endpoints: update_location_status() compares the first
# finding's existing location refs against the second finding's unsaved_locations.
# The second finding only has /app/dashboard, so /app/login (which belongs to
# the first finding) is considered "no longer present" and gets mitigated.
# This is arguably wrong for batch-created findings — both locations came from
# the same report — but it is the current behavior.
self.assertEqual(
location_refs.filter(status=FindingLocationStatus.Active).count(),
1,
"One location ref should be active (/app/dashboard from the matched finding)",
)
self.assertEqual(
location_refs.filter(status=FindingLocationStatus.Mitigated).count(),
1,
"One location ref should be mitigated (/app/login — mitigated by update_location_status "
"because it is not in the second finding's location list)",
)
Loading