Skip to content

kosminus/ai-event-bus

Repository files navigation

AI Event Bus

A local-first AI control plane for Linux and macOS — an event-driven runtime that sits between your operating system and LLM agents via Ollama. It gives your machine ambient intelligence: agents that watch, decide, and act on your behalf.

Deep OS integration per platform: DBus + inotify + systemd + notify-send on Linux; unified logging (log stream) + NSWorkspace + AppleScript notifications + launchd on macOS. Runs entirely on your machine — no cloud, no API keys, no data leaves your box.

┌─────────────────────────────────────────────┐
│              USER (widget / CLI)             │
├─────────────────────────────────────────────┤
│           aiventbus control plane            │
│  events → routing → context → agents → acts  │
├─────────────────────────────────────────────┤
│   OS (Linux: DBus + systemd + inotify /      │
│        macOS: NSWorkspace + launchd + log)   │
├─────────────────────────────────────────────┤
│        Hardware (GPU, disk, network)          │
└─────────────────────────────────────────────┘

Why?

Existing AI agent frameworks (LangChain, CrewAI) are request/response. This is event-driven — agents are autonomous workers that react to the world:

  • System producers — clipboard monitor, file watcher, DBus listener, terminal monitor give the daemon eyes and ears
  • Structured agent output — LLMs return typed JSON with actions, not free text
  • Policy-gated execution — agents propose actions (shell commands, file ops, HTTP requests), a policy engine gates them (blocklist → allowlist → confirm)
  • Pluggable tool backends — extend agents with external tools (Playwright, MCP servers, custom APIs) via a simple ToolBackend interface
  • Chain reactions — agent output becomes a new event that triggers other agents
  • Priority lanes — user queries never wait behind background clipboard events
  • Knowledge store — durable key-value facts shared across all agents
  • Distilled long-term memory — searchable recalled experience separate from transcript history and canonical knowledge
  • Full traceability — trace_id on every causal chain from trigger to action
  • All local — runs on your machine via Ollama, free and private

Platform support

Platform Status Notes
Ubuntu/Debian Full support Primary target. DBus, inotify, notify-send, systemd
Arch/Fedora Should work Same Linux APIs, untested
macOS 12+ Supported log stream, NSWorkspace, pbpaste, osascript, launchd. Optional Swift helper (aibus install --build-helper) unlocks screen lock/unlock and app lifecycle capture. See docs/macos-notes.md.
Windows Not supported Different OS APIs — open to PRs

Capability coverage is reported honestly in the Producers tab (and at /api/v1/producers): each producer lists which OS primitives it can reach and why any are unavailable. System-wide notification content capture is Linux-only — modern macOS requires a system extension.

Hardware: Works on any machine with Ollama. Benefits from a GPU (NVIDIA recommended on Linux, Apple Silicon on macOS) for faster inference. Tested on RTX 5090 + RTX 6000 Pro with 70B+ models and on Apple M2 Pro.

Quick start

Prerequisites: Python 3.11+, Ollama running locally.

# Install
git clone <repo-url> && cd aiventbus
pip install -e .

# Run the daemon
python -m aiventbus

Want the daemon to autostart and survive reboots? Generate the systemd user unit (Linux) or launchd agent (macOS) with a single command:

aibus install                 # Linux: systemd user unit; macOS: LaunchAgent
aibus install --build-helper  # macOS only: also build + install the Swift sidecar

Both forms generate the service unit from the live sys.executable + resolved config/DB/log paths, so a daemon launched by systemd or launchd uses the same state as a CLI run from the repo. aibus uninstall removes it; aibus uninstall --purge also deletes the data/config/log dirs.

Debian package for the daemon

For Ubuntu/Debian distribution, build a self-contained .deb that bundles a PyInstaller-frozen runtime. No pip install, no virtualenv, no system Python required on the target — the only runtime dependency is libc6.

pip install pyinstaller          # maintainer-only dev dependency
aibus package-binary             # build dist/aiventbus-bundle/ (PyInstaller onedir)
aibus package-deb                # package the bundle as dist/aiventbus-daemon_<version>_<arch>.deb

aibus package-deb will invoke PyInstaller itself if no bundle exists; pass --reuse-bundle to skip the rebuild. The resulting package installs the bundle under /opt/aiventbus/ and symlinks /usr/bin/aiventbus and /usr/bin/aibus to a single dispatcher binary.

After installing the .deb, each user who wants the daemon running in their session should still run aibus install to create and enable their per-user systemd unit.

