Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions build_effective_set_generator/scripts/sboms_retention_policy.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,46 @@
from envgenehelper import getenv_with_error, get_envgene_config_yaml, logger, cleanup_dir_by_size, deleteFileIfExists, \
cleanup_dir_by_age, get_sboms_dir
from envgenehelper import getenv_with_error, get_envgene_config_yaml, logger, deleteFileIfExists, \
cleanup_dir_by_age, get_sboms_dir, is_over_size_limit
from envgenehelper.constants import CI_JOB_ARTIFACT_MAX_SIZE_MB
from envgenehelper.models import SbomRetentionConfig


def sboms_retention_policy():
work_dir = getenv_with_error('CI_PROJECT_DIR')
sboms_dir = get_sboms_dir(work_dir)
config = get_envgene_config_yaml()
sbom_retention = SbomRetentionConfig.model_validate(config.get("sbom_retention", {}))

if not sbom_retention.enabled:
logger.info("SBOMs retention policy is disabled")
if not sboms_dir.exists():
logger.warning(f"SBOM directory does not exist: {sboms_dir}")
return

if not sboms_dir.exists():
logger.warning(f"There is no such directory: {sboms_dir}")
config = get_envgene_config_yaml().get("sbom_retention")
if not config:
disabled = True
else:
sbom_retention = SbomRetentionConfig.model_validate(config)
disabled = not sbom_retention.enabled

if disabled:
logger.info("SBOM retention policy is disabled")
return

logger.info("SBOMs retention policy is enabled")
logger.info(f"SBOM retention policy is enabled for directory {sboms_dir}")

for sbom_path in sboms_dir.iterdir():
if sbom_path.is_file():
logger.info(f"Removing outdated format file: {sbom_path}")
logger.info(f"Removing legacy SBOM file: {sbom_path}")
deleteFileIfExists(sbom_path)

for app_sbom_dir in sboms_dir.iterdir():
cleanup_dir_by_age(app_sbom_dir, sbom_retention.keep_versions_per_app)

cleanup_dir_by_size(sboms_dir, CI_JOB_ARTIFACT_MAX_SIZE_MB)
keep_versions_per_app = sbom_retention.keep_versions_per_app
if keep_versions_per_app:
logger.info(f"SBOM retention policy keep_versions_per_app={keep_versions_per_app},"
f" starting cleanup: {sboms_dir}")
for app_sbom_dir in sboms_dir.iterdir():
cleanup_dir_by_age(app_sbom_dir, sbom_retention.keep_versions_per_app)

if is_over_size_limit(sboms_dir, CI_JOB_ARTIFACT_MAX_SIZE_MB):
logger.info(f"SBOM directory exceeds size limit, starting cleanup: {sboms_dir}")
for app_sbom_dir in sboms_dir.iterdir():
cleanup_dir_by_age(app_sbom_dir, 1)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time

import pytest
from envgenehelper import cleanup_dir_by_age, cleanup_dir_by_size

from envgenehelper import cleanup_dir_by_age, is_over_size_limit
from envgenehelper.test_helpers import TestHelpers


Expand All @@ -10,14 +10,16 @@ class TestSBOMSRetentionPolicy:
@pytest.mark.unit
def test_cleanup_dir_by_age_removes_old_files(self, tmp_path):
now = time.time()

files = [
tmp_path / "old.json",
tmp_path / "mid.json",
tmp_path / "new.json",
]
TestHelpers.create_file(files[0], mtime=now - 300)
TestHelpers.create_file(files[1], mtime=now - 200)
TestHelpers.create_file(files[2], mtime=now - 100)

TestHelpers.create_file(files[0], size=1, mtime=now - 300)
TestHelpers.create_file(files[1], size=1, mtime=now - 200)
TestHelpers.create_file(files[2], size=1, mtime=now - 100)

cleanup_dir_by_age(tmp_path, keep_last=2)

Expand All @@ -27,46 +29,49 @@ def test_cleanup_dir_by_age_removes_old_files(self, tmp_path):
@pytest.mark.unit
def test_cleanup_dir_by_age_keep_all(self, tmp_path):
now = time.time()

files = [
tmp_path / "file1.json",
tmp_path / "file2.json",
tmp_path / "file3.json",
]
TestHelpers.create_file(files[0], mtime=now - 180)
TestHelpers.create_file(files[1], mtime=now - 120)
TestHelpers.create_file(files[2], mtime=now - 60)

