Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import (
type Bucket[T any] interface {
Get(ctx context.Context, key string) (Entry[T], error)
GetRevision(ctx context.Context, key string, rev uint64) (Entry[T], error)
GetLatestRevision(ctx context.Context, key string) (Entry[T], error)
Put(ctx context.Context, entry PutEntry[T]) (uint64, error)
Update(ctx context.Context, entry UpdateEntry[T]) (uint64, error)
Delete(ctx context.Context, key string, opts ...DeleteOption) error
History(ctx context.Context, key string, opts ...HistoryOption) ([]Entry[T], error)
Watch(ctx context.Context, match string, opts ...WatcherOption) (Watcher[T], error)
WatchAll(ctx context.Context, opts ...WatcherOption) (Watcher[T], error)
}
Expand Down Expand Up @@ -94,6 +96,20 @@ func (s *bucketImpl[T]) GetRevision(ctx context.Context, key string, rev uint64)
return s.get(raw)
}

func (s *bucketImpl[T]) GetLatestRevision(ctx context.Context, key string) (Entry[T], error) {
watcher, err := s.bucket.Watch(ctx, s.prefixed(key))
if err != nil {
return nil, err
}
defer watcher.Stop()

entry := <-watcher.Updates()
if entry == nil {
return nil, jetstream.ErrKeyNotFound
}
return s.get(entry)
}

