From 4f552fd7616609945f0efc84d78d6431e721fa52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Warcho=C5=82?= Date: Thu, 7 May 2026 13:24:27 +0200 Subject: [PATCH] feat: persist conversation context in SQLite instead of JSONL --- build.sbt | 2 + .../db/migration/V1__create_sessions.sql | 7 + .../db/migration/V2__create_messages.sql | 6 + .../db/migration/V3__create_messages_fts.sql | 18 ++ capybaraclaw/src/main/resources/logback.xml | 1 + .../src/main/scala/capybaraclaw/Main.scala | 25 ++- .../gateway/ContextProvider.scala | 68 +------ .../capybaraclaw/gateway/port/Port.scala | 4 +- .../sqlite/SqliteContextProvider.scala | 163 +++++++++++++++ .../gateway/sqlite/SqliteJdbc.scala | 27 +++ .../gateway/sqlite/SqliteReaderPool.scala | 52 +++++ .../gateway/SqliteContextProviderSuite.scala | 188 ++++++++++++++++++ docs/design.md | 5 +- 13 files changed, 483 insertions(+), 83 deletions(-) create mode 100644 capybaraclaw/src/main/resources/db/migration/V1__create_sessions.sql create mode 100644 capybaraclaw/src/main/resources/db/migration/V2__create_messages.sql create mode 100644 capybaraclaw/src/main/resources/db/migration/V3__create_messages_fts.sql create mode 100644 capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteContextProvider.scala create mode 100644 capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteJdbc.scala create mode 100644 capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteReaderPool.scala create mode 100644 capybaraclaw/src/test/scala/capybaraclaw/gateway/SqliteContextProviderSuite.scala diff --git a/build.sbt b/build.sbt index f957c65..316cd65 100644 --- a/build.sbt +++ b/build.sbt @@ -83,6 +83,8 @@ lazy val capybaraclaw = project "org.jline" % "jline-terminal-jni" % "4.0.12", "xyz.matthieucourt" %% "layoutz" % "0.7.0", "com.github.alexarchambault" %% "case-app" % "2.1.0", + "org.xerial" % "sqlite-jdbc" % "3.53.0.0", + "org.flywaydb" % "flyway-core" % "12.4.0", "lampepfl" %% "tacit" % tacitVersion, ("lampepfl" %% "tacit-library" % tacitLibraryVersion) .excludeAll(ExclusionRule(organization = "*", name = "*")), diff --git a/capybaraclaw/src/main/resources/db/migration/V1__create_sessions.sql b/capybaraclaw/src/main/resources/db/migration/V1__create_sessions.sql new file mode 100644 index 0000000..dc08b2d --- /dev/null +++ b/capybaraclaw/src/main/resources/db/migration/V1__create_sessions.sql @@ -0,0 +1,7 @@ +CREATE TABLE sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + port TEXT NOT NULL, + thread TEXT NOT NULL +); + +CREATE UNIQUE INDEX sessions_port_thread_idx ON sessions(port, thread); diff --git a/capybaraclaw/src/main/resources/db/migration/V2__create_messages.sql b/capybaraclaw/src/main/resources/db/migration/V2__create_messages.sql new file mode 100644 index 0000000..cf387a1 --- /dev/null +++ b/capybaraclaw/src/main/resources/db/migration/V2__create_messages.sql @@ -0,0 +1,6 @@ +CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id INTEGER NOT NULL REFERENCES sessions(id), + role TEXT NOT NULL, + text TEXT NOT NULL +); diff --git a/capybaraclaw/src/main/resources/db/migration/V3__create_messages_fts.sql b/capybaraclaw/src/main/resources/db/migration/V3__create_messages_fts.sql new file mode 100644 index 0000000..a463ba6 --- /dev/null +++ b/capybaraclaw/src/main/resources/db/migration/V3__create_messages_fts.sql @@ -0,0 +1,18 @@ +CREATE VIRTUAL TABLE messages_fts USING fts5(text, content='messages', content_rowid='id'); + +CREATE TRIGGER messages_ai AFTER INSERT ON messages BEGIN + INSERT INTO messages_fts(rowid, text) VALUES (new.id, new.text); +END; + +CREATE TRIGGER messages_ad AFTER DELETE ON messages BEGIN + INSERT INTO messages_fts(messages_fts, rowid, text) + VALUES ('delete', old.id, old.text); +END; + +CREATE TRIGGER messages_au AFTER UPDATE ON messages BEGIN + INSERT INTO messages_fts(messages_fts, rowid, text) + VALUES ('delete', old.id, old.text); + INSERT INTO messages_fts(rowid, text) VALUES (new.id, new.text); +END; + +INSERT INTO messages_fts(messages_fts) VALUES ('rebuild'); diff --git a/capybaraclaw/src/main/resources/logback.xml b/capybaraclaw/src/main/resources/logback.xml index 626ee33..0babe7b 100644 --- a/capybaraclaw/src/main/resources/logback.xml +++ b/capybaraclaw/src/main/resources/logback.xml @@ -17,6 +17,7 @@ + diff --git a/capybaraclaw/src/main/scala/capybaraclaw/Main.scala b/capybaraclaw/src/main/scala/capybaraclaw/Main.scala index 0b0e71b..c71839d 100644 --- a/capybaraclaw/src/main/scala/capybaraclaw/Main.scala +++ b/capybaraclaw/src/main/scala/capybaraclaw/Main.scala @@ -2,10 +2,11 @@ package capybaraclaw import caseapp.* -import capybaraclaw.gateway.{Gateway, JsonlContextProvider} +import capybaraclaw.gateway.Gateway import capybaraclaw.gateway.port.Port import capybaraclaw.gateway.port.cli.CliPort import capybaraclaw.gateway.port.slack.{SlackBot, SlackPort} +import capybaraclaw.gateway.sqlite.SqliteContextProvider import gears.async.{Async, Future} import gears.async.default.given @@ -14,6 +15,7 @@ import java.io.File import language.experimental.captureChecking +import scala.util.Using import scala.util.control.NonFatal /** Entrypoint of Capybara Claw. @@ -51,16 +53,14 @@ private object ClawMain extends CaseApp[CliOptions]: printStartupInfo(workDir, options.enableSlack) - val contextProvider = JsonlContextProvider(workDirFile) - - Async.blocking: - val slackPort: Option[SlackPort] = - if options.enableSlack then Some(SlackPort(SlackBot.fromEnv())) - else None - - try - val cli = CliPort(workDirFile = workDirFile) - try + Using + .Manager: use => + val contextProvider = use(SqliteContextProvider(workDirFile)) + Async.blocking: + val slackPort: Option[SlackPort] = + if options.enableSlack then Some(use(SlackPort(SlackBot.fromEnv()))) + else None + val cli = use(CliPort(workDirFile = workDirFile)) val ports: List[Port] = slackPort.toList :+ cli val gateway = Gateway(workDir, ports, contextProvider) println(s"Gateway ready. Ports: ${ports.map(_.id).mkString(", ")}.") @@ -71,8 +71,7 @@ private object ClawMain extends CaseApp[CliOptions]: try cliFuture.awaitResult finally gateway.shutdown() gateway.run() - finally cli.shutdown() - finally slackPort.foreach(_.shutdown()) + .get private final case class ClawCaseAppExit(code: Int) extends RuntimeException(null, null, false, false) diff --git a/capybaraclaw/src/main/scala/capybaraclaw/gateway/ContextProvider.scala b/capybaraclaw/src/main/scala/capybaraclaw/gateway/ContextProvider.scala index 993dbab..ec30212 100644 --- a/capybaraclaw/src/main/scala/capybaraclaw/gateway/ContextProvider.scala +++ b/capybaraclaw/src/main/scala/capybaraclaw/gateway/ContextProvider.scala @@ -1,7 +1,6 @@ package capybaraclaw.gateway -import tacit.agents.llm.endpoint.{Message, Role, Content} -import java.io.{File, FileWriter, BufferedWriter} +import tacit.agents.llm.endpoint.Message /** Persistent transcript store, keyed by `ContextKey` so each (port, thread) has its * own conversation history. Used by the Gateway to seed a fresh `ClawAgent` on first @@ -13,68 +12,3 @@ trait ContextProvider: /** Append a single message to this key's transcript. */ def append(key: ContextKey, msg: Message): Unit - -/** JSONL-on-disk `ContextProvider`. - * - * Layout: `{baseDir}/.claw/history/{port}/{sanitize(thread)}.jsonl`. Each line is a - * JSON object: `{"role": "user|assistant", "text": "..."}`. - * - * v1 limitation: only `Role.User` / `Role.Assistant` messages with pure `Content.Text` - * are persisted. Tool-use and tool-result content is mid-turn scaffolding and is NOT - * kept — the LLM reconstructs it as needed on future turns. Thinking content is also - * dropped. - */ -class JsonlContextProvider(baseDir: File) extends ContextProvider: - - def load(key: ContextKey): List[Message] = - val f = fileFor(key) - if !f.exists() then return Nil - val src = scala.io.Source.fromFile(f) - try - src.getLines().flatMap(decode).toList - finally src.close() - - def append(key: ContextKey, msg: Message): Unit = - encode(msg) match - case None => - () // not persistable (tool-use, tool-result, thinking-only, etc.) - case Some(line) => - val f = fileFor(key) - val parent = f.getParentFile - if parent != null && !parent.exists() then parent.mkdirs() - val bw = BufferedWriter(FileWriter(f, /*append=*/ true)) - try - bw.write(line) - bw.newLine() - finally bw.close() - - private def fileFor(key: ContextKey): File = - val historyRoot = File(File(baseDir, ".claw"), "history") - File(historyRoot, s"${key.port}/${sanitize(key.thread)}.jsonl") - - private def sanitize(s: String): String = - // Slack thread ids look like "C0123456/1234567890.123456"; preserve readability - // but keep filenames safe on all platforms. - s.map: - case c if c.isLetterOrDigit => c - case c @ ('.' | '-' | '_') => c - case _ => '_' - - private def encode(m: Message): Option[String] = - val roleStr = m.role match - case Role.User => "user" - case Role.Assistant => "assistant" - case Role.System => return None // system seed comes from the agent config - val text = m.content.collect { case Content.Text(t) => t }.mkString - if text.isEmpty then None - else Some(ujson.write(ujson.Obj("role" -> roleStr, "text" -> text))) - - private def decode(line: String): Option[Message] = - if line.trim.isEmpty then return None - val obj = ujson.read(line) - val role = obj.obj.get("role").map(_.str).getOrElse("") - val text = obj.obj.get("text").map(_.str).getOrElse("") - role match - case "user" => Some(Message.user(text)) - case "assistant" => Some(Message.assistant(text)) - case _ => None diff --git a/capybaraclaw/src/main/scala/capybaraclaw/gateway/port/Port.scala b/capybaraclaw/src/main/scala/capybaraclaw/gateway/port/Port.scala index f65169c..2836185 100644 --- a/capybaraclaw/src/main/scala/capybaraclaw/gateway/port/Port.scala +++ b/capybaraclaw/src/main/scala/capybaraclaw/gateway/port/Port.scala @@ -9,7 +9,7 @@ import gears.async.ReadableChannel * send replies back. The Gateway pumps `incoming` into per-thread agent runners and * calls `send` when a runner produces a reply for this port's threads. */ -trait Port: +trait Port extends AutoCloseable: /** Unique identifier for this port, used as `Origin.port` on inbound messages. */ def id: String @@ -30,3 +30,5 @@ trait Port: /** Release any resources (network connections, background listeners). */ def shutdown(): Unit + + override def close(): Unit = shutdown() diff --git a/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteContextProvider.scala b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteContextProvider.scala new file mode 100644 index 0000000..1d195c7 --- /dev/null +++ b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteContextProvider.scala @@ -0,0 +1,163 @@ +package capybaraclaw.gateway.sqlite + +import capybaraclaw.gateway.{ContextKey, ContextProvider} +import org.flywaydb.core.Flyway +import tacit.agents.llm.endpoint.{Content, Message, Role} + +import java.io.File +import java.nio.file.Files +import java.sql.{Connection, DriverManager} +import scala.util.Using + +/** SQLite-backed session store. + * + * `.claw/state.db` is the single source of truth. Schema is versioned by + * Flyway (`flyway_schema_history`); see `db/migration/V*.sql` resources. + * + * Concurrency model: one persistent writer connection + a small pool of + * read-only connections. + */ +class SqliteContextProvider(baseDir: File) + extends ContextProvider + with AutoCloseable: + + private val dbFile = File(File(baseDir, ".claw"), "state.db") + private val writeLock = Object() + + runMigrations() + private val writer: Connection = openWriter() + private val readers: SqliteReaderPool = + try SqliteReaderPool(dbFile, size = 2) + catch + case e: Throwable => + bestEffort: + writer.close() + throw e + + def load(key: ContextKey): List[Message] = + readers.withReader: reader => + findSession(reader, key) match + case None => Nil + case Some(sessionId) => selectMessages(reader, sessionId) + + def append(key: ContextKey, msg: Message): Unit = + persistableText(msg).foreach: (role, text) => + writeLock.synchronized: + inTransaction: + val sessionId = findOrCreateSession(writer, key) + insertMessage(writer, sessionId, role, text) + + def close(): Unit = + writeLock.synchronized: + readers.close() + if !writer.isClosed then writer.close() + + private def runMigrations(): Unit = + Files.createDirectories(dbFile.toPath.getParent) + val _ = Flyway + .configure() + .dataSource(s"jdbc:sqlite:${dbFile.getPath}", "", "") + .locations("classpath:db/migration") + .load() + .migrate() + + private def openWriter(): Connection = + val c = DriverManager.getConnection(s"jdbc:sqlite:${dbFile.getPath}") + try + SqliteJdbc.execute(c, "PRAGMA journal_mode = WAL") + SqliteJdbc.execute(c, "PRAGMA busy_timeout = 5000") + SqliteJdbc.execute(c, "PRAGMA foreign_keys = ON") + SqliteJdbc.execute(c, "PRAGMA journal_size_limit = 67108864") /* 64 MiB */ + c + catch + case e: Throwable => + bestEffort: + c.close() + throw e + + private def findSession(conn: Connection, key: ContextKey): Option[Long] = + SqliteJdbc.withStatement( + conn, + "SELECT id FROM sessions WHERE port = ? AND thread = ?" + ): stmt => + stmt.setString(1, key.port) + stmt.setString(2, key.thread) + SqliteJdbc.withResultSet(stmt.executeQuery()): rs => + if rs.next() then Some(rs.getLong("id")) else None + + private def findOrCreateSession(conn: Connection, key: ContextKey): Long = + val sql = + """INSERT INTO sessions(port, thread) VALUES (?, ?) + |ON CONFLICT(port, thread) DO UPDATE SET port = sessions.port + |RETURNING id""".stripMargin + SqliteJdbc.withStatement(conn, sql): stmt => + stmt.setString(1, key.port) + stmt.setString(2, key.thread) + SqliteJdbc.withResultSet(stmt.executeQuery()): rs => + if rs.next() then rs.getLong("id") + else throw IllegalStateException("upsert returned no session id") + + private def selectMessages( + conn: Connection, + sessionId: Long + ): List[Message] = + val sql = + """SELECT role, text + |FROM messages + |WHERE session_id = ? + |ORDER BY id ASC""".stripMargin + SqliteJdbc.withStatement(conn, sql): stmt => + stmt.setLong(1, sessionId) + SqliteJdbc.withResultSet(stmt.executeQuery()): rs => + Iterator + .continually(rs.next()) + .takeWhile(identity) + .flatMap: _ => + rs.getString("role") match + case "user" => Some(Message.user(rs.getString("text"))) + case "assistant" => Some(Message.assistant(rs.getString("text"))) + case _ => None + .toList + + private def insertMessage( + conn: Connection, + sessionId: Long, + role: String, + text: String + ): Unit = + val sql = + "INSERT INTO messages(session_id, role, text) VALUES (?, ?, ?)" + SqliteJdbc.withStatement(conn, sql): stmt => + stmt.setLong(1, sessionId) + stmt.setString(2, role) + stmt.setString(3, text) + stmt.executeUpdate() + () + + private def persistableText(msg: Message): Option[(String, String)] = + val role = msg.role match + case Role.User => Some("user") + case Role.Assistant => Some("assistant") + case Role.System => None + val text = msg.content.collect { case Content.Text(t) => t }.mkString + role.filter(_ => text.nonEmpty).map(_ -> text) + + private def inTransaction[A](body: => A): A = + Using.resource(WriteTransaction(writer)): tx => + val result = body + tx.commit() + result + private final class WriteTransaction(conn: Connection) extends AutoCloseable: + private val previousAutoCommit = conn.getAutoCommit + conn.setAutoCommit(false) + + def commit(): Unit = + conn.commit() + conn.setAutoCommit(previousAutoCommit) + + override def close(): Unit = + if !conn.getAutoCommit then + bestEffort: + conn.rollback() + bestEffort: + conn.setAutoCommit(previousAutoCommit) diff --git a/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteJdbc.scala b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteJdbc.scala new file mode 100644 index 0000000..cb41b27 --- /dev/null +++ b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteJdbc.scala @@ -0,0 +1,27 @@ +package capybaraclaw.gateway.sqlite + +import java.sql.{Connection, PreparedStatement, ResultSet} + +private[sqlite] def bestEffort(body: => Unit): Unit = + try body + catch case _: Throwable => () + +private[sqlite] object SqliteJdbc: + + def execute(conn: Connection, sql: String): Unit = + val stmt = conn.createStatement() + try + stmt.execute(sql) + () + finally stmt.close() + + def withStatement[A](conn: Connection, sql: String)( + body: PreparedStatement => A + ): A = + val stmt = conn.prepareStatement(sql) + try body(stmt) + finally stmt.close() + + def withResultSet[A](rs: ResultSet)(body: ResultSet => A): A = + try body(rs) + finally rs.close() diff --git a/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteReaderPool.scala b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteReaderPool.scala new file mode 100644 index 0000000..ccc0de1 --- /dev/null +++ b/capybaraclaw/src/main/scala/capybaraclaw/gateway/sqlite/SqliteReaderPool.scala @@ -0,0 +1,52 @@ +package capybaraclaw.gateway.sqlite + +import java.io.File +import java.sql.{Connection, DriverManager} +import java.util.concurrent.ArrayBlockingQueue + +private[sqlite] class SqliteReaderPool(dbFile: File, size: Int) + extends AutoCloseable: + + private val (allConnections, available) = initialize() + + def withReader[A](body: Connection => A): A = + val conn = available.take() + if conn.isClosed then + bestEffort: + available.put(conn) + throw IllegalStateException("reader pool closed") + try body(conn) + finally available.put(conn) + + override def close(): Unit = + allConnections.foreach: c => + bestEffort: + c.close() + + private def initialize(): (List[Connection], ArrayBlockingQueue[Connection]) = + val queue = ArrayBlockingQueue[Connection](size) + val opened = scala.collection.mutable.ListBuffer[Connection]() + try + (1 to size).foreach: _ => + val c = openReader() + opened += c + queue.put(c) + (opened.toList, queue) + catch + case e: Throwable => + opened.foreach: c => + bestEffort: + c.close() + throw e + + private def openReader(): Connection = + val c = DriverManager.getConnection(s"jdbc:sqlite:${dbFile.getPath}") + try + SqliteJdbc.execute(c, "PRAGMA busy_timeout = 5000") + SqliteJdbc.execute(c, "PRAGMA query_only = 1") + c + catch + case e: Throwable => + bestEffort: + c.close() + throw e diff --git a/capybaraclaw/src/test/scala/capybaraclaw/gateway/SqliteContextProviderSuite.scala b/capybaraclaw/src/test/scala/capybaraclaw/gateway/SqliteContextProviderSuite.scala new file mode 100644 index 0000000..4df06d9 --- /dev/null +++ b/capybaraclaw/src/test/scala/capybaraclaw/gateway/SqliteContextProviderSuite.scala @@ -0,0 +1,188 @@ +package capybaraclaw.gateway + +import capybaraclaw.gateway.sqlite.SqliteContextProvider +import tacit.agents.llm.endpoint.{Content, Message, Role} + +import java.nio.file.{Files, Path} +import java.sql.{Connection, DriverManager, ResultSet} + +class SqliteContextProviderSuite extends munit.FunSuite: + + test("load on a fresh key returns empty without creating a session"): + withProvider(): (provider, dbPath) => + val key = ContextKey("cli", "default") + assertEquals(provider.load(key), Nil) + + withConnection(dbPath): conn => + assertEquals(queryInt(conn, "SELECT COUNT(*) FROM sessions"), 0) + + test("append then load preserves message order"): + withProvider(): (provider, _) => + val key = ContextKey("cli", "default") + provider.append(key, Message.user("first")) + provider.append(key, Message.assistant("second")) + provider.append(key, Message.user("third")) + + assertEquals( + provider.load(key).map(m => m.role -> m.text), + List( + Role.User -> "first", + Role.Assistant -> "second", + Role.User -> "third" + ) + ) + + test("keeps separate histories for different port and thread pairs"): + withProvider(): (provider, _) => + val cli = ContextKey("cli", "same-thread") + val slack = ContextKey("slack", "same-thread") + val otherThread = ContextKey("cli", "other-thread") + + provider.append(cli, Message.user("cli message")) + provider.append(slack, Message.user("slack message")) + provider.append(otherThread, Message.assistant("other message")) + + assertEquals(provider.load(cli).map(_.text), List("cli message")) + assertEquals(provider.load(slack).map(_.text), List("slack message")) + assertEquals( + provider.load(otherThread).map(_.text), + List("other message") + ) + + test("persists only user and assistant text messages"): + withProvider(): (provider, _) => + val key = ContextKey("cli", "default") + val skipped = List( + Message.system("system prompt"), + Message(Role.Assistant, List(Content.Thinking("private thought"))), + Message(Role.Assistant, List(Content.ToolUse("id", "tool", "{}"))), + Message.toolResult("id", "tool output"), + Message(Role.User, List(Content.Text(""))) + ) + + skipped.foreach(provider.append(key, _)) + provider.append(key, Message.user("visible user")) + provider.append(key, Message.assistant("visible assistant")) + + assertEquals( + provider.load(key).map(m => m.role -> m.text), + List( + Role.User -> "visible user", + Role.Assistant -> "visible assistant" + ) + ) + + test("messages FTS matches persisted text"): + withProvider(): (provider, dbPath) => + val key = ContextKey("cli", "default") + provider.append( + key, + Message.user("sqlite can search capybara transcripts") + ) + provider.append(key, Message.assistant("ordinary response")) + + withConnection(dbPath): conn => + val matches = queryPreparedRows( + conn, + """SELECT m.text + |FROM messages_fts + |JOIN messages m ON m.id = messages_fts.rowid + |WHERE messages_fts MATCH ? + |ORDER BY m.id ASC""".stripMargin, + List("capybara") + ) + assertEquals( + matches, + List(List("sqlite can search capybara transcripts")) + ) + + test("operations after close fail fast instead of hanging"): + val dir = Files.createTempDirectory("claw-after-close") + val provider = SqliteContextProvider(dir.toFile) + provider.close() + intercept[IllegalStateException]: + provider.load(ContextKey("cli", "default")) + intercept[Exception]: + provider.append(ContextKey("cli", "default"), Message.user("hi")) + + test("close is idempotent"): + val dir = Files.createTempDirectory("claw-close-twice") + val provider = SqliteContextProvider(dir.toFile) + provider.close() + provider.close() // must not throw + + test( + "concurrent providers in one workdir do not deadlock or violate uniqueness" + ): + val dir = Files.createTempDirectory("claw-multi-instance") + val providerA = SqliteContextProvider(dir.toFile) + try + val providerB = SqliteContextProvider(dir.toFile) + try + val key = ContextKey("cli", "shared") + + val threadA = new Thread(() => + (1 to 5).foreach: i => + providerA.append(key, Message.user(s"a-$i")) + ) + val threadB = new Thread(() => + (1 to 5).foreach: i => + providerB.append(key, Message.user(s"b-$i")) + ) + + threadA.start() + threadB.start() + threadA.join() + threadB.join() + + val combined = providerA.load(key).map(_.text).toSet + assertEquals( + combined, + (1 to 5).flatMap(i => List(s"a-$i", s"b-$i")).toSet + ) + finally providerB.close() + finally providerA.close() + + private def withProvider( + )(body: (SqliteContextProvider, Path) => Unit): Unit = + val dir = Files.createTempDirectory("claw-sqlite-provider") + val dbPath = dir.resolve(".claw").resolve("state.db") + val provider = SqliteContextProvider(dir.toFile) + try body(provider, dbPath) + finally provider.close() + + private def withConnection[A](dbPath: Path)(body: Connection => A): A = + val conn = DriverManager.getConnection(s"jdbc:sqlite:${dbPath.toString}") + try body(conn) + finally conn.close() + + private def queryInt(conn: Connection, sql: String): Int = + queryRows(conn, sql).head.head.toInt + + private def queryRows(conn: Connection, sql: String): List[List[String]] = + val stmt = conn.prepareStatement(sql) + try collectRows(stmt.executeQuery()) + finally stmt.close() + + private def queryPreparedRows( + conn: Connection, + sql: String, + values: List[String] + ): List[List[String]] = + val stmt = conn.prepareStatement(sql) + try + values.zipWithIndex.foreach: (value, index) => + stmt.setString(index + 1, value) + collectRows(stmt.executeQuery()) + finally stmt.close() + + private def collectRows(rs: ResultSet): List[List[String]] = + try + val columnCount = rs.getMetaData.getColumnCount + Iterator + .continually(rs.next()) + .takeWhile(identity) + .map: _ => + (1 to columnCount).map(index => rs.getString(index)).toList + .toList + finally rs.close() diff --git a/docs/design.md b/docs/design.md index 85e0288..966e149 100644 --- a/docs/design.md +++ b/docs/design.md @@ -117,7 +117,8 @@ class Gateway(workDir: String, ports: List[Port], contextProvider: ContextProvid def shutdown(): Unit ``` -Mid-turn messages are forwarded to the active `AgentRun` via `steer`. `JsonlContextProvider` persists transcripts to `.claw/history/{port}/{thread}.jsonl`. +Mid-turn messages are forwarded to the active `AgentRun` via `steer`. `SqliteContextProvider` persists session-native transcripts to `.claw/state.db`. The database stores one row per `(port, thread)` in `sessions`, ordered text turns in `messages`, and an FTS5 index in `messages_fts`; only user and assistant text content is persisted. Existing `.claw/history/**/*.jsonl` files are archival and are not read by the gateway. -Current ports: `SlackPort` (in `port.slack`) and `CliPort` (in `port.cli`). +Schema is managed by Flyway; migrations live as plain SQL under `capybaraclaw/src/main/resources/db/migration/V*.sql` and are applied automatically on startup. The provider holds **one writer connection** for all writes plus a **pool of read-only connections** so reads do not serialise behind the writer. Writes are serialised via `writeLock`; reads run unsynchronised on pool connections in WAL mode. +Current ports: `SlackPort` (in `port.slack`) and `CliPort` (in `port.cli`).