Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ from datetime import timedelta
import asyncio

# Load repository information (from collector)
repo_info = await collect_all(
repo_info = asyncio.run(collect_all(
pkt_type='debian',
pkt_name='xz-utils',
repo_name='tukaani-project/xz',
)
))
# Create evaluator
evaluator = HSBRiskEvaluator(repo_info)

Expand Down
79 changes: 79 additions & 0 deletions src/example/dependencies_collect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3
from hsbriskevaluator.dep_collector.deps_dev_collector import DepsDevCollector

def main():
"""Demonstrate usage of the DepsDevCollector"""

# Example 1: Collect NPM package dependencies
print("=== NPM Package Example ===")
npm_collector = DepsDevCollector(platform="npm")
result = npm_collector.collect("lodash")

print(f"Package: {result['package_name']}")
print(f"Platform: {result['platform']}")
print(f"Version: {result['version']}")
print(f"Total dependencies: {len(result['dependencies'])}")

if result['error']:
print(f"Error: {result['error']}")
else:
# Show direct vs indirect dependencies
direct_deps = [dep for dep in result['dependencies'] if dep['relation'] == 'DIRECT']
indirect_deps = [dep for dep in result['dependencies'] if dep['relation'] == 'INDIRECT']

print(f"Direct dependencies: {len(direct_deps)}")
print(f"Indirect dependencies: {len(indirect_deps)}")

if direct_deps:
print("\nDirect dependencies:")
for dep in direct_deps[:3]: # Show first 3
print(f" - {dep['name']}@{dep['version']} (requirement: {dep['requirement']})")

print("\n" + "="*50 + "\n")

# Example 2: Collect PyPI package dependencies
print("=== PyPI Package Example ===")
pypi_collector = DepsDevCollector(platform="pypi")
result = pypi_collector.collect("flask")

print(f"Package: {result['package_name']}")
print(f"Platform: {result['platform']}")
print(f"Version: {result['version']}")
print(f"Total dependencies: {len(result['dependencies'])}")

if result['error']:
print(f"Error: {result['error']}")
else:
# Show direct vs indirect dependencies
direct_deps = [dep for dep in result['dependencies'] if dep['relation'] == 'DIRECT']
indirect_deps = [dep for dep in result['dependencies'] if dep['relation'] == 'INDIRECT']

print(f"Direct dependencies: {len(direct_deps)}")
print(f"Indirect dependencies: {len(indirect_deps)}")

if direct_deps:
print("\nDirect dependencies:")
for dep in direct_deps[:3]: # Show first 3
print(f" - {dep['name']}@{dep['version']} (requirement: {dep['requirement']})")

print("\n" + "="*50 + "\n")

# Example 3: Different platforms
print("=== Multiple Platform Support ===")
platforms = ["npm", "pypi", "maven", "cargo"]
packages = ["express", "django", "junit", "serde"]

for platform, package in zip(platforms, packages):
try:
collector = DepsDevCollector(platform=platform)
result = collector.collect(package)

status = "✓" if not result['error'] else "✗"
dep_count = len(result['dependencies'])
print(f"{status} {platform.upper()}: {package} - {dep_count} dependencies")

except Exception as e:
print(f"✗ {platform.upper()}: {package} - Error: {e}")

if __name__ == "__main__":
main()
19 changes: 19 additions & 0 deletions src/example/evaluate_xz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from hsbriskevaluator.evaluator import HSBRiskEvaluator
from hsbriskevaluator.collector.repo_info import RepoInfo
from hsbriskevaluator.collector import collect_all
from datetime import timedelta
import asyncio

# Load repository information (from collector)
repo_info = asyncio.run(collect_all(
pkt_type='debian',
pkt_name='xz-utils',
repo_name='tukaani-project/xz',
))
# Create evaluator
evaluator = HSBRiskEvaluator(repo_info)

# Run evaluation
result = asyncio.run(evaluator.evaluate())

print(result)
21 changes: 21 additions & 0 deletions src/hsbriskevaluator/dep_collector/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
"""
collect dependencies recursively for a specific package
"""
from typing import Dict

class BaseDepCollector():
platform: str
def __init__(self):
pass

def collect(self, package_name: str) -> Dict:
"""
Collect dependency information for a package

Args:
package_name: Name of the package to collect dependencies for

Returns:
Dict containing dependency information
"""
raise NotImplementedError("Subclasses must implement collect method")
233 changes: 233 additions & 0 deletions src/hsbriskevaluator/dep_collector/deps_dev_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import requests
import logging
from typing import Dict, List, Optional
from urllib.parse import quote
from .base import BaseDepCollector

logger = logging.getLogger(__name__)


class DepsDevCollector(BaseDepCollector):
"""
Dependency collector that queries dependency information through deps.dev API
"""

def __init__(self, platform):
"""
Initialize the deps.dev collector

Args:
platform: The package management system (GO, RUBYGEMS, NPM, CARGO, MAVEN, PYPI, NUGET)
"""
super().__init__()
self.platform = platform.upper()
self.base_url = "https://api.deps.dev"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'HSBRiskEvaluator/1.0'
})

