diff --git a/README.md b/README.md index 67be28c..f6339ff 100644 --- a/README.md +++ b/README.md @@ -97,6 +97,10 @@ conviso --help - Tasks (create with inline YAML): `python -m conviso.app tasks create --company-id 443 --label "Quick Task" --yaml "name: quick\nsteps:\n - action: echo\n message: ok"` - Vulnerabilities: `python -m conviso.app vulns list --company-id 443 --severities HIGH,CRITICAL --asset-tags cloud --all` - Vulnerabilities (last 7 days): `python -m conviso.app vulns list --company-id 443 --days-back 7 --severities HIGH,CRITICAL --all` +- Vulnerabilities by author: `python -m conviso.app vulns list --company-id 443 --author "Fernando" --all` +- Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345` +- Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only` +- Last user who changed vuln status to ANALYSIS: `python -m conviso.app vulns timeline --id 12345 --status ANALYSIS --last-status-change-only` Output options: `--format table|json|csv`, `--output path` to save JSON/CSV. diff --git a/src/conviso/VERSION b/src/conviso/VERSION index 9e11b32..9fc80f9 100644 --- a/src/conviso/VERSION +++ b/src/conviso/VERSION @@ -1 +1 @@ -0.3.1 +0.3.2 \ No newline at end of file diff --git a/src/conviso/commands/vulnerabilities.py b/src/conviso/commands/vulnerabilities.py index 5d1694b..8d3e3e2 100644 --- a/src/conviso/commands/vulnerabilities.py +++ b/src/conviso/commands/vulnerabilities.py @@ -8,8 +8,9 @@ import typer from typing import Optional import json -from datetime import date, timedelta -from conviso.core.notifier import info, error, summary, success +import re +from datetime import date, datetime, timedelta, timezone +from conviso.core.notifier import info, error, summary, success, warning from conviso.clients.client_graphql import graphql_request from conviso.core.output_manager import export_data from conviso.schemas.vulnerabilities_schema import schema @@ -37,6 +38,7 @@ def list_vulnerabilities( business_impact: Optional[str] = typer.Option(None, "--business-impact", help="Comma-separated business impact levels (LOW,MEDIUM,HIGH,NOT_DEFINED)."), exploitability: Optional[str] = typer.Option(None, "--attack-surface", "-A", help="Attack surface (INTERNET_FACING,INTERNAL,NOT_DEFINED)."), assignee_emails: Optional[str] = typer.Option(None, "--assignees", help="Comma-separated assignee emails."), + author: Optional[str] = typer.Option(None, "--author", help="Filter by author name (contains, case-insensitive)."), page: int = typer.Option(1, "--page", "-p", help="Page number."), per_page: int = typer.Option(50, "--per-page", "-l", help="Items per page."), all_pages: bool = typer.Option(False, "--all", help="Fetch all pages."), @@ -212,6 +214,7 @@ def _split_strs(value: Optional[str]): "pagination": {"page": page, "perPage": per_page}, "filters": filters or None, } + author_filter = (author or "").strip().lower() or None try: fetch_all = all_pages # Respect user pagination choices for all formats @@ -252,6 +255,9 @@ def _split_strs(value: Optional[str]): for vuln in collection: asset = vuln.get("asset") or {} tags = ", ".join(asset.get("assetsTagList") or []) + author_name = (vuln.get("author") or {}).get("name", "") + if author_filter and author_filter not in author_name.lower(): + continue severity_value = vuln.get("severity") or "" severity_raw = severity_value sev_color_map = { @@ -279,7 +285,7 @@ def _split_strs(value: Optional[str]): "severity_raw": severity_raw, "asset": asset.get("name") or "", "tags": tags, - "author": (vuln.get("author") or {}).get("name", ""), + "author": author_name, "assignee": assignee, "company": ((asset.get("company") or {}).get("label")) or "", "description": vuln.get("description"), @@ -397,6 +403,211 @@ def sev_to_level(sev: str): } +def _parse_dt_filter(value: Optional[str], end_of_day: bool = False) -> Optional[datetime]: + if not value: + return None + raw = value.strip() + try: + if len(raw) == 10 and raw[4] == "-" and raw[7] == "-": + date_obj = datetime.strptime(raw, "%Y-%m-%d") + if end_of_day: + return date_obj.replace(hour=23, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc) + return date_obj.replace(tzinfo=timezone.utc) + raw = raw.replace("Z", "+00:00") + parsed = datetime.fromisoformat(raw) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + except Exception: + warning(f"Ignoring invalid date/datetime filter: {value}") + return None + + +def _safe_parse_iso(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + except Exception: + return None + + +def _extract_status_change_fields(history_item: dict) -> tuple[str, str, str]: + from_status = "" + to_status = "" + event_status = "" + + candidates = [ + ("fromStatus", "toStatus"), + ("oldStatus", "newStatus"), + ("previousStatus", "status"), + ] + for left_key, right_key in candidates: + left = history_item.get(left_key) + right = history_item.get(right_key) + if left and not from_status: + from_status = str(left).upper() + if right and not to_status: + to_status = str(right).upper() + + status_value = history_item.get("status") + if status_value: + event_status = str(status_value).upper() + if not to_status: + to_status = event_status + + if not to_status: + action_type = (history_item.get("actionType") or "").upper() + if "STATUS" in action_type: + tokens = [t for t in re.split(r"[^A-Z0-9_]+", action_type) if t] + for idx, token in enumerate(tokens): + if token == "STATUS" and idx + 1 < len(tokens): + to_status = tokens[idx + 1] + break + if not to_status: + for token in reversed(tokens): + if token != "STATUS": + to_status = token + break + + return from_status, to_status, event_status + + +@app.command("timeline", help="Show vulnerability timeline/history and filter by actor/status.") +def vulnerability_timeline( + issue_id: int = typer.Option(..., "--id", "-i", help="Vulnerability/issue ID."), + user_email: Optional[str] = typer.Option(None, "--user-email", help="Filter by actor email or name (contains, case-insensitive)."), + status: Optional[str] = typer.Option(None, "--status", help="Filter status-change events by target status (IssueStatusLabel)."), + history_start: Optional[str] = typer.Option(None, "--history-start", help="History created_at >= this value (YYYY-MM-DD or ISO-8601)."), + history_end: Optional[str] = typer.Option(None, "--history-end", help="History created_at <= this value (YYYY-MM-DD or ISO-8601)."), + last_status_change_only: bool = typer.Option(False, "--last-status-change-only", help="Show only the latest status-change event after filters."), + fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv."), + output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file for json/csv."), +): + info(f"Listing timeline for vulnerability {issue_id}...") + + status_filter = status.strip().upper() if status else None + email_filter = (user_email or "").strip().lower() or None + history_start_dt = _parse_dt_filter(history_start, end_of_day=False) + history_end_dt = _parse_dt_filter(history_end, end_of_day=True) + + query = """ + query IssueTimeline($id: ID!) { + issue(id: $id) { + id + title + status + history { + eventId + at + action + authorEmail + assigneeEmail + previousStatus + status + kind + reason + } + } + } + """ + + try: + data = graphql_request(query, {"id": str(issue_id)}) + issue = data.get("issue") + if not issue: + warning(f"Issue {issue_id} not found.") + raise typer.Exit(code=1) + history_rows = issue.get("history") or [] + + rows = [] + for h in history_rows: + action_type = (h.get("action") or "").upper() + actor_email = (h.get("authorEmail") or "").strip() + actor_name = actor_email.split("@", 1)[0] if actor_email else "" + created_at = h.get("at") or "" + created_at_dt = _safe_parse_iso(created_at) + from_status = (h.get("previousStatus") or "").upper() + to_status = (h.get("status") or "").upper() + event_status = to_status + kind = (h.get("kind") or "").lower() + is_status_change = bool(kind == "status" or from_status or to_status) + + if email_filter: + haystack = f"{actor_email.lower()} {actor_name.lower()}".strip() + if email_filter not in haystack: + continue + if history_start_dt and (created_at_dt is None or created_at_dt < history_start_dt): + continue + if history_end_dt and (created_at_dt is None or created_at_dt > history_end_dt): + continue + if status_filter: + if not is_status_change: + continue + if (to_status or event_status) != status_filter: + continue + + rows.append({ + "issueId": issue.get("id") or issue_id, + "issueTitle": issue.get("title") or "", + "currentIssueStatus": issue.get("status") or "", + "eventId": h.get("eventId") or "", + "createdAt": created_at, + "actorName": actor_name, + "actorEmail": actor_email, + "actionType": action_type, + "fromStatus": from_status, + "toStatus": to_status or event_status, + "statusChange": "true" if is_status_change else "false", + }) + + if not rows: + warning("No timeline events found for the given filters.") + raise typer.Exit() + + if last_status_change_only: + status_rows = [r for r in rows if r.get("statusChange") == "true"] + if not status_rows: + warning("No status-change events found for the given filters.") + raise typer.Exit() + status_rows.sort( + key=lambda r: ( + _safe_parse_iso(r.get("createdAt") or "") or datetime.min.replace(tzinfo=timezone.utc), + str(r.get("eventId") or ""), + ) + ) + latest = status_rows[-1] + latest = { + "issueId": latest.get("issueId"), + "issueTitle": latest.get("issueTitle"), + "currentIssueStatus": latest.get("currentIssueStatus"), + "lastChangedAt": latest.get("createdAt"), + "lastChangedBy": latest.get("actorName"), + "lastChangedByEmail": latest.get("actorEmail"), + "fromStatus": latest.get("fromStatus"), + "toStatus": latest.get("toStatus"), + "actionType": latest.get("actionType"), + } + export_data([latest], fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Last Status Change") + summary("1 last status-change event listed.") + return + + export_data(rows, fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Timeline") + summary(f"{len(rows)} timeline event(s) listed.") + + except typer.Exit: + raise + except Exception as exc: + if "RECORD_NOT_FOUND" in str(exc): + error(f"Issue {issue_id} not found. Use the vulnerability ID (not project ID).") + raise typer.Exit(code=1) + error(f"Error listing vulnerability timeline: {exc}") + raise typer.Exit(code=1) + + # ---------------------- CREATE COMMAND ---------------------- # @app.command("create") def create_vulnerability(