Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ conviso --help
- Tasks (create with inline YAML): `python -m conviso.app tasks create --company-id 443 --label "Quick Task" --yaml "name: quick\nsteps:\n - action: echo\n message: ok"`
- Vulnerabilities: `python -m conviso.app vulns list --company-id 443 --severities HIGH,CRITICAL --asset-tags cloud --all`
- Vulnerabilities (last 7 days): `python -m conviso.app vulns list --company-id 443 --days-back 7 --severities HIGH,CRITICAL --all`
- Vulnerabilities by author: `python -m conviso.app vulns list --company-id 443 --author "Fernando" --all`
- Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345`
- Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only`
- Last user who changed vuln status to ANALYSIS: `python -m conviso.app vulns timeline --id 12345 --status ANALYSIS --last-status-change-only`

Output options: `--format table|json|csv`, `--output path` to save JSON/CSV.

Expand Down
2 changes: 1 addition & 1 deletion src/conviso/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.1
0.3.2
217 changes: 214 additions & 3 deletions src/conviso/commands/vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import typer
from typing import Optional
import json
from datetime import date, timedelta
from conviso.core.notifier import info, error, summary, success
import re
from datetime import date, datetime, timedelta, timezone
from conviso.core.notifier import info, error, summary, success, warning
from conviso.clients.client_graphql import graphql_request
from conviso.core.output_manager import export_data
from conviso.schemas.vulnerabilities_schema import schema
Expand Down Expand Up @@ -37,6 +38,7 @@ def list_vulnerabilities(
business_impact: Optional[str] = typer.Option(None, "--business-impact", help="Comma-separated business impact levels (LOW,MEDIUM,HIGH,NOT_DEFINED)."),
exploitability: Optional[str] = typer.Option(None, "--attack-surface", "-A", help="Attack surface (INTERNET_FACING,INTERNAL,NOT_DEFINED)."),
assignee_emails: Optional[str] = typer.Option(None, "--assignees", help="Comma-separated assignee emails."),
author: Optional[str] = typer.Option(None, "--author", help="Filter by author name (contains, case-insensitive)."),
page: int = typer.Option(1, "--page", "-p", help="Page number."),
per_page: int = typer.Option(50, "--per-page", "-l", help="Items per page."),
all_pages: bool = typer.Option(False, "--all", help="Fetch all pages."),
Expand Down Expand Up @@ -212,6 +214,7 @@ def _split_strs(value: Optional[str]):
"pagination": {"page": page, "perPage": per_page},
"filters": filters or None,
}
author_filter = (author or "").strip().lower() or None

try:
fetch_all = all_pages # Respect user pagination choices for all formats
Expand Down Expand Up @@ -252,6 +255,9 @@ def _split_strs(value: Optional[str]):
for vuln in collection:
asset = vuln.get("asset") or {}
tags = ", ".join(asset.get("assetsTagList") or [])
author_name = (vuln.get("author") or {}).get("name", "")
if author_filter and author_filter not in author_name.lower():
continue
severity_value = vuln.get("severity") or ""
severity_raw = severity_value
sev_color_map = {
Expand Down Expand Up @@ -279,7 +285,7 @@ def _split_strs(value: Optional[str]):
"severity_raw": severity_raw,
"asset": asset.get("name") or "",
"tags": tags,
"author": (vuln.get("author") or {}).get("name", ""),
"author": author_name,
"assignee": assignee,
"company": ((asset.get("company") or {}).get("label")) or "",
"description": vuln.get("description"),
Expand Down Expand Up @@ -397,6 +403,211 @@ def sev_to_level(sev: str):
}


def _parse_dt_filter(value: Optional[str], end_of_day: bool = False) -> Optional[datetime]:
if not value:
return None
raw = value.strip()
try:
if len(raw) == 10 and raw[4] == "-" and raw[7] == "-":
date_obj = datetime.strptime(raw, "%Y-%m-%d")
if end_of_day:
return date_obj.replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc)
return date_obj.replace(tzinfo=timezone.utc)
raw = raw.replace("Z", "+00:00")
parsed = datetime.fromisoformat(raw)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
except Exception:
warning(f"Ignoring invalid date/datetime filter: {value}")
return None


def _safe_parse_iso(value: Optional[str]) -> Optional[datetime]:
if not value:
return None
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
except Exception:
return None


def _extract_status_change_fields(history_item: dict) -> tuple[str, str, str]:
from_status = ""
to_status = ""
event_status = ""

candidates = [
("fromStatus", "toStatus"),
("oldStatus", "newStatus"),
("previousStatus", "status"),
]
for left_key, right_key in candidates:
left = history_item.get(left_key)
right = history_item.get(right_key)
if left and not from_status:
from_status = str(left).upper()
if right and not to_status:
to_status = str(right).upper()

status_value = history_item.get("status")
if status_value:
event_status = str(status_value).upper()
if not to_status:
to_status = event_status