func (s *bucketImpl[T]) Put(ctx context.Context, entry PutEntry[T]) (uint64, error) {
b, err := encodeBucketEntryHeader(entry.Header(), entry.Value())
if err != nil {
Expand Down Expand Up @@ -128,6 +144,23 @@ func (s *bucketImpl[T]) Delete(ctx context.Context, key string, opts ...DeleteOp
return s.bucket.Delete(ctx, s.prefixed(key), opts...)
}

type HistoryOption = jetstream.WatchOpt

func (s *bucketImpl[T]) History(ctx context.Context, key string, opts ...HistoryOption) ([]Entry[T], error) {
rawEntries, err := s.bucket.History(ctx, s.prefixed(key), opts...)
if err != nil {
return nil, err
}
entries := make([]Entry[T], len(rawEntries))
for i, raw := range rawEntries {
entries[i], err = s.get(raw)
if err != nil {
return nil, err
}
}
return entries, nil
}

func (s *bucketImpl[T]) Watch(ctx context.Context, match string, opts ...WatcherOption) (Watcher[T], error) {
params := bucketWatcherParams{}
for _, opt := range opts {
Expand Down
132 changes: 132 additions & 0 deletions bucket/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,65 @@ func TestBucket_GetRevision(t *testing.T) {
assert.Equal(t, testModel{Name: "balooney"}, *v.Value())
}

func TestBucket_GetLatestRevision(t *testing.T) {
const value = "----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"balooney"}`
key := "parson.had.a.dog"

t.Run("key not found", func(t *testing.T) {
// Create a mock watcher with updates channel
nw := jetstreammock.NewKeyWatcher(t)
updates := make(chan jetstream.KeyValueEntry, 1)
updates <- nil // Simulate no revisions
nw.EXPECT().Updates().Return(updates).Once()
nw.EXPECT().Stop().Once().Return(nil)

// Set up the KeyValue mock to return our mock watcher
nb := jetstreammock.NewKeyValue(t)
nb.EXPECT().Watch(mock.Anything, key).Return(nw, nil).Once()

// Create the bucket and test GetLatestRevision
b := bucket.NewBucket[testModel](nb)
v, err := b.GetLatestRevision(t.Context(), key)

// Verify results
require.ErrorIs(t, err, jetstream.ErrKeyNotFound)
require.Nil(t, v)
})

t.Run("key exists", func(t *testing.T) {
// Create a mock KeyValueEntry
ne := jetstreammock.NewKeyValueEntry(t)
ne.EXPECT().Key().Return(key).Once()
ne.EXPECT().Operation().Return(jetstream.KeyValuePut).Once()
ne.EXPECT().Value().Return([]byte(value)).Once()
ne.EXPECT().Revision().Return(1).Once()

// Create a mock watcher with updates channel
nw := jetstreammock.NewKeyWatcher(t)

// Set up the updates channel to return our mock entry
updates := make(chan jetstream.KeyValueEntry, 1)
updates <- ne
nw.EXPECT().Updates().Return(updates).Once()
nw.EXPECT().Stop().Once().Return(nil)

// Set up the KeyValue mock to return our mock watcher
nb := jetstreammock.NewKeyValue(t)
nb.EXPECT().Watch(mock.Anything, key).Return(nw, nil).Once()

// Create the bucket and test GetLatestRevision
b := bucket.NewBucket[testModel](nb)
v, err := b.GetLatestRevision(t.Context(), key)

// Verify results
require.NoError(t, err)
require.NotNil(t, v)
assert.Equal(t, key, v.Key())
assert.Equal(t, testModel{Name: "balooney"}, *v.Value())
assert.Equal(t, uint64(1), v.Revision())
})
}

func TestBucket_Put(t *testing.T) {
const expect = "----\r\nContent-Type: application/json\r\nX-Breed: shavka\r\n\r\n" + `{"name":"balooney"}`
e := testPutUpdateEntryImpl{
Expand Down Expand Up @@ -165,3 +224,76 @@ func TestBucket_Delete(t *testing.T) {

require.NoError(t, err)
}

func TestBucket_History(t *testing.T) {
t.Run("get history entries", func(t *testing.T) {
// Define prefix and keys
prefix := "parson"
unprefixedKey := "had.a.dog"
prefixedKey := prefix + "." + unprefixedKey

// Create multiple mock entries representing history revisions
ne1 := jetstreammock.NewKeyValueEntry(t)
ne1.EXPECT().Key().Return(prefixedKey).Once()
ne1.EXPECT().Operation().Return(jetstream.KeyValuePut).Once()
ne1.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"spot"}`)).Once()
ne1.EXPECT().Revision().Return(uint64(1)).Once()

ne2 := jetstreammock.NewKeyValueEntry(t)
ne2.EXPECT().Key().Return(prefixedKey).Once()
ne2.EXPECT().Operation().Return(jetstream.KeyValuePut).Once()
ne2.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"rover"}`)).Once()
ne2.EXPECT().Revision().Return(uint64(2)).Once()

ne3 := jetstreammock.NewKeyValueEntry(t)
ne3.EXPECT().Key().Return(prefixedKey).Once()
ne3.EXPECT().Operation().Return(jetstream.KeyValuePut).Once()
ne3.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"balooney"}`)).Once()
ne3.EXPECT().Revision().Return(uint64(3)).Once()

// Create a slice of history entries
historyEntries := []jetstream.KeyValueEntry{ne1, ne2, ne3}

// Set up the KeyValue mock to return our history entries
nb := jetstreammock.NewKeyValue(t)
nb.EXPECT().History(mock.Anything, prefixedKey, mock.Anything).Return(historyEntries, nil).Once()

// Create the bucket with prefix and test History
b := bucket.NewBucket[testModel](nb, bucket.BucketKeyPrefix(prefix))
entries, err := b.History(t.Context(), unprefixedKey)

// Verify results
require.NoError(t, err)
require.Len(t, entries, 3)

// Verify the entries are in the correct order (oldest to newest)
// and that keys are properly deprefixed
assert.Equal(t, unprefixedKey, entries[0].Key())
assert.Equal(t, "spot", entries[0].Value().Name)
assert.Equal(t, uint64(1), entries[0].Revision())

assert.Equal(t, unprefixedKey, entries[1].Key())
assert.Equal(t, "rover", entries[1].Value().Name)
assert.Equal(t, uint64(2), entries[1].Revision())

assert.Equal(t, unprefixedKey, entries[2].Key())
assert.Equal(t, "balooney", entries[2].Value().Name)
assert.Equal(t, uint64(3), entries[2].Revision())
})

t.Run("empty history", func(t *testing.T) {
key := "no-history-key"

// Set up the KeyValue mock to return empty history
nb := jetstreammock.NewKeyValue(t)
nb.EXPECT().History(mock.Anything, key, mock.Anything).Return([]jetstream.KeyValueEntry{}, nil).Once()

// Create the bucket and test History
b := bucket.NewBucket[testModel](nb)
entries, err := b.History(t.Context(), key)

// Verify results
require.NoError(t, err)
assert.Empty(t, entries)
})
}