diff --git a/sonic_data_client/client_test.go b/sonic_data_client/client_test.go index 9e32f651e..c38c5b411 100644 --- a/sonic_data_client/client_test.go +++ b/sonic_data_client/client_test.go @@ -2422,3 +2422,248 @@ func TestHandleTableDataBatchesOpRemove(t *testing.T) { t.Errorf("DbDelTable called %d times, want 0 (multi-key remove must batch)", delCalls) } } +func TestDbClientOnceRun(t *testing.T) { + cleanup := setupTestTarget2RedisDb(t) + defer cleanup() + ns := "" + rclient := Target2RedisDb[ns]["STATE_DB"] + rclient.HSet(context.Background(), "NEIGH_STATE_TABLE|10.0.0.57", "peerType", "e-BGP") + + t.Run("Success_ReturnsUpdateAndSync", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}} + c := DbClient{ + pathG2S: map[*gnmipb.Path][]tablePath{ + gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}}, + }, + } + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(1) + + go c.OnceRun(q, once, &wg, nil) + + once <- struct{}{} + wg.Wait() + + var gotUpdate, gotSync bool + for !q.Empty() { + items, _ := q.Get(1) + val := items[0].(Value) + if val.GetSyncResponse() { + gotSync = true + } else if val.GetVal() != nil { + gotUpdate = true + } + } + if !gotUpdate { + t.Errorf("expected update notification") + } + if !gotSync { + t.Errorf("expected sync response") + } + }) + + t.Run("ChannelClosed_ExitsEarly", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}} + c := DbClient{ + pathG2S: map[*gnmipb.Path][]tablePath{ + gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}}, + }, + } + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + + close(once) + go c.OnceRun(q, once, &wg, nil) + wg.Wait() + + if !q.Empty() { + t.Errorf("expected no items in queue when channel is closed") + } + }) + + t.Run("NoData_ReturnsSyncOnly", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.99"}}} + c := DbClient{ + pathG2S: map[*gnmipb.Path][]tablePath{ + gnmiPath: {{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.99", delimitor: "|"}}, + }, + } + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(1) + + go c.OnceRun(q, once, &wg, nil) + + once <- struct{}{} + wg.Wait() + + var gotSync bool + var gotUpdate bool + for !q.Empty() { + items, _ := q.Get(1) + val := items[0].(Value) + if val.GetSyncResponse() { + gotSync = true + } else if val.GetVal() != nil { + gotUpdate = true + } + } + if gotUpdate { + t.Errorf("did not expect update for non-existent key") + } + if !gotSync { + t.Errorf("expected sync response even with no data") + } + }) +} + +func TestMixedDbClientOnceRun(t *testing.T) { + mapkey := ":" + cleanup := setupMixedDbRedis(t, mapkey) + defer cleanup() + + ns := "" + rclient := Target2RedisDb[ns]["STATE_DB"] + rclient.HSet(context.Background(), "NEIGH_STATE_TABLE|10.0.0.57", "peerType", "e-BGP") + + t.Run("Success_ReturnsUpdateAndSync", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}} + tblPaths := []tablePath{{dbNamespace: ns, dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}} + + c := MixedDbClient{ + mapkey: mapkey, + encoding: gnmipb.Encoding_JSON_IETF, + paths: []*gnmipb.Path{gnmiPath}, + } + + patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) { + return tblPaths, nil + }) + defer patches.Reset() + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(1) + + go c.OnceRun(q, once, &wg, nil) + + once <- struct{}{} + wg.Wait() + + var gotUpdate, gotSync bool + for !q.Empty() { + items, _ := q.Get(1) + val := items[0].(Value) + if val.GetSyncResponse() { + gotSync = true + } else if val.GetVal() != nil { + gotUpdate = true + } + } + if !gotUpdate { + t.Errorf("expected update notification") + } + if !gotSync { + t.Errorf("expected sync response") + } + }) + + t.Run("ChannelClosed_ExitsEarly", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}} + + c := MixedDbClient{ + mapkey: mapkey, + encoding: gnmipb.Encoding_JSON_IETF, + paths: []*gnmipb.Path{gnmiPath}, + } + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + + close(once) + go c.OnceRun(q, once, &wg, nil) + wg.Wait() + + if !q.Empty() { + t.Errorf("expected no items in queue when channel is closed") + } + }) + + t.Run("GetDbtablePath_Error", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "BAD_TABLE"}}} + + c := MixedDbClient{ + mapkey: mapkey, + encoding: gnmipb.Encoding_JSON_IETF, + paths: []*gnmipb.Path{gnmiPath}, + } + + patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) { + return nil, fmt.Errorf("simulated getDbtablePath error") + }) + defer patches.Reset() + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(1) + + go c.OnceRun(q, once, &wg, nil) + + once <- struct{}{} + wg.Wait() + + // Should have a fatal message in the queue + if q.Empty() { + t.Fatalf("expected fatal message in queue") + } + items, _ := q.Get(1) + val := items[0].(Value) + if val.GetSyncResponse() { + t.Errorf("expected fatal error, not sync response") + } + }) + + t.Run("TableData2TypedValue_Error", func(t *testing.T) { + gnmiPath := &gnmipb.Path{Elem: []*gnmipb.PathElem{{Name: "NEIGH_STATE_TABLE"}, {Name: "10.0.0.57"}}} + // Use a tablePath with a missing redis client to trigger error + tblPaths := []tablePath{{dbNamespace: "nonexistent_ns", dbName: "STATE_DB", tableName: "NEIGH_STATE_TABLE", tableKey: "10.0.0.57", delimitor: "|"}} + + c := MixedDbClient{ + mapkey: mapkey, + encoding: gnmipb.Encoding_JSON_IETF, + paths: []*gnmipb.Path{gnmiPath}, + } + + patches := gomonkey.ApplyPrivateMethod(&c, "getDbtablePath", func(_ *MixedDbClient, _ *gnmipb.Path, _ *gnmipb.Path) ([]tablePath, error) { + return tblPaths, nil + }) + defer patches.Reset() + + q := queue.NewPriorityQueue(1, false) + once := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(1) + + go c.OnceRun(q, once, &wg, nil) + + once <- struct{}{} + wg.Wait() + + // Should have a fatal message in the queue + if q.Empty() { + t.Fatalf("expected fatal message in queue") + } + }) +} diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 58629d35e..b249b6d72 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -382,7 +382,45 @@ func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.W } func (c *DbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - return + c.w = w + defer c.w.Done() + c.q = q + c.channel = once + + _, more := <-c.channel + if !more { + log.V(1).Infof("%v once channel closed, exiting OnceRun routine", c) + return + } + + t1 := time.Now() + for gnmiPath, tblPaths := range c.pathG2S { + val, err, updateReceived := subscribeTableData2TypedValue(tblPaths, nil) + if err != nil { + log.V(2).Infof("OnceRun: Unable to create gnmi TypedValue due to err: %v", err) + putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err)) + return + } + if updateReceived { + spbv := &spb.Value{ + Prefix: c.prefix, + Path: gnmiPath, + Timestamp: time.Now().UnixNano(), + SyncResponse: false, + Val: val, + } + c.q.Put(Value{spbv}) + log.V(6).Infof("OnceRun: Added spbv #%v", spbv) + } + } + + c.q.Put(Value{ + &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + }, + }) + log.V(4).Infof("OnceRun: Sync done, total time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) } func (c *DbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { // wait sync for Get, not used for now diff --git a/sonic_data_client/mixed_db_client.go b/sonic_data_client/mixed_db_client.go index 8318f1989..eb302a8ea 100644 --- a/sonic_data_client/mixed_db_client.go +++ b/sonic_data_client/mixed_db_client.go @@ -1789,7 +1789,51 @@ func (c *MixedDbClient) Get(w *sync.WaitGroup) ([]*spb.Value, error) { } func (c *MixedDbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { - return + c.w = w + defer c.w.Done() + c.q = q + c.channel = once + + _, more := <-c.channel + if !more { + log.V(1).Infof("%v once channel closed, exiting OnceRun routine", c) + return + } + + t1 := time.Now() + for _, gnmiPath := range c.paths { + tblPaths, err := c.getDbtablePath(gnmiPath, nil) + if err != nil { + log.V(2).Infof("OnceRun: Unable to get table path due to err: %v", err) + putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err)) + return + } + val, err, updateReceived := c.tableData2TypedValue(tblPaths, nil) + if err != nil { + log.V(2).Infof("OnceRun: Unable to create gnmi TypedValue due to err: %v", err) + putFatalMsg(c.q, fmt.Sprintf("OnceRun error: %v", err)) + return + } + if updateReceived { + spbv := &spb.Value{ + Prefix: c.prefix, + Path: gnmiPath, + Timestamp: time.Now().UnixNano(), + SyncResponse: false, + Val: val, + } + c.q.Put(Value{spbv}) + log.V(6).Infof("OnceRun: Added spbv #%v", spbv) + } + } + + c.q.Put(Value{ + &spb.Value{ + Timestamp: time.Now().UnixNano(), + SyncResponse: true, + }, + }) + log.V(4).Infof("OnceRun: Sync done, total time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) } func (c *MixedDbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { diff --git a/test/test_gnmi_configdb.py b/test/test_gnmi_configdb.py index 5a60d7c84..73303c3d0 100644 --- a/test/test_gnmi_configdb.py +++ b/test/test_gnmi_configdb.py @@ -5,6 +5,7 @@ import threading, queue from utils import gnmi_set, gnmi_get, gnmi_dump, run_cmd from utils import gnmi_subscribe_poll, gnmi_subscribe_stream_sample, gnmi_subscribe_stream_onchange +from utils import gnmi_subscribe_once, gnmi_subscribe_once_multiple import pytest @@ -594,3 +595,74 @@ def worker(): ret, msg = result_queue.get() assert ret == 0, 'Fail to subscribe: ' + msg assert "bgp_asn" in msg, 'Invalid result: ' + msg + + +class TestGNMISubscribeOnce: + """Tests for Subscribe ONCE mode (OnceRun implementation).""" + + def test_gnmi_once_configdb_table(self): + """Subscribe ONCE on a CONFIG_DB table returns data and completes.""" + path = "/CONFIG_DB/localhost/DEVICE_METADATA" + ret, msg = gnmi_subscribe_once(path, timeout=10) + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + assert "bgp_asn" in msg, 'Expected DEVICE_METADATA content not found: ' + msg + + def test_gnmi_once_configdb_key(self): + """Subscribe ONCE on a specific CONFIG_DB key returns data and completes.""" + path = "/CONFIG_DB/localhost/DEVICE_METADATA/localhost" + ret, msg = gnmi_subscribe_once(path, timeout=10) + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + assert "bgp_asn" in msg, 'Expected field not found: ' + msg + + def test_gnmi_once_configdb_field(self): + """Subscribe ONCE on a specific CONFIG_DB field returns that field.""" + path = "/CONFIG_DB/localhost/DEVICE_METADATA/localhost/bgp_asn" + ret, msg = gnmi_subscribe_once(path, timeout=10) + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + assert "bgp_asn" in msg, 'Expected field not found: ' + msg + + def test_gnmi_once_invalid_table(self): + """Subscribe ONCE on a non-existent table returns error or empty.""" + path = "/CONFIG_DB/localhost/NONEXISTENT_TABLE_XYZ" + ret, msg = gnmi_subscribe_once(path, timeout=10) + # Should complete (not hang) regardless of whether table exists + # rc=124 means timeout which indicates OnceRun is broken + assert ret != 124, 'Subscribe ONCE timed out (OnceRun not implemented): ' + msg + + def test_gnmi_once_does_not_hang(self): + """Subscribe ONCE must complete within a reasonable time, not hang.""" + path = "/CONFIG_DB/localhost/DEVICE_METADATA/localhost" + start = time.time() + ret, msg = gnmi_subscribe_once(path, timeout=10) + elapsed = time.time() - start + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + # OnceRun should complete in well under 5 seconds + assert elapsed < 5, 'Subscribe ONCE took too long (%.1fs), possible hang' % elapsed + + def test_gnmi_once_multiple_paths(self): + """Subscribe ONCE with multiple paths returns data for all.""" + paths = [ + "/CONFIG_DB/localhost/DEVICE_METADATA/localhost", + "/CONFIG_DB/localhost/DEVICE_METADATA" + ] + ret, msg = gnmi_subscribe_once_multiple(paths, timeout=10) + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + assert "bgp_asn" in msg, 'DEVICE_METADATA not found: ' + msg + + def test_gnmi_once_returns_current_data(self): + """Subscribe ONCE returns the current value after a SET.""" + # Set a known value + update_path = '/sonic-db:CONFIG_DB/localhost/DEVICE_METADATA/localhost/bgp_asn' + cmd = r'redis-cli -n 4 hset "DEVICE_METADATA|localhost" bgp_asn 99999' + run_cmd(cmd) + time.sleep(0.5) + + # Subscribe ONCE should return the updated value + path = "/CONFIG_DB/localhost/DEVICE_METADATA/localhost" + ret, msg = gnmi_subscribe_once(path, timeout=10) + assert ret == 0, 'Subscribe ONCE failed (rc=%d): %s' % (ret, msg) + assert "99999" in msg, 'Updated value not reflected in ONCE response: ' + msg + + # Restore + cmd = r'redis-cli -n 4 hset "DEVICE_METADATA|localhost" bgp_asn 65100' + run_cmd(cmd) diff --git a/test/utils.py b/test/utils.py index 4d8500e1b..912daff99 100644 --- a/test/utils.py +++ b/test/utils.py @@ -198,6 +198,31 @@ def gnmi_subscribe_stream_onchange(gnmi_path, count, timeout): ret, msg = run_cmd(cmd) return ret, msg +def gnmi_subscribe_once(gnmi_path, timeout=30): + path = os.getcwd() + cmd = 'timeout %u ' % timeout + cmd += path + '/build/bin/gnmi_cli ' + cmd += '-client_types=gnmi -a 127.0.0.1:8080 -logtostderr -insecure ' + # Use sonic-db as default origin + cmd += '-origin=sonic-db ' + cmd += '-query_type=once ' + cmd += '-q %s' % (gnmi_path) + ret, msg = run_cmd(cmd) + return ret, msg + +def gnmi_subscribe_once_multiple(gnmi_paths, timeout=30): + path = os.getcwd() + cmd = 'timeout %u ' % timeout + cmd += path + '/build/bin/gnmi_cli ' + cmd += '-client_types=gnmi -a 127.0.0.1:8080 -logtostderr -insecure ' + # Use sonic-db as default origin + cmd += '-origin=sonic-db ' + cmd += '-query_type=once ' + for gnmi_path in gnmi_paths: + cmd += '-q %s ' % (gnmi_path) + ret, msg = run_cmd(cmd) + return ret, msg + def gnmi_dump(name): path = os.getcwd() cmd = 'sudo ' + path + '/build/bin/gnmi_dump'