if not to_status:
action_type = (history_item.get("actionType") or "").upper()
if "STATUS" in action_type:
tokens = [t for t in re.split(r"[^A-Z0-9_]+", action_type) if t]
for idx, token in enumerate(tokens):
if token == "STATUS" and idx + 1 < len(tokens):
to_status = tokens[idx + 1]
break
if not to_status:
for token in reversed(tokens):
if token != "STATUS":
to_status = token
break

return from_status, to_status, event_status


@app.command("timeline", help="Show vulnerability timeline/history and filter by actor/status.")
def vulnerability_timeline(
issue_id: int = typer.Option(..., "--id", "-i", help="Vulnerability/issue ID."),
user_email: Optional[str] = typer.Option(None, "--user-email", help="Filter by actor email or name (contains, case-insensitive)."),
status: Optional[str] = typer.Option(None, "--status", help="Filter status-change events by target status (IssueStatusLabel)."),
history_start: Optional[str] = typer.Option(None, "--history-start", help="History created_at >= this value (YYYY-MM-DD or ISO-8601)."),
history_end: Optional[str] = typer.Option(None, "--history-end", help="History created_at <= this value (YYYY-MM-DD or ISO-8601)."),
last_status_change_only: bool = typer.Option(False, "--last-status-change-only", help="Show only the latest status-change event after filters."),
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv."),
output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file for json/csv."),
):
info(f"Listing timeline for vulnerability {issue_id}...")

status_filter = status.strip().upper() if status else None
email_filter = (user_email or "").strip().lower() or None
history_start_dt = _parse_dt_filter(history_start, end_of_day=False)
history_end_dt = _parse_dt_filter(history_end, end_of_day=True)

query = """
query IssueTimeline($id: ID!) {
issue(id: $id) {
id
title
status
history {
eventId
at
action
authorEmail
assigneeEmail
previousStatus
status
kind
reason
}
}
}
"""

try:
data = graphql_request(query, {"id": str(issue_id)})
issue = data.get("issue")
if not issue:
warning(f"Issue {issue_id} not found.")
raise typer.Exit(code=1)
history_rows = issue.get("history") or []

rows = []
for h in history_rows:
action_type = (h.get("action") or "").upper()
actor_email = (h.get("authorEmail") or "").strip()
actor_name = actor_email.split("@", 1)[0] if actor_email else ""
created_at = h.get("at") or ""
created_at_dt = _safe_parse_iso(created_at)
from_status = (h.get("previousStatus") or "").upper()
to_status = (h.get("status") or "").upper()
event_status = to_status
kind = (h.get("kind") or "").lower()
is_status_change = bool(kind == "status" or from_status or to_status)

if email_filter:
haystack = f"{actor_email.lower()} {actor_name.lower()}".strip()
if email_filter not in haystack:
continue
if history_start_dt and (created_at_dt is None or created_at_dt < history_start_dt):
continue
if history_end_dt and (created_at_dt is None or created_at_dt > history_end_dt):
continue
if status_filter:
if not is_status_change:
continue
if (to_status or event_status) != status_filter:
continue

rows.append({
"issueId": issue.get("id") or issue_id,
"issueTitle": issue.get("title") or "",
"currentIssueStatus": issue.get("status") or "",
"eventId": h.get("eventId") or "",
"createdAt": created_at,
"actorName": actor_name,
"actorEmail": actor_email,
"actionType": action_type,
"fromStatus": from_status,
"toStatus": to_status or event_status,
"statusChange": "true" if is_status_change else "false",
})

if not rows:
warning("No timeline events found for the given filters.")
raise typer.Exit()

if last_status_change_only:
status_rows = [r for r in rows if r.get("statusChange") == "true"]
if not status_rows:
warning("No status-change events found for the given filters.")
raise typer.Exit()
status_rows.sort(
key=lambda r: (
_safe_parse_iso(r.get("createdAt") or "") or datetime.min.replace(tzinfo=timezone.utc),
str(r.get("eventId") or ""),
)
)
latest = status_rows[-1]
latest = {
"issueId": latest.get("issueId"),
"issueTitle": latest.get("issueTitle"),
"currentIssueStatus": latest.get("currentIssueStatus"),
"lastChangedAt": latest.get("createdAt"),
"lastChangedBy": latest.get("actorName"),
"lastChangedByEmail": latest.get("actorEmail"),
"fromStatus": latest.get("fromStatus"),
"toStatus": latest.get("toStatus"),
"actionType": latest.get("actionType"),
}
export_data([latest], fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Last Status Change")
summary("1 last status-change event listed.")
return

export_data(rows, fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Timeline")
summary(f"{len(rows)} timeline event(s) listed.")

except typer.Exit:
raise
except Exception as exc:
if "RECORD_NOT_FOUND" in str(exc):
error(f"Issue {issue_id} not found. Use the vulnerability ID (not project ID).")
raise typer.Exit(code=1)
error(f"Error listing vulnerability timeline: {exc}")
raise typer.Exit(code=1)


# ---------------------- CREATE COMMAND ---------------------- #
@app.command("create")
def create_vulnerability(
Expand Down