Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions migrations/versions/007_outbound_websocket_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Add outbound_websocket_sources table.

Stores configuration for outbound WebSocket connections that the automation
service initiates and maintains. Events received over these connections are
dispatched through the same trigger-matching pipeline as inbound webhooks.

Supports two kinds:
- "generic": static wss:// URL with optional HTTP headers
- "slack": Slack Socket Mode via apps.connections.open (dynamic URL)

Revision ID: 007
Revises: 006
Create Date: 2026-05-21
"""

from collections.abc import Sequence

from alembic import op
from sqlalchemy import JSON, Boolean, Column, DateTime, Enum, String, Text, Uuid, text


revision: str = "007"
down_revision: str = "006"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"outbound_websocket_sources",
Column("id", Uuid, primary_key=True),
Column("org_id", Uuid, nullable=False),
Column("name", String(255), nullable=False),
Column("source", String(100), nullable=False),
Column("kind", String(50), nullable=False),
Column("enabled", Boolean, nullable=False, server_default="true"),
# JMESPath expressions
Column("event_key_expr", String(500), nullable=False, server_default="type"),
Column("payload_expr", String(500), nullable=True),
Column("filter_expr", Text, nullable=True),
# generic-kind fields
Column("url", Text, nullable=True),
Column("headers", JSON, nullable=True),
# slack-kind fields
Column("app_token", String(255), nullable=True),
# runtime state
Column(
"status",
Enum(
"CONNECTING",
"CONNECTED",
"DISCONNECTED",
"ERROR",
name="websocketstatus",
native_enum=False,
length=20,
),
nullable=False,
server_default="DISCONNECTED",
),
Column("status_detail", Text, nullable=True),
Column("connected_at", DateTime(timezone=True), nullable=True),
Column("last_event_at", DateTime(timezone=True), nullable=True),
Column(
"created_at",
DateTime(timezone=True),
nullable=False,
server_default=text("CURRENT_TIMESTAMP"),
),
Column(
"updated_at",
DateTime(timezone=True),
nullable=False,
Comment thread
tofarr marked this conversation as resolved.
server_default=text("CURRENT_TIMESTAMP"),
),
)
op.create_index(
"ix_outbound_ws_sources_org_id",
"outbound_websocket_sources",
["org_id"],
)
op.create_index(
"ix_outbound_ws_sources_org_source",
"outbound_websocket_sources",
["org_id", "source"],
unique=True,
)


def downgrade() -> None:
op.drop_table("outbound_websocket_sources")
12 changes: 12 additions & 0 deletions openhands/automation/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
from openhands.automation.preset_router import router as preset_router
from openhands.automation.router import router
from openhands.automation.scheduler import scheduler_loop
from openhands.automation.socket_manager import SocketManager
from openhands.automation.uploads import router as uploads_router
from openhands.automation.watchdog import watchdog_loop
from openhands.automation.webhook_router import router as webhook_router
from openhands.automation.websocket_source_router import (
router as websocket_source_router,
)


logger = logging.getLogger("automation.app")
Expand Down Expand Up @@ -151,11 +155,18 @@ async def lifespan(app: FastAPI):
app.state.watchdog_task = watchdog_task
logger.info("Background watchdog started")

# Socket manager: maintains outbound WebSocket connections
socket_manager = SocketManager(app.state.session_factory)
app.state.socket_manager = socket_manager
await socket_manager.start()
logger.info("Socket manager started")

yield

# Shutdown
logger.info("Shutting down background tasks...")
shutdown_event.set()
await socket_manager.stop()

# Wait for all tasks to exit gracefully
for task_name, task in [
Expand Down Expand Up @@ -224,6 +235,7 @@ def _create_app() -> FastAPI:
app.include_router(preset_router, prefix=_base_path)
app.include_router(event_router, prefix=_base_path)
app.include_router(webhook_router, prefix=_base_path)
app.include_router(websocket_source_router, prefix=_base_path)
app.include_router(router, prefix=_base_path)


Expand Down
131 changes: 131 additions & 0 deletions openhands/automation/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

from openhands.automation.utils import utcnow
from openhands.automation.utils.encrypted_fields import (
EncryptedJSONHeaders,
EncryptedString,
)


class WebSocketStatus(enum.Enum):
"""Runtime connection status for an outbound WebSocket source."""

CONNECTING = "CONNECTING"
CONNECTED = "CONNECTED"
DISCONNECTED = "DISCONNECTED"
ERROR = "ERROR"


class Base(DeclarativeBase):
Expand Down Expand Up @@ -321,3 +334,121 @@ class CustomWebhook(Base):
__table_args__ = (
Index("ix_custom_webhooks_org_source", "org_id", "source", unique=True),
)


class OutboundWebSocketSource(Base):
"""An outbound WebSocket connection that receives events from an external service.