TestHelpers.create_file(files[0], size=1, mtime=now - 180)
TestHelpers.create_file(files[1], size=1, mtime=now - 120)
TestHelpers.create_file(files[2], size=1, mtime=now - 60)

cleanup_dir_by_age(tmp_path, keep_last=3)

remaining = {f.name for f in tmp_path.iterdir()}
assert remaining == {"file1.json", "file2.json", "file3.json"}

@pytest.mark.unit
def test_cleanup_dir_by_size_within_limit(self, tmp_path):
files = [
tmp_path / "file1.json",
tmp_path / "file2.json",
tmp_path / "file3.json",
]
for f in files:
TestHelpers.create_file(f, size=1024)
def test_dir_not_exists_returns_false(self, tmp_path):
missing = tmp_path / "missing"
result = is_over_size_limit(missing, max_size_mb=1)
assert result is False

cleanup_dir_by_size(tmp_path, max_size_mb=10)
@pytest.mark.unit
def test_empty_dir_returns_false(self, tmp_path):
result = is_over_size_limit(tmp_path, max_size_mb=1)
assert result is False

remaining = {f.name for f in tmp_path.iterdir()}
assert remaining == {"file1.json", "file2.json", "file3.json"}
@pytest.mark.unit
def test_below_limit_returns_false(self, tmp_path):
TestHelpers.create_file(tmp_path / "file.json", size=1024 * 1024)
result = is_over_size_limit(tmp_path, max_size_mb=10)
assert result is False

@pytest.mark.unit
def test_cleanup_dir_by_size_exceeds(self, tmp_path):
files = [
tmp_path / "file1.json",
tmp_path / "file2.json",
tmp_path / "file3.json",
]
for f in files:
TestHelpers.create_file(f, size=1024 * 1024)
def test_dir_exactly_at_limit_returns_false(self, tmp_path):
TestHelpers.create_file(tmp_path / "file.json", size=1024 * 1024)
result = is_over_size_limit(tmp_path, max_size_mb=1)
assert result is False

cleanup_dir_by_size(tmp_path, max_size_mb=2)
@pytest.mark.unit
def test_dir_above_limit_returns_true(self, tmp_path):
TestHelpers.create_file(tmp_path / "file1.json", size=1024 * 1024)
TestHelpers.create_file(tmp_path / "file2.json", size=1024 * 1024)
result = is_over_size_limit(tmp_path, max_size_mb=1)
assert result is True

remaining = {f.name for f in tmp_path.iterdir()}
assert remaining == set()
2 changes: 1 addition & 1 deletion python/envgene/envgenehelper/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@
"composite_structure.yml",
]

CI_JOB_ARTIFACT_MAX_SIZE_MB = 1200 # 80% from limit 1.5
CI_JOB_ARTIFACT_MAX_SIZE_MB = 600
16 changes: 6 additions & 10 deletions python/envgene/envgenehelper/file_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,27 +265,23 @@ def is_dir_empty(dir_path):
return dir_path.exists() and dir_path.is_dir() and not any(dir_path.iterdir())


def cleanup_dir_by_size(dir_path, max_size_mb):
def is_over_size_limit(dir_path, max_size_mb):
dir_path = Path(dir_path)

if not dir_path.exists():
logger.warning(f"Path does not exist: {dir_path}")
return
return False

mb = 1024 * 1024
max_size = max_size_mb * mb

files = [Path(f) for f in findAllFilesInDir(dir_path, "")]
total = sum(f.stat().st_size for f in files)
total_mb = total / mb

if total <= max_size:
logger.info(f"Directory size {total_mb:.2f} mb within limit {max_size_mb} mb")
return
total_mb = total / mb
logger.info(f"Directory size {total_mb:.2f} MB")

logger.info(f"Directory size {total_mb:.2f} mb exceeds limit {max_size_mb} mb, deleting all files in {dir_path}")
for file in files:
logger.info(f"Removing file: {file}")
deleteFileIfExists(file)
return total > max_size


def cleanup_dir_by_age(dir_path, keep_last: int):
Expand Down
3 changes: 2 additions & 1 deletion python/envgene/envgenehelper/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel, Field

Expand All @@ -19,4 +20,4 @@ def _missing_(cls, value):

class SbomRetentionConfig(BaseModel):
enabled: bool = Field(default=False)
keep_versions_per_app: int = Field(default=10, ge=0)
keep_versions_per_app: Optional[int] = Field(default=None, ge=0)
Loading