Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions sonic_data_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2422,3 +2422,248 @@ func TestHandleTableDataBatchesOpRemove(t *testing.T) {
t.Errorf("DbDelTable called %d times, want 0 (multi-key remove must batch)", delCalls)
}
}
func TestDbClientOnceRun(t *testing.T) {
cleanup := setupTestTarget2RedisDb(t)
defer cleanup()
ns := ""
rclient := Target2RedisDb[ns]["STATE_DB"]
rclient.HSet(context.Background(), "NEIGH_STATE_TABLE|10.0.0.57", "peerType", "e-BGP")

t.Run("Success_ReturnsUpdateAndSync", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}}
c := DbClient{
pathG2S: map[*gnmipb.Path][]tablePath{
gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}},
},
}

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(1)

go c.OnceRun(q, once, &wg, nil)

once <- struct{}{}
wg.Wait()

var gotUpdate, gotSync bool
for !q.Empty() {
items, _ := q.Get(1)
val := items[0].(Value)
if val.GetSyncResponse() {
gotSync = true
} else if val.GetVal() != nil {
gotUpdate = true
}
}
if !gotUpdate {
t.Errorf("expected update notification")
}
if !gotSync {
t.Errorf("expected sync response")
}
})

t.Run("ChannelClosed_ExitsEarly", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}}
c := DbClient{
pathG2S: map[*gnmipb.Path][]tablePath{
gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}},
},
}

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

close(once)
go c.OnceRun(q, once, &wg, nil)
wg.Wait()

if !q.Empty() {
t.Errorf("expected no items in queue when channel is closed")
}
})

t.Run("NoData_ReturnsSyncOnly", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.99"}}}
c := DbClient{
pathG2S: map[*gnmipb.Path][]tablePath{
gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.99", delimitor: "|"}},
},
}

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(1)

go c.OnceRun(q, once, &wg, nil)

once <- struct{}{}
wg.Wait()

var gotSync bool
var gotUpdate bool
for !q.Empty() {
items, _ := q.Get(1)
val := items[0].(Value)
if val.GetSyncResponse() {
gotSync = true
} else if val.GetVal() != nil {
gotUpdate = true
}
}
if gotUpdate {
t.Errorf("did not expect update for non-existent key")
}
if !gotSync {
t.Errorf("expected sync response even with no data")
}
})
}

func TestMixedDbClientOnceRun(t *testing.T) {
mapkey := ":"
cleanup := setupMixedDbRedis(t, mapkey)
defer cleanup()

ns := ""
rclient := Target2RedisDb[ns]["STATE_DB"]
rclient.HSet(context.Background(), "NEIGH_STATE_TABLE|10.0.0.57", "peerType", "e-BGP")

t.Run("Success_ReturnsUpdateAndSync", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}}
tblPaths := []tablePath{{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}}

c := MixedDbClient{
mapkey: mapkey,
encoding: gnmipb.Encoding_JSON_IETF,
paths: []*gnmipb.Path{gnmiPath},
}

patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) {
return tblPaths, nil
})
defer patches.Reset()

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(1)

go c.OnceRun(q, once, &wg, nil)

once <- struct{}{}
wg.Wait()

var gotUpdate, gotSync bool
for !q.Empty() {
items, _ := q.Get(1)
val := items[0].(Value)
if val.GetSyncResponse() {
gotSync = true
} else if val.GetVal() != nil {
gotUpdate = true
}
}
if !gotUpdate {
t.Errorf("expected update notification")
}
if !gotSync {
t.Errorf("expected sync response")
}
})

t.Run("ChannelClosed_ExitsEarly", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}}

c := MixedDbClient{
mapkey: mapkey,
encoding: gnmipb.Encoding_JSON_IETF,
paths: []*gnmipb.Path{gnmiPath},
}

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)

close(once)
go c.OnceRun(q, once, &wg, nil)
wg.Wait()

if !q.Empty() {
t.Errorf("expected no items in queue when channel is closed")
}
})

t.Run("GetDbtablePath_Error", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "BAD_TABLE"}}}

c := MixedDbClient{
mapkey: mapkey,
encoding: gnmipb.Encoding_JSON_IETF,
paths: []*gnmipb.Path{gnmiPath},
}

patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) {
return nil, fmt.Errorf("simulated getDbtablePath error")
})
defer patches.Reset()

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(1)

go c.OnceRun(q, once, &wg, nil)

once <- struct{}{}
wg.Wait()

// Should have a fatal message in the queue
if q.Empty() {
t.Fatalf("expected fatal message in queue")
}
items, _ := q.Get(1)
val := items[0].(Value)
if val.GetSyncResponse() {
t.Errorf("expected fatal error, not sync response")
}
})

t.Run("TableData2TypedValue_Error", func(t *testing.T) {
gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}}
// Use a tablePath with a missing redis client to trigger error
tblPaths := []tablePath{{dbNamespace: "nonexistent_ns", dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}}

c := MixedDbClient{
mapkey: mapkey,
encoding: gnmipb.Encoding_JSON_IETF,
paths: []*gnmipb.Path{gnmiPath},
}

patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) {
return tblPaths, nil
})
defer patches.Reset()

q := queue.NewPriorityQueue(1, false)
once := make(chan struct{}, 1)
var wg sync.WaitGroup
wg.Add(1)

go c.OnceRun(q, once, &wg, nil)

once <- struct{}{}
wg.Wait()

// Should have a fatal message in the queue
if q.Empty() {
t.Fatalf("expected fatal message in queue")
}
})
}
40 changes: 39 additions & 1 deletion sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,45 @@ func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.W
}

func (c *DbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
return
c.w = w
defer c.w.Done()
c.q = q
c.channel = once

_, more := <-c.channel
if !more {
log.V(1).Infof("%v once channel closed, exiting OnceRun routine", c)
return
}

t1 := time.Now()
for gnmiPath, tblPaths := range c.pathG2S {
val, err, updateReceived := subscribeTableData2TypedValue(tblPaths, nil)
if err != nil {
log.V(2).Infof("OnceRun: Unable to create gnmi TypedValue due to err: %v", err)
putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err))
return
}
if updateReceived {
spbv := &spb.Value{
Prefix: c.prefix,
Path: gnmiPath,
Timestamp: time.Now().UnixNano(),
SyncResponse: false,
Val: val,
}
c.q.Put(Value{spbv})
log.V(6).Infof("OnceRun: Added spbv #%v", spbv)
}
}

c.q.Put(Value{
&spb.Value{
Timestamp: time.Now().UnixNano(),
SyncResponse: true,
},
})
log.V(4).Infof("OnceRun: Sync done, total time taken: %v ms", int64(time.Since(t1)/time.Millisecond))
}
func (c *DbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) {
// wait sync for Get, not used for now
Expand Down
46 changes: 45 additions & 1 deletion sonic_data_client/mixed_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1789,7 +1789,51 @@ func (c *MixedDbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) {
}

func (c *MixedDbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
return
c.w = w
defer c.w.Done()
c.q = q
c.channel = once

_, more := <-c.channel
if !more {
log.V(1).Infof("%v once channel closed, exiting OnceRun routine", c)
return
}

t1 := time.Now()
for _, gnmiPath := range c.paths {
tblPaths, err := c.getDbtablePath(gnmiPath, nil)
if err != nil {
log.V(2).Infof("OnceRun: Unable to get table path due to err: %v", err)
putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err))
return
}
val, err, updateReceived := c.tableData2TypedValue(tblPaths, nil)
if err != nil {
log.V(2).Infof("OnceRun: Unable to create gnmi TypedValue due to err: %v", err)
putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err))
return
}
if updateReceived {
spbv := &spb.Value{
Prefix: c.prefix,
Path: gnmiPath,
Timestamp: time.Now().UnixNano(),
SyncResponse: false,
Val: val,
}
c.q.Put(Value{spbv})
log.V(6).Infof("OnceRun: Added spbv #%v", spbv)
}
}

c.q.Put(Value{
&spb.Value{
Timestamp: time.Now().UnixNano(),
SyncResponse: true,
},
})
log.V(4).Infof("OnceRun: Sync done, total time taken: %v ms", int64(time.Since(t1)/time.Millisecond))
}

func (c *MixedDbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) {
Expand Down
Loading
Loading