Skip to content

Commit a8e4464

Browse files
committed
feat: add lineage HTTP client and MCP tools DEV-165922
1 parent fec41b9 commit a8e4464

2 files changed

Lines changed: 317 additions & 0 deletions

File tree

pkg/clients/lineage_client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,40 @@ type LineageEntity struct {
1616
ParentId string `json:"parentId,omitempty"`
1717
}
1818

19+
// UnmarshalJSON handles both plain string values and JsonNullable-wrapped objects
20+
// for the DgcId and ParentId fields. The server may serialize JsonNullable<T> as
21+
// {"present": false, "undefined": true} when JsonNullableModule is not on the classpath.
22+
func (e *LineageEntity) UnmarshalJSON(data []byte) error {
23+
type lineageEntityAlias LineageEntity
24+
var raw struct {
25+
lineageEntityAlias
26+
DgcId json.RawMessage `json:"dgcId"`
27+
ParentId json.RawMessage `json:"parentId"`
28+
}
29+
if err := json.Unmarshal(data, &raw); err != nil {
30+
return err
31+
}
32+
*e = LineageEntity(raw.lineageEntityAlias)
33+
e.DgcId = extractJsonNullableString(raw.DgcId)
34+
e.ParentId = extractJsonNullableString(raw.ParentId)
35+
return nil
36+
}
37+
38+
// extractJsonNullableString extracts a string from either a plain JSON string
39+
// or a JsonNullable object. Returns empty string for null, undefined, or objects
40+
// where the value is not recoverable.
41+
func extractJsonNullableString(data json.RawMessage) string {
42+
if len(data) == 0 || string(data) == "null" {
43+
return ""
44+
}
45+
var s string
46+
if err := json.Unmarshal(data, &s); err == nil {
47+
return s
48+
}
49+
// JsonNullable object format — actual value is not serialized without the module
50+
return ""
51+
}
52+
1953
type LineageRelation struct {
2054
SourceEntityId string `json:"sourceEntityId"`
2155
TargetEntityId string `json:"targetEntityId"`

pkg/clients/lineage_client_test.go

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
package clients
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"path"
10+
"testing"
11+
)
12+
13+
// redirectClient rewrites requests to hit the test server instead of relative paths.
14+
type redirectClient struct {
15+
baseURL string
16+
next http.RoundTripper
17+
}
18+
19+
func (c *redirectClient) RoundTrip(req *http.Request) (*http.Response, error) {
20+
clone := req.Clone(req.Context())
21+
base, _ := url.Parse(c.baseURL)
22+
clone.URL.Scheme = base.Scheme
23+
clone.URL.Host = base.Host
24+
clone.URL.Path = path.Join(base.Path, req.URL.Path)
25+
clone.URL.RawQuery = req.URL.RawQuery
26+
return c.next.RoundTrip(clone)
27+
}
28+
29+
func newTestClient(server *httptest.Server) *http.Client {
30+
return &http.Client{Transport: &redirectClient{baseURL: server.URL, next: http.DefaultTransport}}
31+
}
32+
33+
func writeJSON(w http.ResponseWriter, status int, v any) {
34+
w.Header().Set("Content-Type", "application/json")
35+
w.WriteHeader(status)
36+
_ = json.NewEncoder(w).Encode(v)
37+
}
38+
39+
// --- LineageEntity.UnmarshalJSON ---
40+
41+
func TestLineageEntityUnmarshalJSON_plainStrings(t *testing.T) {
42+
data := []byte(`{"id":"1","name":"col","type":"column","dgcId":"550e8400-e29b-41d4-a716-446655440000","parentId":"42"}`)
43+
var e LineageEntity
44+
if err := json.Unmarshal(data, &e); err != nil {
45+
t.Fatalf("unexpected error: %v", err)
46+
}
47+
if e.DgcId != "550e8400-e29b-41d4-a716-446655440000" {
48+
t.Errorf("expected dgcId string, got %q", e.DgcId)
49+
}
50+
if e.ParentId != "42" {
51+
t.Errorf("expected parentId string, got %q", e.ParentId)
52+
}
53+
}
54+
55+
func TestLineageEntityUnmarshalJSON_jsonNullableObjects(t *testing.T) {
56+
// Simulates response from server without JsonNullableModule registered.
57+
data := []byte(`{"id":"32","name":"SALESFACT","type":"table","sourceIds":[],"dgcId":{"undefined":true,"present":false},"parentId":{"undefined":false,"present":true}}`)
58+
var e LineageEntity
59+
if err := json.Unmarshal(data, &e); err != nil {
60+
t.Fatalf("unexpected error: %v", err)
61+
}
62+
if e.Id != "32" {
63+
t.Errorf("expected id 32, got %q", e.Id)
64+
}
65+
if e.DgcId != "" {
66+
t.Errorf("expected empty dgcId, got %q", e.DgcId)
67+
}
68+
if e.ParentId != "" {
69+
t.Errorf("expected empty parentId, got %q", e.ParentId)
70+
}
71+
}
72+
73+
func TestLineageEntityUnmarshalJSON_nullFields(t *testing.T) {
74+
data := []byte(`{"id":"5","name":"t","type":"table","dgcId":null,"parentId":null}`)
75+
var e LineageEntity
76+
if err := json.Unmarshal(data, &e); err != nil {
77+
t.Fatalf("unexpected error: %v", err)
78+
}
79+
if e.DgcId != "" {
80+
t.Errorf("expected empty dgcId, got %q", e.DgcId)
81+
}
82+
if e.ParentId != "" {
83+
t.Errorf("expected empty parentId, got %q", e.ParentId)
84+
}
85+
}
86+
87+
func TestLineageEntityUnmarshalJSON_missingFields(t *testing.T) {
88+
data := []byte(`{"id":"7","name":"t","type":"table"}`)
89+
var e LineageEntity
90+
if err := json.Unmarshal(data, &e); err != nil {
91+
t.Fatalf("unexpected error: %v", err)
92+
}
93+
if e.DgcId != "" || e.ParentId != "" {
94+
t.Errorf("expected empty optional fields, got dgcId=%q parentId=%q", e.DgcId, e.ParentId)
95+
}
96+
}
97+
98+
// --- GetLineageEntity ---
99+
100+
func TestGetLineageEntity_RoutesCorrectly(t *testing.T) {
101+
var capturedPath string
102+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
103+
capturedPath = r.URL.Path
104+
writeJSON(w, http.StatusOK, map[string]any{"id": "entity-1", "name": "col1", "type": "Column"})
105+
}))
106+
defer server.Close()
107+
108+
_, err := GetLineageEntity(context.Background(), newTestClient(server), "entity-1")
109+
if err != nil {
110+
t.Fatalf("unexpected error: %v", err)
111+
}
112+
113+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1"
114+
if capturedPath != expected {
115+
t.Errorf("expected path %q, got %q", expected, capturedPath)
116+
}
117+
}
118+
119+
// --- GetLineageUpstream ---
120+
121+
func TestGetLineageUpstream_RoutesCorrectly(t *testing.T) {
122+
var capturedPath string
123+
var capturedQuery url.Values
124+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
125+
capturedPath = r.URL.Path
126+
capturedQuery = r.URL.Query()
127+
writeJSON(w, http.StatusOK, map[string]any{"relations": []any{}, "pagination": nil})
128+
}))
129+
defer server.Close()
130+
131+
_, err := GetLineageUpstream(context.Background(), newTestClient(server), "entity-1", "Column", 10, "cursor-abc")
132+
if err != nil {
133+
t.Fatalf("unexpected error: %v", err)
134+
}
135+
136+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1/upstream"
137+
if capturedPath != expected {
138+
t.Errorf("expected path %q, got %q", expected, capturedPath)
139+
}
140+
if capturedQuery.Get("entityType") != "Column" {
141+
t.Errorf("expected entityType=Column, got %q", capturedQuery.Get("entityType"))
142+
}
143+
if capturedQuery.Get("limit") != "10" {
144+
t.Errorf("expected limit=10, got %q", capturedQuery.Get("limit"))
145+
}
146+
if capturedQuery.Get("cursor") != "cursor-abc" {
147+
t.Errorf("expected cursor=cursor-abc, got %q", capturedQuery.Get("cursor"))
148+
}
149+
}
150+
151+
// --- GetLineageDownstream ---
152+
153+
func TestGetLineageDownstream_RoutesCorrectly(t *testing.T) {
154+
var capturedPath string
155+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
156+
capturedPath = r.URL.Path
157+
writeJSON(w, http.StatusOK, map[string]any{"relations": []any{}, "pagination": nil})
158+
}))
159+
defer server.Close()
160+
161+
_, err := GetLineageDownstream(context.Background(), newTestClient(server), "entity-2", "", 0, "")
162+
if err != nil {
163+
t.Fatalf("unexpected error: %v", err)
164+
}
165+
166+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-2/downstream"
167+
if capturedPath != expected {
168+
t.Errorf("expected path %q, got %q", expected, capturedPath)
169+
}
170+
}
171+
172+
// --- SearchLineageEntities ---
173+
174+
func TestSearchLineageEntities_RoutesCorrectly(t *testing.T) {
175+
var capturedPath string
176+
var capturedQuery url.Values
177+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
178+
capturedPath = r.URL.Path
179+
capturedQuery = r.URL.Query()
180+
writeJSON(w, http.StatusOK, map[string]any{"results": []any{}, "pagination": nil})
181+
}))
182+
defer server.Close()
183+
184+
_, err := SearchLineageEntities(context.Background(), newTestClient(server), "orders", "Table", "dgc-id-1", 5, "")
185+
if err != nil {
186+
t.Fatalf("unexpected error: %v", err)
187+
}
188+
189+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities"
190+
if capturedPath != expected {
191+
t.Errorf("expected path %q, got %q", expected, capturedPath)
192+
}
193+
if capturedQuery.Get("nameContains") != "orders" {
194+
t.Errorf("expected nameContains=orders, got %q", capturedQuery.Get("nameContains"))
195+
}
196+
if capturedQuery.Get("type") != "Table" {
197+
t.Errorf("expected type=Table, got %q", capturedQuery.Get("type"))
198+
}
199+
if capturedQuery.Get("dgcId") != "dgc-id-1" {
200+
t.Errorf("expected dgcId=dgc-id-1, got %q", capturedQuery.Get("dgcId"))
201+
}
202+
if capturedQuery.Get("limit") != "5" {
203+
t.Errorf("expected limit=5, got %q", capturedQuery.Get("limit"))
204+
}
205+
}
206+
207+
func TestSearchLineageEntities_JsonNullableObjects(t *testing.T) {
208+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
209+
// Simulate server without JsonNullableModule
210+
w.Header().Set("Content-Type", "application/json")
211+
w.WriteHeader(http.StatusOK)
212+
_, _ = w.Write([]byte(`{"results":[{"id":"32","name":"SALESFACT","type":"table","sourceIds":[],"dgcId":{"undefined":true,"present":false},"parentId":{"undefined":false,"present":true}}],"nextCursor":null}`))
213+
}))
214+
defer server.Close()
215+
216+
out, err := SearchLineageEntities(context.Background(), newTestClient(server), "SALES", "", "", 5, "")
217+
if err != nil {
218+
t.Fatalf("unexpected error: %v", err)
219+
}
220+
if len(out.Results) != 1 {
221+
t.Fatalf("expected 1 result, got %d", len(out.Results))
222+
}
223+
e := out.Results[0]
224+
if e.Id != "32" {
225+
t.Errorf("expected id 32, got %q", e.Id)
226+
}
227+
if e.DgcId != "" {
228+
t.Errorf("expected empty dgcId, got %q", e.DgcId)
229+
}
230+
}
231+
232+
// --- GetLineageTransformation ---
233+
234+
func TestGetLineageTransformation_RoutesCorrectly(t *testing.T) {
235+
var capturedPath string
236+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
237+
capturedPath = r.URL.Path
238+
writeJSON(w, http.StatusOK, map[string]any{"id": "transform-1", "name": "t1"})
239+
}))
240+
defer server.Close()
241+
242+
_, err := GetLineageTransformation(context.Background(), newTestClient(server), "transform-1")
243+
if err != nil {
244+
t.Fatalf("unexpected error: %v", err)
245+
}
246+
247+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/transformations/transform-1"
248+
if capturedPath != expected {
249+
t.Errorf("expected path %q, got %q", expected, capturedPath)
250+
}
251+
}
252+
253+
// --- SearchLineageTransformations ---
254+
255+
func TestSearchLineageTransformations_RoutesCorrectly(t *testing.T) {
256+
var capturedPath string
257+
var capturedQuery url.Values
258+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
259+
capturedPath = r.URL.Path
260+
capturedQuery = r.URL.Query()
261+
writeJSON(w, http.StatusOK, map[string]any{"results": []any{}, "pagination": nil})
262+
}))
263+
defer server.Close()
264+
265+
_, err := SearchLineageTransformations(context.Background(), newTestClient(server), "etl", 20, "next-cursor")
266+
if err != nil {
267+
t.Fatalf("unexpected error: %v", err)
268+
}
269+
270+
expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/transformations"
271+
if capturedPath != expected {
272+
t.Errorf("expected path %q, got %q", expected, capturedPath)
273+
}
274+
if capturedQuery.Get("nameContains") != "etl" {
275+
t.Errorf("expected nameContains=etl, got %q", capturedQuery.Get("nameContains"))
276+
}
277+
if capturedQuery.Get("limit") != "20" {
278+
t.Errorf("expected limit=20, got %q", capturedQuery.Get("limit"))
279+
}
280+
if capturedQuery.Get("cursor") != "next-cursor" {
281+
t.Errorf("expected cursor=next-cursor, got %q", capturedQuery.Get("cursor"))
282+
}
283+
}

0 commit comments

Comments
 (0)