Unlike CustomWebhook (where the external service connects to us), this model
represents a connection WE initiate to an external service. A background
SocketManager maintains the connection and dispatches received events through
the same trigger-matching pipeline used by webhooks.

Two kinds are supported, selected via the ``kind`` discriminator column:

``"GenericWebSocketSource"``
Connects to a static ``wss://`` URL with optional HTTP headers.
Suitable for any service that exposes a plain WebSocket endpoint.

``"SlackWebSocketSource"``
Connects to Slack's Socket Mode API. Requires a Slack App-Level Token
(``xapp-…``). The connection URL is fetched dynamically by calling
``apps.connections.open`` before each connect attempt; Slack-specific
envelope ACKs are handled automatically.

Event routing uses the same JMESPath machinery as webhooks:

- ``event_key_expr`` extracts the event-type string that is matched against
``trigger.on`` patterns in automations (e.g. ``"payload.event.type"``
yields ``"message"`` for Slack message events).
- ``payload_expr`` unwraps outer envelopes before the payload is stored on
the run and handed to ``trigger.filter`` evaluation (e.g. ``"payload.event"``
for Slack strips the Socket Mode envelope).
- ``filter_expr`` is a *connection-level* pre-filter: events that do not match
are silently dropped before any automation matching occurs. Use this to
avoid dispatching irrelevant high-volume events (e.g. bot messages).
"""

__tablename__ = "outbound_websocket_sources"

id: Mapped[uuid.UUID] = mapped_column(Uuid, primary_key=True, default=uuid.uuid4)
org_id: Mapped[uuid.UUID] = mapped_column(Uuid, nullable=False, index=True)

# Human-readable label
name: Mapped[str] = mapped_column(String(255), nullable=False)

# Slug used as the event ``source`` name in trigger matching and URLs.
# Must be unique per org (enforced by the unique index below).
source: Mapped[str] = mapped_column(String(100), nullable=False)

# Discriminator: "GenericWebSocketSource" or "SlackWebSocketSource"
kind: Mapped[str] = mapped_column(String(50), nullable=False)

enabled: Mapped[bool] = mapped_column(default=True, nullable=False)

# --- JMESPath expressions (common to all kinds) ---

# Extracts the event-type key used for trigger.on pattern matching.
# Defaults differ per kind and are set at the schema layer:
# GenericWebSocketSource → "type"
# SlackWebSocketSource → "payload.event.type"
event_key_expr: Mapped[str] = mapped_column(String(500), nullable=False)

# Unwraps outer envelopes so the stored/filtered payload is the inner event.
# None means pass the raw message through unchanged.
# SlackWebSocketSource default → "payload.event"
payload_expr: Mapped[str | None] = mapped_column(String(500), nullable=True)

# Connection-level pre-filter (JMESPath). Evaluated against the raw message
# *before* payload unwrapping. Events that do not match are dropped silently.
# None means accept all events.
filter_expr: Mapped[str | None] = mapped_column(Text, nullable=True)

# --- kind = "GenericWebSocketSource fields ---

# Static wss:// URL to connect to. Not encrypted (not a credential).
url: Mapped[str | None] = mapped_column(Text, nullable=True)

# HTTP headers for the WebSocket upgrade request.
# Sensitive header values (Authorization, X-Api-Key, Cookie, etc.) are
# encrypted at rest via EncryptedJSONHeaders using AUTOMATION_SECRET_KEY /
# OH_SECRET_KEY. Non-sensitive headers are stored as-is.
headers: Mapped[dict | None] = mapped_column(EncryptedJSONHeaders, nullable=True)

# --- kind = "SlackWebSocketSource fields ---

# Slack App-Level Token (xapp-…). Required for Socket Mode.
# Encrypted at rest via EncryptedString using AUTOMATION_SECRET_KEY /
# OH_SECRET_KEY.
app_token: Mapped[str | None] = mapped_column(EncryptedString(255), nullable=True)

# --- Runtime state (managed by SocketManager, not by the API) ---

status: Mapped[WebSocketStatus] = mapped_column(
Enum(WebSocketStatus, native_enum=False, length=20),
nullable=False,
default=WebSocketStatus.DISCONNECTED,
)
status_detail: Mapped[str | None] = mapped_column(Text, nullable=True)
connected_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
last_event_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)

created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=text("CURRENT_TIMESTAMP"),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=text("CURRENT_TIMESTAMP"),
onupdate=utcnow,
nullable=False,
)

__table_args__ = (
Index("ix_outbound_ws_sources_org_source", "org_id", "source", unique=True),
)
Loading
Loading