Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ lazy val capybaraclaw = project
"org.jline" % "jline-terminal-jni" % "4.0.12",
"xyz.matthieucourt" %% "layoutz" % "0.7.0",
"com.github.alexarchambault" %% "case-app" % "2.1.0",
"org.xerial" % "sqlite-jdbc" % "3.53.0.0",
"org.flywaydb" % "flyway-core" % "12.4.0",
"lampepfl" %% "tacit" % tacitVersion,
("lampepfl" %% "tacit-library" % tacitLibraryVersion)
.excludeAll(ExclusionRule(organization = "*", name = "*")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE TABLE sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
port TEXT NOT NULL,
thread TEXT NOT NULL
);

CREATE UNIQUE INDEX sessions_port_thread_idx ON sessions(port, thread);
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id INTEGER NOT NULL REFERENCES sessions(id),
role TEXT NOT NULL,
text TEXT NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE VIRTUAL TABLE messages_fts USING fts5(text, content='messages', content_rowid='id');

CREATE TRIGGER messages_ai AFTER INSERT ON messages BEGIN
INSERT INTO messages_fts(rowid, text) VALUES (new.id, new.text);
END;

CREATE TRIGGER messages_ad AFTER DELETE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, text)
VALUES ('delete', old.id, old.text);
END;

CREATE TRIGGER messages_au AFTER UPDATE ON messages BEGIN
INSERT INTO messages_fts(messages_fts, rowid, text)
VALUES ('delete', old.id, old.text);
INSERT INTO messages_fts(rowid, text) VALUES (new.id, new.text);
END;

INSERT INTO messages_fts(messages_fts) VALUES ('rebuild');
1 change: 1 addition & 0 deletions capybaraclaw/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

<logger name="com.slack.api" level="ERROR"/>
<logger name="org.glassfish.tyrus" level="ERROR"/>
<logger name="org.flywaydb" level="INFO"/>
<logger name="capybaraclaw" level="INFO"/>

<root level="WARN">
Expand Down
25 changes: 12 additions & 13 deletions capybaraclaw/src/main/scala/capybaraclaw/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package capybaraclaw

import caseapp.*

import capybaraclaw.gateway.{Gateway, JsonlContextProvider}
import capybaraclaw.gateway.Gateway
import capybaraclaw.gateway.port.Port
import capybaraclaw.gateway.port.cli.CliPort
import capybaraclaw.gateway.port.slack.{SlackBot, SlackPort}
import capybaraclaw.gateway.sqlite.SqliteContextProvider

import gears.async.{Async, Future}
import gears.async.default.given
Expand All @@ -14,6 +15,7 @@ import java.io.File

import language.experimental.captureChecking

import scala.util.Using
import scala.util.control.NonFatal

/** Entrypoint of Capybara Claw.
Expand Down Expand Up @@ -51,16 +53,14 @@ private object ClawMain extends CaseApp[CliOptions]:

printStartupInfo(workDir, options.enableSlack)

val contextProvider = JsonlContextProvider(workDirFile)

Async.blocking:
val slackPort: Option[SlackPort] =
if options.enableSlack then Some(SlackPort(SlackBot.fromEnv()))
else None

try
val cli = CliPort(workDirFile = workDirFile)
try
Using
.Manager: use =>
val contextProvider = use(SqliteContextProvider(workDirFile))
Async.blocking:
val slackPort: Option[SlackPort] =
if options.enableSlack then Some(use(SlackPort(SlackBot.fromEnv())))
else None
val cli = use(CliPort(workDirFile = workDirFile))
val ports: List[Port] = slackPort.toList :+ cli
val gateway = Gateway(workDir, ports, contextProvider)
println(s"Gateway ready. Ports: ${ports.map(_.id).mkString(", ")}.")
Expand All @@ -71,8 +71,7 @@ private object ClawMain extends CaseApp[CliOptions]:
try cliFuture.awaitResult
finally gateway.shutdown()
gateway.run()
finally cli.shutdown()
finally slackPort.foreach(_.shutdown())
.get

private final case class ClawCaseAppExit(code: Int)
extends RuntimeException(null, null, false, false)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package capybaraclaw.gateway

import tacit.agents.llm.endpoint.{Message, Role, Content}
import java.io.{File, FileWriter, BufferedWriter}
import tacit.agents.llm.endpoint.Message

/** Persistent transcript store, keyed by `ContextKey` so each (port, thread) has its
* own conversation history. Used by the Gateway to seed a fresh `ClawAgent` on first
Expand All @@ -13,68 +12,3 @@ trait ContextProvider:

/** Append a single message to this key's transcript. */
def append(key: ContextKey, msg: Message): Unit

