From a2f43d5c68bedc30e5ebe6f7fd5b2ed294f768cb Mon Sep 17 00:00:00 2001 From: rawbytedev <95090911+rawbytedev@users.noreply.github.com> Date: Mon, 15 Dec 2025 20:46:00 +0100 Subject: [PATCH 1/2] Refactor journal batch handling and improve documentation --- doc.md | 4 -- internal/journal/README.md | 85 ++++++++++++++++++++++++++++++ internal/journal/journal.go | 90 ++++++++++++++++---------------- internal/journal/journal_test.go | 16 +++--- internal/journal/types.go | 26 ++++++--- 5 files changed, 157 insertions(+), 64 deletions(-) create mode 100644 internal/journal/README.md diff --git a/doc.md b/doc.md index f904991..a964db5 100644 --- a/doc.md +++ b/doc.md @@ -17,7 +17,3 @@ SealBatch handles operations such as batching, generating merkletree, storing in and anchoring receipt on solana ## Usage - -- Improve Journaling -- Improve DB -- Add Benchmarking \ No newline at end of file diff --git a/internal/journal/README.md b/internal/journal/README.md new file mode 100644 index 0000000..2da7bdc --- /dev/null +++ b/internal/journal/README.md @@ -0,0 +1,85 @@ +# Journal + +On-chain we store: +batchCommitment = hash(rootHash || totalEntries || timestampRangeHash || version) + +- rootHash = Merkle root of all entries +- totalEntries = number of entries in batch +- timestampRangeHash = hash(startTimestamp || endTimestamp) +- version = protocol version + +Off-Chain Key-Value Database Schema + +1. Batch Registry (registry:batch:{commitment}) + +```json +{ + "rootHash": "0xabc123...", + "totalEntries": 150, + "startTime": 1633023456000, + "endTime": 1633023556000, + "treeDepth": 8, + "leafFormat": "sha256(data || position)", + "commitment": "batchCommitment (on-chain hash)", + "status": "finalized", + "createdAt": 1633023557000 +} +``` + +2.Entry Storage (Content-Addressable) + +```t +Key: e:{dataHash} +Value: { + "data": "actual entry content", + "position": 3, + "batchCommitment": "batchCommitment", + "timestamp": 1633023456050, + "metadata": {} // optional additional data +} + +``` + +3.Position Index (Fast lookup by position) + +```t +Key: p:{batchCommitment}:{position:08d} +Value: dataHash // points to entry key +``` + +4.Merkle Proof Cache (Optimized for verification) (Not fully implemented) + +```t +Key: proof:{batchCommitment}:{position} +Value: { + "leafHash": "hash of entry at position", + "siblings": ["sibling1", "sibling2", ...], + "path": [0,1,0,...] // 0=left,1=right +} +``` + +5.Reverse Lookup (Find batch by entry) + +```t +Key: r:{dataHash} +Value: batchCommitment +``` + +Complete Key Structure + +```python +# All keys used prefixes +BATCH_PREFIX = "b:" +ENTRY_PREFIX = "e:" +POSITION_PREFIX = "p:" +PROOF_PREFIX = "proof:" +REVERSE_PREFIX = "r:" +TIME_PREFIX = "t:" + +# Example keys: +batch_key = f"b:{batchCommitment}" +entry_key = f"e:{dataHash}" +position_key = f"p:{batchCommitment}:{position:08d}" +proof_key = f"proof:{batchCommitment}:{position}" +reverse_key = f"r:{dataHash}" +``` diff --git a/internal/journal/journal.go b/internal/journal/journal.go index 194c608..f9c8f63 100644 --- a/internal/journal/journal.go +++ b/internal/journal/journal.go @@ -30,7 +30,6 @@ func NewJournalCache(ctx *context.Context) JournalStore { return &JournalCache{ctx: ctx, store: db} } - // called by main // testing // this is received by ai @@ -83,6 +82,12 @@ func (res *CommitResult) Encode() ([]byte, error) { func (res *CommitResult) Decode(data []byte) error { return res.ctx.Encoder.Decode(data, res) } +func (res *Commitment) Encode() ([]byte, error) { + return res.ctx.Encoder.Encode(res) +} +func (res *Commitment) Decode(data []byte) error { + return res.ctx.Encoder.Decode(data, res) +} // set based on configuration // that means if config are changed during run @@ -187,42 +192,50 @@ func (j *JournalCache) BuildTree() error { return nil } -// this is related to commitresult needed to mint and anchor -// run after calling buildtree and before committing onto database -// needs len(j.post) j.treeroot timewindow -func (j *JournalCache) BatchInsert() (*CommitResult, error) { +// prepares a batch +func (j *JournalCache) Batch() (*CommitResult, error) { + if uint32(len(j.Post)) == 0 { + if j.commitRes != nil { + return j.commitRes, nil + } + return nil, nil + } batch := CommitResult{ ctx: j.ctx, Root: [32]byte(j.treeroot), Count: uint32(len(j.Post)), WindowsStart: j.Post[0].GetTimestamp(), WindowsEnd: j.Post[len(j.Post)-1].GetTimestamp(), + version: "v1", } - // encode root + count enc, err := batch.Encode() - fmt.Print("Logging encoded") - fmt.Print(enc) if err != nil { return &CommitResult{}, err } - // derive hash from both - // us it as id - batch.BatchID = hex.EncodeToString(j.hash(enc)) - newenc, err := batch.Encode() - j.batchid = batch.BatchID + batchcommitment := j.ctx.Hasher.Sum(enc) + batchdata := &Commitment{ + ctx: j.ctx, + Roothash: [32]byte(j.treeroot), + Count: uint32(len(j.Post)), + WindowsStart: j.Post[0].GetTimestamp(), + WindowsEnd: j.Post[len(j.Post)-1].GetTimestamp(), + commitment: batchcommitment, + } + data, err := batchdata.Encode() if err != nil { return &CommitResult{}, err } - return &batch, j.store.Put(j.ctx.Hasher.Sum([]byte(batch.BatchID)), newenc) + j.batchid = batchcommitment + batch.batchID = string(batchcommitment) + j.commitRes = &batch + return &batch, j.store.Put(fmt.Appendf(nil, "b:%x", batchcommitment), data) } -// only store post Entries -// entry are rehashed -// pattern -// chk:%s (checksum) -> PostEntry -// batch:%s (batchid) -> CommitResult -// seq:%s:%s (batchid) (n) -> checksum func (j *JournalCache) Commit() error { + _, err := j.Batch() + if err != nil { + return fmt.Errorf("%s", err) + } // j.Post get zerro when it goes low // we can't get size from it at that point // at this point seems like j contents get corrupted? need to investigate @@ -246,12 +259,15 @@ func (j *JournalCache) largeCommit() error { if err != nil { return err } - err = j.store.BatchPut(j.hash(fmt.Appendf(nil, "chk:%s", entry.GetID())), enc) + err = j.store.BatchPut(fmt.Appendf(nil, "e:%x", entry.GetID()), enc) if err != nil { return err } - //use j.batchid here to test bug/ replace with batchid below - err = j.store.BatchPut(j.hash(fmt.Appendf(nil, "seq:%s:%d", batchid, i)), []byte(entry.GetID())) + err = j.store.BatchPut(fmt.Appendf(nil, "p:%x:%d", batchid, i), []byte(entry.GetID())) + if err != nil { + return err + } + err = j.store.BatchPut(fmt.Appendf(nil, "r:%s", entry.GetID()), batchid) if err != nil { return err } @@ -259,7 +275,7 @@ func (j *JournalCache) largeCommit() error { // for testing the problem // replace size-1 with len(j.Post) // might be because it's a pointer? - // can be done after loop + // can be done after loop if i == size-1 { err = j.store.BatchPut(nil, nil) if err != nil { @@ -275,17 +291,16 @@ func (j *JournalCache) largeCommit() error { // hash it according to type before calling it func (j *JournalCache) Get(id string) ([]byte, error) { item, err := hex.DecodeString(id) - obj := j.ctx.Hasher.Sum(item) if err != nil { - return []byte{}, err + return nil, err } - return j.store.Get(obj) + return j.store.Get(item) } // clean everything / for now it can only clear func (j *JournalCache) RoolBack() { j.Post = j.Post[:0] - j.batchid = "" + j.batchid = j.batchid[:0] j.treeroot = nil } func (j *JournalCache) Close() error { @@ -295,25 +310,12 @@ func (j *JournalCache) Close() error { // small Format implementation func Format(s string, opts ...RetrieveOptions) string { - // do not support multiple options yet - // for future use - /* - if len(opts) == 1 { - switch opts[0] { - case Checksum: - d := hex.EncodeToString((fmt.Appendf(nil, "chk:%s", s))) - return d - default: - d := hex.EncodeToString((fmt.Appendf(nil, "chk:%s", s))) - return d - } - }*/ - return hex.EncodeToString(fmt.Appendf(nil, "chk:%s", s)) + return hex.EncodeToString(fmt.Appendf(nil, "e:%x", s)) } func FormatSeq(s string, n int) string { - return hex.EncodeToString(fmt.Appendf(nil, "seq:%s:%d", s, n)) + return hex.EncodeToString(fmt.Appendf(nil, "p:%x:%d", []byte(s), n)) } func FormatBatch(s string) string { - return hex.EncodeToString([]byte(s)) + return hex.EncodeToString(fmt.Appendf(nil, "b:%x", s)) } diff --git a/internal/journal/journal_test.go b/internal/journal/journal_test.go index 1e59637..00be091 100644 --- a/internal/journal/journal_test.go +++ b/internal/journal/journal_test.go @@ -121,6 +121,10 @@ func TestJournalInsert(t *testing.T) { if err != nil { t.Fatal(err) } + _, err = journal.Batch() + if err != nil { + t.Fatal(err) + } err = journal.Commit() if err != nil { t.Fatal(err) @@ -239,10 +243,6 @@ func TestJournalInsertGet(t *testing.T) { } } -// error when trying to use in -// tempdir -// it has to do with how batcher handles write? -// The process cannot access the file because it is being used by another process. func TestBatchQuery(t *testing.T) { new := true if new { @@ -273,7 +273,7 @@ func TestBatchQuery(t *testing.T) { if err != nil { t.Fatal(err) } - com, err := journal.BatchInsert() + com, err := journal.Batch() if err != nil { t.Fatal(err) } @@ -301,7 +301,7 @@ func TestBatchQuery(t *testing.T) { for i := range com.Count { // retrieving checksum from batchID and at index i // the comparing with records - data, err := journal.Get(FormatSeq(com.BatchID, int(i))) + data, err := journal.Get(FormatSeq(com.batchID, int(i))) if err != nil { t.Log(err) t.Fatal("sequence while using Format for Seq") @@ -313,7 +313,7 @@ func TestBatchQuery(t *testing.T) { t.Log("Retrieving Batch") var v CommitResult v.ctx = &ctx - data, err := journal.Get(FormatBatch(com.BatchID)) + data, err := journal.Get(FormatBatch(com.batchID)) if err != nil { t.Fatal(err) } @@ -331,7 +331,7 @@ func TestBatchQuery(t *testing.T) { } else { // only peform query doesn't write to db _ = CommitResult{ - BatchID: "09dd1d47d7f0e5dfac278513a723b6d424558669feb014aecf5afce040c18211", + batchID: "09dd1d47d7f0e5dfac278513a723b6d424558669feb014aecf5afce040c18211", Root: [32]byte{89, 82, 203, 230, 157, 145, 229, 24, 119, 35, 162, 39, 108, 37, 209, 71, 3, 171, 242, 49, 6, 1, 84, 104, 252, 65, 22, 173, 7, 180, 233, 189}, Count: 0x3, } diff --git a/internal/journal/types.go b/internal/journal/types.go index c2a6b74..f69c1aa 100644 --- a/internal/journal/types.go +++ b/internal/journal/types.go @@ -21,16 +21,25 @@ type JournalStore interface { Entries() []JournalEntry BuildTree() error Get(id string) ([]byte, error) - BatchInsert() (*CommitResult, error) - Close() error // shutdows + Batch() (*CommitResult, error) // used for manual batch creation + Close() error // shutdows } type CommitResult struct { ctx *context.Context - BatchID string + batchID string Root [32]byte Count uint32 WindowsStart time.Time // first j.Port // assuming that it is ordered WindowsEnd time.Time // last j.Post + version string +} +type Commitment struct { + ctx *context.Context // anonymous + Roothash [32]byte + Count uint32 + WindowsStart time.Time + WindowsEnd time.Time + commitment []byte } // Default format when received / Unsafe @@ -79,9 +88,10 @@ func NewLocalStorage(ctx *context.Context) (database.StorageDB, error) { // this avoid having to recompute tree if something fails along the way type JournalCache struct { - ctx *context.Context - store database.StorageDB - Post []JournalEntry - treeroot []byte - batchid string + ctx *context.Context + store database.StorageDB + Post []JournalEntry + treeroot []byte + batchid []byte + commitRes *CommitResult } From 9a1490e2178a432e51103c29ebb4655eb26a85ba Mon Sep 17 00:00:00 2001 From: rawbytedev <95090911+rawbytedev@users.noreply.github.com> Date: Tue, 24 Feb 2026 00:45:10 +0100 Subject: [PATCH 2/2] - Switched over to zerokv - Converted anonymous variables - renamed context to settings for better usage - simplified commits Next Step: removed redundant struct like commitment and commit result document implementations, add full support for context add worker based operations (related to journal, PUT, GET) rename functions and simplify implementation --- cmd/agent/main.go | 2 +- cmd/agent/tools/sealbatch.go | 2 +- go.mod | 6 +- go.sum | 2 + internal/journal/database/badger.go | 191 ---------------------------- internal/journal/database/db.go | 33 ----- internal/journal/database/errors.go | 12 -- internal/journal/database/pebble.go | 180 -------------------------- internal/journal/journal.go | 37 +++--- internal/journal/journal_test.go | 29 +++-- internal/journal/types.go | 33 ++--- {context => settings}/main.go | 10 +- 12 files changed, 61 insertions(+), 476 deletions(-) delete mode 100644 internal/journal/database/badger.go delete mode 100644 internal/journal/database/db.go delete mode 100644 internal/journal/database/errors.go delete mode 100644 internal/journal/database/pebble.go rename {context => settings}/main.go (86%) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 0f5ec1e..6a7b201 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -3,7 +3,7 @@ package main import ( "context" crosstracetools "crosstrace/cmd/agent/tools" - cont "crosstrace/context" + cont "crosstrace/settings" "crosstrace/internal/configs" "crosstrace/internal/crossmint" "crosstrace/internal/journal" diff --git a/cmd/agent/tools/sealbatch.go b/cmd/agent/tools/sealbatch.go index f6845df..43413d8 100644 --- a/cmd/agent/tools/sealbatch.go +++ b/cmd/agent/tools/sealbatch.go @@ -24,7 +24,7 @@ func (t *SealBatchTool) Call(ctx context.Context, input string) (string, error) if err != nil { return "", err } - batch, err := t.Journal.BatchInsert() + batch, err := t.Journal.Batch() if err != nil { return "", err } diff --git a/go.mod b/go.mod index fd64c5d..7bd25da 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,14 @@ module crosstrace -go 1.24.3 +go 1.25.2 require ( github.com/dgraph-io/badger/v4 v4.8.0 github.com/ethereum/go-ethereum v1.16.3 ) +require github.com/rawbytedev/zerokv v1.0.101 + require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.0 // indirect @@ -27,7 +29,7 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/yargevad/filepathx v1.0.0 // indirect - gopkg.in/yaml.v3 v3.0.1 + gopkg.in/yaml.v3 v3.0.1 ) require ( diff --git a/go.sum b/go.sum index bb1cc28..7592882 100644 --- a/go.sum +++ b/go.sum @@ -269,6 +269,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= +github.com/rawbytedev/zerokv v1.0.101 h1:btNmI23hM9q3uwHVt69u/0F+lR5csqY/HGziTZtREA4= +github.com/rawbytedev/zerokv v1.0.101/go.mod h1:gbRQiP2nU3iWTFUdQSfaJ9TCMqW8atFLP2EItdfgKzM= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= diff --git a/internal/journal/database/badger.go b/internal/journal/database/badger.go deleted file mode 100644 index 052714f..0000000 --- a/internal/journal/database/badger.go +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright (C) Enviora -// This file is part of go-enviora -// -// BadgerDB implementation for StorageDB interface. -// Provides efficient key-value storage for blocks, transactions, and other data. -// Supports batch operations and is optimized for concurrent access. -// -// Usage: -// db, err := NewBadgerdb(cfg) -// err = db.Put(key, value) -// value, err := db.Get(key) -// err = db.Del(key) -// err = db.Close() -// -// Batch operations: -// err = db.BatchPut(key, value) // enqueue -// err = db.BatchPut(nil, nil) // flush -// err = db.BatchDel(key) // enqueue delete -// err = db.BatchDel(nil) // flush - -package database - -import ( - dbconfig "crosstrace/internal/configs" - "errors" - - "github.com/dgraph-io/badger/v4" -) - -// badgerdb manages Database Insert/Deletion/Batch Operations. -// It only handles []byte keys and values. -type badgerdb struct { - batch *badger.WriteBatch - db *badger.DB -} - -// NewBadgerdb creates a new BadgerDB instance with the given config. -// Returns a StorageDB interface or an error if initialization fails. -func NewBadgerdb(cfg dbconfig.JournalConfig) (StorageDB, error) { - opts := badger.DefaultOptions(cfg.DBPath) - if cfg.LogSize != "" { - size, err := dbconfig.ParseSize(cfg.LogSize) - if err != nil { - return nil, err - } - opts = opts.WithValueLogFileSize(size) - } - - db, err := badger.Open(opts) - if err != nil { - return nil, err - } - batch := db.NewWriteBatch() - if batch == nil { - db.Close() - return nil, badger.ErrInvalidRequest - } - return &badgerdb{db: db, batch: batch}, nil -} - -// Put inserts or updates a key-value pair in the database. -func (b *badgerdb) Put(key []byte, data []byte) error { - if b.db == nil { - return badger.ErrInvalidRequest - } - if key == nil { - return ErrEmptydbKey - } - if data == nil { - return ErrEmptydbValue - } - return b.db.Update(func(txn *badger.Txn) error { - return txn.Set(key, data) - }) -} - -// Get retrieves the value for a given key. Returns an error if not found. -func (b *badgerdb) Get(key []byte) ([]byte, error) { - if b.db == nil { - return nil, badger.ErrInvalidRequest - } - if key == nil { - return nil, ErrEmptydbKey - } - var data []byte - err := b.db.View(func(txn *badger.Txn) error { - item, err := txn.Get(key) - if err != nil { - return err - } - return item.Value(func(val []byte) error { - data = append([]byte{}, val...) - return nil - }) - }) - return data, err -} - -// Del deletes a key-value pair from the database. -func (b *badgerdb) Del(key []byte) error { - if b.db == nil { - return badger.ErrInvalidRequest - } - if key == nil { - return ErrEmptydbKey - } - return b.db.Update(func(txn *badger.Txn) error { - return txn.Delete(key) - }) -} - -// BatchPut adds a key-value pair to the current batch. -// Returns error if batch operation fails. -func (b *badgerdb) BatchPut(key []byte, data []byte) error { - if b.batch == nil { - return badger.ErrInvalidRequest - } - if key != nil && data != nil { - err := b.batch.Set(key, data) - if err != nil { - b.batch.Cancel() - return err - } - } - if key == nil && data == nil { - if err := b.FlushBatch(); err != nil { - return err - } - return nil // this is important to avoid ErrEmptydbKey below - } - if key == nil { - return ErrEmptydbKey - } - if data == nil { - return ErrEmptydbValue - } - - return nil -} - -// BatchDel adds a delete operation to the current batch. -func (b *badgerdb) BatchDel(key []byte) error { - if b.batch == nil { - return badger.ErrInvalidRequest - } - if key == nil { - // flushing is done when nil key is passed - if err := b.FlushBatch(); err != nil { - return err - } - - } - if err := b.batch.Delete(key); err != nil { - b.batch.Cancel() - return err - } - return nil -} - -// FlushBatch flushes any pending batch operations. -func (b *badgerdb) FlushBatch() error { - if b.batch == nil { - return badger.ErrInvalidRequest - } - err := b.batch.Flush() - b.batch = nil // reset batch after flush - if err != nil { - return err - } - return nil -} - -// Close closes the database and releases all resources. -func (b *badgerdb) Close() error { - var errs []error - if b.batch != nil { - if err := b.FlushBatch(); err != nil { - b.batch.Cancel() - errs = append(errs, err) - } - } - if b.db != nil { - if err := b.db.Close(); err != nil { - errs = append(errs, err) - } - } - if len(errs) == 0 { - return nil - } - return errors.Join(errs...) -} diff --git a/internal/journal/database/db.go b/internal/journal/database/db.go deleted file mode 100644 index e805f3b..0000000 --- a/internal/journal/database/db.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright (C) Enviora -// This file is part of go-enviora -// -// StorageDB is a generic key-value database interface for blockchain storage backends. -// Implementations must provide efficient and safe access to persistent storage for blocks, transactions, and state. -// -// Methods: -// -// Put(key, data): Insert or update a key-value pair. -// Get(key): Retrieve the value for a given key. -// Del(key): Delete a key-value pair. -// BatchPut(key, data: Add a key-value pair to a batch; -// BatchDel(key: Add a delete operation to a batch; -// Close(): Close the database and release all resources. -package database - -// StorageDB defines the interface for a pluggable key-value store. -type StorageDB interface { - // Put inserts or updates a key-value pair in the database. - Put(key []byte, data []byte) error - // Get retrieves the value for a given key. Returns an error if not found. - Get(key []byte) ([]byte, error) - // Del deletes a key-value pair from the database. - Del(key []byte) error - // BatchPut adds a key-value pair to the current batch. - BatchPut(key []byte, data []byte) error - // BatchDel adds a delete operation to the current batch. - BatchDel(key []byte) error - // FlushBatch flushes any pending batch operations. - FlushBatch() error - // Close closes the database and releases all resources. - Close() error -} diff --git a/internal/journal/database/errors.go b/internal/journal/database/errors.go deleted file mode 100644 index 9641f55..0000000 --- a/internal/journal/database/errors.go +++ /dev/null @@ -1,12 +0,0 @@ -package database - -import "errors" - -var ( - /* those errors occurs only when either - of them is empty and the other is populated - */ - ErrEmptydbKey = errors.New("key is Empty") - ErrEmptydbValue = errors.New("value is Empty") - ErrInvalidDB = errors.New("invalid DB") -) diff --git a/internal/journal/database/pebble.go b/internal/journal/database/pebble.go deleted file mode 100644 index 4752eee..0000000 --- a/internal/journal/database/pebble.go +++ /dev/null @@ -1,180 +0,0 @@ -// Copyright (C) Enviora -// This file is part of go-enviora -// -// PebbleDB implementation for StorageDB interface. -// Provides efficient key-value storage for blocks, transactions, and other data. -// Supports batch operations and is optimized for concurrent access. -// -// Usage: -// db, err := NewPebbledb(cfg) -// err = db.Put(key, value) -// value, err := db.Get(key) -// err = db.Del(key) -// err = db.Close() -// -// Batch operations: -// err = db.BatchPut(key, value) // enqueue -// err = db.BatchPut(nil, nil) // flush -// err = db.BatchDel(key) // enqueue delete -// err = db.BatchDel(nil) // flush - -package database - -import ( - dbconfig "crosstrace/internal/configs" - "errors" - - "github.com/cockroachdb/pebble" -) - -// pebbledb manages Database Insert/Deletion/Batch Operations for Pebble. -// It only handles []byte keys and values. -type pebbledb struct { - batch *pebble.Batch - db *pebble.DB -} - -// NewPebbledb creates a new PebbleDB instance with the given config. -// Returns a StorageDB interface or an error if initialization fails. -func NewPebbledb(cfg dbconfig.JournalConfig) (StorageDB, error) { - opts := &pebble.Options{ - MaxOpenFiles: 5000, - BytesPerSync: 1 << 20, - WALBytesPerSync: 1 << 20, - } - if cfg.CacheSize != "" { - size, err := dbconfig.ParseSize(cfg.CacheSize) - if err != nil { - return nil, err - } - opts.Cache = pebble.NewCache(size) - } - db, err := pebble.Open(cfg.DBPath, opts) - if err != nil { - return nil, err - } - batch := db.NewBatch() - if batch == nil { - db.Close() - return nil, err - } - return &pebbledb{db: db, batch: batch}, nil -} - -// Put inserts or updates a key-value pair in the database. -func (p *pebbledb) Put(key []byte, data []byte) error { - if p.db == nil { - return pebble.ErrClosed - } - if key == nil { - return ErrEmptydbKey - } - if data == nil { - return ErrEmptydbValue - } - return p.db.Set(key, data, pebble.Sync) -} - -// Get retrieves the value for a given key. Returns an error if not found. -func (p *pebbledb) Get(key []byte) ([]byte, error) { - if p.db == nil { - return nil, pebble.ErrClosed - } - if key == nil { - return nil, ErrEmptydbKey - } - val, closer, err := p.db.Get(key) - if err != nil { - return nil, err - } - defer closer.Close() - return val, nil -} - -// BatchPut adds a key-value pair to the current batch. If key and data == nil commits -// since we use key = nil and data = nil before committing we'll have to make checks -func (p *pebbledb) BatchPut(key []byte, data []byte) error { - if p.batch == nil { - return pebble.ErrClosed - } - - if key != nil && data != nil { - if err := p.batch.Set(key, data, pebble.NoSync); err != nil { - p.batch.Close() - return err - } - } - // both key and data must be nil to commit - if key == nil && data == nil { - if err := p.FlushBatch(); err != nil { - return err - } - return nil // important to avoid ErrEmptydbKey below - } - if key == nil { - return ErrEmptydbKey - } - if data == nil { - return ErrEmptydbValue - } - return nil -} - -// BatchDel adds a delete operation to the current batch. -func (p *pebbledb) BatchDel(key []byte) error { - if p.batch == nil { - return pebble.ErrClosed - } - if key != nil { - if err := p.batch.Delete(key, pebble.NoSync); err != nil { - p.batch.Close() - return err - } - return nil - } else { - if err := p.FlushBatch(); err != nil { - return err - } - return nil - } -} - -// Del deletes a key-value pair from the database. -func (p *pebbledb) Del(key []byte) error { - if p.db == nil { - return pebble.ErrClosed - } - if key == nil { - return ErrEmptydbKey - } - return p.db.Delete(key, pebble.Sync) -} - -// flushBatch flushes any pending batch operations. -func (p *pebbledb) FlushBatch() error { - if p.batch == nil { - return pebble.ErrClosed - } - return p.batch.Commit(pebble.Sync) -} - -// Close closes the database and releases all resources. -func (p *pebbledb) Close() error { - var errs []error - if p.batch != nil { - if err := p.batch.Commit(pebble.Sync); err != nil { - errs = append(errs, err) - } - p.batch.Close() - } - if p.db != nil { - if err := p.db.Close(); err != nil { - errs = append(errs, err) - } - - } - if len(errs) == 0 { - return nil - } - return errors.Join(errs...) -} diff --git a/internal/journal/journal.go b/internal/journal/journal.go index f9c8f63..5a64115 100644 --- a/internal/journal/journal.go +++ b/internal/journal/journal.go @@ -1,8 +1,9 @@ package journal import ( - "crosstrace/context" + "context" mptree "crosstrace/internal/merkle" + "crosstrace/settings" "encoding/hex" "fmt" "strings" @@ -21,7 +22,7 @@ const ( // Entry point to Journalling -func NewJournalCache(ctx *context.Context) JournalStore { +func NewJournalCache(ctx *settings.Settings) JournalStore { db, err := NewLocalStorage(ctx) if err != nil { fmt.Print(err) @@ -34,10 +35,10 @@ func NewJournalCache(ctx *context.Context) JournalStore { // testing // this is received by ai // Will remove this -func NewPreEntry(ctx *context.Context, raw_msg string, sender_id string, source string, session_id string) *PreEntry { +func NewPreEntry(ctx *settings.Settings, raw_msg string, sender_id string, source string, session_id string) *PreEntry { return &PreEntry{ctx: ctx, raw_msg: raw_msg, sender_id: sender_id, source: source, session_id: session_id} } -func NewPostEntryWithCtx(ctx *context.Context) *PostEntry { +func NewPostEntryWithCtx(ctx *settings.Settings) *PostEntry { return &PostEntry{ctx: ctx} } @@ -96,7 +97,7 @@ func (res *Commitment) Decode(data []byte) error { // Handle Sanitization : add global error vars // change PreEntry/PostEntry to JournalEntry -func SanitizePreEntry(ctx *context.Context, pre *PreEntry) (JournalEntry, error) { +func SanitizePreEntry(ctx *settings.Settings, pre *PreEntry) (JournalEntry, error) { // size check if len(pre.raw_msg) > ctx.Journal.MaxMsgSize { @@ -219,23 +220,19 @@ func (j *JournalCache) Batch() (*CommitResult, error) { Count: uint32(len(j.Post)), WindowsStart: j.Post[0].GetTimestamp(), WindowsEnd: j.Post[len(j.Post)-1].GetTimestamp(), - commitment: batchcommitment, + Commitment: batchcommitment, } data, err := batchdata.Encode() if err != nil { return &CommitResult{}, err } j.batchid = batchcommitment - batch.batchID = string(batchcommitment) + batch.BatchID = string(batchcommitment) j.commitRes = &batch - return &batch, j.store.Put(fmt.Appendf(nil, "b:%x", batchcommitment), data) + return &batch, j.store.Put(context.Background(), fmt.Appendf(nil, "b:%x", batchcommitment), data) } func (j *JournalCache) Commit() error { - _, err := j.Batch() - if err != nil { - return fmt.Errorf("%s", err) - } // j.Post get zerro when it goes low // we can't get size from it at that point // at this point seems like j contents get corrupted? need to investigate @@ -254,30 +251,26 @@ func (j *JournalCache) hash(data []byte) []byte { func (j *JournalCache) largeCommit() error { size := len(j.Post) batchid := j.batchid + batch := j.store.Batch() for i, entry := range j.Post { enc, err := entry.Encode() if err != nil { return err } - err = j.store.BatchPut(fmt.Appendf(nil, "e:%x", entry.GetID()), enc) + err = batch.Put(fmt.Appendf(nil, "e:%x", entry.GetID()), enc) if err != nil { return err } - err = j.store.BatchPut(fmt.Appendf(nil, "p:%x:%d", batchid, i), []byte(entry.GetID())) + err = batch.Put(fmt.Appendf(nil, "p:%x:%d", batchid, i), []byte(entry.GetID())) if err != nil { return err } - err = j.store.BatchPut(fmt.Appendf(nil, "r:%s", entry.GetID()), batchid) + err = batch.Put(fmt.Appendf(nil, "r:%s", entry.GetID()), batchid) if err != nil { return err } - // once for last elem - // for testing the problem - // replace size-1 with len(j.Post) - // might be because it's a pointer? - // can be done after loop if i == size-1 { - err = j.store.BatchPut(nil, nil) + err = batch.Commit(context.TODO()) if err != nil { return err } @@ -294,7 +287,7 @@ func (j *JournalCache) Get(id string) ([]byte, error) { if err != nil { return nil, err } - return j.store.Get(item) + return j.store.Get(context.TODO(), item) } // clean everything / for now it can only clear diff --git a/internal/journal/journal_test.go b/internal/journal/journal_test.go index 00be091..db5f413 100644 --- a/internal/journal/journal_test.go +++ b/internal/journal/journal_test.go @@ -2,10 +2,10 @@ package journal import ( "bytes" - "crosstrace/context" "crosstrace/internal/configs" "crosstrace/internal/crypto" "crosstrace/internal/encoder" + "crosstrace/settings" "crypto/rand" "encoding/hex" "fmt" @@ -26,7 +26,7 @@ func NewJournalConfig() *configs.JournalConfig { } } -func GeneRandomPreEntry(ctx *context.Context) []*PreEntry { +func GeneRandomPreEntry(ctx *settings.Settings) []*PreEntry { var items []*PreEntry for range 7 { items = append(items, &PreEntry{ @@ -57,7 +57,7 @@ func GeneRandomPreEntry(ctx *context.Context) []*PreEntry { return items } -func GeneConstantPreEntry(ctx *context.Context) []*PreEntry { +func GeneConstantPreEntry(ctx *settings.Settings) []*PreEntry { var items []*PreEntry items = append(items, &PreEntry{ ctx: ctx, @@ -95,7 +95,7 @@ func GeneConstantPreEntry(ctx *context.Context) []*PreEntry { } func TestJournalInsert(t *testing.T) { cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Encoder = encoder.NewEncoder(cfg.EncoderName) ctx.Hasher = crypto.NewHasher(cfg.HasherName) journal := NewJournalCache(&ctx) @@ -155,7 +155,7 @@ func TestJournalInsertGet(t *testing.T) { if new { cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Encoder = encoder.NewEncoder(cfg.EncoderName) ctx.Hasher = crypto.NewHasher(cfg.HasherName) journal := NewJournalCache(&ctx) @@ -205,7 +205,7 @@ func TestJournalInsertGet(t *testing.T) { } } else { cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Encoder = encoder.NewEncoder(cfg.EncoderName) ctx.Hasher = crypto.NewHasher(cfg.HasherName) journal := NewJournalCache(&ctx) @@ -247,7 +247,7 @@ func TestBatchQuery(t *testing.T) { new := true if new { cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Encoder = encoder.NewEncoder(cfg.EncoderName) ctx.Hasher = crypto.NewHasher(cfg.HasherName) journal := NewJournalCache(&ctx) @@ -301,7 +301,7 @@ func TestBatchQuery(t *testing.T) { for i := range com.Count { // retrieving checksum from batchID and at index i // the comparing with records - data, err := journal.Get(FormatSeq(com.batchID, int(i))) + data, err := journal.Get(FormatSeq(com.BatchID, int(i))) if err != nil { t.Log(err) t.Fatal("sequence while using Format for Seq") @@ -311,17 +311,18 @@ func TestBatchQuery(t *testing.T) { } } t.Log("Retrieving Batch") - var v CommitResult + var v Commitment v.ctx = &ctx - data, err := journal.Get(FormatBatch(com.batchID)) + data, err := journal.Get(FormatBatch(com.BatchID)) if err != nil { t.Fatal(err) } + t.Log(data) err = v.Decode(data) if err != nil { t.Fatal(err) } - if v.Root != com.Root { + if v.Roothash != com.Root { t.Fatal("mismatch Roots") } err = journal.Close() @@ -331,12 +332,12 @@ func TestBatchQuery(t *testing.T) { } else { // only peform query doesn't write to db _ = CommitResult{ - batchID: "09dd1d47d7f0e5dfac278513a723b6d424558669feb014aecf5afce040c18211", + BatchID: "09dd1d47d7f0e5dfac278513a723b6d424558669feb014aecf5afce040c18211", Root: [32]byte{89, 82, 203, 230, 157, 145, 229, 24, 119, 35, 162, 39, 108, 37, 209, 71, 3, 171, 242, 49, 6, 1, 84, 104, 252, 65, 22, 173, 7, 180, 233, 189}, Count: 0x3, } cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Encoder = encoder.NewEncoder(cfg.EncoderName) ctx.Hasher = crypto.NewHasher(cfg.HasherName) journal := NewJournalCache(&ctx) @@ -383,7 +384,7 @@ each seq represent an event stored in order seq:%s:%d -> id of event */ func TestFormatSeq(t *testing.T) { cfg := NewJournalConfig() - ctx := context.Context{Journal: *cfg} + ctx := settings.Settings{Journal: *cfg} ctx.Hasher = crypto.NewHasher(ctx.Journal.HasherName) s := hex.EncodeToString(ctx.Hasher.Sum(fmt.Appendf(nil, "seq:%s:%d", "12", 1))) b := FormatSeq("12", 1) // this only formats into seq:%s:%d diff --git a/internal/journal/types.go b/internal/journal/types.go index f69c1aa..c8f3145 100644 --- a/internal/journal/types.go +++ b/internal/journal/types.go @@ -1,9 +1,12 @@ package journal import ( - "crosstrace/context" - "crosstrace/internal/journal/database" + "crosstrace/settings" "time" + + "github.com/rawbytedev/zerokv" + "github.com/rawbytedev/zerokv/badgerdb" + "github.com/rawbytedev/zerokv/pebbledb" ) type JournalEntry interface { @@ -22,11 +25,11 @@ type JournalStore interface { BuildTree() error Get(id string) ([]byte, error) Batch() (*CommitResult, error) // used for manual batch creation - Close() error // shutdows + Close() error // shutdows } type CommitResult struct { - ctx *context.Context - batchID string + ctx *settings.Settings + BatchID string Root [32]byte Count uint32 WindowsStart time.Time // first j.Port // assuming that it is ordered @@ -34,17 +37,17 @@ type CommitResult struct { version string } type Commitment struct { - ctx *context.Context // anonymous + ctx *settings.Settings Roothash [32]byte Count uint32 WindowsStart time.Time WindowsEnd time.Time - commitment []byte + Commitment []byte } // Default format when received / Unsafe type PreEntry struct { - ctx *context.Context + ctx *settings.Settings sender_id string raw_msg string timestamp time.Time @@ -54,7 +57,7 @@ type PreEntry struct { // PostEntry is the sanitized event type PostEntry struct { - ctx *context.Context + ctx *settings.Settings SenderID string `json:"sender_id"` SessionID string `json:"session_id"` Timestamp time.Time `json:"timestamp"` @@ -73,14 +76,14 @@ type Event struct { comment string } -func NewLocalStorage(ctx *context.Context) (database.StorageDB, error) { +func NewLocalStorage(ctx *settings.Settings) (zerokv.Core, error) { switch ctx.Journal.DBName { case "badgerdb": - return database.NewBadgerdb(ctx.Journal) + return badgerdb.NewBadgerDB(badgerdb.Config{Dir: ctx.Journal.DBPath}) case "pebbledb": - return database.NewPebbledb(ctx.Journal) + return pebbledb.NewPebbleDB(pebbledb.Config{Dir: ctx.Journal.DBPath}) default: - return database.NewBadgerdb(ctx.Journal) + return badgerdb.NewBadgerDB(badgerdb.Config{Dir: ctx.Journal.DBPath}) } } @@ -88,8 +91,8 @@ func NewLocalStorage(ctx *context.Context) (database.StorageDB, error) { // this avoid having to recompute tree if something fails along the way type JournalCache struct { - ctx *context.Context - store database.StorageDB + ctx *settings.Settings + store zerokv.Core Post []JournalEntry treeroot []byte batchid []byte diff --git a/context/main.go b/settings/main.go similarity index 86% rename from context/main.go rename to settings/main.go index 1dceef6..f0cbca4 100644 --- a/context/main.go +++ b/settings/main.go @@ -1,4 +1,4 @@ -package context +package settings import ( "crosstrace/internal/configs" @@ -12,7 +12,7 @@ We have the main cfg that can be used to applied settings to other fields using ctx.Share() but we also allow users to configure it on their own if it's needed each field can be set with different configs and doesn't necessary needs to align with each other */ -type Context struct { +type Settings struct { Cfg configs.Configs // usually only this needs to be set Journal configs.JournalConfig Anchor configs.AnchorConfig @@ -22,12 +22,12 @@ type Context struct { Hasher crypto.Hasher } -func NewContext(cfg configs.Configs) *Context { - return &Context{Cfg: cfg} +func NewContext(cfg configs.Configs) *Settings { + return &Settings{Cfg: cfg} } // uses cfg and set the other configs -func (ctx *Context) Share() { +func (ctx *Settings) Share() { ctx.Journal = ctx.Cfg.Journal ctx.Anchor = ctx.Cfg.Anchor ctx.Batcher = ctx.Cfg.Batcher