Skip to content

Feature: Session-based conversation reuse for event-triggered automations #105

@malhotra5

Description

@malhotra5

Summary

Add support for reusing conversations across multiple events within a "session". This enables use cases like:

  • Slack threads: Multiple messages in the same thread should go to the same conversation
  • GitHub PRs: Multiple comments/events on the same PR should share context
  • Linear issues: Follow-up events on the same issue should maintain conversation history

Problem Statement

Currently, each event creates a new AutomationRun → new sandbox → new conversation. There is no mechanism to route related events to the same conversation. This means:

  • No context sharing between related events
  • Each event starts fresh without knowledge of previous interactions
  • Expensive: new sandbox for every event even if related

Proposed Solution

High-Level Flow

  1. Automations configure a session key expression - JMESPath to extract a session identifier from event payloads
  2. First event creates a session - AutomationRun with session_key, sandbox started, session state stored
  3. Subsequent events match to existing session - Events with same session_key are routed to existing sandbox
  4. Events are queued to agent-server - New endpoint allows automation service to push events
  5. SDK polls for events - User's SDK script decides when/how to process queued events
  6. Handle sandbox death gracefully - Configurable behavior: queue, restart, or drop

Why Event Queue (vs Direct Conversation Manipulation)

We considered directly calling agent-server conversation APIs (POST /api/conversations/{id}/events), but this approach is inflexible:

  • Custom SDK code: Users may have custom tarballs with their own conversation logic, filters, multiple conversations
  • State uncertainty: Even if sandbox is alive, the SDK script might have exited, conversation might be closed
  • No user control: The automation service would be making assumptions about conversation lifecycle

Instead, we push events to a queue that the SDK can consume, giving users full control over processing.

Detailed Design

Schema Changes

class SessionConfig(BaseModel):
    """Configure session-based conversation reuse."""
    
    key_expr: str = Field(
        ...,
        description="JMESPath expression to extract session key from event payload"
    )
    
    idle_timeout_seconds: int = Field(
        default=300,
        ge=30,
        le=3600,
        description="SDK exits if no events for this long (prevents infinite sandbox)"
    )
    
    session_timeout_seconds: int = Field(
        default=3600,
        ge=60,
        le=86400,
        description="Max total session lifetime"
    )
    
    on_sandbox_death: Literal["queue", "restart", "drop"] = Field(
        default="queue",
        description=(
            "Behavior when sandbox dies while events are pending:\n"
            "- queue: Store events, deliver to next sandbox\n"
            "- restart: Start new sandbox immediately with queued events\n"
            "- drop: Discard the event"
        )
    )


class EventTrigger(BaseModel):
    type: Literal["event"] = "event"
    source: str
    on: str | list[str]
    filter: str | None = None
    session: SessionConfig | None = None  # NEW

Database Models

class AutomationSession(Base):
    """Tracks active sessions for event routing."""
    
    __tablename__ = "automation_sessions"
    
    id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    automation_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("automations.id"))
    session_key: Mapped[str] = mapped_column(String(255), index=True)
    run_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("automation_runs.id"))
    
    # Sandbox state (for routing events)
    sandbox_id: Mapped[str] = mapped_column(String(255))
    agent_url: Mapped[str] = mapped_column(Text)
    agent_session_key: Mapped[str] = mapped_column(String(255))
    
    # Lifecycle
    status: Mapped[str]  # ACTIVE, EXPIRED, DEAD
    started_at: Mapped[datetime]
    expires_at: Mapped[datetime]
    last_event_at: Mapped[datetime]
    
    __table_args__ = (
        Index("ix_session_lookup", "automation_id", "session_key", "status"),
    )


class PendingSessionEvent(Base):
    """Events queued for dead/restarting sessions."""
    
    __tablename__ = "pending_session_events"
    
    id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    automation_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("automations.id"))
    session_key: Mapped[str] = mapped_column(String(255), index=True)
    event_payload: Mapped[dict] = mapped_column(JSON)
    created_at: Mapped[datetime]

Agent-Server API (New Endpoints)

POST /api/workspace/events
  Headers: X-Session-API-Key: ...
  Body: {"event_id": "...", "payload": {...}, "timestamp": ...}
  
GET /api/workspace/events
  Headers: X-Session-API-Key: ...
  Query: ?timeout=60 (optional, for long-polling)
  Returns: [{"event_id": "...", "payload": {...}, "timestamp": ...}, ...]

DELETE /api/workspace/events/{event_id}
  Headers: X-Session-API-Key: ...
  (Acknowledge event, removes from queue)

SDK Workspace Integration

class OpenHandsCloudWorkspace:
    def get_pending_events(
        self, 
        timeout: float = 0,  # 0 = non-blocking
        max_count: int = 10,
    ) -> list[AutomationEvent]:
        """
        Get pending automation events for this session.
        
        Args:
            timeout: Seconds to wait for events. 0 = return immediately.
            max_count: Maximum events to return.
            
        Returns:
            List of pending events, empty if none available within timeout.
        """
        
    def ack_event(self, event_id: str) -> None:
        """Acknowledge event (removes from queue)."""

Event Router Changes

async def receive_event(...):
    # ... existing matching logic ...
    
    for automation in matched_automations:
        trigger = automation.trigger
        session_config = trigger.get("session")
        
        if session_config:
            session_key = evaluate_jmespath(session_config["key_expr"], payload)
            active_session = await get_active_session(automation.id, session_key)
            
            if active_session and active_session.status == "ACTIVE":
                # Check if sandbox is alive
                if await is_sandbox_alive(active_session.sandbox_id):
                    # Route to existing session
                    await push_event_to_session(active_session, payload)
                    continue
                else:
                    # Handle sandbox death
                    await handle_sandbox_death(active_session, session_config, payload)
                    continue
        
        # No session config or no active session — create new run
        run = await create_automation_run(automation, session, event_payload=payload)

Preset Script Changes

The preset scripts will add optional session mode:

session_config = event_context.get("session", {})

if not session_config.get("enabled"):
    # Normal single-event mode
    conversation.send_message(USER_PROMPT)
    conversation.run()
else:
    # Session mode: process initial + poll for more
    conversation.send_message(USER_PROMPT)
    conversation.run()
    
    idle_timeout = session_config.get("idle_timeout_seconds", 300)
    
    while True:
        events = workspace.get_pending_events(timeout=idle_timeout)
        if not events:
            break  # Idle timeout, exit
            
        for event in events:
            if event.type == "session_end":
                workspace.ack_event(event.id)
                return
                
            event_message = format_event_context(event.payload)
            conversation.send_message(event_message)
            conversation.run()
            workspace.ack_event(event.id)

Example Configuration

Slack Thread Reuse

{
  "trigger": {
    "type": "event",
    "source": "slack",
    "on": "message",
    "filter": "icontains(text, '@openhands')",
    "session": {
      "key_expr": "thread_ts || ts",
      "idle_timeout_seconds": 300,
      "on_sandbox_death": "queue"
    }
  }
}

GitHub PR Conversation

{
  "trigger": {
    "type": "event",
    "source": "github",
    "on": ["issue_comment.created", "pull_request.synchronize"],
    "filter": "icontains(comment.body, '@openhands')",
    "session": {
      "key_expr": "pull_request.number || issue.number",
      "idle_timeout_seconds": 600,
      "session_timeout_seconds": 7200,
      "on_sandbox_death": "restart"
    }
  }
}

Implementation Plan

Component Owner Changes
Event queue API Agent Server (SDK repo) New endpoints: POST/GET/DELETE /api/workspace/events
Workspace methods SDK get_pending_events(), ack_event()
Session tracking Automation Service New AutomationSession table, session lifecycle
Event routing Automation Service Check active sessions, route events to queue
Preset scripts Automation Service Add optional session mode polling loop
Watchdog Automation Service Detect dead sessions, handle on_sandbox_death

Open Questions

  1. Session-to-session handoff: If a session expires while processing, should we allow "continuing" in a new session with conversation history?
  2. Event ordering guarantees: Should we guarantee FIFO ordering of events within a session?
  3. Concurrent event processing: Should we allow the SDK to process multiple events concurrently, or enforce sequential processing?
  4. Session visibility in UI: How should active sessions be displayed in the automations frontend?

Related

  • Depends on agent-server changes in OpenHands/software-agent-sdk
  • May affect how completion callbacks work (need to track session state, not just run state)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions