diff --git a/build_effective_set_generator/scripts/sboms_retention_policy.py b/build_effective_set_generator/scripts/sboms_retention_policy.py index 2cb1a4447..9b849c5b1 100644 --- a/build_effective_set_generator/scripts/sboms_retention_policy.py +++ b/build_effective_set_generator/scripts/sboms_retention_policy.py @@ -1,5 +1,5 @@ -from envgenehelper import getenv_with_error, get_envgene_config_yaml, logger, cleanup_dir_by_size, deleteFileIfExists, \ - cleanup_dir_by_age, get_sboms_dir +from envgenehelper import getenv_with_error, get_envgene_config_yaml, logger, deleteFileIfExists, \ + cleanup_dir_by_age, get_sboms_dir, is_over_size_limit from envgenehelper.constants import CI_JOB_ARTIFACT_MAX_SIZE_MB from envgenehelper.models import SbomRetentionConfig @@ -7,27 +7,40 @@ def sboms_retention_policy(): work_dir = getenv_with_error('CI_PROJECT_DIR') sboms_dir = get_sboms_dir(work_dir) - config = get_envgene_config_yaml() - sbom_retention = SbomRetentionConfig.model_validate(config.get("sbom_retention", {})) - if not sbom_retention.enabled: - logger.info("SBOMs retention policy is disabled") + if not sboms_dir.exists(): + logger.warning(f"SBOM directory does not exist: {sboms_dir}") return - if not sboms_dir.exists(): - logger.warning(f"There is no such directory: {sboms_dir}") + config = get_envgene_config_yaml().get("sbom_retention") + if not config: + disabled = True + else: + sbom_retention = SbomRetentionConfig.model_validate(config) + disabled = not sbom_retention.enabled + + if disabled: + logger.info("SBOM retention policy is disabled") return - logger.info("SBOMs retention policy is enabled") + logger.info(f"SBOM retention policy is enabled for directory {sboms_dir}") + for sbom_path in sboms_dir.iterdir(): if sbom_path.is_file(): - logger.info(f"Removing outdated format file: {sbom_path}") + logger.info(f"Removing legacy SBOM file: {sbom_path}") deleteFileIfExists(sbom_path) - for app_sbom_dir in sboms_dir.iterdir(): - cleanup_dir_by_age(app_sbom_dir, sbom_retention.keep_versions_per_app) - - cleanup_dir_by_size(sboms_dir, CI_JOB_ARTIFACT_MAX_SIZE_MB) + keep_versions_per_app = sbom_retention.keep_versions_per_app + if keep_versions_per_app: + logger.info(f"SBOM retention policy keep_versions_per_app={keep_versions_per_app}," + f" starting cleanup: {sboms_dir}") + for app_sbom_dir in sboms_dir.iterdir(): + cleanup_dir_by_age(app_sbom_dir, sbom_retention.keep_versions_per_app) + + if is_over_size_limit(sboms_dir, CI_JOB_ARTIFACT_MAX_SIZE_MB): + logger.info(f"SBOM directory exceeds size limit, starting cleanup: {sboms_dir}") + for app_sbom_dir in sboms_dir.iterdir(): + cleanup_dir_by_age(app_sbom_dir, 1) if __name__ == "__main__": diff --git a/build_effective_set_generator/scripts/test_sboms_retention_policy.py b/build_effective_set_generator/scripts/test_sboms_retention_policy.py index e2da85d8b..dff87d30f 100644 --- a/build_effective_set_generator/scripts/test_sboms_retention_policy.py +++ b/build_effective_set_generator/scripts/test_sboms_retention_policy.py @@ -1,7 +1,7 @@ import time - import pytest -from envgenehelper import cleanup_dir_by_age, cleanup_dir_by_size + +from envgenehelper import cleanup_dir_by_age, is_over_size_limit from envgenehelper.test_helpers import TestHelpers @@ -10,14 +10,16 @@ class TestSBOMSRetentionPolicy: @pytest.mark.unit def test_cleanup_dir_by_age_removes_old_files(self, tmp_path): now = time.time() + files = [ tmp_path / "old.json", tmp_path / "mid.json", tmp_path / "new.json", ] - TestHelpers.create_file(files[0], mtime=now - 300) - TestHelpers.create_file(files[1], mtime=now - 200) - TestHelpers.create_file(files[2], mtime=now - 100) + + TestHelpers.create_file(files[0], size=1, mtime=now - 300) + TestHelpers.create_file(files[1], size=1, mtime=now - 200) + TestHelpers.create_file(files[2], size=1, mtime=now - 100) cleanup_dir_by_age(tmp_path, keep_last=2) @@ -27,14 +29,16 @@ def test_cleanup_dir_by_age_removes_old_files(self, tmp_path): @pytest.mark.unit def test_cleanup_dir_by_age_keep_all(self, tmp_path): now = time.time() + files = [ tmp_path / "file1.json", tmp_path / "file2.json", tmp_path / "file3.json", ] - TestHelpers.create_file(files[0], mtime=now - 180) - TestHelpers.create_file(files[1], mtime=now - 120) - TestHelpers.create_file(files[2], mtime=now - 60) + + TestHelpers.create_file(files[0], size=1, mtime=now - 180) + TestHelpers.create_file(files[1], size=1, mtime=now - 120) + TestHelpers.create_file(files[2], size=1, mtime=now - 60) cleanup_dir_by_age(tmp_path, keep_last=3) @@ -42,31 +46,32 @@ def test_cleanup_dir_by_age_keep_all(self, tmp_path): assert remaining == {"file1.json", "file2.json", "file3.json"} @pytest.mark.unit - def test_cleanup_dir_by_size_within_limit(self, tmp_path): - files = [ - tmp_path / "file1.json", - tmp_path / "file2.json", - tmp_path / "file3.json", - ] - for f in files: - TestHelpers.create_file(f, size=1024) + def test_dir_not_exists_returns_false(self, tmp_path): + missing = tmp_path / "missing" + result = is_over_size_limit(missing, max_size_mb=1) + assert result is False - cleanup_dir_by_size(tmp_path, max_size_mb=10) + @pytest.mark.unit + def test_empty_dir_returns_false(self, tmp_path): + result = is_over_size_limit(tmp_path, max_size_mb=1) + assert result is False - remaining = {f.name for f in tmp_path.iterdir()} - assert remaining == {"file1.json", "file2.json", "file3.json"} + @pytest.mark.unit + def test_below_limit_returns_false(self, tmp_path): + TestHelpers.create_file(tmp_path / "file.json", size=1024 * 1024) + result = is_over_size_limit(tmp_path, max_size_mb=10) + assert result is False @pytest.mark.unit - def test_cleanup_dir_by_size_exceeds(self, tmp_path): - files = [ - tmp_path / "file1.json", - tmp_path / "file2.json", - tmp_path / "file3.json", - ] - for f in files: - TestHelpers.create_file(f, size=1024 * 1024) + def test_dir_exactly_at_limit_returns_false(self, tmp_path): + TestHelpers.create_file(tmp_path / "file.json", size=1024 * 1024) + result = is_over_size_limit(tmp_path, max_size_mb=1) + assert result is False - cleanup_dir_by_size(tmp_path, max_size_mb=2) + @pytest.mark.unit + def test_dir_above_limit_returns_true(self, tmp_path): + TestHelpers.create_file(tmp_path / "file1.json", size=1024 * 1024) + TestHelpers.create_file(tmp_path / "file2.json", size=1024 * 1024) + result = is_over_size_limit(tmp_path, max_size_mb=1) + assert result is True - remaining = {f.name for f in tmp_path.iterdir()} - assert remaining == set() diff --git a/python/envgene/envgenehelper/constants.py b/python/envgene/envgenehelper/constants.py index f7f2f3c10..3247f505c 100644 --- a/python/envgene/envgenehelper/constants.py +++ b/python/envgene/envgenehelper/constants.py @@ -10,4 +10,4 @@ "composite_structure.yml", ] -CI_JOB_ARTIFACT_MAX_SIZE_MB = 1200 # 80% from limit 1.5 +CI_JOB_ARTIFACT_MAX_SIZE_MB = 600 diff --git a/python/envgene/envgenehelper/file_helper.py b/python/envgene/envgenehelper/file_helper.py index 6b547612c..fc0a6653f 100644 --- a/python/envgene/envgenehelper/file_helper.py +++ b/python/envgene/envgenehelper/file_helper.py @@ -265,27 +265,23 @@ def is_dir_empty(dir_path): return dir_path.exists() and dir_path.is_dir() and not any(dir_path.iterdir()) -def cleanup_dir_by_size(dir_path, max_size_mb): +def is_over_size_limit(dir_path, max_size_mb): dir_path = Path(dir_path) + if not dir_path.exists(): logger.warning(f"Path does not exist: {dir_path}") - return + return False mb = 1024 * 1024 max_size = max_size_mb * mb files = [Path(f) for f in findAllFilesInDir(dir_path, "")] total = sum(f.stat().st_size for f in files) - total_mb = total / mb - if total <= max_size: - logger.info(f"Directory size {total_mb:.2f} mb within limit {max_size_mb} mb") - return + total_mb = total / mb + logger.info(f"Directory size {total_mb:.2f} MB") - logger.info(f"Directory size {total_mb:.2f} mb exceeds limit {max_size_mb} mb, deleting all files in {dir_path}") - for file in files: - logger.info(f"Removing file: {file}") - deleteFileIfExists(file) + return total > max_size def cleanup_dir_by_age(dir_path, keep_last: int): diff --git a/python/envgene/envgenehelper/models.py b/python/envgene/envgenehelper/models.py index af7c83730..3731214cb 100644 --- a/python/envgene/envgenehelper/models.py +++ b/python/envgene/envgenehelper/models.py @@ -1,4 +1,5 @@ from enum import Enum +from typing import Optional from pydantic import BaseModel, Field @@ -19,4 +20,4 @@ def _missing_(cls, value): class SbomRetentionConfig(BaseModel): enabled: bool = Field(default=False) - keep_versions_per_app: int = Field(default=10, ge=0) + keep_versions_per_app: Optional[int] = Field(default=None, ge=0)