diff --git a/db.go b/db.go index 6999076..1778812 100644 --- a/db.go +++ b/db.go @@ -205,7 +205,12 @@ func (db *DB) WriteTxn(tables ...TableMeta) WriteTxn { txn.smus.Lock() acquiredAt := time.Now() - txn.tableEntries = slices.Clone(*db.root.Load()) + txn.oldRoot = db.root.Load() + + // Clone the root. This new allocation will become the new root when + // we commit. + txn.tableEntries = slices.Clone(*txn.oldRoot) + txn.handle = db.handleName txn.acquiredAt = acquiredAt diff --git a/db_test.go b/db_test.go index 88b06ef..1c9b205 100644 --- a/db_test.go +++ b/db_test.go @@ -592,6 +592,29 @@ func TestDB_Changes(t *testing.T) { assert.EqualValues(t, 0, expvarInt(metrics.ObjectCountVar.Get("test")), "ObjectCount") assert.EqualValues(t, 0, expvarInt(metrics.GraveyardObjectCountVar.Get("test")), "GraveyardObjectCount") + + // Create another iterator and test observing changes using a WriteTxn + // that is mutating the table. This will observe the changes up to the + // point WriteTxn() was called, but not changes made in the WriteTxn. + wtxn = db.WriteTxn(table) + iter3, err := table.Changes(wtxn) + require.NoError(t, err, "failed to create ChangeIterator") + _, _, err = table.Insert(wtxn, &testObject{ID: 1}) + require.NoError(t, err, "Insert failed") + wtxn.Commit() + + wtxn = db.WriteTxn(table) + _, _, err = table.Insert(wtxn, &testObject{ID: 2}) + require.NoError(t, err, "Insert failed") + changes, _ = iter3.Next(wtxn) + // We don't observe the insert of ID 2 + count = 0 + for change := range changes { + require.EqualValues(t, 1, change.Object.ID) + count++ + } + require.Equal(t, 1, count) + wtxn.Abort() } func TestDB_Observable(t *testing.T) { diff --git a/iterator.go b/iterator.go index d908fdf..f83d461 100644 --- a/iterator.go +++ b/iterator.go @@ -166,7 +166,7 @@ type changeIterator[Obj any] struct { } func (it *changeIterator[Obj]) refresh(txn ReadTxn) { - tableEntry := txn.root()[it.table.tablePos()] + tableEntry := txn.committedRoot()[it.table.tablePos()] if it.iter != nil && tableEntry.locked { var obj Obj panic(fmt.Sprintf("Table[%T].Changes().Next() called with the target table locked. This is not supported.", obj)) diff --git a/read_txn.go b/read_txn.go index 11fe3b9..4a098b5 100644 --- a/read_txn.go +++ b/read_txn.go @@ -36,6 +36,11 @@ func (r *readTxn) root() dbRoot { return dbRoot(*r) } +// committedRoot implements ReadTxn. +func (r *readTxn) committedRoot() dbRoot { + return dbRoot(*r) +} + // WriteJSON marshals out the database as JSON into the given writer. // If tables are given then only these tables are written. func (r *readTxn) WriteJSON(w io.Writer, tables ...string) error { diff --git a/types.go b/types.go index 8237aae..cefc565 100644 --- a/types.go +++ b/types.go @@ -75,6 +75,10 @@ type Table[Obj any] interface { // // If an object is created and deleted before the observer has iterated // over the creation then only the deletion is seen. + // + // If [ChangeIterator.Next] is called with a [WriteTxn] targeting the + // table being observed then only the changes prior to that [WriteTxn] + // are observed. Changes(WriteTxn) (ChangeIterator[Obj], error) } @@ -108,7 +112,9 @@ type ChangeIterator[Obj any] interface { // The returned sequence is a single-use sequence and subsequent calls will return // an empty sequence. // - // Next will panic if called with a WriteTxn that has locked the target table. + // If Next is called with a [WriteTxn] targeting the table being observed then only + // the changes made prior to that [WriteTxn] are observed, e.g. we can only observe + // committed changes. Next(ReadTxn) (iter.Seq2[Change[Obj], Revision], <-chan struct{}) // Close the change iterator. Once all change iterators for a given table are closed @@ -254,8 +260,17 @@ type ReadTxn interface { indexReadTxn(meta TableMeta, indexPos int) (tableIndexReader, error) mustIndexReadTxn(meta TableMeta, indexPos int) tableIndexReader getTableEntry(meta TableMeta) *tableEntry + + // root returns the database root. If this is a WriteTxn it returns + // the current modified root. root() dbRoot + // committedRoot returns the committed database root. If this is a + // WriteTxn it returns the root snapshotted at the time the WriteTxn + // was constructed and thus does not reflect any changes made in the + // transaction. + committedRoot() dbRoot + // WriteJSON writes the contents of the database as JSON. WriteJSON(w io.Writer, tables ...string) error } diff --git a/write_txn.go b/write_txn.go index 091ac0d..cd87d9c 100644 --- a/write_txn.go +++ b/write_txn.go @@ -32,6 +32,7 @@ type writeTxnState struct { acquiredAt time.Time // the time at which the transaction acquired the locks duration atomic.Uint64 // the transaction duration after it finished + oldRoot *dbRoot // snapshot of the root at the time WriteTxn was called tableEntries []*tableEntry // table entries being modified numTxns int // number of index transactions opened smus internal.SortableMutexes // the (sorted) table locks @@ -44,6 +45,10 @@ func (txn *writeTxnState) unwrap() *writeTxnState { return txn } +func (txn *writeTxnState) committedRoot() dbRoot { + return *txn.oldRoot +} + func (txn *writeTxnState) root() dbRoot { return txn.tableEntries } @@ -286,6 +291,7 @@ func (txn *writeTxnState) delete(meta TableMeta, guardRevision Revision, data an // and returns it to the pool. func (handle *writeTxnHandle) returnToPool() { txn := handle.writeTxnState + txn.oldRoot = nil txn.tableEntries = nil txn.numTxns = 0 clear(txn.smus)