Build caveat: PyInstaller bundles are tied to the builder's architecture, glibc version, and Python version. Build in a clean container that matches your target distro (e.g. ubuntu:22.04 for maximum compatibility with 22.04+).

Open http://localhost:8420 for the dashboard. The web UI now includes a Memories tab for searching and deleting distilled long-term memory. API docs at http://localhost:8420/docs.

CLI

# Ask a question (publishes user.query, waits for agent response)
aibus query "what files were modified in the last 10 minutes?"

# Check status
aibus status

# List recent events
aibus events --topic clipboard.text --limit 20

# Manage pending actions
aibus approve <action_id>
aibus deny <action_id>

# Knowledge store
aibus knowledge list --prefix system.
aibus knowledge set user.pref.editor vscode
aibus knowledge get system.gpu

# Long-term memory
aibus memory list --scope user
aibus memory add --kind semantic --scope global --content "This machine runs Ollama locally"
aibus memory delete <memory_id>

Desktop Widget

A lightweight Tauri app (17MB) that connects to the running daemon:

cd widget
cargo tauri dev    # development
cargo tauri build  # production (.deb + AppImage)

Features: chat input with Ctrl+Space hotkey, tabbed activity feed (All/Files/Security/Approvals), action approval buttons, system tray icon, desktop notifications for critical events.

Usage

1. Create an agent

curl -X POST http://localhost:8420/api/v1/agents \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "general assistant",
    "model": "gemma4:latest",
    "description": "General-purpose AI assistant",
    "system_prompt": "You are a helpful AI assistant. Answer concisely."
  }'

2. Create a routing rule

curl -X POST http://localhost:8420/api/v1/routing-rules \
  -H 'Content-Type: application/json' \
  -d '{
    "name": "user queries",
    "topic_pattern": "user.*",
    "consumer_id": "agent_general-assistant"
  }'

3. Ask a question

aibus query "why is my machine slow?"

Or publish any event:

curl -X POST http://localhost:8420/api/v1/events \
  -H 'Content-Type: application/json' \
  -d '{
    "topic": "log.error",
    "payload": {"message": "401 Unauthorized", "path": "/admin"},
    "priority": "high",
    "semantic_type": "security.auth_failure"
  }'

4. Classifier fallback

If no routing rule matches, an optional LLM classifier routes the event to the best agent automatically. Enable in config.yaml:

classifier:
  enabled: true
  model: "gemma4:latest"

OS integration

aiventbus hooks into the OS to observe and act. All OS-specific plumbing lives behind a single aiventbus.platform boundary; the rest of the code asks for capabilities and topics, not OS names.

Integration Linux backend macOS backend
Clipboard xclip / wl-paste polling pbpaste polling
File system inotify via watchfiles FSEvents via watchfiles
System log journalctl -f -o json log stream --style=ndjson --predicate …
Screen lock / unlock DBus org.freedesktop.login1.Session NSDistributedNotificationCenter (via Swift sidecar)
App launch / quit / activate Not exposed (inconsistent across DEs) NSWorkspace.notificationCenter (via Swift sidecar)
Inbound notifications DBus org.freedesktop.Notifications (Monitor) Not supported without a system extension
Outbound notifications notify-send osascript -e 'display notification …'
App launching xdg-open open
Terminal bash/zsh history polling + preexec hook (portable) same
Shell / file / HTTP / tools portable (asyncio.subprocess, pathlib, httpx, ToolBackend) same

Producers are registered once against capabilities, not OSes. On macOS, when the optional Swift helper isn't installed, the desktop_events producer reports session_state and app_lifecycle as unavailable with a concrete reason ("macOS helper not installed — run: aibus install --build-helper") rather than silently no-opping.

Event schema

Events use a tiered schema. Only topic and payload are required:

{
  "topic": "log.error",
  "payload": {"message": "401 Unauthorized"},

  "priority": "high",
  "semantic_type": "security.auth_failure",
  "dedupe_key": "auth-401-/admin",
  "trace_id": "tr_abc123def456",
  "parent_event": "evt_abc123",
  "output_topic": "agent.security.response",
  "context_refs": ["evt_deploy_xyz"],
  "memory_scope": "security-agent"
}

Agent output format

Agents return structured JSON with action proposals:

{
  "type": "analysis",
  "summary": "Repeated 401s suggest credential stuffing from 10.0.0.1",
  "confidence": 0.92,
  "proposed_actions": [
    {
      "action_type": "shell_exec",
      "command": "last -i | grep 10.0.0.1"
    },
    {
      "action_type": "notify",
      "title": "Security Alert",
      "message": "Possible credential stuffing attack detected"
    },
    {
      "action_type": "set_knowledge",
      "key": "security.last_alert",
      "value": "credential stuffing from 10.0.0.1"
    }
  ]
}