/** JSONL-on-disk `ContextProvider`.
*
* Layout: `{baseDir}/.claw/history/{port}/{sanitize(thread)}.jsonl`. Each line is a
* JSON object: `{"role": "user|assistant", "text": "..."}`.
*
* v1 limitation: only `Role.User` / `Role.Assistant` messages with pure `Content.Text`
* are persisted. Tool-use and tool-result content is mid-turn scaffolding and is NOT
* kept — the LLM reconstructs it as needed on future turns. Thinking content is also
* dropped.
*/
class JsonlContextProvider(baseDir: File) extends ContextProvider:

def load(key: ContextKey): List[Message] =
val f = fileFor(key)
if !f.exists() then return Nil
val src = scala.io.Source.fromFile(f)
try
src.getLines().flatMap(decode).toList
finally src.close()

def append(key: ContextKey, msg: Message): Unit =
encode(msg) match
case None =>
() // not persistable (tool-use, tool-result, thinking-only, etc.)
case Some(line) =>
val f = fileFor(key)
val parent = f.getParentFile
if parent != null && !parent.exists() then parent.mkdirs()
val bw = BufferedWriter(FileWriter(f, /*append=*/ true))
try
bw.write(line)
bw.newLine()
finally bw.close()

private def fileFor(key: ContextKey): File =
val historyRoot = File(File(baseDir, ".claw"), "history")
File(historyRoot, s"${key.port}/${sanitize(key.thread)}.jsonl")

private def sanitize(s: String): String =
// Slack thread ids look like "C0123456/1234567890.123456"; preserve readability
// but keep filenames safe on all platforms.
s.map:
case c if c.isLetterOrDigit => c
case c @ ('.' | '-' | '_') => c
case _ => '_'

private def encode(m: Message): Option[String] =
val roleStr = m.role match
case Role.User => "user"
case Role.Assistant => "assistant"
case Role.System => return None // system seed comes from the agent config
val text = m.content.collect { case Content.Text(t) => t }.mkString
if text.isEmpty then None
else Some(ujson.write(ujson.Obj("role" -> roleStr, "text" -> text)))

private def decode(line: String): Option[Message] =
if line.trim.isEmpty then return None
val obj = ujson.read(line)
val role = obj.obj.get("role").map(_.str).getOrElse("")
val text = obj.obj.get("text").map(_.str).getOrElse("")
role match
case "user" => Some(Message.user(text))
case "assistant" => Some(Message.assistant(text))
case _ => None
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import gears.async.ReadableChannel
* send replies back. The Gateway pumps `incoming` into per-thread agent runners and
* calls `send` when a runner produces a reply for this port's threads.
*/
trait Port:
trait Port extends AutoCloseable:
/** Unique identifier for this port, used as `Origin.port` on inbound messages. */
def id: String

Expand All @@ -30,3 +30,5 @@ trait Port:

/** Release any resources (network connections, background listeners). */
def shutdown(): Unit

override def close(): Unit = shutdown()
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package capybaraclaw.gateway.sqlite

import capybaraclaw.gateway.{ContextKey, ContextProvider}
import org.flywaydb.core.Flyway
import tacit.agents.llm.endpoint.{Content, Message, Role}

import java.io.File
import java.nio.file.Files
import java.sql.{Connection, DriverManager}
import scala.util.Using

/** SQLite-backed session store.
*
* `.claw/state.db` is the single source of truth. Schema is versioned by
* Flyway (`flyway_schema_history`); see `db/migration/V*.sql` resources.
*
* Concurrency model: one persistent writer connection + a small pool of
* read-only connections.
*/
class SqliteContextProvider(baseDir: File)
extends ContextProvider
with AutoCloseable:

private val dbFile = File(File(baseDir, ".claw"), "state.db")
private val writeLock = Object()

runMigrations()
private val writer: Connection = openWriter()
private val readers: SqliteReaderPool =
try SqliteReaderPool(dbFile, size = 2)
catch
case e: Throwable =>
bestEffort:
writer.close()
throw e

def load(key: ContextKey): List[Message] =
readers.withReader: reader =>
findSession(reader, key) match
case None => Nil
case Some(sessionId) => selectMessages(reader, sessionId)

def append(key: ContextKey, msg: Message): Unit =
persistableText(msg).foreach: (role, text) =>
writeLock.synchronized:
inTransaction:
val sessionId = findOrCreateSession(writer, key)
insertMessage(writer, sessionId, role, text)