# Validate platform
valid_platforms = {"GO", "RUBYGEMS", "NPM", "CARGO", "MAVEN", "PYPI", "NUGET"}
if self.platform not in valid_platforms:
raise ValueError(f"Platform {self.platform} not supported. Valid platforms: {valid_platforms}")

def _make_request(self, url: str) -> Optional[Dict]:
"""
Make HTTP request to deps.dev API with error handling

Args:
url: The API endpoint URL

Returns:
JSON response as dict or None if request failed
"""
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed for {url}: {e}")
return None
except ValueError as e:
logger.error(f"Failed to parse JSON response from {url}: {e}")
return None

def _get_package_info(self, package_name: str) -> Optional[Dict]:
"""
Get package information including available versions

Args:
package_name: Name of the package

Returns:
Package information dict or None if not found
"""
encoded_name = quote(package_name, safe='')
url = f"{self.base_url}/v3/systems/{self.platform}/packages/{encoded_name}"

logger.info(f"Fetching package info for {package_name} from {url}")
return self._make_request(url)


def _get_sorted_versions(self, package_info: Dict) -> List[str]:
"""
Get all versions sorted by publication date (newest first)

Args:
package_info: Package information from GetPackage API

Returns:
List of version strings sorted by publication date
"""
if not package_info or 'versions' not in package_info:
return []

versions = package_info['versions']
if not versions:
return []

# Sort by publishedAt date (newest first), fallback to version order
def sort_key(version):
published_at = version.get('publishedAt', '')
return published_at if published_at else '0'

sorted_versions = sorted(versions, key=sort_key, reverse=True)
return [v['versionKey']['version'] for v in sorted_versions]

def _get_dependencies(self, package_name: str, version: str) -> Optional[Dict]:
"""
Get dependency information for a specific package version

Args:
package_name: Name of the package
version: Version of the package

Returns:
Dependency information dict or None if not found
"""
encoded_name = quote(package_name, safe='')
encoded_version = quote(version, safe='')
url = f"{self.base_url}/v3/systems/{self.platform}/packages/{encoded_name}/versions/{encoded_version}:dependencies"

logger.info(f"Fetching dependencies for {package_name}@{version} from {url}")
return self._make_request(url)

def _extract_all_dependencies(self, dep_info: Dict) -> List[Dict]:
"""
Extract all dependencies (direct and indirect) from dependency graph

Args:
dep_info: Dependency information from GetDependencies API

Returns:
List of all dependency dicts with name, version, relation, and bundled info
"""
if not dep_info or 'nodes' not in dep_info:
return []

nodes = dep_info['nodes']

if not nodes:
return []

# Extract all dependencies except the root node (first node)
all_deps = []

for i, node in enumerate(nodes):
# Skip the root node (index 0)
if i == 0:
continue

version_key = node.get('versionKey', {})
relation = node.get('relation', 'UNKNOWN')

# Find the requirement for this dependency
requirement = ''
if 'edges' in dep_info:
for edge in dep_info['edges']:
if edge.get('toNode') == i:
requirement = edge.get('requirement', '')
break

all_deps.append({
'name': version_key.get('name', ''),
'version': version_key.get('version', ''),
'relation': relation,
'requirement': requirement,
'bundled': node.get('bundled', False)
})

return all_deps

def collect(self, package_name: str) -> Dict:
"""
Collect dependency information for a package

Args:
package_name: Name of the package to collect dependencies for

Returns:
Dict containing dependency information with structure:
{
'package_name': str,
'platform': str,
'version': str,
'dependencies': List[Dict],
'error': Optional[str]
}
"""
result = {
'package_name': package_name,
'platform': self.platform,
'version': None,
'dependencies': [],
'error': None
}

try:
# Step 1: Get package information
package_info = self._get_package_info(package_name)
if not package_info:
result['error'] = f"Package {package_name} not found"
return result

# Step 2: Get sorted versions (newest first)
versions = self._get_sorted_versions(package_info)
if not versions:
result['error'] = f"No versions found for package {package_name}"
return result

# Step 3: Try to get dependencies, starting with latest version
dep_info = None
used_version = None

for version in versions:
logger.info(f"Trying to get dependencies for {package_name}@{version}")
dep_info = self._get_dependencies(package_name, version)

if dep_info and 'nodes' in dep_info:
# Check if we have meaningful dependency data
if len(dep_info.get('nodes', [])) > 1 or dep_info.get('edges'):
used_version = version
break
else:
logger.info(f"No dependency data found for {package_name}@{version}, trying next version")
else:
logger.info(f"Failed to get dependency info for {package_name}@{version}, trying next version")

if not dep_info or not used_version:
result['error'] = f"No dependency information available for any version of {package_name}"
return result

# Step 4: Extract all dependencies
result['version'] = used_version
result['dependencies'] = self._extract_all_dependencies(dep_info)

logger.info(f"Successfully collected {len(result['dependencies'])} dependencies for {package_name}@{used_version}")

except Exception as e:
logger.error(f"Error collecting dependencies for {package_name}: {e}")
result['error'] = str(e)

return result