diff --git a/bucket/bucket.go b/bucket/bucket.go index 8102aca..7f7b3c2 100644 --- a/bucket/bucket.go +++ b/bucket/bucket.go @@ -13,9 +13,11 @@ import ( type Bucket[T any] interface { Get(ctx context.Context, key string) (Entry[T], error) GetRevision(ctx context.Context, key string, rev uint64) (Entry[T], error) + GetLatestRevision(ctx context.Context, key string) (Entry[T], error) Put(ctx context.Context, entry PutEntry[T]) (uint64, error) Update(ctx context.Context, entry UpdateEntry[T]) (uint64, error) Delete(ctx context.Context, key string, opts ...DeleteOption) error + History(ctx context.Context, key string, opts ...HistoryOption) ([]Entry[T], error) Watch(ctx context.Context, match string, opts ...WatcherOption) (Watcher[T], error) WatchAll(ctx context.Context, opts ...WatcherOption) (Watcher[T], error) } @@ -94,6 +96,20 @@ func (s *bucketImpl[T]) GetRevision(ctx context.Context, key string, rev uint64) return s.get(raw) } +func (s *bucketImpl[T]) GetLatestRevision(ctx context.Context, key string) (Entry[T], error) { + watcher, err := s.bucket.Watch(ctx, s.prefixed(key)) + if err != nil { + return nil, err + } + defer watcher.Stop() + + entry := <-watcher.Updates() + if entry == nil { + return nil, jetstream.ErrKeyNotFound + } + return s.get(entry) +} + func (s *bucketImpl[T]) Put(ctx context.Context, entry PutEntry[T]) (uint64, error) { b, err := encodeBucketEntryHeader(entry.Header(), entry.Value()) if err != nil { @@ -128,6 +144,23 @@ func (s *bucketImpl[T]) Delete(ctx context.Context, key string, opts ...DeleteOp return s.bucket.Delete(ctx, s.prefixed(key), opts...) } +type HistoryOption = jetstream.WatchOpt + +func (s *bucketImpl[T]) History(ctx context.Context, key string, opts ...HistoryOption) ([]Entry[T], error) { + rawEntries, err := s.bucket.History(ctx, s.prefixed(key), opts...) + if err != nil { + return nil, err + } + entries := make([]Entry[T], len(rawEntries)) + for i, raw := range rawEntries { + entries[i], err = s.get(raw) + if err != nil { + return nil, err + } + } + return entries, nil +} + func (s *bucketImpl[T]) Watch(ctx context.Context, match string, opts ...WatcherOption) (Watcher[T], error) { params := bucketWatcherParams{} for _, opt := range opts { diff --git a/bucket/bucket_test.go b/bucket/bucket_test.go index fbd7db0..d0cfa40 100644 --- a/bucket/bucket_test.go +++ b/bucket/bucket_test.go @@ -117,6 +117,65 @@ func TestBucket_GetRevision(t *testing.T) { assert.Equal(t, testModel{Name: "balooney"}, *v.Value()) } +func TestBucket_GetLatestRevision(t *testing.T) { + const value = "----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"balooney"}` + key := "parson.had.a.dog" + + t.Run("key not found", func(t *testing.T) { + // Create a mock watcher with updates channel + nw := jetstreammock.NewKeyWatcher(t) + updates := make(chan jetstream.KeyValueEntry, 1) + updates <- nil // Simulate no revisions + nw.EXPECT().Updates().Return(updates).Once() + nw.EXPECT().Stop().Once().Return(nil) + + // Set up the KeyValue mock to return our mock watcher + nb := jetstreammock.NewKeyValue(t) + nb.EXPECT().Watch(mock.Anything, key).Return(nw, nil).Once() + + // Create the bucket and test GetLatestRevision + b := bucket.NewBucket[testModel](nb) + v, err := b.GetLatestRevision(t.Context(), key) + + // Verify results + require.ErrorIs(t, err, jetstream.ErrKeyNotFound) + require.Nil(t, v) + }) + + t.Run("key exists", func(t *testing.T) { + // Create a mock KeyValueEntry + ne := jetstreammock.NewKeyValueEntry(t) + ne.EXPECT().Key().Return(key).Once() + ne.EXPECT().Operation().Return(jetstream.KeyValuePut).Once() + ne.EXPECT().Value().Return([]byte(value)).Once() + ne.EXPECT().Revision().Return(1).Once() + + // Create a mock watcher with updates channel + nw := jetstreammock.NewKeyWatcher(t) + + // Set up the updates channel to return our mock entry + updates := make(chan jetstream.KeyValueEntry, 1) + updates <- ne + nw.EXPECT().Updates().Return(updates).Once() + nw.EXPECT().Stop().Once().Return(nil) + + // Set up the KeyValue mock to return our mock watcher + nb := jetstreammock.NewKeyValue(t) + nb.EXPECT().Watch(mock.Anything, key).Return(nw, nil).Once() + + // Create the bucket and test GetLatestRevision + b := bucket.NewBucket[testModel](nb) + v, err := b.GetLatestRevision(t.Context(), key) + + // Verify results + require.NoError(t, err) + require.NotNil(t, v) + assert.Equal(t, key, v.Key()) + assert.Equal(t, testModel{Name: "balooney"}, *v.Value()) + assert.Equal(t, uint64(1), v.Revision()) + }) +} + func TestBucket_Put(t *testing.T) { const expect = "----\r\nContent-Type: application/json\r\nX-Breed: shavka\r\n\r\n" + `{"name":"balooney"}` e := testPutUpdateEntryImpl{ @@ -165,3 +224,76 @@ func TestBucket_Delete(t *testing.T) { require.NoError(t, err) } + +func TestBucket_History(t *testing.T) { + t.Run("get history entries", func(t *testing.T) { + // Define prefix and keys + prefix := "parson" + unprefixedKey := "had.a.dog" + prefixedKey := prefix + "." + unprefixedKey + + // Create multiple mock entries representing history revisions + ne1 := jetstreammock.NewKeyValueEntry(t) + ne1.EXPECT().Key().Return(prefixedKey).Once() + ne1.EXPECT().Operation().Return(jetstream.KeyValuePut).Once() + ne1.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"spot"}`)).Once() + ne1.EXPECT().Revision().Return(uint64(1)).Once() + + ne2 := jetstreammock.NewKeyValueEntry(t) + ne2.EXPECT().Key().Return(prefixedKey).Once() + ne2.EXPECT().Operation().Return(jetstream.KeyValuePut).Once() + ne2.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"rover"}`)).Once() + ne2.EXPECT().Revision().Return(uint64(2)).Once() + + ne3 := jetstreammock.NewKeyValueEntry(t) + ne3.EXPECT().Key().Return(prefixedKey).Once() + ne3.EXPECT().Operation().Return(jetstream.KeyValuePut).Once() + ne3.EXPECT().Value().Return([]byte("----\r\nContent-Type: application/json\r\n\r\n" + `{"name":"balooney"}`)).Once() + ne3.EXPECT().Revision().Return(uint64(3)).Once() + + // Create a slice of history entries + historyEntries := []jetstream.KeyValueEntry{ne1, ne2, ne3} + + // Set up the KeyValue mock to return our history entries + nb := jetstreammock.NewKeyValue(t) + nb.EXPECT().History(mock.Anything, prefixedKey, mock.Anything).Return(historyEntries, nil).Once() + + // Create the bucket with prefix and test History + b := bucket.NewBucket[testModel](nb, bucket.BucketKeyPrefix(prefix)) + entries, err := b.History(t.Context(), unprefixedKey) + + // Verify results + require.NoError(t, err) + require.Len(t, entries, 3) + + // Verify the entries are in the correct order (oldest to newest) + // and that keys are properly deprefixed + assert.Equal(t, unprefixedKey, entries[0].Key()) + assert.Equal(t, "spot", entries[0].Value().Name) + assert.Equal(t, uint64(1), entries[0].Revision()) + + assert.Equal(t, unprefixedKey, entries[1].Key()) + assert.Equal(t, "rover", entries[1].Value().Name) + assert.Equal(t, uint64(2), entries[1].Revision()) + + assert.Equal(t, unprefixedKey, entries[2].Key()) + assert.Equal(t, "balooney", entries[2].Value().Name) + assert.Equal(t, uint64(3), entries[2].Revision()) + }) + + t.Run("empty history", func(t *testing.T) { + key := "no-history-key" + + // Set up the KeyValue mock to return empty history + nb := jetstreammock.NewKeyValue(t) + nb.EXPECT().History(mock.Anything, key, mock.Anything).Return([]jetstream.KeyValueEntry{}, nil).Once() + + // Create the bucket and test History + b := bucket.NewBucket[testModel](nb) + entries, err := b.History(t.Context(), key) + + // Verify results + require.NoError(t, err) + assert.Empty(t, entries) + }) +}