Built-in action types: emit_event, log, alert, notify, shell_exec, file_read, file_write, file_delete, open_app, http_request, set_knowledge, get_knowledge, tool_call.

Actions go through the policy engine: auto-approved (safe commands, HTTP requests), confirm (needs user approval — shell, file writes, tool calls), or deny (blocked patterns like rm -rf /, sudo).

Agent prompts are generated dynamically — they list only the action types actually available (including any registered tool backends). If an agent proposes an unknown action type, the bus emits a system.unknown_action event instead of failing silently.

Architecture

┌─────────────── PRODUCERS ─────────────────┐
│  clipboard, file_watcher, terminal,        │
│  system_log, desktop_events, webhook,      │
│  cron, manual (API/UI)                     │
└──────────────┬─────────────────────────────┘
               ▼
        ┌──────────────┐
        │   EVENT BUS   │  SQLite persistence, dedupe, chain limits
        └──────┬───────┘
               ▼
      ┌────────────────────┐
      │   PRIORITY ROUTER   │  3 lanes: interactive / critical / ambient
      │                      │  Static rules + classifier fallback
      └────────┬────────────┘
               ▼
      ┌────────────────────┐
      │  CONTEXT ENGINE     │  Recalled experience + transcript memory + pinned facts + knowledge store
      │                      │  Token-bounded prompt assembly
      └────────┬────────────┘
               ▼
      ┌────────────────────┐
      │    LLM AGENTS       │  Ollama streaming, structured output
      └────────┬────────────┘
               ▼
      ┌────────────────────┐
      │   POLICY ENGINE     │  Blocklist → allowlist → trust modes
      └────────┬────────────┘
               ▼
      ┌────────────────────┐
      │     EXECUTOR        │  Shell, filesystem, HTTP, notifications
      │                      │  + pluggable tool backends
      └────────┬────────────┘
               ▼
        back to EVENT BUS (chain reactions)

Tool backends

Agents can call external tools through the pluggable ToolBackend system. Tool backends register with the executor and are automatically advertised in agent prompts — the LLM sees what tools are available and how to call them.

Writing a tool backend

from aiventbus.core.tools import ToolBackend, ToolMethod

class PlaywrightBackend(ToolBackend):
    @property
    def name(self):
        return "playwright"

    @property
    def description(self):
        return "Browser automation — navigate pages, extract text, take screenshots"

    def methods(self):
        return [
            ToolMethod("goto", "Navigate to a URL", {"url": "full URL string"}),
            ToolMethod("get_text", "Extract text from the page", {"selector": "CSS selector"}),
            ToolMethod("screenshot", "Take a screenshot", {"path": "output file path"}),
        ]

    async def call(self, method, params):
        if method == "goto":
            page = await self.browser.new_page()
            await page.goto(params["url"])
            return {"status": "ok", "title": await page.title()}
        # ... handle other methods

Registering a backend

Register tool backends in main.py (or via a future plugin API):

executor.tool_registry.register(PlaywrightBackend())

How agents use tools

The agent's system prompt automatically includes registered tools. Agents propose tool_call actions:

{
  "action_type": "tool_call",
  "tool": "playwright",
  "method": "goto",
  "params": {"url": "https://weather.example.com/bucharest"}
}

Tool calls go through the policy engine (confirm by default — requires user approval). Override in config:

policy:
  trust_overrides:
    tool_call: "auto"   # auto-approve all tool calls

Built-in: HTTP requests

The http_request action type is built-in — no tool backend needed. Agents can fetch external data directly:

{
  "action_type": "http_request",
  "url": "https://api.weather.com/v1/current?city=bucharest",
  "method": "GET"
}

Configuration

Optional config.yaml in project root:

server:
  host: "0.0.0.0"
  port: 8420

ollama:
  base_url: "http://localhost:11434"
  default_model: "gemma4:latest"

bus:
  dedupe_window_seconds: 60
  max_fan_out: 3
  max_chain_depth: 10
  max_chain_budget: 20

producers:
  clipboard_enabled: true
  file_watcher_enabled: false
  file_watcher_paths: ["~/Downloads", "~/Documents"]
  dbus_enabled: false
  terminal_monitor_enabled: false

tools:
  http_request_enabled: true
  http_request_timeout: 30         # seconds
  http_request_max_size: 1048576   # 1MB response cap

classifier:
  enabled: false
  model: "gemma4:latest"

