diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..230f1d7 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,30 @@ +name: CI + +on: + push: + branches: [main, feat/*] + pull_request: + branches: [main] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + - run: uv sync --dev + - run: uv run ruff check src/ tests/ + - run: uv run ruff format --check src/ tests/ + + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12", "3.13"] + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v4 + with: + python-version: ${{ matrix.python-version }} + - run: uv sync --dev + - run: uv run pytest --no-header -q diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1e38c2d --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +.venv/ +*.egg-info/ +dist/ +build/ +.eggs/ +uv.lock diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..a063c18 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,14 @@ +repos: +- repo: local + hooks: + - id: ruff-check + name: "Run ruff check" + language: system + entry: uv run ruff check . --fix --force-exclude + types: [python] + + - id: ruff-format + name: "Run ruff format" + language: system + entry: uv run ruff format --force-exclude + types: [python] diff --git a/README.md b/README.md new file mode 100644 index 0000000..44f04cf --- /dev/null +++ b/README.md @@ -0,0 +1,189 @@ +# DualEntry CLI + +Command-line interface for the DualEntry accounting API. + +## Install + +**Production** (default — connects to `api.dualentry.com`): + +```bash +uv tool install git+https://github.com/dualentry/dualentry-cli.git +``` + +**Development** (connects to `api-dev.dualentry.com`): + +```bash +uv tool install git+https://github.com/dualentry/dualentry-cli.git +dualentry config set-env dev +``` + +### Prerequisites + +- Python >= 3.11 +- [uv](https://docs.astral.sh/uv/) or [pipx](https://pipx.pypa.io/) + +### Upgrade + +```bash +uv tool upgrade dualentry-cli +``` + +### Uninstall + +```bash +uv tool uninstall dualentry-cli +``` + +## Authentication + +### Browser login (OAuth) + +```bash +dualentry auth login +``` + +Opens a browser window for WorkOS AuthKit authentication. Tokens are stored in your system keychain. + +### API key (environment variable) + +For dev/CI environments, you can skip OAuth and use an API key directly: + +```bash +export X_API_KEY=your_api_key +dualentry invoices list +``` + +Combine with a dev API URL: + +```bash +export X_API_KEY=your_api_key +export DUALENTRY_API_URL=https://api-dev.dualentry.com +dualentry invoices list +``` + +### Check status + +```bash +dualentry auth status +dualentry auth logout +``` + +## Configuration + +```bash +# Show current config +dualentry config show + +# Switch to dev environment +dualentry config set-env dev + +# Switch back to prod +dualentry config set-env prod + +# Set a custom API URL +dualentry config set-url https://my-staging.example.com +``` + +**Environment variables** (override config file): + +| Variable | Description | +|----------|-------------| +| `DUALENTRY_API_URL` | API base URL (overrides config) | +| `X_API_KEY` | API key (skips OAuth) | + +**Config file** is stored at `~/.dualentry/config.toml`. + +## Usage + +### Common options + +All `list` commands support: + +| Flag | Short | Description | +|------|-------|-------------| +| `--limit` | `-l` | Max items to return (default: 20) | +| `--offset` | | Offset for pagination | +| `--all` | `-a` | Fetch all pages | +| `--search` | `-s` | Free text search | +| `--status` | | Filter by status (draft, posted, archived) | +| `--start-date` | | Filter from date (YYYY-MM-DD) | +| `--end-date` | | Filter to date (YYYY-MM-DD) | +| `--format` | `-o` | Output format: `human` or `json` | + +### Examples + +```bash +# List invoices +dualentry invoices list + +# Get a specific invoice by number +dualentry invoices get 1001 + +# Search with filters +dualentry invoices list --status posted --start-date 2025-01-01 --format json + +# Fetch all pages +dualentry bills list --all + +# Create from JSON file +dualentry invoices create --file invoice.json +``` + +### Available resources + +**Money-in:** invoices, sales-orders, customer-payments, customer-credits, customer-prepayments, customer-prepayment-applications, customer-deposits, customer-refunds, cash-sales + +**Money-out:** bills, purchase-orders, vendor-payments, vendor-credits, vendor-prepayments, vendor-prepayment-applications, vendor-refunds, direct-expenses + +**Accounting:** accounts, journal-entries, bank-transfers, fixed-assets, depreciation-books + +**Entities:** customers, vendors, items, companies, classifications + +**Recurring:** recurring invoices, recurring bills, recurring journal-entries + +**Other:** contracts, budgets, workflows + +Each resource supports `list` and `get`. Most also support `create` and `update`. + +## Development + +### Development setup + +```bash +uv sync --extra dev +``` + +### Pre-commit hooks + +```bash +uv run pre-commit install +``` + +Hooks run `ruff check --fix` and `ruff format` on each commit. + +### Linting + +```bash +uv run ruff check . +uv run ruff format --check . +``` + +### Tests + +Unit tests (mocked, no API needed): + +```bash +uv run pytest +``` + +Integration tests (requires running API server): + +```bash +X_API_KEY=your_key DUALENTRY_API_URL=https://api-dev.dualentry.com uv run pytest tests/test_integration.py -v +``` + +With coverage: + +```bash +uv run pytest --cov=dualentry_cli --cov-report=term-missing +``` diff --git a/install.sh b/install.sh new file mode 100755 index 0000000..3be32f7 --- /dev/null +++ b/install.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +set -euo pipefail + +# DualEntry CLI installer +# Usage: curl -sSL /install.sh | bash + +REPO="git+https://github.com/dualentry/dualentry-cli.git" +TOOL_NAME="dualentry-cli" + +echo "Installing DualEntry CLI..." + +# Prefer uv, fall back to pipx +if command -v uv &>/dev/null; then + echo "Using uv..." + uv tool install "$REPO" +elif command -v pipx &>/dev/null; then + echo "Using pipx..." + pipx install "$REPO" +else + echo "Error: requires uv or pipx" + echo "" + echo "Install uv: curl -LsSf https://astral.sh/uv/install.sh | sh" + echo "Install pipx: brew install pipx && pipx ensurepath" + exit 1 +fi + +echo "" +echo "Installed! Run: dualentry auth login" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c047573 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,173 @@ +[project] +name = "dualentry-cli" +version = "0.1.0" +description = "DualEntry accounting CLI" +requires-python = ">=3.11" +dependencies = [ + "typer>=0.12,<1.0", + "httpx>=0.27,<1.0", + "keyring>=25.0,<26.0", + "rich>=13.0,<14.0", +] + +[project.scripts] +dualentry = "dualentry_cli.main:app" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/dualentry_cli"] + +[tool.pytest.ini_options] +testpaths = ["tests"] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-cov>=6.0", + "pytest-mock>=3.14", + "respx>=0.21", + "ruff>=0.11.11", + "pre-commit>=4.2", +] + +[dependency-groups] +dev = [ + "pytest>=8.0", + "pytest-cov>=6.0", + "pytest-mock>=3.14", + "respx>=0.21", + "ruff>=0.11.11", + "pre-commit>=4.2", +] + +# ── Ruff ─────────────────────────────────────────────────────────────── + +[tool.ruff.format] +quote-style = "preserve" + +[tool.ruff] +line-length = 180 +target-version = "py311" +exclude = [".venv"] + +[tool.ruff.lint] +select = ["ALL"] + +ignore = [ + "ERA001", + "D100", + "D101", + "D102", + "D103", + "D104", + "D106", + "D107", + "TD002", + "RUF012", + "FIX002", + "TD003", + "D203", + "D211", + "D212", + "PD011", + "TRY301", + "COM812", + "ISC001", + "E501", + "ANN002", + "ANN003", + "ANN201", + "ANN001", + "ANN205", + "ANN202", + "ANN204", + "BLE001", + "B008", + "TRY002", + "B039", + "RUF034", + "RUF032", + "PLW1508", + "SIM115", + "RUF046", + + # CLI-specific: Typer uses bool args and deferred imports for circular dep avoidance + "FBT001", + "FBT002", + "FBT003", + "PLC0415", + "PLR0913", + "PLR2004", + "A002", + "TC003", + "C901", + "ARG001", + "PT006", + "D105", + "D205", + "D401", + "TRY400", + "EM101", + "EM102", + "TRY003", + "SLF001", + "UP007", + "S101", + "S110", + "T201", + "PLW0603", + "PLW2901", + "PLR0915", + "F841", + "SIM105", +] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = [ + "S101", + "S105", + "S106", + "PLR2004", + "INP001", + "PLR0913", + "ANN001", + "ANN003", + "ANN201", + "S311", +] + +[tool.ruff.lint.isort] +known-first-party = ["dualentry_cli"] +section-order = [ + "future", + "standard-library", + "third-party", + "first-party", + "local-folder", +] + +[tool.ruff.lint.pycodestyle] +max-doc-length = 120 + +[tool.ruff.lint.mccabe] +max-complexity = 10 + +# ── Coverage ─────────────────────────────────────────────────────────── + +[tool.coverage.run] +branch = true +relative_files = true +source = ["src/dualentry_cli"] +omit = ["*/tests/*"] + +[tool.coverage.report] +show_missing = true +exclude_also = [ + "def __repr__", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", +] diff --git a/src/dualentry_cli/__init__.py b/src/dualentry_cli/__init__.py new file mode 100644 index 0000000..6bc7d1a --- /dev/null +++ b/src/dualentry_cli/__init__.py @@ -0,0 +1,3 @@ +"""DualEntry CLI - command-line interface for DualEntry accounting.""" + +__version__ = "0.1.0" diff --git a/src/dualentry_cli/auth.py b/src/dualentry_cli/auth.py new file mode 100644 index 0000000..07d21be --- /dev/null +++ b/src/dualentry_cli/auth.py @@ -0,0 +1,265 @@ +"""Authentication for DualEntry CLI - OAuth flow via MCP endpoints and credential storage.""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import secrets +import socket +import webbrowser +from enum import StrEnum +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from urllib.parse import parse_qs, urlencode, urlparse + +import httpx +import keyring +import typer + + +class CodeChallengeMethod(StrEnum): + S256 = "S256" + + +class GrantType(StrEnum): + AUTHORIZATION_CODE = "authorization_code" + REFRESH_TOKEN = "refresh_token" # noqa: S105 + + +class ResponseType(StrEnum): + CODE = "code" + + +class TokenEndpointAuthMethod(StrEnum): + NONE = "none" + + +_SERVICE_NAME = "dualentry-cli" +_KEY_NAME_ACCESS = "access_token" +_KEY_NAME_REFRESH = "refresh_token" +_KEY_NAME_API_KEY = "api_key" # legacy, still checked for migration + +_TOKEN_FILE = Path.home() / ".dualentry" / "tokens.json" + + +def _generate_pkce_pair() -> tuple[str, str]: + """Generate PKCE code_verifier and code_challenge per RFC 7636.""" + verifier = secrets.token_urlsafe(64) + digest = hashlib.sha256(verifier.encode()).digest() + challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii") + return verifier, challenge + + +# ── Token storage ──────────────────────────────────────────────────── + + +def store_tokens(access_token: str, refresh_token: str) -> None: + """Store OAuth tokens. Uses keyring with file fallback.""" + try: + keyring.set_password(_SERVICE_NAME, _KEY_NAME_ACCESS, access_token) + keyring.set_password(_SERVICE_NAME, _KEY_NAME_REFRESH, refresh_token) + except Exception: + # Fallback to file storage (e.g. CI, headless) + _TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True) + _TOKEN_FILE.write_text(json.dumps({"access_token": access_token, "refresh_token": refresh_token})) + _TOKEN_FILE.chmod(0o600) + + +def load_tokens() -> tuple[str | None, str | None]: + """Load OAuth tokens. Returns (access_token, refresh_token).""" + try: + access = keyring.get_password(_SERVICE_NAME, _KEY_NAME_ACCESS) + refresh = keyring.get_password(_SERVICE_NAME, _KEY_NAME_REFRESH) + if access and refresh: + return access, refresh + except Exception: + pass + # File fallback + if _TOKEN_FILE.exists(): + try: + data = json.loads(_TOKEN_FILE.read_text()) + return data.get("access_token"), data.get("refresh_token") + except (json.JSONDecodeError, OSError): + pass + return None, None + + +def load_api_key() -> str | None: + """Load legacy API key (for X_API_KEY env var compat check).""" + try: + return keyring.get_password(_SERVICE_NAME, _KEY_NAME_API_KEY) + except Exception: + return None + + +def clear_credentials() -> None: + """Clear all stored credentials.""" + for key in (_KEY_NAME_ACCESS, _KEY_NAME_REFRESH, _KEY_NAME_API_KEY): + try: + keyring.delete_password(_SERVICE_NAME, key) + except Exception: + pass + if _TOKEN_FILE.exists(): + try: + _TOKEN_FILE.unlink() + except OSError: + pass + + +# legacy alias +clear_api_key = clear_credentials + + +# ── MCP OAuth client registration ─────────────────────────────────── + + +def _register_client(mcp_url: str, redirect_uri: str) -> dict: + """Register as an OAuth client with the MCP server (dynamic client registration).""" + response = httpx.post( + f"{mcp_url}/register", + json={ + "client_name": "DualEntry CLI", + "redirect_uris": [redirect_uri], + "grant_types": [GrantType.AUTHORIZATION_CODE, GrantType.REFRESH_TOKEN], + "response_types": [ResponseType.CODE], + "token_endpoint_auth_method": TokenEndpointAuthMethod.NONE, + }, + timeout=30.0, + ) + response.raise_for_status() + return response.json() + + +# ── OAuth flow ─────────────────────────────────────────────────────── + + +def _start_authorize(mcp_url: str, client_id: str, redirect_uri: str, code_challenge: str, state: str) -> str: + """Build the authorization URL and return it (the MCP /authorize endpoint redirects to WorkOS).""" + params = { + "response_type": ResponseType.CODE, + "client_id": client_id, + "redirect_uri": redirect_uri, + "code_challenge": code_challenge, + "code_challenge_method": CodeChallengeMethod.S256, + "state": state, + } + return f"{mcp_url}/authorize?{urlencode(params)}" + + +def _exchange_token(mcp_url: str, client_id: str, code: str, code_verifier: str, redirect_uri: str) -> dict: + """Exchange authorization code for access/refresh tokens at MCP /token endpoint.""" + response = httpx.post( + f"{mcp_url}/token", + data={ + "grant_type": GrantType.AUTHORIZATION_CODE, + "client_id": client_id, + "code": code, + "code_verifier": code_verifier, + "redirect_uri": redirect_uri, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + response.raise_for_status() + return response.json() + + +def refresh_access_token(mcp_url: str, client_id: str, refresh_token: str) -> dict: + """Use refresh token to get a new access/refresh token pair.""" + response = httpx.post( + f"{mcp_url}/token", + data={ + "grant_type": GrantType.REFRESH_TOKEN, + "client_id": client_id, + "refresh_token": refresh_token, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + response.raise_for_status() + return response.json() + + +# ── Local callback server ──────────────────────────────────────────── + + +def _find_free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +class _CallbackHandler(BaseHTTPRequestHandler): + code: str | None = None + state: str | None = None + + def do_GET(self): + parsed = urlparse(self.path) + if parsed.path != "/callback": + self.send_response(404) + self.end_headers() + return + params = parse_qs(parsed.query) + _CallbackHandler.code = params.get("code", [None])[0] + _CallbackHandler.state = params.get("state", [None])[0] + self.send_response(200) + self.send_header("Content-Type", "text/html") + self.end_headers() + self.wfile.write(b"

Login successful!

You can close this window and return to the terminal.

") + + def log_message(self, format, *args): + pass + + +# ── Main login flow ────────────────────────────────────────────────── + + +def run_login_flow(api_url: str) -> dict: + """ + Run the full OAuth login flow using MCP endpoints. + + Returns dict with access_token, refresh_token, and token metadata. + """ + mcp_url = f"{api_url.rstrip('/')}/mcp" + + port = _find_free_port() + redirect_uri = f"http://localhost:{port}/callback" + verifier, challenge = _generate_pkce_pair() + state = secrets.token_urlsafe(16) + + # Register as OAuth client + client_info = _register_client(mcp_url, redirect_uri) + client_id = client_info["client_id"] + + # Build authorize URL + auth_url = _start_authorize(mcp_url, client_id, redirect_uri, challenge, state) + + # Start local server and open browser + _CallbackHandler.code = None + _CallbackHandler.state = None + server = HTTPServer(("127.0.0.1", port), _CallbackHandler) + + typer.echo("Opening browser for login...") + typer.echo(f"If the browser doesn't open, visit: {auth_url}") + webbrowser.open(auth_url) + + server.handle_request() + server.server_close() + + if not _CallbackHandler.code: + typer.echo("No authorization code received.") + raise typer.Exit(code=1) + if _CallbackHandler.state != state: + typer.echo("State mismatch - possible CSRF attack.") + raise typer.Exit(code=1) + + # Exchange code for tokens + token_response = _exchange_token(mcp_url, client_id, _CallbackHandler.code, verifier, redirect_uri) + + return { + "access_token": token_response["access_token"], + "refresh_token": token_response.get("refresh_token", ""), + "expires_in": token_response.get("expires_in"), + "client_id": client_id, + } diff --git a/src/dualentry_cli/cli.py b/src/dualentry_cli/cli.py new file mode 100644 index 0000000..3ff86ec --- /dev/null +++ b/src/dualentry_cli/cli.py @@ -0,0 +1,25 @@ +"""Shared CLI utilities.""" + +import difflib + +import click +from typer.core import TyperGroup + + +class HelpfulGroup(TyperGroup): + """Typer group that shows help + suggestions instead of 'No such command'.""" + + def resolve_command(self, ctx, args): + try: + return super().resolve_command(ctx, args) + except click.UsageError: + cmd_name = args[0] if args else None + if cmd_name: + matches = difflib.get_close_matches(cmd_name, self.list_commands(ctx), n=3, cutoff=0.4) + if matches: + hint = ", ".join(f"'{m}'" for m in matches) + click.echo(f"Unknown command '{cmd_name}'. Did you mean: {hint}?\n", err=True) + else: + click.echo(f"Unknown command '{cmd_name}'.\n", err=True) + click.echo(ctx.get_help()) + ctx.exit(2) diff --git a/src/dualentry_cli/client.py b/src/dualentry_cli/client.py new file mode 100644 index 0000000..67bb89f --- /dev/null +++ b/src/dualentry_cli/client.py @@ -0,0 +1,116 @@ +"""HTTP client for the DualEntry public API.""" + +from __future__ import annotations + +import os +from typing import Any + +import httpx + + +class APIError(Exception): + def __init__(self, status_code: int, detail: str): + self.status_code = status_code + self.detail = detail + super().__init__(f"HTTP {status_code}: {detail}") + + +class DualEntryClient: + def __init__(self, api_url: str, *, access_token: str | None = None, refresh_token: str | None = None, client_id: str | None = None, api_key: str | None = None): + self._api_url = api_url.rstrip("/") + self._base_url = f"{self._api_url}/public/v2" + self._access_token = access_token + self._refresh_token = refresh_token + self._client_id = client_id + self._api_key = api_key + + headers = self._build_headers() + self._client = httpx.Client(base_url=self._base_url, headers=headers, timeout=30.0) + + def _build_headers(self) -> dict[str, str]: + if self._api_key: + return {"X-API-KEY": self._api_key} + if self._access_token: + return {"Authorization": f"Bearer {self._access_token}"} + return {} + + @classmethod + def from_env(cls, api_url: str) -> DualEntryClient: + api_key = os.environ.get("X_API_KEY", "") + if not api_key: + msg = "X_API_KEY environment variable is not set" + raise ValueError(msg) + return cls(api_url=api_url, api_key=api_key) + + def _try_refresh(self) -> bool: + """Attempt to refresh the access token. Returns True if successful.""" + if not self._refresh_token or not self._client_id: + return False + try: + from dualentry_cli.auth import refresh_access_token, store_tokens + + mcp_url = f"{self._api_url}/mcp" + token_response = refresh_access_token(mcp_url, self._client_id, self._refresh_token) + self._access_token = token_response["access_token"] + self._refresh_token = token_response.get("refresh_token", self._refresh_token) + store_tokens(self._access_token, self._refresh_token) + self._client.headers.update({"Authorization": f"Bearer {self._access_token}"}) + except Exception as exc: + import sys + + print(f"Token refresh failed: {exc}. Re-login with: dualentry auth login", file=sys.stderr) + return False + else: + return True + + def _handle_response(self, response: httpx.Response) -> dict: + if response.status_code >= 400: + try: + detail = response.json() + except Exception: + detail = response.text + raise APIError(response.status_code, str(detail)) + return response.json() + + def _request(self, method: str, path: str, **kwargs) -> dict: + response = self._client.request(method, path, **kwargs) + if response.status_code in (401, 403) and self._access_token and self._try_refresh(): + response = self._client.request(method, path, **kwargs) + return self._handle_response(response) + + def get(self, path: str, params: dict[str, Any] | None = None) -> dict: + return self._request("GET", path, params=params) + + def paginate(self, path: str, params: dict[str, Any] | None = None, page_size: int = 100, max_items: int | None = None) -> dict: + """Fetch all pages and return combined {items: [...], count: N}.""" + params = dict(params or {}) + params["limit"] = page_size + params["offset"] = 0 + all_items = [] + max_pages = 1000 # safety guard against infinite loops + + for _ in range(max_pages): + data = self.get(path, params=params) + items = data.get("items", []) + all_items.extend(items) + total = data.get("count", len(items)) + if max_items and len(all_items) >= max_items: + all_items = all_items[:max_items] + break + if len(all_items) >= total or not items: + break + params["offset"] += page_size + + return {"items": all_items, "count": len(all_items)} + + def post(self, path: str, json: dict[str, Any] | None = None) -> dict: + return self._request("POST", path, json=json) + + def put(self, path: str, json: dict[str, Any] | None = None) -> dict: + return self._request("PUT", path, json=json) + + def delete(self, path: str) -> dict: + return self._request("DELETE", path) + + def close(self): + self._client.close() diff --git a/src/dualentry_cli/commands/__init__.py b/src/dualentry_cli/commands/__init__.py new file mode 100644 index 0000000..62e354b --- /dev/null +++ b/src/dualentry_cli/commands/__init__.py @@ -0,0 +1,164 @@ +"""Command factory for DualEntry CLI resources.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import typer + +from dualentry_cli.cli import HelpfulGroup +from dualentry_cli.output import format_output + +# ── Shared option defaults ────────────────────────────────────────── + +Limit = typer.Option(20, "--limit", "-l", help="Max items to return") +Offset = typer.Option(0, "--offset", help="Offset for pagination") +AllPages = typer.Option(False, "--all", "-a", help="Fetch all pages") +Search = typer.Option(None, "--search", "-s", help="Free text search") +Status = typer.Option(None, "--status", help="Filter by status (draft, posted, archived)") +StartDate = typer.Option(None, "--start-date", help="Filter from date (YYYY-MM-DD)") +EndDate = typer.Option(None, "--end-date", help="Filter to date (YYYY-MM-DD)") +Format = typer.Option("human", "--format", "-o", help="Output format: human or json") + + +def _build_filter_params( + search: str | None = None, + status: str | None = None, + start_date: str | None = None, + end_date: str | None = None, +) -> dict: + """Build filter query params, omitting None values.""" + params: dict = {} + if search: + params["search"] = search + if status: + params["record_status"] = status + if start_date: + params["start_date"] = start_date + if end_date: + params["end_date"] = end_date + return params + + +def _do_list(client, path: str, resource: str, limit: int, offset: int, all_pages: bool, output: str, **filters): + """Shared list logic for all resources.""" + params = _build_filter_params(**filters) + if all_pages: + data = client.paginate(f"/{path}/", params=params) + else: + params.update({"limit": limit, "offset": offset}) + data = client.get(f"/{path}/", params=params) + format_output(data, resource=resource, fmt=output) + + +# ── Factory ───────────────────────────────────────────────────────── + + +def make_resource_app( + name: str, + resource: str, + path: str, + has_create: bool = True, + has_update: bool = True, + has_delete: bool = False, + has_number: bool = False, +) -> typer.Typer: + """Create a Typer app for a standard CRUD resource.""" + app = typer.Typer(help=f"Manage {name}", no_args_is_help=True, cls=HelpfulGroup) + + @app.command("list") + def list_cmd( + limit: int = Limit, + offset: int = Offset, + all_pages: bool = AllPages, + search: str | None = Search, + status: str | None = Status, + start_date: str | None = StartDate, + end_date: str | None = EndDate, + output: str = Format, + ): + from dualentry_cli.main import get_client + + client = get_client() + _do_list(client, path, resource, limit, offset, all_pages, output, search=search, status=status, start_date=start_date, end_date=end_date) + + list_cmd.__doc__ = f"List {name}." + + if has_number: + + @app.command("get") + def get_cmd_with_number( + number: int = typer.Argument(help="Record number"), + output: str = Format, + ): + from dualentry_cli.main import get_client + + client = get_client() + data = client.get(f"/{path}/{number}/") + format_output(data, resource=resource, fmt=output) + + get_cmd_with_number.__doc__ = f"Get a {resource} by number." + else: + + @app.command("get") + def get_cmd( + record_id: str = typer.Argument(help="Record ID"), + output: str = Format, + ): + from dualentry_cli.main import get_client + + client = get_client() + data = client.get(f"/{path}/{record_id}/") + format_output(data, resource=resource, fmt=output) + + get_cmd.__doc__ = f"Get a {resource} by ID." + + if has_create: + + @app.command("create") + def create_cmd( + file: Path = typer.Option(..., "--file", "-f", help="JSON file with record data"), + output: str = Format, + ): + from dualentry_cli.main import get_client + + payload = json.loads(file.read_text()) + client = get_client() + data = client.post(f"/{path}/", json=payload) + format_output(data, resource=resource, fmt=output) + + create_cmd.__doc__ = f"Create a {resource} from a JSON file." + + if has_update: + + @app.command("update") + def update_cmd( + record_id: str = typer.Argument(help="Record ID"), + file: Path = typer.Option(..., "--file", "-f", help="JSON file with update data"), + output: str = Format, + ): + from dualentry_cli.main import get_client + + payload = json.loads(file.read_text()) + client = get_client() + data = client.put(f"/{path}/{record_id}/", json=payload) + format_output(data, resource=resource, fmt=output) + + update_cmd.__doc__ = f"Update a {resource}." + + if has_delete: + + @app.command("delete") + def delete_cmd( + record_id: str = typer.Argument(help="Record ID"), + ): + from dualentry_cli.main import get_client + + client = get_client() + client.delete(f"/{path}/{record_id}/") + typer.echo(f"{resource.replace('-', ' ').title()} {record_id} deleted.") + + delete_cmd.__doc__ = f"Delete a {resource}." + + return app diff --git a/src/dualentry_cli/commands/accounts.py b/src/dualentry_cli/commands/accounts.py new file mode 100644 index 0000000..0066459 --- /dev/null +++ b/src/dualentry_cli/commands/accounts.py @@ -0,0 +1,39 @@ +"""Account commands.""" + +from __future__ import annotations + +import typer + +from dualentry_cli.cli import HelpfulGroup +from dualentry_cli.commands import AllPages, Format, Limit, Offset, Search, _do_list +from dualentry_cli.output import format_output + +app = typer.Typer(help="Manage accounts", no_args_is_help=True, cls=HelpfulGroup) + + +@app.command("list") +def list_accounts( + limit: int = Limit, + offset: int = Offset, + all_pages: bool = AllPages, + search: str | None = Search, + output: str = Format, +): + """List accounts.""" + from dualentry_cli.main import get_client + + client = get_client() + _do_list(client, "accounts", "account", limit, offset, all_pages, output, search=search) + + +@app.command("get") +def get_account( + account_id: int = typer.Argument(help="Account number"), + output: str = Format, +): + """Get an account by number.""" + from dualentry_cli.main import get_client + + client = get_client() + data = client.get(f"/accounts/{account_id}/") + format_output(data, resource="account", fmt=output) diff --git a/src/dualentry_cli/commands/bills.py b/src/dualentry_cli/commands/bills.py new file mode 100644 index 0000000..81583c1 --- /dev/null +++ b/src/dualentry_cli/commands/bills.py @@ -0,0 +1,59 @@ +"""Bill commands.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import typer + +from dualentry_cli.cli import HelpfulGroup +from dualentry_cli.commands import AllPages, EndDate, Format, Limit, Offset, Search, StartDate, Status, _do_list +from dualentry_cli.output import format_output + +app = typer.Typer(help="Manage bills", no_args_is_help=True, cls=HelpfulGroup) + + +@app.command("list") +def list_bills( + limit: int = Limit, + offset: int = Offset, + all_pages: bool = AllPages, + search: str | None = Search, + status: str | None = Status, + start_date: str | None = StartDate, + end_date: str | None = EndDate, + output: str = Format, +): + """List bills.""" + from dualentry_cli.main import get_client + + client = get_client() + _do_list(client, "bills", "bill", limit, offset, all_pages, output, search=search, status=status, start_date=start_date, end_date=end_date) + + +@app.command("get") +def get_bill( + number: int = typer.Argument(help="Bill number"), + output: str = Format, +): + """Get a bill by number.""" + from dualentry_cli.main import get_client + + client = get_client() + data = client.get(f"/bills/{number}/") + format_output(data, resource="bill", fmt=output) + + +@app.command("create") +def create_bill( + file: Path = typer.Option(..., "--file", "-f", help="JSON file with bill data"), + output: str = Format, +): + """Create a bill from a JSON file.""" + from dualentry_cli.main import get_client + + payload = json.loads(file.read_text()) + client = get_client() + data = client.post("/bills/", json=payload) + format_output(data, resource="bill", fmt=output) diff --git a/src/dualentry_cli/commands/invoices.py b/src/dualentry_cli/commands/invoices.py new file mode 100644 index 0000000..2e54eb8 --- /dev/null +++ b/src/dualentry_cli/commands/invoices.py @@ -0,0 +1,59 @@ +"""Invoice commands.""" + +from __future__ import annotations + +import json +from pathlib import Path + +import typer + +from dualentry_cli.cli import HelpfulGroup +from dualentry_cli.commands import AllPages, EndDate, Format, Limit, Offset, Search, StartDate, Status, _do_list +from dualentry_cli.output import format_output + +app = typer.Typer(help="Manage invoices", no_args_is_help=True, cls=HelpfulGroup) + + +@app.command("list") +def list_invoices( + limit: int = Limit, + offset: int = Offset, + all_pages: bool = AllPages, + search: str | None = Search, + status: str | None = Status, + start_date: str | None = StartDate, + end_date: str | None = EndDate, + output: str = Format, +): + """List invoices.""" + from dualentry_cli.main import get_client + + client = get_client() + _do_list(client, "invoices", "invoice", limit, offset, all_pages, output, search=search, status=status, start_date=start_date, end_date=end_date) + + +@app.command("get") +def get_invoice( + number: int = typer.Argument(help="Invoice number"), + output: str = Format, +): + """Get an invoice by number.""" + from dualentry_cli.main import get_client + + client = get_client() + data = client.get(f"/invoices/{number}/") + format_output(data, resource="invoice", fmt=output) + + +@app.command("create") +def create_invoice( + file: Path = typer.Option(..., "--file", "-f", help="JSON file with invoice data"), + output: str = Format, +): + """Create an invoice from a JSON file.""" + from dualentry_cli.main import get_client + + payload = json.loads(file.read_text()) + client = get_client() + data = client.post("/invoices/", json=payload) + format_output(data, resource="invoice", fmt=output) diff --git a/src/dualentry_cli/config.py b/src/dualentry_cli/config.py new file mode 100644 index 0000000..05e9117 --- /dev/null +++ b/src/dualentry_cli/config.py @@ -0,0 +1,77 @@ +"""Configuration management for DualEntry CLI.""" + +from __future__ import annotations + +import os +import tomllib +from pathlib import Path + +_DEFAULT_CONFIG_DIR = Path.home() / ".dualentry" +_CONFIG_FILENAME = "config.toml" + +ENVIRONMENTS = { + "prod": "https://api.dualentry.com", + "dev": "https://api-dev.dualentry.com", +} + + +class Config: + def __init__(self, config_dir: Path | None = None): + self._config_dir = config_dir or _DEFAULT_CONFIG_DIR + self._config_file = self._config_dir / _CONFIG_FILENAME + self.api_url: str = ENVIRONMENTS["prod"] + self.output: str = "table" + self.organization_id: int | None = None + self.user_email: str | None = None + self.client_id: str | None = None + self._load() + # Env var overrides config file + env_url = os.environ.get("DUALENTRY_API_URL") + if env_url: + self.api_url = env_url + + def _load(self): + if not self._config_file.exists(): + return + with self._config_file.open("rb") as f: + data = tomllib.load(f) + default = data.get("default", {}) + self.api_url = default.get("api_url", self.api_url) + self.output = default.get("output", self.output) + auth = data.get("auth", {}) + self.organization_id = auth.get("organization_id") + self.user_email = auth.get("user_email") + self.client_id = auth.get("client_id") + + @property + def env_name(self) -> str: + """Return the environment name based on the current api_url.""" + for name, url in ENVIRONMENTS.items(): + if self.api_url == url: + return name + return "custom" + + @staticmethod + def _escape_toml_string(value: str) -> str: + """Escape a value for safe inclusion in a TOML double-quoted string.""" + return value.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n").replace("\r", "\\r") + + def save(self): + self._config_dir.mkdir(parents=True, exist_ok=True) + lines = [ + "[default]", + f'api_url = "{self._escape_toml_string(self.api_url)}"', + f'output = "{self._escape_toml_string(self.output)}"', + "", + ] + has_auth = any(v is not None for v in (self.organization_id, self.user_email, self.client_id)) + if has_auth: + lines.append("[auth]") + if self.organization_id is not None: + lines.append(f"organization_id = {self.organization_id}") + if self.user_email is not None: + lines.append(f'user_email = "{self._escape_toml_string(self.user_email)}"') + if self.client_id is not None: + lines.append(f'client_id = "{self._escape_toml_string(self.client_id)}"') + lines.append("") + self._config_file.write_text("\n".join(lines)) diff --git a/src/dualentry_cli/main.py b/src/dualentry_cli/main.py new file mode 100644 index 0000000..348cef3 --- /dev/null +++ b/src/dualentry_cli/main.py @@ -0,0 +1,182 @@ +"""DualEntry CLI entry point.""" + +import os + +import typer + +from dualentry_cli.auth import clear_credentials, load_tokens, run_login_flow, store_tokens +from dualentry_cli.cli import HelpfulGroup +from dualentry_cli.commands import make_resource_app +from dualentry_cli.commands.accounts import app as accounts_app +from dualentry_cli.commands.bills import app as bills_app +from dualentry_cli.commands.invoices import app as invoices_app +from dualentry_cli.config import ENVIRONMENTS, Config + +app = typer.Typer(name="dualentry", help="DualEntry accounting CLI", no_args_is_help=True, cls=HelpfulGroup) +auth_app = typer.Typer(help="Authentication commands", no_args_is_help=True, cls=HelpfulGroup) +config_app = typer.Typer(help="Configuration commands", no_args_is_help=True, cls=HelpfulGroup) +app.add_typer(auth_app, name="auth") +app.add_typer(config_app, name="config") + +# Custom-formatted resources +app.add_typer(invoices_app, name="invoices") +app.add_typer(bills_app, name="bills") +app.add_typer(accounts_app, name="accounts") + +# Money-in +app.add_typer(make_resource_app("sales orders", "sales-order", "sales-orders", has_number=True), name="sales-orders") +app.add_typer(make_resource_app("customer payments", "customer-payment", "customer-payments", has_number=True), name="customer-payments") +app.add_typer(make_resource_app("customer credits", "customer-credit", "customer-credits", has_number=True), name="customer-credits") +app.add_typer(make_resource_app("customer prepayments", "customer-prepayment", "customer-prepayments", has_number=True), name="customer-prepayments") +app.add_typer( + make_resource_app("customer prepayment applications", "customer-prepayment-application", "customer-prepayment-applications", has_number=True), + name="customer-prepayment-applications", +) +app.add_typer(make_resource_app("customer deposits", "customer-deposit", "customer-deposits", has_number=True), name="customer-deposits") +app.add_typer(make_resource_app("customer refunds", "customer-refund", "customer-refunds", has_number=True), name="customer-refunds") +app.add_typer(make_resource_app("cash sales", "cash-sale", "cash-sales", has_number=True), name="cash-sales") + +# Money-out +app.add_typer(make_resource_app("purchase orders", "purchase-order", "purchase-orders", has_number=True), name="purchase-orders") +app.add_typer(make_resource_app("vendor payments", "vendor-payment", "vendor-payments", has_number=True), name="vendor-payments") +app.add_typer(make_resource_app("vendor credits", "vendor-credit", "vendor-credits", has_number=True), name="vendor-credits") +app.add_typer(make_resource_app("vendor prepayments", "vendor-prepayment", "vendor-prepayments", has_number=True), name="vendor-prepayments") +app.add_typer( + make_resource_app("vendor prepayment applications", "vendor-prepayment-application", "vendor-prepayment-applications", has_number=True), name="vendor-prepayment-applications" +) +app.add_typer(make_resource_app("vendor refunds", "vendor-refund", "vendor-refunds", has_number=True), name="vendor-refunds") +app.add_typer(make_resource_app("direct expenses", "direct-expense", "direct-expenses", has_number=True), name="direct-expenses") + +# Accounting +app.add_typer(make_resource_app("journal entries", "journal-entry", "journal-entries", has_number=True), name="journal-entries") +app.add_typer(make_resource_app("bank transfers", "bank-transfer", "bank-transfers", has_number=True), name="bank-transfers") +app.add_typer(make_resource_app("fixed assets", "fixed-asset", "fixed-assets", has_number=True), name="fixed-assets") +app.add_typer(make_resource_app("depreciation books", "depreciation-book", "depreciation-books"), name="depreciation-books") + +# Entities +app.add_typer(make_resource_app("customers", "customer", "customers"), name="customers") +app.add_typer(make_resource_app("vendors", "vendor", "vendors"), name="vendors") +app.add_typer(make_resource_app("items", "item", "items"), name="items") +app.add_typer(make_resource_app("companies", "company", "companies"), name="companies") +app.add_typer(make_resource_app("classifications", "classification", "classifications"), name="classifications") + +# Recurring +recurring_app = typer.Typer(help="Manage recurring records", no_args_is_help=True, cls=HelpfulGroup) +recurring_app.add_typer(make_resource_app("recurring invoices", "recurring-invoice", "recurring/invoices", has_delete=True), name="invoices") +recurring_app.add_typer(make_resource_app("recurring bills", "recurring-bill", "recurring/bills", has_delete=True), name="bills") +recurring_app.add_typer(make_resource_app("recurring journal entries", "recurring-journal-entry", "recurring/journal-entries", has_delete=True), name="journal-entries") +app.add_typer(recurring_app, name="recurring") + +# Other +app.add_typer(make_resource_app("contracts", "contract", "contracts"), name="contracts") +app.add_typer(make_resource_app("budgets", "budget", "budgets"), name="budgets") +app.add_typer(make_resource_app("workflows", "workflow", "workflows", has_create=False, has_update=False), name="workflows") + + +def version_callback(value: bool): + if value: + from dualentry_cli import __version__ + + typer.echo(f"dualentry-cli {__version__}") + raise typer.Exit + + +@app.callback() +def main(version: bool = typer.Option(False, "--version", "-v", help="Show version and exit.", callback=version_callback, is_eager=True)): + """DualEntry accounting CLI.""" + from dualentry_cli.updater import check_for_updates + + check_for_updates() + + +@auth_app.command() +def login(api_url: str = typer.Option(None, "--api-url", help="API base URL override")): + """Log in to DualEntry via browser.""" + config = Config() + url = api_url or config.api_url + result = run_login_flow(api_url=url) + store_tokens(result["access_token"], result["refresh_token"]) + config.client_id = result["client_id"] + config.save() + typer.echo("Logged in successfully.") + + +@auth_app.command() +def logout(): + """Log out and clear stored credentials.""" + clear_credentials() + typer.echo("Logged out.") + + +@auth_app.command() +def status(): + """Show current authentication status.""" + env_key = os.environ.get("X_API_KEY") + if env_key: + typer.echo("Authenticated via X_API_KEY environment variable") + return + access_token, refresh_token = load_tokens() + if not access_token: + typer.echo("Not logged in. Run: dualentry auth login") + raise typer.Exit(code=1) + config = Config() + typer.echo(f"API URL: {config.api_url}") + typer.echo("Authenticated via OAuth tokens") + if refresh_token: + typer.echo("Refresh token: present") + + +@config_app.command("show") +def config_show(): + """Show current configuration.""" + config = Config() + typer.echo(f"Environment: {config.env_name}") + typer.echo(f"API URL: {config.api_url}") + typer.echo(f"Output format: {config.output}") + if config.client_id: + typer.echo(f"OAuth client ID: {config.client_id}") + + +@config_app.command("set-env") +def config_set_env(env: str = typer.Argument(help=f"Environment name: {', '.join(ENVIRONMENTS)}")): + """Switch between environments (dev, prod).""" + if env not in ENVIRONMENTS: + typer.echo(f"Unknown environment '{env}'. Available: {', '.join(ENVIRONMENTS)}") + raise typer.Exit(code=1) + config = Config() + config.api_url = ENVIRONMENTS[env] + config.save() + typer.echo(f"Switched to {env} ({ENVIRONMENTS[env]})") + + +@config_app.command("set-url") +def config_set_url(url: str = typer.Argument(help="Custom API base URL")): + """Set a custom API URL.""" + config = Config() + config.api_url = url + config.save() + typer.echo(f"API URL set to {url}") + + +def get_client(): + """Get an authenticated DualEntryClient.""" + from dualentry_cli.client import DualEntryClient + + config = Config() + env_key = os.environ.get("X_API_KEY") + if env_key: + return DualEntryClient(api_url=config.api_url, api_key=env_key) + access_token, refresh_token = load_tokens() + if not access_token: + typer.echo("Not logged in. Run: dualentry auth login") + raise typer.Exit(code=1) + return DualEntryClient( + api_url=config.api_url, + access_token=access_token, + refresh_token=refresh_token, + client_id=config.client_id, + ) + + +if __name__ == "__main__": + app() diff --git a/src/dualentry_cli/output.py b/src/dualentry_cli/output.py new file mode 100644 index 0000000..2c6ed04 --- /dev/null +++ b/src/dualentry_cli/output.py @@ -0,0 +1,1132 @@ +"""Output formatting for DualEntry CLI.""" + +from __future__ import annotations + +import json + +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +console = Console() + +_format: str = "human" + + +def set_format(fmt: str) -> None: + global _format + _format = fmt + + +def get_format() -> str: + return _format + + +def format_output(data: dict, resource: str = "generic", fmt: str | None = None) -> None: + effective_fmt = fmt or _format + if effective_fmt == "json": + print(json.dumps(data, indent=2)) + return + + if "items" in data and "count" in data: + _print_list(data, resource) + return + + _print_detail(data, resource) + + +# ── Dispatcher ─────────────────────────────────────────────────────── + +# Map resource types to (list_fn, detail_fn) +_FORMATTERS: dict[str, tuple] = {} + + +def _register(resource: str, list_fn, detail_fn): + _FORMATTERS[resource] = (list_fn, detail_fn) + + +def _print_list(data: dict, resource: str) -> None: + items = data["items"] + if not items: + console.print("No results.") + return + + list_fn = _FORMATTERS.get(resource, (None, None))[0] + if list_fn: + list_fn(items) + else: + _print_generic_list(items) + + if "count" in data: + console.print(f"\n[dim]Showing {len(items)} of {data['count']}[/dim]") + + +def _print_detail(data: dict, resource: str) -> None: + detail_fn = _FORMATTERS.get(resource, (None, None))[1] + if detail_fn: + detail_fn(data) + else: + _print_generic_detail(data) + + +# ── Shared: Transaction list (records with #, Date, Counterparty, Amount) ── + + +def _transaction_list( + items: list[dict], + title: str, + counterparty_label: str, + counterparty_field: str, + show_due_date: bool = False, + show_paid: bool = False, + show_remaining: bool = False, + show_memo: bool = False, +): + table = Table(title=title, show_lines=False) + table.add_column("#", style="bold", justify="right") + table.add_column("Date", justify="center") + table.add_column("Company") + table.add_column(counterparty_label, min_width=16) + if show_due_date: + table.add_column("Due Date", justify="center") + if show_memo: + table.add_column("Memo", max_width=20) + table.add_column("Currency", justify="center") + table.add_column("Amount", justify="right", style="bold") + if show_paid: + table.add_column("Paid", justify="right") + table.add_column("Due", justify="right") + if show_remaining: + table.add_column("Remaining", justify="right") + table.add_column("Status") + + for r in items: + currency = r.get("currency_iso_4217_code", "") + row = [ + str(r.get("number", "")), + r.get("date", "-"), + r.get("company_name", "-"), + r.get(counterparty_field) or "-", + ] + if show_due_date: + row.append(r.get("due_date") or "-") + if show_memo: + memo = r.get("memo", "") or "" + row.append(memo[:20] + ("..." if len(memo) > 20 else "")) + row.append(currency) + row.append(_money(r.get("amount"), currency)) + if show_paid: + row.append(_money(r.get("paid_total"), currency)) + row.append(_money(r.get("amount_due"), currency)) + if show_remaining: + row.append(_money(r.get("remaining_amount"), currency)) + row.append(_status_badge(r.get("record_status", ""))) + + table.add_row(*row) + + console.print(table) + + +# ── Shared: Transaction detail (records with line items, totals) ───── + + +def _transaction_detail( + record: dict, + record_type: str, + counterparty_label: str, + counterparty_field: str, + due_color: str = "green", +): + currency = record.get("currency_iso_4217_code") or record.get("company_currency", "") + + header = Text() + header.append(record_type.upper(), style="bold") + header.append(f" #{record.get('number', '')}", style="bold cyan") + status = record.get("record_status", "") + if status: + header.append(f" {status.upper()}", style=_status_color(status)) + console.print(Panel(header, expand=False)) + + grid = Table.grid(padding=(0, 4)) + grid.add_column(min_width=30) + grid.add_column(min_width=30) + + left = Text() + left.append("Company: ", style="dim") + left.append(record.get("company_name", "-"), style="bold") + + right = Text() + right.append(f"{counterparty_label}: ", style="dim") + right.append(record.get(counterparty_field) or "-", style="bold") + addr = record.get("bill_to_address", "") or record.get("address", "") + if addr: + right.append(f"\n{addr}") + + grid.add_row(left, right) + console.print(grid) + console.print() + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("Date:", record.get("date", "-")) + if record.get("due_date"): + details.add_row("Due Date:", record["due_date"]) + if record.get("reference_number"): + details.add_row("Reference:", record["reference_number"]) + if record.get("term_name"): + details.add_row("Terms:", record["term_name"]) + if record.get("currency_iso_4217_code"): + details.add_row("Currency:", record["currency_iso_4217_code"]) + console.print(details) + console.print() + + items = record.get("items", []) + if items: + items_table = Table(show_lines=True, title="Line Items") + items_table.add_column("#", justify="right", style="dim", width=4) + items_table.add_column("Description", min_width=30) + items_table.add_column("Qty", justify="right", width=10) + items_table.add_column("Rate", justify="right", width=14) + items_table.add_column("Amount", justify="right", width=14, style="bold") + + for i, item in enumerate(items, 1): + qty = item.get("quantity", "1") + rate = item.get("rate", "0") + amount = _calc_line_amount(qty, rate) + desc = item.get("memo") or f"Item #{item.get('item_id', i)}" + items_table.add_row(str(i), desc, _fmt_decimal(qty), _money(rate, currency), _money(amount, currency)) + + console.print(items_table) + + console.print() + totals = Table.grid(padding=(0, 2)) + totals.add_column(min_width=40) + totals.add_column(justify="right", style="dim", min_width=14) + totals.add_column(justify="right", min_width=14) + + totals.add_row("", "Total:", _money(record.get("amount"), currency)) + paid = record.get("paid_total") + if paid and paid != "0.00": + totals.add_row("", "Paid:", f"-{_money(paid, currency)}") + amount_due = record.get("amount_due") + if amount_due is not None: + totals.add_row("", "─" * 14, "─" * 14) + totals.add_row("", Text("Amount Due:", style="bold"), Text(_money(amount_due, currency), style=f"bold {due_color}")) + remaining = record.get("remaining_amount") + if remaining is not None: + totals.add_row("", "─" * 14, "─" * 14) + totals.add_row("", Text("Remaining:", style="bold"), Text(_money(remaining, currency), style="bold")) + + console.print(totals) + + if record.get("memo"): + console.print(f"\n[dim]Memo:[/dim] {record['memo']}") + + +# ── Invoice ────────────────────────────────────────────────────────── + + +def _invoice_list(items): + _transaction_list(items, "Invoices", "Customer", "customer_name", show_due_date=True, show_paid=True) + + +def _invoice_detail(r): + _transaction_detail(r, "Invoice", "Customer", "customer_name", due_color="green") + + +_register("invoice", _invoice_list, _invoice_detail) + + +# ── Bill ───────────────────────────────────────────────────────────── + + +def _bill_list(items): + _transaction_list(items, "Bills", "Vendor", "vendor_name", show_due_date=True, show_paid=True) + + +def _bill_detail(r): + _transaction_detail(r, "Bill", "Vendor", "vendor_name", due_color="red") + + +_register("bill", _bill_list, _bill_detail) + + +# ── Sales Order ────────────────────────────────────────────────────── + + +def _sales_order_list(items): + _transaction_list(items, "Sales Orders", "Customer", "customer_name") + + +def _sales_order_detail(r): + _transaction_detail(r, "Sales Order", "Customer", "customer_name") + + +_register("sales-order", _sales_order_list, _sales_order_detail) + + +# ── Purchase Order ─────────────────────────────────────────────────── + + +def _purchase_order_list(items): + _transaction_list(items, "Purchase Orders", "Vendor", "vendor_name") + + +def _purchase_order_detail(r): + _transaction_detail(r, "Purchase Order", "Vendor", "vendor_name") + + +_register("purchase-order", _purchase_order_list, _purchase_order_detail) + + +# ── Cash Sale ──────────────────────────────────────────────────────── + + +def _cash_sale_list(items): + _transaction_list(items, "Cash Sales", "Customer", "customer_name") + + +def _cash_sale_detail(r): + _transaction_detail(r, "Cash Sale", "Customer", "customer_name") + + +_register("cash-sale", _cash_sale_list, _cash_sale_detail) + + +# ── Direct Expense ─────────────────────────────────────────────────── + + +def _direct_expense_list(items): + _transaction_list(items, "Direct Expenses", "Vendor", "vendor_name") + + +def _direct_expense_detail(r): + _transaction_detail(r, "Direct Expense", "Vendor", "vendor_name") + + +_register("direct-expense", _direct_expense_list, _direct_expense_detail) + + +# ── Customer Payments ──────────────────────────────────────────────── + + +def _customer_payment_list(items): + _transaction_list(items, "Customer Payments", "Customer", "customer_name", show_memo=True) + + +def _customer_payment_detail(r): + _transaction_detail(r, "Customer Payment", "Customer", "customer_name") + + +_register("customer-payment", _customer_payment_list, _customer_payment_detail) + + +# ── Customer Credits ───────────────────────────────────────────────── + + +def _customer_credit_list(items): + _transaction_list(items, "Customer Credits", "Customer", "customer_name", show_remaining=True) + + +def _customer_credit_detail(r): + _transaction_detail(r, "Customer Credit", "Customer", "customer_name") + + +_register("customer-credit", _customer_credit_list, _customer_credit_detail) + + +# ── Customer Prepayments ───────────────────────────────────────────── + + +def _customer_prepayment_list(items): + _transaction_list(items, "Customer Prepayments", "Customer", "customer_name", show_remaining=True) + + +def _customer_prepayment_detail(r): + _transaction_detail(r, "Customer Prepayment", "Customer", "customer_name") + + +_register("customer-prepayment", _customer_prepayment_list, _customer_prepayment_detail) + + +# ── Customer Prepayment Applications ───────────────────────────────── + + +def _customer_prepayment_app_list(items): + _transaction_list(items, "Customer Prepayment Applications", "Customer", "customer_name") + + +def _customer_prepayment_app_detail(r): + _transaction_detail(r, "Customer Prepayment Application", "Customer", "customer_name") + + +_register("customer-prepayment-application", _customer_prepayment_app_list, _customer_prepayment_app_detail) + + +# ── Customer Deposits ──────────────────────────────────────────────── + + +def _customer_deposit_list(items): + _transaction_list(items, "Customer Deposits", "Customer", "customer_name", show_memo=True) + + +def _customer_deposit_detail(r): + _transaction_detail(r, "Customer Deposit", "Customer", "customer_name") + + +_register("customer-deposit", _customer_deposit_list, _customer_deposit_detail) + + +# ── Customer Refunds ───────────────────────────────────────────────── + + +def _customer_refund_list(items): + _transaction_list(items, "Customer Refunds", "Customer", "customer_name") + + +def _customer_refund_detail(r): + _transaction_detail(r, "Customer Refund", "Customer", "customer_name") + + +_register("customer-refund", _customer_refund_list, _customer_refund_detail) + + +# ── Vendor Payments ────────────────────────────────────────────────── + + +def _vendor_payment_list(items): + _transaction_list(items, "Vendor Payments", "Vendor", "vendor_name", show_memo=True) + + +def _vendor_payment_detail(r): + _transaction_detail(r, "Vendor Payment", "Vendor", "vendor_name") + + +_register("vendor-payment", _vendor_payment_list, _vendor_payment_detail) + + +# ── Vendor Credits ─────────────────────────────────────────────────── + + +def _vendor_credit_list(items): + _transaction_list(items, "Vendor Credits", "Vendor", "vendor_name", show_remaining=True) + + +def _vendor_credit_detail(r): + _transaction_detail(r, "Vendor Credit", "Vendor", "vendor_name") + + +_register("vendor-credit", _vendor_credit_list, _vendor_credit_detail) + + +# ── Vendor Prepayments ─────────────────────────────────────────────── + + +def _vendor_prepayment_list(items): + _transaction_list(items, "Vendor Prepayments", "Vendor", "vendor_name", show_remaining=True) + + +def _vendor_prepayment_detail(r): + _transaction_detail(r, "Vendor Prepayment", "Vendor", "vendor_name") + + +_register("vendor-prepayment", _vendor_prepayment_list, _vendor_prepayment_detail) + + +# ── Vendor Prepayment Applications ─────────────────────────────────── + + +def _vendor_prepayment_app_list(items): + _transaction_list(items, "Vendor Prepayment Applications", "Vendor", "vendor_name") + + +def _vendor_prepayment_app_detail(r): + _transaction_detail(r, "Vendor Prepayment Application", "Vendor", "vendor_name") + + +_register("vendor-prepayment-application", _vendor_prepayment_app_list, _vendor_prepayment_app_detail) + + +# ── Vendor Refunds ─────────────────────────────────────────────────── + + +def _vendor_refund_list(items): + _transaction_list(items, "Vendor Refunds", "Vendor", "vendor_name") + + +def _vendor_refund_detail(r): + _transaction_detail(r, "Vendor Refund", "Vendor", "vendor_name") + + +_register("vendor-refund", _vendor_refund_list, _vendor_refund_detail) + + +# ── Journal Entry ──────────────────────────────────────────────────── + + +def _journal_entry_list(items): + _transaction_list(items, "Journal Entries", "Memo", "memo") + + +def _journal_entry_detail(r): + currency = r.get("currency_iso_4217_code") or r.get("company_currency", "") + + header = Text() + header.append("JOURNAL ENTRY", style="bold") + header.append(f" #{r.get('number', '')}", style="bold cyan") + status = r.get("record_status", "") + if status: + header.append(f" {status.upper()}", style=_status_color(status)) + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("Date:", r.get("date", "-")) + details.add_row("Company:", r.get("company_name", "-")) + details.add_row("Currency:", currency or "-") + if r.get("memo"): + details.add_row("Memo:", r["memo"]) + console.print(details) + console.print() + + items = r.get("items", []) + if items: + je_table = Table(show_lines=True, title="Entries") + je_table.add_column("#", justify="right", style="dim", width=4) + je_table.add_column("Account", min_width=25) + je_table.add_column("Description", min_width=20) + je_table.add_column("Debit", justify="right", width=14) + je_table.add_column("Credit", justify="right", width=14) + + for i, item in enumerate(items, 1): + je_table.add_row( + str(i), + str(item.get("account_name", item.get("account_id", ""))), + item.get("memo", ""), + _money(item.get("debit"), currency) if item.get("debit") else "", + _money(item.get("credit"), currency) if item.get("credit") else "", + ) + + console.print(je_table) + + +_register("journal-entry", _journal_entry_list, _journal_entry_detail) + + +# ── Bank Transfer ──────────────────────────────────────────────────── + + +def _bank_transfer_list(items): + table = Table(title="Bank Transfers", show_lines=False) + table.add_column("#", style="bold", justify="right") + table.add_column("Date", justify="center") + table.add_column("Company") + table.add_column("From Account", min_width=14) + table.add_column("Sent", justify="right") + table.add_column("To Account", min_width=14) + table.add_column("Received", justify="right") + table.add_column("Status") + + for r in items: + send_currency = r.get("credit_bank_account_currency", "") + recv_currency = r.get("debit_bank_account_currency", "") + table.add_row( + str(r.get("number", "")), + r.get("date", "-"), + r.get("company_name", "-"), + r.get("credit_bank_account_name", "-"), + _money(r.get("amount"), send_currency), + r.get("debit_bank_account_name", "-"), + _money(r.get("receiving_amount"), recv_currency), + _status_badge(r.get("record_status", "")), + ) + + console.print(table) + + +def _bank_transfer_detail(r): + header = Text() + header.append("BANK TRANSFER", style="bold") + header.append(f" #{r.get('number', '')}", style="bold cyan") + status = r.get("record_status", "") + if status: + header.append(f" {status.upper()}", style=_status_color(status)) + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=20) + details.add_column() + details.add_row("Date:", r.get("date", "-")) + details.add_row("Company:", r.get("company_name", "-")) + details.add_row("From:", r.get("credit_bank_account_name", "-")) + details.add_row("Sending Amount:", _money(r.get("amount"), r.get("credit_bank_account_currency", ""))) + details.add_row("To:", r.get("debit_bank_account_name", "-")) + details.add_row("Receiving Amount:", _money(r.get("receiving_amount"), r.get("debit_bank_account_currency", ""))) + if r.get("exchange_rate") and r["exchange_rate"] != "1": + details.add_row("Exchange Rate:", r["exchange_rate"]) + if r.get("memo"): + details.add_row("Memo:", r["memo"]) + console.print(details) + + +_register("bank-transfer", _bank_transfer_list, _bank_transfer_detail) + + +# ── Fixed Asset ────────────────────────────────────────────────────── + + +def _fixed_asset_list(items): + table = Table(title="Fixed Assets", show_lines=False) + table.add_column("#", style="bold", justify="right") + table.add_column("Name", min_width=20) + table.add_column("Company") + table.add_column("Purchase Date", justify="center") + table.add_column("Cost", justify="right", style="bold") + table.add_column("Status") + + for r in items: + table.add_row( + str(r.get("number", "")), + r.get("name", "-"), + r.get("company_name", "-"), + r.get("purchase_date", "-"), + _money(r.get("cost"), r.get("currency_iso_4217_code", "")), + _status_badge(r.get("record_status", r.get("status", ""))), + ) + + console.print(table) + + +def _fixed_asset_detail(r): + header = Text() + header.append("FIXED ASSET", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + status = r.get("record_status", r.get("status", "")) + if status: + header.append(f" {status.upper()}", style=_status_color(status)) + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=20) + details.add_column() + details.add_row("Number:", str(r.get("number", "-"))) + details.add_row("Company:", r.get("company_name", "-")) + details.add_row("Purchase Date:", r.get("purchase_date", "-")) + details.add_row("Cost:", _money(r.get("cost"), r.get("currency_iso_4217_code", ""))) + if r.get("serial_number"): + details.add_row("Serial Number:", r["serial_number"]) + if r.get("memo"): + details.add_row("Description:", r["memo"]) + console.print(details) + + +_register("fixed-asset", _fixed_asset_list, _fixed_asset_detail) + + +# ── Customer ───────────────────────────────────────────────────────── + + +def _customer_list(items): + table = Table(title="Customers", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Type") + table.add_column("Email") + table.add_column("Phone") + table.add_column("Status") + + for r in items: + table.add_row( + str(r.get("id", "")), + r.get("name", "-"), + r.get("customer_type", "-"), + r.get("email", "-"), + r.get("phone", "-"), + _status_badge(r.get("record_status", "")), + ) + + console.print(table) + + +def _customer_detail(r): + header = Text() + header.append("CUSTOMER", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Type:", r.get("customer_type", "-")) + if r.get("email"): + details.add_row("Email:", r["email"]) + if r.get("phone"): + details.add_row("Phone:", r["phone"]) + if r.get("website"): + details.add_row("Website:", r["website"]) + if r.get("parent_name"): + details.add_row("Parent:", r["parent_name"]) + console.print(details) + + +_register("customer", _customer_list, _customer_detail) + + +# ── Vendor ─────────────────────────────────────────────────────────── + + +def _vendor_list(items): + table = Table(title="Vendors", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Type") + table.add_column("Email") + table.add_column("Phone") + table.add_column("Status") + + for r in items: + table.add_row( + str(r.get("id", "")), + r.get("name", "-"), + r.get("vendor_type", "-"), + r.get("email", "-"), + r.get("phone", "-"), + _status_badge(r.get("record_status", "")), + ) + + console.print(table) + + +def _vendor_detail(r): + header = Text() + header.append("VENDOR", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Type:", r.get("vendor_type", "-")) + if r.get("email"): + details.add_row("Email:", r["email"]) + if r.get("phone"): + details.add_row("Phone:", r["phone"]) + if r.get("website"): + details.add_row("Website:", r["website"]) + console.print(details) + + +_register("vendor", _vendor_list, _vendor_detail) + + +# ── Item ───────────────────────────────────────────────────────────── + + +def _item_list(items): + table = Table(title="Items", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("SKU") + table.add_column("Type") + table.add_column("Expense Account") + table.add_column("Income Account") + + for r in items: + table.add_row( + str(r.get("id", "")), + r.get("name", "-"), + r.get("sku", "-"), + r.get("item_type", "-"), + str(r.get("expense_account_id", "-")), + str(r.get("income_account_id", "-")), + ) + + console.print(table) + + +def _item_detail(r): + header = Text() + header.append("ITEM", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=18) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("SKU:", r.get("sku", "-")) + details.add_row("Type:", r.get("item_type", "-")) + if r.get("expense_account_id"): + details.add_row("Expense Account:", str(r["expense_account_id"])) + if r.get("income_account_id"): + details.add_row("Income Account:", str(r["income_account_id"])) + if r.get("description"): + details.add_row("Description:", r["description"]) + console.print(details) + + +_register("item", _item_list, _item_detail) + + +# ── Account ────────────────────────────────────────────────────────── + + +def _account_list(items): + table = Table(title="Chart of Accounts", show_lines=False) + table.add_column("Number", justify="right", style="bold") + table.add_column("Name", min_width=28) + table.add_column("Type") + table.add_column("Description", max_width=30) + table.add_column("Active") + + for r in items: + active = "[green]active[/green]" if r.get("is_active", True) else "[dim]inactive[/dim]" + desc = r.get("description", "") or "" + table.add_row( + str(r.get("number", "")), + r.get("name", "-"), + r.get("account_type", "-"), + desc[:30] + ("..." if len(desc) > 30 else ""), + active, + ) + + console.print(table) + + +def _account_detail(r): + header = Text() + header.append("ACCOUNT", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("Number:", str(r.get("number", "-"))) + details.add_row("Type:", r.get("account_type", "-")) + details.add_row("Currency:", r.get("currency_iso_4217_code", "-")) + if r.get("description"): + details.add_row("Description:", r["description"]) + active = "Active" if r.get("is_active", True) else "Inactive" + details.add_row("Status:", active) + console.print(details) + + +_register("account", _account_list, _account_detail) + + +# ── Company ────────────────────────────────────────────────────────── + + +def _company_list(items): + table = Table(title="Companies", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Currency") + table.add_column("Country") + + for r in items: + table.add_row( + str(r.get("id", "")), + r.get("name", "-"), + r.get("currency_iso_4217_code", "-"), + r.get("country", "-"), + ) + + console.print(table) + + +def _company_detail(r): + header = Text() + header.append("COMPANY", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Currency:", r.get("currency_iso_4217_code", "-")) + if r.get("country"): + details.add_row("Country:", r["country"]) + console.print(details) + + +_register("company", _company_list, _company_detail) + + +# ── Contract ───────────────────────────────────────────────────────── + + +def _contract_list(items): + table = Table(title="Contracts", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("#", style="bold", justify="right") + table.add_column("Name", min_width=20) + table.add_column("Customer") + table.add_column("Start", justify="center") + table.add_column("End", justify="center") + table.add_column("Currency") + table.add_column("Status") + + for r in items: + table.add_row( + str(r.get("id", "")), + str(r.get("number", r.get("id", ""))), + r.get("name", "-"), + r.get("customer_name", "-"), + r.get("start_date", "-"), + r.get("end_date", "-"), + r.get("currency_iso_4217_code", "-"), + _status_badge(r.get("status", "")), + ) + + console.print(table) + + +def _contract_detail(r): + header = Text() + header.append("CONTRACT", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + status = r.get("status", "") + if status: + header.append(f" {status.upper()}", style=_status_color(status)) + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Customer:", r.get("customer_name", "-")) + details.add_row("Company:", r.get("company_name", "-")) + details.add_row("Start Date:", r.get("start_date", "-")) + details.add_row("End Date:", r.get("end_date", "-")) + details.add_row("Currency:", r.get("currency_iso_4217_code", "-")) + console.print(details) + + +_register("contract", _contract_list, _contract_detail) + + +# ── Budget ─────────────────────────────────────────────────────────── + + +def _budget_list(items): + table = Table(title="Budgets", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Company") + table.add_column("Start", justify="center") + table.add_column("End", justify="center") + + for r in items: + table.add_row( + str(r.get("id", "")), + r.get("name", "-"), + r.get("company_name", "-"), + r.get("start_date", "-"), + r.get("end_date", "-"), + ) + + console.print(table) + + +def _budget_detail(r): + header = Text() + header.append("BUDGET", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Company:", r.get("company_name", "-")) + details.add_row("Start:", r.get("start_date", "-")) + details.add_row("End:", r.get("end_date", "-")) + console.print(details) + + +_register("budget", _budget_list, _budget_detail) + + +# ── Classification ─────────────────────────────────────────────────── + + +def _classification_list(items): + table = Table(title="Classifications", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + + for r in items: + table.add_row(str(r.get("id", "")), r.get("name", "-")) + + console.print(table) + + +def _classification_detail(r): + header = Text() + header.append("CLASSIFICATION", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Name:", r.get("name", "-")) + console.print(details) + + +_register("classification", _classification_list, _classification_detail) + + +# ── Depreciation Book ──────────────────────────────────────────────── + + +def _depreciation_book_list(items): + table = Table(title="Depreciation Books", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Method") + + for r in items: + table.add_row(str(r.get("id", "")), r.get("name", "-"), r.get("method", "-")) + + console.print(table) + + +def _depreciation_book_detail(r): + header = Text() + header.append("DEPRECIATION BOOK", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Name:", r.get("name", "-")) + details.add_row("Method:", r.get("method", "-")) + console.print(details) + + +_register("depreciation-book", _depreciation_book_list, _depreciation_book_detail) + + +# ── Workflow ───────────────────────────────────────────────────────── + + +def _workflow_list(items): + table = Table(title="Workflows", show_lines=False) + table.add_column("ID", style="dim", justify="right") + table.add_column("Name", min_width=24, style="bold") + table.add_column("Status") + + for r in items: + table.add_row(str(r.get("id", "")), r.get("name", "-"), _status_badge(r.get("status", ""))) + + console.print(table) + + +def _workflow_detail(r): + header = Text() + header.append("WORKFLOW", style="bold") + header.append(f" {r.get('name', '')}", style="bold cyan") + console.print(Panel(header, expand=False)) + + details = Table.grid(padding=(0, 2)) + details.add_column(style="dim", min_width=16) + details.add_column() + details.add_row("ID:", str(r.get("id", "-"))) + details.add_row("Name:", r.get("name", "-")) + console.print(details) + + +_register("workflow", _workflow_list, _workflow_detail) + + +# ── Recurring records (use generic transaction pattern) ────────────── + +for _prefix in ("recurring-invoice", "recurring-bill", "recurring-journal-entry"): + _register(_prefix, None, None) # falls back to generic + + +# ── Generic fallback ───────────────────────────────────────────────── + + +def _print_generic_list(items: list[dict]) -> None: + if not items: + console.print("No results.") + return + table = Table() + # Pick useful columns: skip nested objects and long fields + columns = [k for k in items[0] if not isinstance(items[0][k], (dict, list))][:12] + for col in columns: + table.add_column(col) + for item in items: + table.add_row(*[str(item.get(col, ""))[:40] for col in columns]) + console.print(table) + + +def _print_generic_detail(item: dict) -> None: + table = Table(show_header=False) + table.add_column("Field", style="bold") + table.add_column("Value") + for key, value in item.items(): + if isinstance(value, (dict, list)): + value = json.dumps(value, indent=2) + table.add_row(key, str(value)[:100]) + console.print(table) + + +# ── Helpers ────────────────────────────────────────────────────────── + + +def _money(value, currency: str = "") -> str: + if value is None: + return "-" + try: + num = float(value) + except (ValueError, TypeError): + return str(value) + symbol = _currency_symbol(currency) + return f"{symbol}{num:,.2f}" + + +def _currency_symbol(code: str) -> str: + symbols = {"USD": "$", "EUR": "\u20ac", "GBP": "\u00a3", "CAD": "CA$", "AUD": "A$"} + return symbols.get(code, f"{code} " if code else "") + + +def _fmt_decimal(value) -> str: + try: + num = float(value) + except (ValueError, TypeError): + return str(value) + if num == int(num): + return str(int(num)) + return f"{num:.2f}" + + +def _calc_line_amount(qty, rate) -> str: + try: + return f"{float(qty) * float(rate):.2f}" + except (ValueError, TypeError): + return "0.00" + + +def _status_color(status: str) -> str: + return {"draft": "yellow", "posted": "green", "archived": "dim", "active": "green", "inactive": "dim"}.get(status, "white") + + +def _status_badge(status: str) -> str: + if not status: + return "-" + color = _status_color(status) + return f"[{color}]{status}[/{color}]" diff --git a/src/dualentry_cli/updater.py b/src/dualentry_cli/updater.py new file mode 100644 index 0000000..2cc34f3 --- /dev/null +++ b/src/dualentry_cli/updater.py @@ -0,0 +1,94 @@ +"""Auto-update checker for DualEntry CLI.""" + +from __future__ import annotations + +import json +import subprocess +import time +from pathlib import Path + +import typer + +from dualentry_cli import __version__ + +_CACHE_DIR = Path.home() / ".dualentry" +_UPDATE_CACHE = _CACHE_DIR / ".update_check.json" +_CHECK_INTERVAL = 86400 # 24 hours +_REPO = "git+https://github.com/dualentry/dualentry-cli.git" + + +def _read_cache() -> dict: + if not _UPDATE_CACHE.exists(): + return {} + try: + return json.loads(_UPDATE_CACHE.read_text()) + except (json.JSONDecodeError, OSError): + return {} + + +def _write_cache(data: dict) -> None: + _CACHE_DIR.mkdir(parents=True, exist_ok=True) + _UPDATE_CACHE.write_text(json.dumps(data)) + + +def _fetch_latest_version() -> str | None: + """Fetch latest version from GitHub by reading __init__.py from main branch.""" + try: + result = subprocess.run( + ["git", "ls-remote", "--refs", "https://github.com/dualentry/dualentry-cli.git", "refs/tags/v*"], # noqa: S607 + capture_output=True, + text=True, + timeout=5, + check=False, + ) + if result.returncode != 0 or not result.stdout.strip(): + return None + # Parse tags like "refs/tags/v0.2.0" and pick the latest + tags = [] + for line in result.stdout.strip().splitlines(): + ref = line.split("refs/tags/")[-1] + if ref.startswith("v"): + tags.append(ref[1:]) # strip "v" + if not tags: + return None + tags.sort(key=lambda v: [int(x) for x in v.split(".")], reverse=True) + return tags[0] + except (subprocess.TimeoutExpired, OSError, ValueError): + return None + + +def check_for_updates() -> None: + """Show update notice from last cached check, then refresh cache in background.""" + cache = _read_cache() + + # Show cached result immediately (no blocking) + cached_latest = cache.get("latest_version") + if cached_latest and cached_latest != __version__ and _is_newer(cached_latest, __version__): + typer.secho( + f"\nUpdate available: {__version__} → {cached_latest}. Run: uv tool upgrade dualentry-cli", + fg=typer.colors.YELLOW, + err=True, + ) + + # Refresh cache in background if stale + last_check = cache.get("last_check", 0) + if time.time() - last_check >= _CHECK_INTERVAL: + import threading + + threading.Thread(target=_refresh_update_cache, daemon=True).start() + + +def _refresh_update_cache() -> None: + """Fetch latest version and update the cache file (runs in background thread).""" + latest = _fetch_latest_version() + _write_cache({"last_check": time.time(), "latest_version": latest}) + + +def _is_newer(latest: str, current: str) -> bool: + try: + latest_parts = [int(x) for x in latest.split(".")] + current_parts = [int(x) for x in current.split(".")] + except ValueError: + return False + else: + return latest_parts > current_parts diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_auth.py b/tests/test_auth.py new file mode 100644 index 0000000..6080942 --- /dev/null +++ b/tests/test_auth.py @@ -0,0 +1,76 @@ +import base64 +import hashlib +from unittest.mock import patch + +import httpx +import respx + + +class TestPKCE: + def test_generate_pkce_pair(self): + from dualentry_cli.auth import _generate_pkce_pair + + verifier, challenge = _generate_pkce_pair() + assert len(verifier) >= 43 + expected = base64.urlsafe_b64encode(hashlib.sha256(verifier.encode()).digest()).rstrip(b"=").decode("ascii") + assert challenge == expected + + +class TestCredentialStorage: + def test_store_and_load_tokens(self): + from dualentry_cli.auth import load_tokens, store_tokens + + with patch("dualentry_cli.auth.keyring") as mock_keyring: + mock_keyring.get_password.side_effect = lambda _svc, key: {"access_token": "acc_123", "refresh_token": "ref_456"}.get(key) + store_tokens("acc_123", "ref_456") + assert mock_keyring.set_password.call_count == 2 + access, refresh = load_tokens() + assert access == "acc_123" + assert refresh == "ref_456" + + def test_clear_credentials(self): + from dualentry_cli.auth import clear_credentials + + with patch("dualentry_cli.auth.keyring") as mock_keyring: + clear_credentials() + assert mock_keyring.delete_password.call_count == 3 + + +class TestRegisterClient: + @respx.mock + def test_registers_oauth_client(self): + from dualentry_cli.auth import _register_client + + route = respx.post("https://api.dualentry.com/mcp/register").mock(return_value=httpx.Response(200, json={"client_id": "client_abc", "client_secret": ""})) + result = _register_client("https://api.dualentry.com/mcp", "http://localhost:9876/callback") + assert result["client_id"] == "client_abc" + assert route.called + + +class TestExchangeToken: + @respx.mock + def test_exchanges_code_for_tokens(self): + from dualentry_cli.auth import _exchange_token + + route = respx.post("https://api.dualentry.com/mcp/token").mock( + return_value=httpx.Response(200, json={"access_token": "acc_xyz", "refresh_token": "ref_xyz", "expires_in": 43200, "token_type": "Bearer"}) + ) + result = _exchange_token( + mcp_url="https://api.dualentry.com/mcp", client_id="client_abc", code="auth_code_123", code_verifier="test_verifier", redirect_uri="http://localhost:9876/callback" + ) + assert result["access_token"] == "acc_xyz" + assert result["refresh_token"] == "ref_xyz" + assert route.called + + +class TestRefreshToken: + @respx.mock + def test_refreshes_access_token(self): + from dualentry_cli.auth import refresh_access_token + + route = respx.post("https://api.dualentry.com/mcp/token").mock( + return_value=httpx.Response(200, json={"access_token": "acc_new", "refresh_token": "ref_new", "expires_in": 43200, "token_type": "Bearer"}) + ) + result = refresh_access_token(mcp_url="https://api.dualentry.com/mcp", client_id="client_abc", refresh_token="ref_old") + assert result["access_token"] == "acc_new" + assert route.called diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..7dc7e27 --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,46 @@ +import httpx +import pytest +import respx + + +class TestDualEntryClient: + def test_sets_api_key_header(self): + from dualentry_cli.client import DualEntryClient + + client = DualEntryClient(api_url="https://api.dualentry.com", api_key="org_live_xxxx_secret") + assert client._client.headers["X-API-KEY"] == "org_live_xxxx_secret" + + @respx.mock + def test_get_request(self): + from dualentry_cli.client import DualEntryClient + + route = respx.get("https://api.dualentry.com/public/v2/invoices/").mock(return_value=httpx.Response(200, json={"items": [], "count": 0})) + client = DualEntryClient(api_url="https://api.dualentry.com", api_key="test_key") + data = client.get("/invoices/") + assert data == {"items": [], "count": 0} + assert route.called + + @respx.mock + def test_post_request(self): + from dualentry_cli.client import DualEntryClient + + respx.post("https://api.dualentry.com/public/v2/invoices/").mock(return_value=httpx.Response(201, json={"id": 1, "number": "INV-001"})) + client = DualEntryClient(api_url="https://api.dualentry.com", api_key="test_key") + data = client.post("/invoices/", json={"customer_id": 1}) + assert data == {"id": 1, "number": "INV-001"} + + @respx.mock + def test_handles_error_response(self): + from dualentry_cli.client import APIError, DualEntryClient + + respx.get("https://api.dualentry.com/public/v2/invoices/").mock(return_value=httpx.Response(403, json={"success": False, "errors": {"__all__": ["Access denied"]}})) + client = DualEntryClient(api_url="https://api.dualentry.com", api_key="test_key") + with pytest.raises(APIError, match="403"): + client.get("/invoices/") + + def test_from_env_uses_api_key_env_var(self, monkeypatch): + from dualentry_cli.client import DualEntryClient + + monkeypatch.setenv("X_API_KEY", "env_key_123") + client = DualEntryClient.from_env(api_url="https://api.dualentry.com") + assert client._client.headers["X-API-KEY"] == "env_key_123" diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..a2d49e4 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,76 @@ +import json +from unittest.mock import MagicMock, patch + +import pytest +from typer.testing import CliRunner + +from dualentry_cli.main import app + +runner = CliRunner() + + +@pytest.fixture(autouse=True) +def mock_get_client(): + mock_client = MagicMock() + with patch("dualentry_cli.main.get_client", return_value=mock_client): + yield mock_client + + +class TestInvoiceCommands: + def test_invoices_list(self, mock_get_client): + mock_get_client.get.return_value = {"items": [{"id": 1, "number": "INV-001", "total": "100.00"}], "count": 1} + result = runner.invoke(app, ["invoices", "list"]) + assert result.exit_code == 0 + assert "INV" in result.output + mock_get_client.get.assert_called_once_with("/invoices/", params={"limit": 20, "offset": 0}) + + def test_invoices_list_with_pagination(self, mock_get_client): + mock_get_client.get.return_value = {"items": [], "count": 0} + result = runner.invoke(app, ["invoices", "list", "--limit", "50", "--offset", "10"]) + assert result.exit_code == 0 + mock_get_client.get.assert_called_once_with("/invoices/", params={"limit": 50, "offset": 10}) + + def test_invoices_list_json_output(self, mock_get_client): + mock_get_client.get.return_value = {"items": [], "count": 0} + result = runner.invoke(app, ["invoices", "list", "--format", "json"]) + assert result.exit_code == 0 + parsed = json.loads(result.output) + assert parsed == {"items": [], "count": 0} + + def test_invoices_get(self, mock_get_client): + mock_get_client.get.return_value = {"id": 1, "number": "INV-001", "total": "100.00"} + result = runner.invoke(app, ["invoices", "get", "1"]) + assert result.exit_code == 0 + assert "INV-001" in result.output + mock_get_client.get.assert_called_once_with("/invoices/1/") + + def test_invoices_create(self, mock_get_client, tmp_path): + invoice_data = {"customer_id": 1, "lines": []} + data_file = tmp_path / "invoice.json" + data_file.write_text(json.dumps(invoice_data)) + mock_get_client.post.return_value = {"id": 1, "number": "INV-001"} + result = runner.invoke(app, ["invoices", "create", "--file", str(data_file)]) + assert result.exit_code == 0 + mock_get_client.post.assert_called_once_with("/invoices/", json=invoice_data) + + +class TestBillCommands: + def test_bills_list(self, mock_get_client): + mock_get_client.get.return_value = {"items": [], "count": 0} + result = runner.invoke(app, ["bills", "list"]) + assert result.exit_code == 0 + mock_get_client.get.assert_called_once_with("/bills/", params={"limit": 20, "offset": 0}) + + +class TestAccountCommands: + def test_accounts_list(self, mock_get_client): + mock_get_client.get.return_value = {"items": [], "count": 0} + result = runner.invoke(app, ["accounts", "list"]) + assert result.exit_code == 0 + mock_get_client.get.assert_called_once_with("/accounts/", params={"limit": 20, "offset": 0}) + + def test_accounts_get(self, mock_get_client): + mock_get_client.get.return_value = {"id": 1, "name": "Cash", "number": "1000"} + result = runner.invoke(app, ["accounts", "get", "1"]) + assert result.exit_code == 0 + assert "Cash" in result.output diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..4fb7e6e --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,29 @@ +class TestConfig: + def test_default_config(self, tmp_path): + from dualentry_cli.config import Config + + config = Config(config_dir=tmp_path) + assert config.api_url == "https://api.dualentry.com" + assert config.output == "table" + + def test_load_config_from_file(self, tmp_path): + from dualentry_cli.config import Config + + config_file = tmp_path / "config.toml" + config_file.write_text('[default]\napi_url = "https://api-dev.dualentry.com"\noutput = "json"\n\n[auth]\norganization_id = 123\nuser_email = "test@example.com"\n') + config = Config(config_dir=tmp_path) + assert config.api_url == "https://api-dev.dualentry.com" + assert config.output == "json" + assert config.organization_id == 123 + assert config.user_email == "test@example.com" + + def test_save_config(self, tmp_path): + from dualentry_cli.config import Config + + config = Config(config_dir=tmp_path) + config.organization_id = 456 + config.user_email = "user@test.com" + config.save() + config2 = Config(config_dir=tmp_path) + assert config2.organization_id == 456 + assert config2.user_email == "user@test.com" diff --git a/tests/test_integration.py b/tests/test_integration.py new file mode 100644 index 0000000..21fdca4 --- /dev/null +++ b/tests/test_integration.py @@ -0,0 +1,218 @@ +""" +Integration tests against a live DualEntry API. + +Requires: + - X_API_KEY env var set + - API server running (default: http://localhost:8000) + +Run: + X_API_KEY=... pytest tests/test_integration.py -v + X_API_KEY=... pytest tests/test_integration.py -v -k invoices +""" + +from __future__ import annotations + +import os + +import pytest + +from dualentry_cli.client import DualEntryClient +from dualentry_cli.config import Config + +# ── Fixtures ──────────────────────────────────────────────────────── + + +@pytest.fixture(scope="session") +def client(): + api_key = os.environ.get("X_API_KEY") + if not api_key: + pytest.skip("X_API_KEY not set") + config = Config() + c = DualEntryClient(api_url=config.api_url, api_key=api_key) + yield c + c.close() + + +# ── Helpers ───────────────────────────────────────────────────────── + + +def assert_list(data: dict, min_count: int = 0): + """Assert data looks like a paginated list response.""" + assert "items" in data, f"Missing 'items' key, got: {list(data.keys())}" + assert "count" in data, f"Missing 'count' key, got: {list(data.keys())}" + assert isinstance(data["items"], list) + assert data["count"] >= min_count + + +def assert_record(data: dict, required_keys: list[str]): + """Assert data is a single record with expected keys.""" + for key in required_keys: + assert key in data, f"Missing key '{key}' in record. Keys: {list(data.keys())}" + + +# ── Numbered resources (get by number) ────────────────────────────── + +NUMBERED_RESOURCES = [ + ("invoices", ["number", "date", "currency_iso_4217_code"]), + ("bills", ["number", "date", "currency_iso_4217_code"]), + ("sales-orders", ["number", "date", "currency_iso_4217_code"]), + ("purchase-orders", ["number", "date", "currency_iso_4217_code"]), + ("customer-payments", ["number", "date", "currency_iso_4217_code"]), + ("customer-credits", ["number", "date", "currency_iso_4217_code"]), + ("customer-prepayments", ["number", "date", "currency_iso_4217_code"]), + ("customer-prepayment-applications", ["number", "date", "currency_iso_4217_code"]), + ("customer-deposits", ["number", "date", "currency_iso_4217_code"]), + ("customer-refunds", ["number", "date", "currency_iso_4217_code"]), + ("cash-sales", ["number", "date", "currency_iso_4217_code"]), + ("vendor-payments", ["number", "date", "currency_iso_4217_code"]), + ("vendor-credits", ["number", "date", "currency_iso_4217_code"]), + ("vendor-prepayments", ["number", "date", "currency_iso_4217_code"]), + ("vendor-prepayment-applications", ["number", "date", "currency_iso_4217_code"]), + ("vendor-refunds", ["number", "date", "currency_iso_4217_code"]), + ("direct-expenses", ["number", "date", "currency_iso_4217_code"]), + ("journal-entries", ["number", "date", "currency_iso_4217_code"]), + ("bank-transfers", ["number", "date"]), + ("fixed-assets", ["number", "name"]), +] + + +@pytest.mark.parametrize("path,detail_keys", NUMBERED_RESOURCES, ids=[r[0] for r in NUMBERED_RESOURCES]) +def test_numbered_resource_list(client: DualEntryClient, path: str, detail_keys: list[str]): + data = client.get(f"/{path}/", params={"limit": 2}) + assert_list(data) + + +@pytest.mark.parametrize("path,detail_keys", NUMBERED_RESOURCES, ids=[r[0] for r in NUMBERED_RESOURCES]) +def test_numbered_resource_get(client: DualEntryClient, path: str, detail_keys: list[str]): + # List first to get a valid number + data = client.get(f"/{path}/", params={"limit": 1}) + assert_list(data, min_count=1) + number = data["items"][0]["number"] + + detail = client.get(f"/{path}/{number}/") + assert_record(detail, detail_keys) + + +@pytest.mark.parametrize("path,detail_keys", NUMBERED_RESOURCES, ids=[r[0] for r in NUMBERED_RESOURCES]) +def test_numbered_resource_search(client: DualEntryClient, path: str, detail_keys: list[str]): + data = client.get(f"/{path}/", params={"limit": 2, "search": "test"}) + assert_list(data) + + +@pytest.mark.parametrize("path,detail_keys", NUMBERED_RESOURCES, ids=[r[0] for r in NUMBERED_RESOURCES]) +def test_numbered_resource_limit(client: DualEntryClient, path: str, detail_keys: list[str]): + data = client.get(f"/{path}/", params={"limit": 1}) + assert_list(data) + assert len(data["items"]) <= 1 + + +@pytest.mark.parametrize("path,detail_keys", NUMBERED_RESOURCES, ids=[r[0] for r in NUMBERED_RESOURCES]) +def test_numbered_resource_offset(client: DualEntryClient, path: str, detail_keys: list[str]): + all_data = client.get(f"/{path}/", params={"limit": 3}) + if all_data["count"] < 2: + pytest.skip("Not enough records to test offset") + offset_data = client.get(f"/{path}/", params={"limit": 2, "offset": 1}) + assert_list(offset_data) + # First item at offset=1 should match second item at offset=0 + assert offset_data["items"][0]["number"] == all_data["items"][1]["number"] + + +# ── ID-based resources ────────────────────────────────────────────── + +ID_RESOURCES = [ + ("customers", "id", ["id", "name"]), + ("vendors", "id", ["id", "name"]), + ("items", "id", ["id", "name"]), + ("companies", "id", ["id", "name"]), + ("classifications", "id", ["id", "name"]), + ("contracts", "id", ["id", "name"]), + ("budgets", "id", ["id", "name"]), + ("workflows", "id", ["id", "name"]), +] + + +@pytest.mark.parametrize("path,id_field,detail_keys", ID_RESOURCES, ids=[r[0] for r in ID_RESOURCES]) +def test_id_resource_list(client: DualEntryClient, path: str, id_field: str, detail_keys: list[str]): + data = client.get(f"/{path}/", params={"limit": 2}) + assert_list(data) + + +@pytest.mark.parametrize("path,id_field,detail_keys", ID_RESOURCES, ids=[r[0] for r in ID_RESOURCES]) +def test_id_resource_get(client: DualEntryClient, path: str, id_field: str, detail_keys: list[str]): + data = client.get(f"/{path}/", params={"limit": 1}) + assert_list(data, min_count=1) + record_id = data["items"][0][id_field] + + detail = client.get(f"/{path}/{record_id}/") + assert_record(detail, detail_keys) + + +# ── Accounts (uses account number in URL) ─────────────────────────── + + +def test_accounts_list(client: DualEntryClient): + data = client.get("/accounts/", params={"limit": 2}) + assert_list(data) + if data["items"]: + assert "number" in data["items"][0] + + +def test_accounts_get(client: DualEntryClient): + data = client.get("/accounts/", params={"limit": 1}) + assert_list(data, min_count=1) + account_number = data["items"][0]["number"] + detail = client.get(f"/accounts/{account_number}/") + assert_record(detail, ["number", "name", "account_type"]) + + +# ── Depreciation books (uses string book_code in URL) ─────────────── + + +def test_depreciation_books_list(client: DualEntryClient): + data = client.get("/depreciation-books/", params={"limit": 2}) + assert_list(data) + + +def test_depreciation_books_get(client: DualEntryClient): + data = client.get("/depreciation-books/", params={"limit": 1}) + assert_list(data, min_count=1) + code = data["items"][0].get("code") or data["items"][0].get("name", "").upper() + detail = client.get(f"/depreciation-books/{code}/") + assert_record(detail, ["name"]) + + +# ── Recurring resources ───────────────────────────────────────────── + +RECURRING_RESOURCES = [ + "recurring/invoices", + "recurring/bills", + "recurring/journal-entries", +] + + +@pytest.mark.parametrize("path", RECURRING_RESOURCES, ids=[r.replace("/", "-") for r in RECURRING_RESOURCES]) +def test_recurring_list(client: DualEntryClient, path: str): + data = client.get(f"/{path}/", params={"limit": 2}) + assert_list(data) + + +# ── Pagination ────────────────────────────────────────────────────── + + +def test_paginate(client: DualEntryClient): + """paginate() should fetch all pages and combine results.""" + data = client.paginate("/invoices/", page_size=5) + assert "items" in data + assert "count" in data + assert data["count"] == len(data["items"]) + # Should have fetched more than one page worth if enough data exists + if data["count"] > 5: + assert len(data["items"]) > 5 + + +def test_paginate_with_filters(client: DualEntryClient): + """paginate() should forward filter params.""" + data = client.paginate("/invoices/", params={"search": "nonexistent_xyz_12345"}, page_size=5) + assert_list(data) + # Unlikely to find anything with gibberish search + assert data["count"] == 0 diff --git a/tests/test_output.py b/tests/test_output.py new file mode 100644 index 0000000..1d4fb1a --- /dev/null +++ b/tests/test_output.py @@ -0,0 +1,28 @@ +import json + + +class TestFormatOutput: + def test_json_format(self, capsys): + from dualentry_cli.output import format_output + + data = {"items": [{"id": 1, "name": "Test"}], "count": 1} + format_output(data, fmt="json") + captured = capsys.readouterr() + assert json.loads(captured.out) == data + + def test_table_format_list(self, capsys): + from dualentry_cli.output import format_output + + data = {"items": [{"id": 1, "number": "INV-001", "total": "100.00"}, {"id": 2, "number": "INV-002", "total": "200.00"}], "count": 2} + format_output(data, fmt="table") + captured = capsys.readouterr() + assert "INV-001" in captured.out + assert "INV-002" in captured.out + + def test_table_format_single_item(self, capsys): + from dualentry_cli.output import format_output + + data = {"id": 1, "number": "INV-001", "total": "100.00"} + format_output(data, fmt="table") + captured = capsys.readouterr() + assert "INV-001" in captured.out