From 38c54690da357be84a44ddf15c24552c43b5dec6 Mon Sep 17 00:00:00 2001 From: Abiorh001 Date: Mon, 6 Apr 2026 08:41:54 +0100 Subject: [PATCH] Add persistent business compute seller demo --- examples/business-compute/README.md | 100 ++ examples/business-compute/app.py | 927 ++++++++++++++++++ examples/business-compute/compute_job.py | 78 ++ .../machine-commerce-settlement-design.pdf | Bin 0 -> 917 bytes .../policy-controlled-agent-finance.pdf | Bin 0 -> 937 bytes scripts/start_business_compute_demo.sh | 58 ++ 6 files changed, 1163 insertions(+) create mode 100644 examples/business-compute/README.md create mode 100644 examples/business-compute/app.py create mode 100644 examples/business-compute/compute_job.py create mode 100644 examples/business-compute/papers/machine-commerce-settlement-design.pdf create mode 100644 examples/business-compute/papers/policy-controlled-agent-finance.pdf create mode 100755 scripts/start_business_compute_demo.sh diff --git a/examples/business-compute/README.md b/examples/business-compute/README.md new file mode 100644 index 0000000..e888cc6 --- /dev/null +++ b/examples/business-compute/README.md @@ -0,0 +1,100 @@ +# Business Compute Demo + +A business-facing OmniClaw seller example. + +This example is not a CLI seller surface. The business runs its own web app and integrates OmniClaw directly through the seller backend APIs for: +- x402 payment requirements +- x402 payment verification +- Circle Gateway-backed settlement flow + +The app exposes paid products over HTTP: +- paid compute jobs +- paid compute sessions with credits +- paid research-paper PDFs + +## What It Demonstrates + +- buyer agent pays a real x402 URL +- seller business backend stays in control of the product surface +- unpaid access returns `402 Payment Required` +- paid access unlocks only after seller-side verification +- compute sessions, settlement summaries, and event logs persist through Redis + +## Run + +From the repo root: + +```bash +bash scripts/start_business_compute_demo.sh +``` + +Open in the browser: + +```text +http://127.0.0.1:8010 +``` + +The launcher prints the current buyer-pay URL base for the local Docker network. + +## Architecture + +Components: +- buyer OmniClaw server: `http://localhost:9090` +- seller OmniClaw server: `http://localhost:9091` +- business web app: `http://127.0.0.1:8010` +- business Redis state: `business-compute-redis` + +The browser uses `127.0.0.1:8010`, but the buyer agent pays the business app through its Docker-network URL, for example: + +```text +http://172.18.0.5:8010/compute?job=prime-count&size=1000 +``` + +## Example Buyer Prompts + +Direct compute: + +```text +pay for this url: http://172.18.0.5:8010/compute?job=prime-count&size=1000 +``` + +Compute session: + +```text +pay for this url: http://172.18.0.5:8010/compute/session?tier=starter +``` + +Research paper: + +```text +pay for this url: http://172.18.0.5:8010/papers/agentic-wallet-control-plane +``` + +Note: the `172.18.x.x` address can change on restart. Use the URL printed by the launcher or shown on the page. + +## Persistence + +The following business-app state is persisted in Redis: +- sessions +- session job history +- recent settlements +- seller event log +- revenue and delivery counters +- download counters + +This state survives business app restarts. + +## Seller Logs + +The launcher streams the business container logs. You will see seller-side proof such as: +- `402 Payment Required` +- `200 OK` after payment +- delivery events +- PDF download events + +## Product Surface + +This example is intentionally business-first. + +The business is not presented as another agent using `omniclaw-cli`. +The business owns the API surface, while OmniClaw provides the payment and control layer underneath. diff --git a/examples/business-compute/app.py b/examples/business-compute/app.py new file mode 100644 index 0000000..795e0ad --- /dev/null +++ b/examples/business-compute/app.py @@ -0,0 +1,927 @@ +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import os +import socket +import time +import uuid +from collections import deque +from dataclasses import asdict, dataclass +from math import isqrt +from pathlib import Path +from typing import Any +from urllib.parse import urlencode + +import httpx +import redis +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import FileResponse, HTMLResponse, JSONResponse + +SELLER_SERVER_URL = os.environ.get("SELLER_OMNICLAW_SERVER_URL", "http://localhost:9091") +SELLER_TOKEN = os.environ.get("SELLER_OMNICLAW_TOKEN", "seller-agent-token") +BUYER_SERVER_URL = os.environ.get("BUYER_OMNICLAW_SERVER_URL", "http://localhost:9090") +BUYER_TOKEN = os.environ.get("BUYER_OMNICLAW_TOKEN", "payment-agent-token") +APP_PORT = int(os.environ.get("BUSINESS_COMPUTE_PORT", "8010")) +PAPERS_DIR = Path(__file__).resolve().parent / "papers" +DOWNLOAD_SIGNING_SECRET = os.environ.get( + "BUSINESS_COMPUTE_DOWNLOAD_SECRET", "local-business-compute-demo-secret" +) +DOWNLOAD_TOKEN_TTL_SECONDS = int(os.environ.get("BUSINESS_COMPUTE_DOWNLOAD_TTL", "900")) +REDIS_URL = os.environ.get("BUSINESS_COMPUTE_REDIS_URL", "redis://business-compute-redis:6379/0") +REDIS_STATE_KEY = os.environ.get("BUSINESS_COMPUTE_REDIS_STATE_KEY", "business-compute-demo:state") + + +def default_buyer_base_url() -> str: + try: + ip = socket.gethostbyname(socket.gethostname()) + except OSError: + ip = "127.0.0.1" + return f"http://{ip}:{APP_PORT}" + + +BUYER_BASE_URL = os.environ.get("BUSINESS_COMPUTE_BUYER_BASE_URL", default_buyer_base_url()) + +app = FastAPI(title="OmniClaw Business Demo") +EVENTS: deque[dict[str, Any]] = deque(maxlen=120) +RECENT_SETTLEMENTS: deque[dict[str, Any]] = deque(maxlen=40) +SESSION_STORE: dict[str, dict[str, Any]] = {} +METRICS: dict[str, Any] = { + "revenue_usdc": 0.0, + "deliveries": 0, + "compute_runs": 0, + "paper_unlocks": 0, + "downloads": 0, + "sessions_created": 0, +} +REDIS_CLIENT: redis.Redis | None = None + + +@dataclass +class ComputeProduct: + kind: str + slug: str + label: str + price_usdc: str + description: str + job: str + size: int + + +@dataclass +class SessionProduct: + kind: str + slug: str + label: str + price_usdc: str + description: str + tier: str + credits: int + + +@dataclass +class PaperProduct: + kind: str + slug: str + label: str + price_usdc: str + description: str + title: str + abstract: str + filename: str + + +COMPUTE_PRODUCTS = [ + ComputeProduct( + kind="compute", + slug="prime-quick", + label="Quick prime scan", + price_usdc="0.01", + description="Counts primes up to 1,000.", + job="prime-count", + size=1000, + ), + ComputeProduct( + kind="compute", + slug="prime-research", + label="Research prime batch", + price_usdc="0.25", + description="Counts primes up to 70,000.", + job="prime-count", + size=70000, + ), + ComputeProduct( + kind="compute", + slug="fib-long", + label="Fibonacci long-run", + price_usdc="0.05", + description="Computes fibonacci(250).", + job="fib", + size=250, + ), +] + +PAPER_PRODUCTS = [ + PaperProduct( + kind="paper", + slug="agentic-wallet-control-plane", + label="Policy-Controlled Agent Finance", + price_usdc="0.03", + description="A concise paper on why wallets become policy systems in the agent era.", + title="Policy-Controlled Agent Finance", + abstract="A short research note on zero-trust financial execution, bounded authority, and why agentic commerce requires a control plane above settlement rails.", + filename="policy-controlled-agent-finance.pdf", + ), + PaperProduct( + kind="paper", + slug="machine-commerce-settlement", + label="Machine Commerce Settlement Design", + price_usdc="0.04", + description="A paper on buyer/seller settlement loops with x402 and batch settlement.", + title="Machine Commerce Settlement Design", + abstract="A short paper explaining why buyer usability depends on seller-side acceptance, verification, and batch settlement visibility.", + filename="machine-commerce-settlement-design.pdf", + ), +] + + +SESSION_PRODUCTS = [ + SessionProduct( + kind="session", + slug="compute-starter-session", + label="Compute starter session", + price_usdc="0.08", + description="Creates a short-lived compute session with 3 credits for queued jobs.", + tier="starter", + credits=3, + ), + SessionProduct( + kind="session", + slug="compute-research-session", + label="Compute research session", + price_usdc="0.20", + description="Creates a research session with 10 credits for larger compute jobs.", + tier="research", + credits=10, + ), +] + + +HTML = """ + + + + + OmniClaw Business Demo + + + +
+
+
Business Seller
+

Mini AWS + Research Library powered by OmniClaw

+
This server is the business surface. It exposes paid compute and paid research-paper products over HTTP, uses OmniClaw directly for seller-side x402 verification and Circle Gateway settlement, and only unlocks the product after payment.
+
+
+
+
+
+
+

Business products

+
+
+
+

Buyer usage

+
Use the exact paid URL below inside Telegram/OpenClaw. The local test button runs the buyer through OmniClaw directly.
+
+

+        
+
+
+
+

Seller event log

+
402 first, then 200 after seller-side verification and settlement.
+
+
+
+

Business settlements

+
No settlements yet.
+
+
+

Last buyer result

+
No local buyer run yet.
+
+
+
+
+ + +""" + + +def connect_redis() -> redis.Redis | None: + try: + client = redis.Redis.from_url(REDIS_URL, decode_responses=True) + client.ping() + return client + except Exception: + return None + + +def serialize_state() -> dict[str, Any]: + return { + "events": list(EVENTS), + "recent_settlements": list(RECENT_SETTLEMENTS), + "sessions": SESSION_STORE, + "metrics": METRICS, + } + + +def load_state() -> None: + global REDIS_CLIENT + REDIS_CLIENT = connect_redis() + if REDIS_CLIENT is None: + return + raw = REDIS_CLIENT.get(REDIS_STATE_KEY) + if not raw: + return + data = json.loads(raw) + EVENTS.clear() + EVENTS.extend(data.get("events", [])) + RECENT_SETTLEMENTS.clear() + RECENT_SETTLEMENTS.extend(data.get("recent_settlements", [])) + SESSION_STORE.clear() + SESSION_STORE.update(data.get("sessions", {})) + METRICS.update(data.get("metrics", {})) + + +def persist_state() -> None: + if REDIS_CLIENT is None: + return + REDIS_CLIENT.set(REDIS_STATE_KEY, json.dumps(serialize_state())) + + +def log_event(stage: str, message: str, level: str = "info") -> None: + EVENTS.appendleft( + { + "time": time.strftime("%H:%M:%S"), + "stage": stage, + "message": message, + "level": level, + } + ) + persist_state() + + +def record_settlement( + kind: str, label: str, amount: str, payer: str, tx_hash: str, resource: str +) -> None: + METRICS["revenue_usdc"] += float(amount) + METRICS["deliveries"] += 1 + if kind == "compute": + METRICS["compute_runs"] += 1 + elif kind == "paper": + METRICS["paper_unlocks"] += 1 + elif kind == "session": + METRICS["sessions_created"] += 1 + RECENT_SETTLEMENTS.appendleft( + { + "time": time.strftime("%H:%M:%S"), + "kind": kind, + "label": label, + "amount_usdc": f"{float(amount):.2f}", + "payer": payer or "unknown", + "transaction": tx_hash, + "resource": resource, + } + ) + persist_state() + + +def build_summary() -> dict[str, Any]: + return { + "revenue_usdc": f"{METRICS['revenue_usdc']:.2f}", + "deliveries": METRICS["deliveries"], + "compute_runs": METRICS["compute_runs"], + "paper_unlocks": METRICS["paper_unlocks"], + "downloads": METRICS["downloads"], + "sessions_created": METRICS["sessions_created"], + "recent_settlements": list(RECENT_SETTLEMENTS), + "active_sessions": len(SESSION_STORE), + } + + +def create_session(tier: str, credits: int, payer: str, tx_hash: str) -> dict[str, Any]: + session_id = str(uuid.uuid4()) + session = { + "session_id": session_id, + "tier": tier, + "credits_total": credits, + "credits_remaining": credits, + "payer": payer or "unknown", + "created_at": time.strftime("%Y-%m-%dT%H:%M:%S"), + "transaction": tx_hash, + "jobs": [], + } + SESSION_STORE[session_id] = session + persist_state() + return session + + +def run_session_job(session_id: str, job: str, size: int) -> dict[str, Any]: + session = SESSION_STORE.get(session_id) + if not session: + raise HTTPException(status_code=404, detail="session not found") + if session["credits_remaining"] < 1: + raise HTTPException(status_code=402, detail="session has no credits remaining") + result = build_compute_result(job, size, session["payer"], "0.00", session["transaction"]) + session["credits_remaining"] -= 1 + session["jobs"].append( + { + "job": job, + "size": size, + "ran_at": time.strftime("%H:%M:%S"), + "output": result["output"], + } + ) + METRICS["compute_runs"] += 1 + persist_state() + return { + "session_id": session_id, + "tier": session["tier"], + "credits_remaining": session["credits_remaining"], + "job_result": result, + } + + +def _download_payload(filename: str, tx_hash: str, expires: int) -> str: + return f"{filename}:{tx_hash}:{expires}" + + +def sign_download_token(filename: str, tx_hash: str, expires: int | None = None) -> str: + if expires is None: + expires = int(time.time()) + DOWNLOAD_TOKEN_TTL_SECONDS + payload = _download_payload(filename, tx_hash, expires) + sig = hmac.new(DOWNLOAD_SIGNING_SECRET.encode(), payload.encode(), hashlib.sha256).hexdigest() + token = {"filename": filename, "tx": tx_hash, "exp": expires, "sig": sig} + return base64.urlsafe_b64encode(json.dumps(token).encode()).decode() + + +def verify_download_token(token: str, filename: str) -> None: + try: + data = json.loads(base64.urlsafe_b64decode(token.encode()).decode()) + except Exception as exc: + raise HTTPException(status_code=403, detail="invalid download token") from exc + expected_filename = data.get("filename") + tx_hash = data.get("tx", "") + exp = int(data.get("exp", 0)) + sig = data.get("sig", "") + if expected_filename != filename: + raise HTTPException(status_code=403, detail="download token filename mismatch") + if exp < int(time.time()): + raise HTTPException(status_code=403, detail="download token expired") + payload = _download_payload(filename, tx_hash, exp) + expected_sig = hmac.new( + DOWNLOAD_SIGNING_SECRET.encode(), payload.encode(), hashlib.sha256 + ).hexdigest() + if not hmac.compare_digest(sig, expected_sig): + raise HTTPException(status_code=403, detail="invalid download token signature") + + +def payment_response_header(verify_data: dict[str, Any]) -> str: + return base64.b64encode( + json.dumps( + { + "success": True, + "transaction": verify_data.get("transaction", ""), + "network": "", + "payer": verify_data.get("sender", ""), + } + ).encode() + ).decode() + + +def ensure_sample_pdf(path: Path, title: str, subtitle: str, body: list[str]) -> None: + if path.exists(): + return + path.parent.mkdir(parents=True, exist_ok=True) + lines = [title, subtitle, ""] + body + escaped = [] + for raw in lines: + raw = raw.replace("\\", "\\\\").replace("(", "\\(").replace(")", "\\)") + escaped.append(raw) + content_lines = ["BT", "/F1 18 Tf", "72 760 Td", f"({escaped[0]}) Tj"] + content_lines += ["0 -26 Td", "/F1 12 Tf", f"({escaped[1]}) Tj"] + y_step = -20 + for line in escaped[2:]: + content_lines += [f"0 {y_step} Td", f"({line}) Tj"] + content_lines.append("ET") + stream = "\n".join(content_lines).encode("latin-1", errors="replace") + objs = [] + objs.append(b"1 0 obj<< /Type /Catalog /Pages 2 0 R >>endobj\n") + objs.append(b"2 0 obj<< /Type /Pages /Kids [3 0 R] /Count 1 >>endobj\n") + objs.append( + b"3 0 obj<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Contents 4 0 R /Resources << /Font << /F1 5 0 R >> >> >>endobj\n" + ) + objs.append( + b"4 0 obj<< /Length " + + str(len(stream)).encode() + + b" >>stream\n" + + stream + + b"\nendstream\nendobj\n" + ) + objs.append(b"5 0 obj<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>endobj\n") + pdf = bytearray(b"%PDF-1.4\n") + offsets = [0] + for obj in objs: + offsets.append(len(pdf)) + pdf.extend(obj) + xref = len(pdf) + pdf.extend(f"xref\n0 {len(offsets)}\n".encode()) + pdf.extend(b"0000000000 65535 f \n") + for off in offsets[1:]: + pdf.extend(f"{off:010d} 00000 n \n".encode()) + pdf.extend( + f"trailer<< /Size {len(offsets)} /Root 1 0 R >>\nstartxref\n{xref}\n%%EOF\n".encode() + ) + path.write_bytes(pdf) + + +for paper in PAPER_PRODUCTS: + ensure_sample_pdf( + PAPERS_DIR / paper.filename, + paper.title, + "OmniClaw business demo paper", + [ + paper.abstract, + "", + "Paid access is unlocked only after OmniClaw verification and Circle settlement.", + ], + ) + + +async def seller_post(path: str, payload: dict[str, Any]) -> httpx.Response: + async with httpx.AsyncClient(timeout=30.0) as client: + return await client.post( + f"{SELLER_SERVER_URL}{path}", + headers={"Authorization": f"Bearer {SELLER_TOKEN}"}, + json=payload, + ) + + +async def buyer_post(path: str, payload: dict[str, Any]) -> httpx.Response: + async with httpx.AsyncClient(timeout=60.0) as client: + return await client.post( + f"{BUYER_SERVER_URL}{path}", + headers={"Authorization": f"Bearer {BUYER_TOKEN}"}, + json=payload, + ) + + +def build_compute_result( + job: str, size: int, payer: str, amount: str, tx_hash: str +) -> dict[str, Any]: + if job == "prime-count": + if size < 10 or size > 500000: + raise ValueError("size must be between 10 and 500000 for prime-count") + output = {"prime_count": prime_count(size)} + elif job == "fib": + if size < 1 or size > 5000: + raise ValueError("size must be between 1 and 5000 for fib") + output = {"fib": str(fib(size))} + else: + raise ValueError(f"unsupported job: {job}") + return { + "service": "mini-aws-compute", + "job": job, + "input": {"size": size}, + "output": output, + "paid_by": payer, + "amount_usdc": amount, + "settlement_tx": tx_hash, + } + + +def prime_count(limit: int) -> int: + if limit < 2: + return 0 + sieve = bytearray(b"\x01") * (limit + 1) + sieve[0:2] = b"\x00\x00" + for n in range(2, isqrt(limit) + 1): + if sieve[n]: + start = n * n + step = n + sieve[start : limit + 1 : step] = b"\x00" * (((limit - start) // step) + 1) + return int(sum(sieve)) + + +def fib(n: int) -> int: + a, b = 0, 1 + for _ in range(n): + a, b = b, a + b + return a + + +def find_compute(slug: str) -> ComputeProduct: + for product in COMPUTE_PRODUCTS: + if product.slug == slug: + return product + raise KeyError(slug) + + +def find_paper(slug: str) -> PaperProduct: + for paper in PAPER_PRODUCTS: + if paper.slug == slug: + return paper + raise KeyError(slug) + + +async def requirements_response(resource: str, price: str) -> JSONResponse: + resp = await seller_post( + "/api/v1/x402/requirements", {"amount": f"${price}", "resource": resource} + ) + req_data = resp.json() + return JSONResponse( + status_code=req_data.get("status_code", 402), + content=req_data.get("detail", {}), + headers=req_data.get("headers", {}), + ) + + +async def verify_or_402( + request: Request, resource: str, price: str, label: str +) -> dict[str, Any] | JSONResponse: + sig_header = request.headers.get("payment-signature") or request.headers.get( + "PAYMENT-SIGNATURE" + ) + if not sig_header: + log_event("payment-required", f"Unpaid request for {label} -> 402", "warn") + return await requirements_response(resource, price) + log_event( + "verify", f"Payment signature received for {label}; verifying via OmniClaw seller backend" + ) + verify = await seller_post( + "/api/v1/x402/verify", + { + "signature": sig_header, + "amount": price, + "sender": request.headers.get("x-forwarded-for", ""), + "resource": resource, + }, + ) + verify_data = verify.json() + if verify.status_code >= 400 or not verify_data.get("valid"): + log_event("verify", f"Verification failed for {label}", "warn") + return await requirements_response(resource, price) + return verify_data + + +@app.on_event("startup") +async def startup() -> None: + load_state() + log_event("boot", "Business seller booted. Waiting for buyer traffic.") + + +@app.get("/", response_class=HTMLResponse) +async def home() -> str: + return HTML + + +@app.get("/api/catalog") +async def catalog(request: Request) -> dict[str, Any]: + base_url = str(request.base_url).rstrip("/") + products: list[dict[str, Any]] = [] + for product in COMPUTE_PRODUCTS: + query = urlencode({"job": product.job, "size": product.size}) + browser_url = f"{base_url}/compute?{query}" + pay_url = f"{BUYER_BASE_URL}/compute?{query}" + products.append( + { + **asdict(product), + "browser_url": browser_url, + "pay_url": pay_url, + "badges": ["compute", f"job={product.job}", f"size={product.size}"], + } + ) + for session in SESSION_PRODUCTS: + browser_url = f"{base_url}/compute/session?tier={session.tier}" + pay_url = f"{BUYER_BASE_URL}/compute/session?tier={session.tier}" + products.append( + { + **asdict(session), + "browser_url": browser_url, + "pay_url": pay_url, + "badges": ["session", session.tier, f"credits={session.credits}"], + } + ) + for paper in PAPER_PRODUCTS: + browser_url = f"{base_url}/papers/{paper.slug}" + pay_url = f"{BUYER_BASE_URL}/papers/{paper.slug}" + products.append( + { + **asdict(paper), + "browser_url": browser_url, + "pay_url": pay_url, + "badges": ["paper", "pdf", paper.title], + } + ) + return { + "seller_server": SELLER_SERVER_URL, + "buyer_server": BUYER_SERVER_URL, + "public_base_url": base_url, + "buyer_base_url": BUYER_BASE_URL, + "products": products, + } + + +@app.get("/api/events") +async def events() -> dict[str, Any]: + return {"events": list(EVENTS)} + + +@app.get("/api/summary") +async def summary() -> dict[str, Any]: + return build_summary() + + +@app.post("/api/demo/pay") +async def demo_pay(payload: dict[str, Any]) -> JSONResponse: + url = payload["pay_url"] + log_event("buyer", f"Buyer initiated payment for {url}") + resp = await buyer_post("/api/v1/x402/pay", {"url": url, "method": "GET"}) + data = resp.json() + outcome = data.get("status", "unknown") + log_event( + "buyer", + f"Buyer payment {outcome} for {payload['label']}", + "good" if data.get("success") else "warn", + ) + return JSONResponse(status_code=resp.status_code, content=data) + + +@app.get("/compute") +async def compute(request: Request) -> JSONResponse: + params = request.query_params + job = (params.get("job") or "prime-count").strip().lower() + size = int((params.get("size") or "1000").strip()) + price = "0.10" + label = f"compute job={job} size={size}" + for product in COMPUTE_PRODUCTS: + if product.job == job and product.size == size: + price = product.price_usdc + label = product.label + break + resource = str(request.url) + verified = await verify_or_402(request, resource, price, label) + if isinstance(verified, JSONResponse): + return verified + result = build_compute_result( + job, size, verified.get("sender") or "unknown", price, verified.get("transaction") or "" + ) + record_settlement( + "compute", + label, + price, + verified.get("sender") or "unknown", + verified.get("transaction") or "", + resource, + ) + log_event( + "delivery", + f"Delivered compute result for {label}; tx {verified.get('transaction', '')}", + "good", + ) + return JSONResponse( + status_code=200, + content=result, + headers={"PAYMENT-RESPONSE": payment_response_header(verified)}, + ) + + +@app.get("/compute/session") +async def compute_session(request: Request) -> JSONResponse: + tier = (request.query_params.get("tier") or "starter").strip().lower() + product = next((p for p in SESSION_PRODUCTS if p.tier == tier), None) + if product is None: + raise HTTPException(status_code=404, detail="session tier not found") + resource = str(request.url) + verified = await verify_or_402(request, resource, product.price_usdc, product.label) + if isinstance(verified, JSONResponse): + return verified + session = create_session( + product.tier, + product.credits, + verified.get("sender") or "unknown", + verified.get("transaction") or "", + ) + record_settlement( + "session", + product.label, + product.price_usdc, + verified.get("sender") or "unknown", + verified.get("transaction") or "", + resource, + ) + log_event("delivery", f"Created {product.label}; session {session['session_id']}", "good") + return JSONResponse( + status_code=200, + content={ + "service": "mini-aws-compute", + "product": product.label, + "tier": product.tier, + "session_id": session["session_id"], + "credits_total": product.credits, + "credits_remaining": product.credits, + "submit_url": f"{BUYER_BASE_URL}/compute/jobs/{session['session_id']}?job=prime-count&size=5000", + "status_url": f"{BUYER_BASE_URL}/compute/sessions/{session['session_id']}", + "paid_by": verified.get("sender") or "unknown", + "amount_usdc": product.price_usdc, + "settlement_tx": verified.get("transaction") or "", + }, + headers={"PAYMENT-RESPONSE": payment_response_header(verified)}, + ) + + +@app.get("/compute/sessions/{session_id}") +async def compute_session_status(session_id: str) -> JSONResponse: + session = SESSION_STORE.get(session_id) + if not session: + raise HTTPException(status_code=404, detail="session not found") + return JSONResponse(status_code=200, content=session) + + +@app.get("/compute/jobs/{session_id}") +async def compute_session_job(session_id: str, request: Request) -> JSONResponse: + job = (request.query_params.get("job") or "prime-count").strip().lower() + size = int((request.query_params.get("size") or "5000").strip()) + result = run_session_job(session_id, job, size) + log_event("delivery", f"Ran session job {job} size={size} for session {session_id}", "good") + return JSONResponse(status_code=200, content=result) + + +@app.get("/papers/{slug}") +async def paper(slug: str, request: Request) -> JSONResponse: + paper = find_paper(slug) + resource = str(request.url) + verified = await verify_or_402(request, resource, paper.price_usdc, paper.label) + if isinstance(verified, JSONResponse): + return verified + download_token = sign_download_token(paper.filename, verified.get("transaction") or "") + download_url = f"{BUYER_BASE_URL}/downloads/{paper.filename}?token={download_token}" + result = { + "service": "research-library", + "product": paper.title, + "abstract": paper.abstract, + "download_url": download_url, + "format": "pdf", + "paid_by": verified.get("sender") or "unknown", + "amount_usdc": paper.price_usdc, + "settlement_tx": verified.get("transaction") or "", + } + record_settlement( + "paper", + paper.title, + paper.price_usdc, + verified.get("sender") or "unknown", + verified.get("transaction") or "", + resource, + ) + log_event( + "delivery", f"Unlocked paper {paper.title}; tx {verified.get('transaction', '')}", "good" + ) + return JSONResponse( + status_code=200, + content=result, + headers={"PAYMENT-RESPONSE": payment_response_header(verified)}, + ) + + +@app.get("/downloads/{filename}") +async def download_pdf(filename: str, token: str | None = None) -> FileResponse: + if not token: + raise HTTPException(status_code=403, detail="download token required") + verify_download_token(token, filename) + path = PAPERS_DIR / filename + if not path.exists(): + raise HTTPException(status_code=404, detail="file not found") + METRICS["downloads"] += 1 + persist_state() + log_event("download", f"PDF downloaded: {filename}") + return FileResponse(path, media_type="application/pdf", filename=filename) diff --git a/examples/business-compute/compute_job.py b/examples/business-compute/compute_job.py new file mode 100644 index 0000000..47f98e0 --- /dev/null +++ b/examples/business-compute/compute_job.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import json +import os +from math import isqrt +from urllib.parse import parse_qs + + +def prime_count(limit: int) -> int: + if limit < 2: + return 0 + sieve = bytearray(b"\x01") * (limit + 1) + sieve[0:2] = b"\x00\x00" + for n in range(2, isqrt(limit) + 1): + if sieve[n]: + start = n * n + step = n + sieve[start : limit + 1 : step] = b"\x00" * (((limit - start) // step) + 1) + return int(sum(sieve)) + + +def fib(n: int) -> int: + a, b = 0, 1 + for _ in range(n): + a, b = b, a + b + return a + + +def main() -> None: + query = parse_qs(os.environ.get("OMNICLAW_REQUEST_QUERY", ""), keep_blank_values=True) + job = (query.get("job", ["prime-count"])[0] or "prime-count").strip().lower() + size_raw = (query.get("size", ["50000"])[0] or "50000").strip() + payer = os.environ.get("OMNICLAW_PAYER_ADDRESS", "unknown") + tx_hash = os.environ.get("OMNICLAW_TX_HASH", "") + amount = os.environ.get("OMNICLAW_AMOUNT_USD", "") + + try: + size = int(size_raw) + except ValueError as err: + print(json.dumps({"error": f"invalid size: {size_raw}"})) + raise SystemExit(2) from err + + if job == "prime-count": + if size < 10 or size > 500000: + print(json.dumps({"error": "size must be between 10 and 500000 for prime-count"})) + raise SystemExit(2) + result = { + "service": "mini-aws-compute", + "job": job, + "input": {"size": size}, + "output": {"prime_count": prime_count(size)}, + "paid_by": payer, + "amount_usdc": amount, + "settlement_tx": tx_hash, + } + elif job == "fib": + if size < 1 or size > 5000: + print(json.dumps({"error": "size must be between 1 and 5000 for fib"})) + raise SystemExit(2) + value = str(fib(size)) + result = { + "service": "mini-aws-compute", + "job": job, + "input": {"n": size}, + "output": {"fib": value}, + "paid_by": payer, + "amount_usdc": amount, + "settlement_tx": tx_hash, + } + else: + print(json.dumps({"error": f"unsupported job: {job}"})) + raise SystemExit(2) + + print(json.dumps(result)) + + +if __name__ == "__main__": + main() diff --git a/examples/business-compute/papers/machine-commerce-settlement-design.pdf b/examples/business-compute/papers/machine-commerce-settlement-design.pdf new file mode 100644 index 0000000000000000000000000000000000000000..3d4a7d5864b287d3a416a54fedc5f61788369752 GIT binary patch literal 917 zcmZWo!EWL(5WV{=-lbAkDh&xuXr&5ip+zh0Rz)SZY7ZtEz}>`-Vu!+heaA^fDD}yn zdGp?k-uPniFdu~1HzLFkX7zS|k03o%gkUC}EX)SMLT<<+;`|FH6Vf@ugnxd4cc|dE z$}L_;9{iU%OrssbzRV}Sm3fJp7}#k9Pn0V;HG8}c8J>g@#&=Q6rX9m9Zah7L7qX_Q zGj7gzJvYqIUP8nzU@~FhUfWlFYX2dj9D`oBF)9+ zlgzeC6K19?iPf+q=L#x)!w+QDM*l$iCWaV9i7#kLB2RMpTxvBd!6^0RY;|k-r#+EV73F*XsleRLb(HzR_uy}(Xdn$R1d7m2{Oy5 za#Ck>g&oyutupD9(N~Z<$4a_v%lP9gJ7rZzczxE)*>fRP-jee?%A(PQ$^KE!^2Gsi z?YOt&;HSE+>`c{JK_|MM*kdaE2%Qt3E9Oo7vMa5_!LnJoc0v7`h6tw8QkVHf#f}_L z*?YdPY0cIoVt71A;&>EejXrjVHVfUy;^bQ_4Ec}vbApSQ={%Ms<38qUsn`*}mrM1Z cknq|XEl_miaTB$-RY|%=)tD-iyY7ZtEz+GdrwnIz5e#ZepOLK~6 z-n{obGw#g3PrKRGKx9Z^=P$RnNXp%YkW6Hd+AfjIoJ7IcfwWb<(%V>70j5HN}L-`_7bbCcaw+y|=aPhDu;i5r9VpU#qLYN%XXN~VeR5zKkZLD z_2j9UhjwY>>SjpB($hZkgY=g8;_|cyZ;lo`u|lNB4@I8$b1ZO*-Js3FoML(LC6;CU yZsPX=*JGyhSaCf(#X{^TxwxxO>YY&VDq9=h$^MB6AEXPd&0(I4PUrq%D*gj7N)adk literal 0 HcmV?d00001 diff --git a/scripts/start_business_compute_demo.sh b/scripts/start_business_compute_demo.sh new file mode 100755 index 0000000..a94705f --- /dev/null +++ b/scripts/start_business_compute_demo.sh @@ -0,0 +1,58 @@ +#!/usr/bin/env bash +set -euo pipefail +ROOT=$(cd "$(dirname "$0")/.." && pwd) +cd "$ROOT" + +if [[ -f .env ]]; then + set -a + source .env + set +a +fi + +bash scripts/start_local_economy.sh >/dev/null + +export PAYMENT_AGENT_POLICY_FILE="${PAYMENT_AGENT_POLICY_FILE:-$ROOT/.runtime/payment-agent.policy.runtime.json}" +export SELLER_AGENT_POLICY_FILE="${SELLER_AGENT_POLICY_FILE:-$ROOT/.runtime/seller-agent.policy.runtime.json}" + +docker rm -f omniclaw-business-compute-demo >/dev/null 2>&1 || true +docker rm -f business-compute-redis >/dev/null 2>&1 || true +docker run -d --name business-compute-redis --network omniclaw-buyer_default redis:7-alpine >/dev/null + +docker run -d \ + --name omniclaw-business-compute-demo \ + --network omniclaw-buyer_default \ + -p 8010:8010 \ + -v "$ROOT:/workspace" \ + -w /workspace \ + -e SELLER_OMNICLAW_SERVER_URL="http://seller-agent:9091" \ + -e SELLER_OMNICLAW_TOKEN="seller-agent-token" \ + -e BUYER_OMNICLAW_SERVER_URL="http://payment-agent:9090" \ + -e BUYER_OMNICLAW_TOKEN="payment-agent-token" \ + -e BUSINESS_COMPUTE_PORT="8010" \ + -e BUSINESS_COMPUTE_REDIS_URL="redis://business-compute-redis:6379/0" \ + -e UV_PROJECT_ENVIRONMENT="/tmp/omniclaw-business-demo-venv" \ + omniclaw-agent:local \ + sh -lc 'PYTHONPATH=/workspace/src:/workspace uvx --from uvicorn --with fastapi[standard] --with httpx --with redis uvicorn examples.business-compute.app:app --host 0.0.0.0 --port 8010' >/dev/null + +docker network connect omniclaw-seller_default omniclaw-business-compute-demo >/dev/null 2>&1 || true + +BUSINESS_IP=$(docker inspect omniclaw-business-compute-demo --format '{{with index .NetworkSettings.Networks "omniclaw-buyer_default"}}{{.IPAddress}}{{end}}') +python3 - </dev/null + +printf 'Business compute demo: http://127.0.0.1:8010\n' +printf 'Buyer pay URL: http://%s:8010/compute?job=prime-count&size=1000\n' "$BUSINESS_IP" +printf 'Business container logs: docker logs -f omniclaw-business-compute-demo\n' + +exec docker logs -f omniclaw-business-compute-demo