def close(): Unit =
writeLock.synchronized:
readers.close()
if !writer.isClosed then writer.close()

private def runMigrations(): Unit =
Files.createDirectories(dbFile.toPath.getParent)
val _ = Flyway
.configure()
.dataSource(s"jdbc:sqlite:${dbFile.getPath}", "", "")
.locations("classpath:db/migration")
.load()
.migrate()

private def openWriter(): Connection =
val c = DriverManager.getConnection(s"jdbc:sqlite:${dbFile.getPath}")
try
SqliteJdbc.execute(c, "PRAGMA journal_mode = WAL")
SqliteJdbc.execute(c, "PRAGMA busy_timeout = 5000")
SqliteJdbc.execute(c, "PRAGMA foreign_keys = ON")
SqliteJdbc.execute(c, "PRAGMA journal_size_limit = 67108864") /* 64 MiB */
c
catch
case e: Throwable =>
bestEffort:
c.close()
throw e

private def findSession(conn: Connection, key: ContextKey): Option[Long] =
SqliteJdbc.withStatement(
conn,
"SELECT id FROM sessions WHERE port = ? AND thread = ?"
): stmt =>
stmt.setString(1, key.port)
stmt.setString(2, key.thread)
SqliteJdbc.withResultSet(stmt.executeQuery()): rs =>
if rs.next() then Some(rs.getLong("id")) else None

private def findOrCreateSession(conn: Connection, key: ContextKey): Long =
val sql =
"""INSERT INTO sessions(port, thread) VALUES (?, ?)
|ON CONFLICT(port, thread) DO UPDATE SET port = sessions.port
|RETURNING id""".stripMargin
SqliteJdbc.withStatement(conn, sql): stmt =>
stmt.setString(1, key.port)
stmt.setString(2, key.thread)
SqliteJdbc.withResultSet(stmt.executeQuery()): rs =>
if rs.next() then rs.getLong("id")
else throw IllegalStateException("upsert returned no session id")

private def selectMessages(
conn: Connection,
sessionId: Long
): List[Message] =
val sql =
"""SELECT role, text
|FROM messages
|WHERE session_id = ?
|ORDER BY id ASC""".stripMargin
SqliteJdbc.withStatement(conn, sql): stmt =>
stmt.setLong(1, sessionId)
SqliteJdbc.withResultSet(stmt.executeQuery()): rs =>
Iterator
.continually(rs.next())
.takeWhile(identity)
.flatMap: _ =>
rs.getString("role") match
case "user" => Some(Message.user(rs.getString("text")))
case "assistant" => Some(Message.assistant(rs.getString("text")))
case _ => None
.toList

private def insertMessage(
conn: Connection,
sessionId: Long,
role: String,
text: String
): Unit =
val sql =
"INSERT INTO messages(session_id, role, text) VALUES (?, ?, ?)"
SqliteJdbc.withStatement(conn, sql): stmt =>
stmt.setLong(1, sessionId)
stmt.setString(2, role)
stmt.setString(3, text)
stmt.executeUpdate()
()

private def persistableText(msg: Message): Option[(String, String)] =
val role = msg.role match
case Role.User => Some("user")
case Role.Assistant => Some("assistant")
case Role.System => None
val text = msg.content.collect { case Content.Text(t) => t }.mkString
role.filter(_ => text.nonEmpty).map(_ -> text)

private def inTransaction[A](body: => A): A =
Using.resource(WriteTransaction(writer)): tx =>
val result = body
tx.commit()
result
private final class WriteTransaction(conn: Connection) extends AutoCloseable:
private val previousAutoCommit = conn.getAutoCommit
conn.setAutoCommit(false)

def commit(): Unit =
conn.commit()
conn.setAutoCommit(previousAutoCommit)

override def close(): Unit =
if !conn.getAutoCommit then
bestEffort:
conn.rollback()
bestEffort:
conn.setAutoCommit(previousAutoCommit)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package capybaraclaw.gateway.sqlite

import java.sql.{Connection, PreparedStatement, ResultSet}

private[sqlite] def bestEffort(body: => Unit): Unit =
try body
catch case _: Throwable => ()

private[sqlite] object SqliteJdbc:

def execute(conn: Connection, sql: String): Unit =
val stmt = conn.createStatement()
try
stmt.execute(sql)
()
finally stmt.close()

def withStatement[A](conn: Connection, sql: String)(
body: PreparedStatement => A
): A =
val stmt = conn.prepareStatement(sql)
try body(stmt)
finally stmt.close()

def withResultSet[A](rs: ResultSet)(body: ResultSet => A): A =
try body(rs)
finally rs.close()
Loading