policy:
  trust_overrides: {}
  shell_timeout_seconds: 30

lanes:
  interactive_prefixes: ["user."]
  critical_prefixes: ["security.", "system.failure"]

telemetry:
  queue_depth_sample_interval_seconds: 5.0

Observability

Prometheus metrics are always exposed at http://localhost:8420/metrics (plaintext, OpenMetrics-compatible). Scrape with any Prometheus-compatible collector.

Metric families:

Family Type Labels What it measures
aiventbus_http_requests_total counter method, path, status HTTP traffic against the daemon
aiventbus_http_request_duration_seconds histogram method, path HTTP request latency
aiventbus_events_published_total counter topic, source Events accepted onto the bus
aiventbus_events_deduped_total counter topic Events dropped by the dedupe window
aiventbus_events_chain_limit_total counter reason (depth|budget) Chain-reaction guardrail rejections
aiventbus_event_publish_duration_seconds histogram outcome Latency of EventBus.publish
aiventbus_routing_decisions_total counter result Matched / unmatched routing outcomes
aiventbus_routing_duration_seconds histogram result Latency of routing decisions
aiventbus_assignments_created_total counter agent_id, lane Assignments handed to agents
aiventbus_assignment_state_transitions_total counter agent_id, state claimed / completed / failed / retried
aiventbus_assignment_queue_depth gauge lane Pending assignments per priority lane
aiventbus_agent_runs_total counter agent_id, model, result Assignment processing outcomes
aiventbus_agent_run_duration_seconds histogram agent_id, model, result Latency of one assignment run
aiventbus_llm_requests_total counter agent_id, model, result Ollama completion calls
aiventbus_llm_request_duration_seconds histogram agent_id, model LLM wall time
aiventbus_llm_parse_failures_total counter agent_id, model Unparseable structured output
aiventbus_llm_tokens_total counter agent_id, model, kind (prompt|eval) Tokens reported by Ollama
aiventbus_action_executions_total counter agent_id, action_type, result Executor outcomes
aiventbus_action_execution_duration_seconds histogram action_type, result Executor wall time
aiventbus_producer_events_emitted_total counter producer Per-producer emit rate
aiventbus_system_events_total counter topic Internal system.* events (parse_failure, agent_failure, chain_limit, ...)
aiventbus_classifier_fallbacks_total counter result Classifier invocations for unmatched events

Queue depth is sampled in the background (default 5s, tunable via telemetry.queue_depth_sample_interval_seconds) so scrapes don't hit the DB. The /metrics endpoint is always on — gate it at the network layer (firewall / reverse proxy) if you don't want it exposed.

API reference

Method Endpoint Description
POST /api/v1/events Publish event
GET /api/v1/events List events (filter by topic, status)
GET /api/v1/events/:id Event detail
GET /api/v1/events/:id/chain Full event chain
GET /api/v1/events/:id/assignments Assignments for event
GET /api/v1/events/:id/responses Agent responses for event
GET /api/v1/events/trace/:trace_id All events in a trace
POST /api/v1/agents Create agent
GET /api/v1/agents List agents
POST /api/v1/agents/:id/enable Enable agent
POST /api/v1/agents/:id/disable Disable agent
GET /api/v1/agents/:id/memory Agent transcript memory + pinned facts
GET /api/v1/memories List/search distilled long-term memories
POST /api/v1/memories Create long-term memory
GET /api/v1/memories/:id Get long-term memory
PATCH /api/v1/memories/:id Update long-term memory importance
DELETE /api/v1/memories/:id Delete long-term memory
POST /api/v1/routing-rules Create routing rule
GET /api/v1/routing-rules List rules
GET /api/v1/actions/pending List pending actions
GET /api/v1/actions/history All actions (pending + resolved)
GET /api/v1/actions/:id Action detail
POST /api/v1/actions/:id/approve Approve and execute action
POST /api/v1/actions/:id/deny Deny action
GET /api/v1/knowledge List knowledge (with prefix filter)
PUT /api/v1/knowledge/:key Set knowledge entry
GET /api/v1/knowledge/:key Get knowledge entry
GET /api/v1/topics Topic stats
GET /api/v1/system/status Health check
GET /metrics Prometheus metrics (plaintext exposition)
WS /ws WebSocket (real-time events + agent streaming)

License

MIT

About

AI Event Bus is the only local-first, event-driven AI runtime that makes your Mac or Linux machine autonomously intelligent — monitoring OS events in real time, reasoning with local LLMs, and taking policy-gated actions. Unlike cloud AI assistants, nothing leaves your machine.

Topics

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors