From 5d22e855962efca176f767d2461963a4f8b5af9b Mon Sep 17 00:00:00 2001 From: Jussi Maki Date: Mon, 9 Feb 2026 11:33:56 +0100 Subject: [PATCH] statedb: Restore ability to use Changes with WriteTxn The index refactoring in cd27022265c847264dc331df102fe81279a56fd4 removed the ability to use ChangeIterator.Next with a WriteTxn targeting the table being observed. This was done to avoid having to hold onto the old instance of the tableEntry, but it broke a useful pattern for Changes(): the ability for a component A to write into a table and component B to be able to observe and "augment" the objects created by A. Restore this ability to by keeping a pointer to the old root in the write transaction. Before: BenchmarkDB_WriteTxn_CommitOnly_100Tables-8 1428603 838.6 ns/op 1112 B/op 5 allocs/op BenchmarkDB_WriteTxn_CommitOnly_1Table-8 2391542 503.3 ns/op 224 B/op 5 allocs/op BenchmarkDB_NewWriteTxn-8 2607277 458.1 ns/op 200 B/op 4 allocs/op BenchmarkDB_WriteTxnCommit100-8 1455978 823.8 ns/op 1096 B/op 5 allocs/op After: BenchmarkDB_WriteTxn_CommitOnly_100Tables-8 1239177 962.4 ns/op 1112 B/op 5 allocs/op BenchmarkDB_WriteTxn_CommitOnly_1Table-8 2332510 515.2 ns/op 224 B/op 5 allocs/op BenchmarkDB_NewWriteTxn-8 2566347 468.2 ns/op 200 B/op 4 allocs/op BenchmarkDB_WriteTxnCommit100-8 1452818 892.0 ns/op 1096 B/op 5 allocs/op No practical difference since we keep a pool for writeTxnHandle and thus don't really allocate more memory even though the writeTxnHandle is larger now. The impact this may have is to workloads that have a huge table and churn through all objects and now WriteTxn holds onto both the old root and the new root being prepared and thus do not allow GC to collect old objects. This however seems unlikely to be an issue since we do hold onto the old root via [DB] and only way for constructing lots of potentially garbage objects is to have two tables churning and have the two WriteTxn's hold onto "garbage" of the other table that is no longer reachable via [DB.ReadTxn]. Signed-off-by: Jussi Maki --- db.go | 7 ++++++- db_test.go | 23 +++++++++++++++++++++++ iterator.go | 2 +- read_txn.go | 5 +++++ types.go | 17 ++++++++++++++++- write_txn.go | 6 ++++++ 6 files changed, 57 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 6999076..1778812 100644 --- a/db.go +++ b/db.go @@ -205,7 +205,12 @@ func (db *DB) WriteTxn(tables ...TableMeta) WriteTxn { txn.smus.Lock() acquiredAt := time.Now() - txn.tableEntries = slices.Clone(*db.root.Load()) + txn.oldRoot = db.root.Load() + + // Clone the root. This new allocation will become the new root when + // we commit. + txn.tableEntries = slices.Clone(*txn.oldRoot) + txn.handle = db.handleName txn.acquiredAt = acquiredAt diff --git a/db_test.go b/db_test.go index 88b06ef..1c9b205 100644 --- a/db_test.go +++ b/db_test.go @@ -592,6 +592,29 @@ func TestDB_Changes(t *testing.T) { assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount") assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount") + + // Create another iterator and test observing changes using a WriteTxn + // that is mutating the table. This will observe the changes up to the + // point WriteTxn() was called, but not changes made in the WriteTxn. + wtxn = db.WriteTxn(table) + iter3, err := table.Changes(wtxn) + require.NoError(t, err, "failed to create ChangeIterator") + _, _, err = table.Insert(wtxn, &testObject{ID: 1}) + require.NoError(t, err, "Insert failed") + wtxn.Commit() + + wtxn = db.WriteTxn(table) + _, _, err = table.Insert(wtxn, &testObject{ID: 2}) + require.NoError(t, err, "Insert failed") + changes, _ = iter3.Next(wtxn) + // We don't observe the insert of ID 2 + count = 0 + for change := range changes { + require.EqualValues(t, 1, change.Object.ID) + count++ + } + require.Equal(t, 1, count) + wtxn.Abort() } func TestDB_Observable(t *testing.T) { diff --git a/iterator.go b/iterator.go index d908fdf..f83d461 100644 --- a/iterator.go +++ b/iterator.go @@ -166,7 +166,7 @@ type changeIterator[Obj any] struct { } func (it *changeIterator[Obj]) refresh(txn ReadTxn) { - tableEntry := txn.root()[it.table.tablePos()] + tableEntry := txn.committedRoot()[it.table.tablePos()] if it.iter != nil && tableEntry.locked { var obj Obj panic(fmt.Sprintf("Table[%T].Changes().Next() called with the target table locked. This is not supported.", obj)) diff --git a/read_txn.go b/read_txn.go index 11fe3b9..4a098b5 100644 --- a/read_txn.go +++ b/read_txn.go @@ -36,6 +36,11 @@ func (r *readTxn) root() dbRoot { return dbRoot(*r) } +// committedRoot implements ReadTxn. +func (r *readTxn) committedRoot() dbRoot { + return dbRoot(*r) +} + // WriteJSON marshals out the database as JSON into the given writer. // If tables are given then only these tables are written. func (r *readTxn) WriteJSON(w io.Writer, tables ...string) error { diff --git a/types.go b/types.go index 8237aae..cefc565 100644 --- a/types.go +++ b/types.go @@ -75,6 +75,10 @@ type Table[Obj any] interface { // // If an object is created and deleted before the observer has iterated // over the creation then only the deletion is seen. + // + // If [ChangeIterator.Next] is called with a [WriteTxn] targeting the + // table being observed then only the changes prior to that [WriteTxn] + // are observed. Changes(WriteTxn) (ChangeIterator[Obj], error) } @@ -108,7 +112,9 @@ type ChangeIterator[Obj any] interface { // The returned sequence is a single-use sequence and subsequent calls will return // an empty sequence. // - // Next will panic if called with a WriteTxn that has locked the target table. + // If Next is called with a [WriteTxn] targeting the table being observed then only + // the changes made prior to that [WriteTxn] are observed, e.g. we can only observe + // committed changes. Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{}) // Close the change iterator. Once all change iterators for a given table are closed @@ -254,8 +260,17 @@ type ReadTxn interface { indexReadTxn(meta TableMeta, indexPos int) (tableIndexReader, error) mustIndexReadTxn(meta TableMeta, indexPos int) tableIndexReader getTableEntry(meta TableMeta) *tableEntry + + // root returns the database root. If this is a WriteTxn it returns + // the current modified root. root() dbRoot + // committedRoot returns the committed database root. If this is a + // WriteTxn it returns the root snapshotted at the time the WriteTxn + // was constructed and thus does not reflect any changes made in the + // transaction. + committedRoot() dbRoot + // WriteJSON writes the contents of the database as JSON. WriteJSON(w io.Writer, tables ...string) error } diff --git a/write_txn.go b/write_txn.go index 091ac0d..cd87d9c 100644 --- a/write_txn.go +++ b/write_txn.go @@ -32,6 +32,7 @@ type writeTxnState struct { acquiredAt time.Time // the time at which the transaction acquired the locks duration atomic.Uint64 // the transaction duration after it finished + oldRoot *dbRoot // snapshot of the root at the time WriteTxn was called tableEntries []*tableEntry // table entries being modified numTxns int // number of index transactions opened smus internal.SortableMutexes // the (sorted) table locks @@ -44,6 +45,10 @@ func (txn *writeTxnState) unwrap() *writeTxnState { return txn } +func (txn *writeTxnState) committedRoot() dbRoot { + return *txn.oldRoot +} + func (txn *writeTxnState) root() dbRoot { return txn.tableEntries } @@ -286,6 +291,7 @@ func (txn *writeTxnState) delete(meta TableMeta, guardRevision Revision, data an // and returns it to the pool. func (handle *writeTxnHandle) returnToPool() { txn := handle.writeTxnState + txn.oldRoot = nil txn.tableEntries = nil txn.numTxns = 0 clear(txn.smus)