diff --git a/README.md b/README.md index c41d9c1..3ed745e 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,12 @@ This Go-based MCP server acts as a bridge between AI applications and Collibra, - [`search_asset_keyword`](pkg/tools/search_asset_keyword.go) - Wildcard keyword search for assets - [`search_data_class`](pkg/tools/search_data_classes.go) - Search for data classes with filters - [`search_data_classification_match`](pkg/tools/search_data_classification_matches.go) - Search for associations between data classes and assets +- [`get_lineage_entity`](pkg/tools/get_lineage_entity.go) - Get metadata about a specific entity in the technical lineage graph +- [`get_lineage_upstream`](pkg/tools/get_lineage_upstream.go) - Get upstream technical lineage (sources) for a data entity +- [`get_lineage_downstream`](pkg/tools/get_lineage_downstream.go) - Get downstream technical lineage (consumers) for a data entity +- [`search_lineage_entities`](pkg/tools/search_lineage_entities.go) - Search for entities in the technical lineage graph +- [`get_lineage_transformation`](pkg/tools/get_lineage_transformation.go) - Get details and logic of a specific data transformation +- [`search_lineage_transformations`](pkg/tools/search_lineage_transformations.go) - Search for transformations in the technical lineage graph ## Quick Start diff --git a/SKILLS.md b/SKILLS.md index 2278d1b..8d4d74c 100644 --- a/SKILLS.md +++ b/SKILLS.md @@ -51,6 +51,22 @@ These tools walk the Collibra asset relation graph to answer lineage and semanti **`data_classification_match_remove`** — Remove a classification match. Requires `dgc.classify` + `dgc.catalog`. +### Technical Lineage + +These tools query the technical lineage graph — a map of all data objects and transformations across external systems, including unregistered assets, temporary tables, and source code. Unlike business lineage (which only covers assets in the Collibra Data Catalog), technical lineage covers the full physical data flow. + +**`search_lineage_entities`** — Search for data entities in the technical lineage graph by name, type, or DGC UUID. Use this as a starting point when you don't have an entity ID. Supports partial name matching and type filtering (e.g. `table`, `column`, `report`). Paginated. + +**`get_lineage_entity`** — Get full metadata for a specific lineage entity by ID: name, type, source systems, parent entity, and linked DGC identifier. Use after obtaining an entity ID from a search or lineage traversal. + +**`get_lineage_upstream`** — Get all upstream entities (sources) for a data entity, along with the transformations connecting them. Use to answer "where does this data come from?". Paginated. + +**`get_lineage_downstream`** — Get all downstream entities (consumers) for a data entity, along with the transformations connecting them. Use to answer "what depends on this data?" or "what is impacted if this changes?". Paginated. + +**`search_lineage_transformations`** — Search for transformations by name. Returns lightweight summaries. Use to discover ETL jobs or SQL queries by name. + +**`get_lineage_transformation`** — Get the full details of a transformation, including its SQL or script logic. Use after finding a transformation ID in an upstream/downstream result or search. + ### Data Contracts **`data_contract_list`** — List data contracts with cursor-based pagination. Filter by `manifestId`. Use this to find a contract's UUID. @@ -84,6 +100,17 @@ These tools walk the Collibra asset relation graph to answer lineage and semanti 1. `search_asset_keyword` to find the business term UUID 2. `get_business_term_data` → data attributes → columns → tables +### Trace upstream lineage for a data asset +1. `search_lineage_entities` with the asset name → get entity ID +2. `get_lineage_upstream` → relations with source entity IDs and transformation IDs +3. `get_lineage_entity` for any source entity to get its details +4. `get_lineage_transformation` for any transformation ID to see the logic + +### Perform impact analysis (downstream) +1. `search_lineage_entities` with the asset name → get entity ID +2. `get_lineage_downstream` → relations with consumer entity IDs +3. Follow up with `get_lineage_entity` for specific consumers as needed + ### Manage a data contract 1. `data_contract_list` to find the contract UUID 2. `data_contract_manifest_pull` to download, edit, then `data_contract_manifest_push` to update @@ -95,5 +122,5 @@ These tools walk the Collibra asset relation graph to answer lineage and semanti - **UUIDs are required for most tools.** When you only have a name, start with `search_asset_keyword` or the natural language discovery tools to get the UUID first. - **`data_assets_discover` vs `search_asset_keyword`**: Prefer `data_assets_discover` for open-ended semantic questions; prefer `search_asset_keyword` when you know the exact name or need to filter by type/community/domain. - **Permissions**: `data_assets_discover` and `business_glossary_discover` require the `dgc.ai-copilot` permission. Classification tools require `dgc.classify` + `dgc.catalog`. If a tool fails with a permission error, let the user know which permission is needed. -- **Pagination**: `search_asset_keyword`, `asset_types_list`, `data_class_search`, and `data_classification_match_search` use `limit`/`offset`. `data_contract_list` and `asset_details_get` (for relations) use cursor-based pagination — carry the cursor from the previous response. +- **Pagination**: `search_asset_keyword`, `asset_types_list`, `data_class_search`, and `data_classification_match_search` use `limit`/`offset`. `data_contract_list` and `asset_details_get` (for relations) use cursor-based pagination — carry the cursor from the previous response. Lineage tools (`search_lineage_entities`, `get_lineage_upstream`, `get_lineage_downstream`, `search_lineage_transformations`) also use cursor-based pagination. - **Error handling**: Validation errors are returned in the output `error` field (not as Go errors), so always check `error` and `success`/`found` fields in the response before using the data. diff --git a/pkg/clients/lineage_client.go b/pkg/clients/lineage_client.go new file mode 100644 index 0000000..c7f7d9c --- /dev/null +++ b/pkg/clients/lineage_client.go @@ -0,0 +1,340 @@ +package clients + +import ( + "context" + "encoding/json" + "fmt" + "net/http" +) + +type LineageEntity struct { + Id string `json:"id"` + Name string `json:"name"` + Type string `json:"type"` + SourceIds []string `json:"sourceIds,omitempty"` + DgcId string `json:"dgcId,omitempty"` + ParentId string `json:"parentId,omitempty"` +} + +// UnmarshalJSON handles both plain string values and JsonNullable-wrapped objects +// for the DgcId and ParentId fields. The server may serialize JsonNullable as +// {"present": false, "undefined": true} when JsonNullableModule is not on the classpath. +func (e *LineageEntity) UnmarshalJSON(data []byte) error { + type lineageEntityAlias LineageEntity + var raw struct { + lineageEntityAlias + DgcId json.RawMessage `json:"dgcId"` + ParentId json.RawMessage `json:"parentId"` + } + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + *e = LineageEntity(raw.lineageEntityAlias) + e.DgcId = extractJsonNullableString(raw.DgcId) + e.ParentId = extractJsonNullableString(raw.ParentId) + return nil +} + +// extractJsonNullableString extracts a string from either a plain JSON string +// or a JsonNullable object. Returns empty string for null, undefined, or objects +// where the value is not recoverable. +func extractJsonNullableString(data json.RawMessage) string { + if len(data) == 0 || string(data) == "null" { + return "" + } + var s string + if err := json.Unmarshal(data, &s); err == nil { + return s + } + // JsonNullable object format — actual value is not serialized without the module + return "" +} + +type LineageRelation struct { + SourceEntityId string `json:"sourceEntityId"` + TargetEntityId string `json:"targetEntityId"` + TransformationIds []string `json:"transformationIds"` +} + +type LineagePagination struct { + NextCursor string `json:"nextCursor,omitempty"` +} + +type LineageResponseWarning struct { + Code string `json:"code"` + Message string `json:"message"` +} + +type LineageTransformation struct { + Id string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + TransformationLogic string `json:"transformationLogic,omitempty"` +} + +type TransformationSummary struct { + Id string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` +} + +// --- API response types --- + +type lineageUpstreamDownstreamResponse struct { + Relations []LineageRelation `json:"relations"` + NextCursor string `json:"nextCursor,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` +} + +type lineageEntitiesResponse struct { + Results []LineageEntity `json:"results"` + NextCursor string `json:"nextCursor,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` +} + +type lineageTransformationsResponse struct { + Results []TransformationSummary `json:"results"` + NextCursor string `json:"nextCursor,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` +} + +// --- Output types --- + +type GetLineageEntityOutput struct { + Entity *LineageEntity `json:"entity,omitempty"` + Error string `json:"error,omitempty"` + Found bool `json:"found"` +} + +type GetLineageDirectionalOutput struct { + EntityId string `json:"entityId"` + Direction LineageDirection `json:"direction"` + Relations []LineageRelation `json:"relations"` + Pagination *LineagePagination `json:"pagination,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` + Error string `json:"error,omitempty"` +} + +type SearchLineageEntitiesOutput struct { + Results []LineageEntity `json:"results"` + Pagination *LineagePagination `json:"pagination,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` +} + +type GetLineageTransformationOutput struct { + Transformation *LineageTransformation `json:"transformation,omitempty"` + Error string `json:"error,omitempty"` + Found bool `json:"found"` +} + +type SearchLineageTransformationsOutput struct { + Results []TransformationSummary `json:"results"` + Pagination *LineagePagination `json:"pagination,omitempty"` + Warnings []LineageResponseWarning `json:"warnings,omitempty"` +} + +type LineageDirection string + +const ( + LineageDirectionUpstream LineageDirection = "upstream" + LineageDirectionDownstream LineageDirection = "downstream" + + // lineageDGCProxyPath is the path prefix targeting the lineage proxy on DGC + lineageDGCProxyPath = "/technical_lineage_resource" + + // lineageReadAPIPath is the API prefix for the lineage read API (LineageRead.yaml). + lineageReadAPIPath = "/rest/lineageGraphRead/v1" + + lineageAPIBasePath = lineageDGCProxyPath + lineageReadAPIPath +) + +// --- Query param structs --- + +type lineageDirectionalParams struct { + EntityType string `url:"entityType,omitempty"` + Limit int `url:"limit,omitempty"` + Cursor string `url:"cursor,omitempty"` +} + +type lineageSearchEntitiesParams struct { + NameContains string `url:"nameContains,omitempty"` + Type string `url:"type,omitempty"` + DgcId string `url:"dgcId,omitempty"` + Limit int `url:"limit,omitempty"` + Cursor string `url:"cursor,omitempty"` +} + +type lineageSearchTransformationsParams struct { + NameContains string `url:"nameContains,omitempty"` + Limit int `url:"limit,omitempty"` + Cursor string `url:"cursor,omitempty"` +} + +// --- Client functions --- + +func GetLineageEntity(ctx context.Context, collibraHttpClient *http.Client, entityId string) (*GetLineageEntityOutput, error) { + endpoint := fmt.Sprintf("%s/entities/%s", lineageAPIBasePath, entityId) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + body, err := executeRequest(collibraHttpClient, req) + if err != nil { + return &GetLineageEntityOutput{Found: false, Error: err.Error()}, nil + } + + var entity LineageEntity + if err := json.Unmarshal(body, &entity); err != nil { + return nil, fmt.Errorf("failed to parse entity response: %w", err) + } + + return &GetLineageEntityOutput{Entity: &entity, Found: true}, nil +} + +func GetLineageUpstream(ctx context.Context, collibraHttpClient *http.Client, entityId string, entityType string, limit int, cursor string) (*GetLineageDirectionalOutput, error) { + return getLineageDirectional(ctx, collibraHttpClient, entityId, LineageDirectionUpstream, entityType, limit, cursor) +} + +func GetLineageDownstream(ctx context.Context, collibraHttpClient *http.Client, entityId string, entityType string, limit int, cursor string) (*GetLineageDirectionalOutput, error) { + return getLineageDirectional(ctx, collibraHttpClient, entityId, LineageDirectionDownstream, entityType, limit, cursor) +} + +func getLineageDirectional(ctx context.Context, collibraHttpClient *http.Client, entityId string, direction LineageDirection, entityType string, limit int, cursor string) (*GetLineageDirectionalOutput, error) { + basePath := fmt.Sprintf("%s/entities/%s/%s", lineageAPIBasePath, entityId, direction) + + params := lineageDirectionalParams{ + EntityType: entityType, + Limit: limit, + Cursor: cursor, + } + + endpoint, err := buildUrl(basePath, params) + if err != nil { + return nil, fmt.Errorf("failed to build endpoint: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + body, err := executeRequest(collibraHttpClient, req) + if err != nil { + return &GetLineageDirectionalOutput{EntityId: entityId, Direction: direction, Relations: []LineageRelation{}, Error: err.Error()}, nil + } + + var resp lineageUpstreamDownstreamResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("failed to parse %s response: %w", direction, err) + } + + out := &GetLineageDirectionalOutput{ + EntityId: entityId, + Direction: direction, + Relations: resp.Relations, + Warnings: resp.Warnings, + } + if resp.NextCursor != "" { + out.Pagination = &LineagePagination{NextCursor: resp.NextCursor} + } + return out, nil +} + +func SearchLineageEntities(ctx context.Context, collibraHttpClient *http.Client, nameContains string, entityType string, dgcId string, limit int, cursor string) (*SearchLineageEntitiesOutput, error) { + params := lineageSearchEntitiesParams{ + NameContains: nameContains, + Type: entityType, + DgcId: dgcId, + Limit: limit, + Cursor: cursor, + } + + endpoint, err := buildUrl(lineageAPIBasePath+"/entities", params) + if err != nil { + return nil, fmt.Errorf("failed to build endpoint: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + body, err := executeRequest(collibraHttpClient, req) + if err != nil { + return nil, err + } + + var resp lineageEntitiesResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("failed to parse entities response: %w", err) + } + + out := &SearchLineageEntitiesOutput{ + Results: resp.Results, + Warnings: resp.Warnings, + } + if resp.NextCursor != "" { + out.Pagination = &LineagePagination{NextCursor: resp.NextCursor} + } + return out, nil +} + +func GetLineageTransformation(ctx context.Context, collibraHttpClient *http.Client, transformationId string) (*GetLineageTransformationOutput, error) { + endpoint := fmt.Sprintf("%s/transformations/%s", lineageAPIBasePath, transformationId) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + body, err := executeRequest(collibraHttpClient, req) + if err != nil { + return &GetLineageTransformationOutput{Found: false, Error: err.Error()}, nil + } + + var t LineageTransformation + if err := json.Unmarshal(body, &t); err != nil { + return nil, fmt.Errorf("failed to parse transformation response: %w", err) + } + + return &GetLineageTransformationOutput{Transformation: &t, Found: true}, nil +} + +func SearchLineageTransformations(ctx context.Context, collibraHttpClient *http.Client, nameContains string, limit int, cursor string) (*SearchLineageTransformationsOutput, error) { + params := lineageSearchTransformationsParams{ + NameContains: nameContains, + Limit: limit, + Cursor: cursor, + } + + endpoint, err := buildUrl(lineageAPIBasePath+"/transformations", params) + if err != nil { + return nil, fmt.Errorf("failed to build endpoint: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + body, err := executeRequest(collibraHttpClient, req) + if err != nil { + return nil, err + } + + var resp lineageTransformationsResponse + if err := json.Unmarshal(body, &resp); err != nil { + return nil, fmt.Errorf("failed to parse transformations response: %w", err) + } + + out := &SearchLineageTransformationsOutput{ + Results: resp.Results, + Warnings: resp.Warnings, + } + if resp.NextCursor != "" { + out.Pagination = &LineagePagination{NextCursor: resp.NextCursor} + } + return out, nil +} diff --git a/pkg/clients/lineage_client_test.go b/pkg/clients/lineage_client_test.go new file mode 100644 index 0000000..5f8c20b --- /dev/null +++ b/pkg/clients/lineage_client_test.go @@ -0,0 +1,352 @@ +package clients + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "net/url" + "path" + "testing" +) + +// redirectClient rewrites requests to hit the test server instead of relative paths. +type redirectClient struct { + baseURL string + next http.RoundTripper +} + +func (c *redirectClient) RoundTrip(req *http.Request) (*http.Response, error) { + clone := req.Clone(req.Context()) + base, _ := url.Parse(c.baseURL) + clone.URL.Scheme = base.Scheme + clone.URL.Host = base.Host + clone.URL.Path = path.Join(base.Path, req.URL.Path) + clone.URL.RawQuery = req.URL.RawQuery + return c.next.RoundTrip(clone) +} + +func newTestClient(server *httptest.Server) *http.Client { + return &http.Client{Transport: &redirectClient{baseURL: server.URL, next: http.DefaultTransport}} +} + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +// --- LineageEntity.UnmarshalJSON --- + +func TestLineageEntityUnmarshalJSON_plainStrings(t *testing.T) { + data := []byte(`{"id":"1","name":"col","type":"column","dgcId":"550e8400-e29b-41d4-a716-446655440000","parentId":"42"}`) + var e LineageEntity + if err := json.Unmarshal(data, &e); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if e.DgcId != "550e8400-e29b-41d4-a716-446655440000" { + t.Errorf("expected dgcId string, got %q", e.DgcId) + } + if e.ParentId != "42" { + t.Errorf("expected parentId string, got %q", e.ParentId) + } +} + +func TestLineageEntityUnmarshalJSON_jsonNullableObjects(t *testing.T) { + // Simulates response from server without JsonNullableModule registered. + data := []byte(`{"id":"32","name":"SALESFACT","type":"table","sourceIds":[],"dgcId":{"undefined":true,"present":false},"parentId":{"undefined":false,"present":true}}`) + var e LineageEntity + if err := json.Unmarshal(data, &e); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if e.Id != "32" { + t.Errorf("expected id 32, got %q", e.Id) + } + if e.DgcId != "" { + t.Errorf("expected empty dgcId, got %q", e.DgcId) + } + if e.ParentId != "" { + t.Errorf("expected empty parentId, got %q", e.ParentId) + } +} + +func TestLineageEntityUnmarshalJSON_nullFields(t *testing.T) { + data := []byte(`{"id":"5","name":"t","type":"table","dgcId":null,"parentId":null}`) + var e LineageEntity + if err := json.Unmarshal(data, &e); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if e.DgcId != "" { + t.Errorf("expected empty dgcId, got %q", e.DgcId) + } + if e.ParentId != "" { + t.Errorf("expected empty parentId, got %q", e.ParentId) + } +} + +func TestLineageEntityUnmarshalJSON_missingFields(t *testing.T) { + data := []byte(`{"id":"7","name":"t","type":"table"}`) + var e LineageEntity + if err := json.Unmarshal(data, &e); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if e.DgcId != "" || e.ParentId != "" { + t.Errorf("expected empty optional fields, got dgcId=%q parentId=%q", e.DgcId, e.ParentId) + } +} + +// --- GetLineageEntity --- + +func TestGetLineageEntity_RoutesCorrectly(t *testing.T) { + var capturedPath string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + writeJSON(w, http.StatusOK, map[string]any{"id": "entity-1", "name": "col1", "type": "Column"}) + })) + defer server.Close() + + _, err := GetLineageEntity(context.Background(), newTestClient(server), "entity-1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } +} + +// --- GetLineageUpstream --- + +func TestGetLineageUpstream_RoutesCorrectly(t *testing.T) { + var capturedPath string + var capturedQuery url.Values + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + capturedQuery = r.URL.Query() + writeJSON(w, http.StatusOK, map[string]any{"relations": []any{}}) + })) + defer server.Close() + + _, err := GetLineageUpstream(context.Background(), newTestClient(server), "entity-1", "Column", 10, "cursor-abc") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1/upstream" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } + if capturedQuery.Get("entityType") != "Column" { + t.Errorf("expected entityType=Column, got %q", capturedQuery.Get("entityType")) + } + if capturedQuery.Get("limit") != "10" { + t.Errorf("expected limit=10, got %q", capturedQuery.Get("limit")) + } + if capturedQuery.Get("cursor") != "cursor-abc" { + t.Errorf("expected cursor=cursor-abc, got %q", capturedQuery.Get("cursor")) + } +} + +// --- GetLineageDownstream --- + +func TestGetLineageDownstream_RoutesCorrectly(t *testing.T) { + var capturedPath string + var capturedQuery url.Values + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + capturedQuery = r.URL.Query() + writeJSON(w, http.StatusOK, map[string]any{"relations": []any{}}) + })) + defer server.Close() + + _, err := GetLineageDownstream(context.Background(), newTestClient(server), "entity-2", "Table", 5, "cursor-xyz") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-2/downstream" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } + if capturedQuery.Get("entityType") != "Table" { + t.Errorf("expected entityType=Table, got %q", capturedQuery.Get("entityType")) + } + if capturedQuery.Get("limit") != "5" { + t.Errorf("expected limit=5, got %q", capturedQuery.Get("limit")) + } + if capturedQuery.Get("cursor") != "cursor-xyz" { + t.Errorf("expected cursor=cursor-xyz, got %q", capturedQuery.Get("cursor")) + } +} + +func TestGetLineageDownstream_ErrorReturnsEmptyRelations(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "not found", http.StatusNotFound) + })) + defer server.Close() + + out, err := GetLineageDownstream(context.Background(), newTestClient(server), "entity-x", "", 0, "") + if err != nil { + t.Fatalf("unexpected hard error: %v", err) + } + if out.Error == "" { + t.Errorf("expected error message in output") + } + if out.Relations == nil { + t.Errorf("expected non-nil Relations slice on error, got nil") + } +} + +func TestGetLineageUpstream_ErrorReturnsEmptyRelations(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "not found", http.StatusNotFound) + })) + defer server.Close() + + out, err := GetLineageUpstream(context.Background(), newTestClient(server), "entity-x", "", 0, "") + if err != nil { + t.Fatalf("unexpected hard error: %v", err) + } + if out.Error == "" { + t.Errorf("expected error message in output") + } + if out.Relations == nil { + t.Errorf("expected non-nil Relations slice on error, got nil") + } +} + +func TestGetLineageDirectional_NoCursorInResponse(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "relations": []map[string]any{ + {"sourceEntityId": "a", "targetEntityId": "b", "transformationIds": []string{"t1"}}, + }, + }) + })) + defer server.Close() + + out, err := GetLineageDownstream(context.Background(), newTestClient(server), "a", "", 0, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if out.Pagination != nil { + t.Errorf("expected nil Pagination when server omits nextCursor, got %+v", out.Pagination) + } +} + +// --- SearchLineageEntities --- + +func TestSearchLineageEntities_RoutesCorrectly(t *testing.T) { + var capturedPath string + var capturedQuery url.Values + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + capturedQuery = r.URL.Query() + writeJSON(w, http.StatusOK, map[string]any{"results": []any{}}) + })) + defer server.Close() + + _, err := SearchLineageEntities(context.Background(), newTestClient(server), "orders", "Table", "dgc-id-1", 5, "cur-1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/entities" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } + if capturedQuery.Get("nameContains") != "orders" { + t.Errorf("expected nameContains=orders, got %q", capturedQuery.Get("nameContains")) + } + if capturedQuery.Get("type") != "Table" { + t.Errorf("expected type=Table, got %q", capturedQuery.Get("type")) + } + if capturedQuery.Get("dgcId") != "dgc-id-1" { + t.Errorf("expected dgcId=dgc-id-1, got %q", capturedQuery.Get("dgcId")) + } + if capturedQuery.Get("limit") != "5" { + t.Errorf("expected limit=5, got %q", capturedQuery.Get("limit")) + } + if capturedQuery.Get("cursor") != "cur-1" { + t.Errorf("expected cursor=cur-1, got %q", capturedQuery.Get("cursor")) + } +} + +func TestSearchLineageEntities_JsonNullableObjects(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate server without JsonNullableModule + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"results":[{"id":"32","name":"SALESFACT","type":"table","sourceIds":[],"dgcId":{"undefined":true,"present":false},"parentId":{"undefined":false,"present":true}}],"nextCursor":null}`)) + })) + defer server.Close() + + out, err := SearchLineageEntities(context.Background(), newTestClient(server), "SALES", "", "", 5, "") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(out.Results) != 1 { + t.Fatalf("expected 1 result, got %d", len(out.Results)) + } + e := out.Results[0] + if e.Id != "32" { + t.Errorf("expected id 32, got %q", e.Id) + } + if e.DgcId != "" { + t.Errorf("expected empty dgcId, got %q", e.DgcId) + } +} + +// --- GetLineageTransformation --- + +func TestGetLineageTransformation_RoutesCorrectly(t *testing.T) { + var capturedPath string + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + writeJSON(w, http.StatusOK, map[string]any{"id": "transform-1", "name": "t1"}) + })) + defer server.Close() + + _, err := GetLineageTransformation(context.Background(), newTestClient(server), "transform-1") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/transformations/transform-1" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } +} + +// --- SearchLineageTransformations --- + +func TestSearchLineageTransformations_RoutesCorrectly(t *testing.T) { + var capturedPath string + var capturedQuery url.Values + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + capturedPath = r.URL.Path + capturedQuery = r.URL.Query() + writeJSON(w, http.StatusOK, map[string]any{"results": []any{}}) + })) + defer server.Close() + + _, err := SearchLineageTransformations(context.Background(), newTestClient(server), "etl", 20, "next-cursor") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "/technical_lineage_resource/rest/lineageGraphRead/v1/transformations" + if capturedPath != expected { + t.Errorf("expected path %q, got %q", expected, capturedPath) + } + if capturedQuery.Get("nameContains") != "etl" { + t.Errorf("expected nameContains=etl, got %q", capturedQuery.Get("nameContains")) + } + if capturedQuery.Get("limit") != "20" { + t.Errorf("expected limit=20, got %q", capturedQuery.Get("limit")) + } + if capturedQuery.Get("cursor") != "next-cursor" { + t.Errorf("expected cursor=next-cursor, got %q", capturedQuery.Get("cursor")) + } +} diff --git a/pkg/tools/get_lineage_downstream.go b/pkg/tools/get_lineage_downstream.go new file mode 100644 index 0000000..7d2b492 --- /dev/null +++ b/pkg/tools/get_lineage_downstream.go @@ -0,0 +1,31 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type GetLineageDownstreamInput struct { + EntityId string `json:"entityId" jsonschema:"Required. ID of the entity to trace downstream from. Can be numeric string or DGC UUID."` + EntityType string `json:"entityType,omitempty" jsonschema:"Optional. Filter to only include entities of this type (e.g. 'table', 'report'). Useful when you only care about specific downstream asset types."` + Limit int `json:"limit,omitempty" jsonschema:"Optional. Max relations per page. Default: 20, Min: 1, Max: 100."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional. Pagination cursor from a previous response. Do not construct manually."` +} + +func NewGetLineageDownstreamTool(collibraClient *http.Client) *chip.Tool[GetLineageDownstreamInput, clients.GetLineageDirectionalOutput] { + return &chip.Tool[GetLineageDownstreamInput, clients.GetLineageDirectionalOutput]{ + Name: "get_lineage_downstream", + Description: "Get the downstream technical lineage graph for a data entity -- all direct and indirect consumer entities that are impacted by it, along with the transformations connecting them. This traces through all data objects across external systems (including unregistered assets, temporary tables, and source code), not just assets in the Collibra Data Catalog. Use this to answer \"What depends on this data?\" or \"If this table changes, what else is affected?\" Essential for impact analysis before modifying or deprecating a data asset. Results are paginated.", + Handler: handleGetLineageDownstream(collibraClient), + Permissions: []string{}, + } +} + +func handleGetLineageDownstream(collibraClient *http.Client) chip.ToolHandlerFunc[GetLineageDownstreamInput, clients.GetLineageDirectionalOutput] { + return func(ctx context.Context, input GetLineageDownstreamInput) (clients.GetLineageDirectionalOutput, error) { + return handleLineageDirectional(ctx, collibraClient, input.EntityId, input.EntityType, input.Limit, input.Cursor, clients.GetLineageDownstream) + } +} diff --git a/pkg/tools/get_lineage_downstream_test.go b/pkg/tools/get_lineage_downstream_test.go new file mode 100644 index 0000000..85ecf6a --- /dev/null +++ b/pkg/tools/get_lineage_downstream_test.go @@ -0,0 +1,103 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/clients" + "github.com/collibra/chip/pkg/tools" +) + +func TestGetLineageDownstream(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1/downstream", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "relations": []map[string]any{ + { + "sourceEntityId": "entity-1", + "targetEntityId": "target-1", + "transformationIds": []string{"transform-2"}, + }, + }, + "nextCursor": "cursor-xyz", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageDownstreamTool(client).Handler(t.Context(), tools.GetLineageDownstreamInput{ + EntityId: "entity-1", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error != "" { + t.Fatalf("Expected no error in output, got: %s", output.Error) + } + + if output.EntityId != "entity-1" { + t.Fatalf("Expected entityId 'entity-1', got: '%s'", output.EntityId) + } + + if output.Direction != clients.LineageDirectionDownstream { + t.Fatalf("Expected direction 'downstream', got: '%s'", output.Direction) + } + + if len(output.Relations) != 1 { + t.Fatalf("Expected 1 relation, got: %d", len(output.Relations)) + } + + relation := output.Relations[0] + if relation.TargetEntityId != "target-1" { + t.Fatalf("Expected targetEntityId 'target-1', got: '%s'", relation.TargetEntityId) + } + + if output.Pagination == nil || output.Pagination.NextCursor != "cursor-xyz" { + t.Fatalf("Expected nextCursor 'cursor-xyz'") + } +} + +func TestGetLineageDownstreamNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-unknown/downstream", JsonHandlerOut(func(r *http.Request) (int, string) { + return http.StatusNotFound, "entity not found" + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageDownstreamTool(client).Handler(t.Context(), tools.GetLineageDownstreamInput{ + EntityId: "entity-unknown", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } + + if output.Relations == nil { + t.Fatalf("Expected Relations to be a non-nil slice, got nil") + } +} + +func TestGetLineageDownstreamMissingId(t *testing.T) { + server := httptest.NewServer(http.NewServeMux()) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageDownstreamTool(client).Handler(t.Context(), tools.GetLineageDownstreamInput{}) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} diff --git a/pkg/tools/get_lineage_entity.go b/pkg/tools/get_lineage_entity.go new file mode 100644 index 0000000..9a08e1d --- /dev/null +++ b/pkg/tools/get_lineage_entity.go @@ -0,0 +1,37 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type GetLineageEntityInput struct { + EntityId string `json:"entityId" jsonschema:"Required. Unique identifier of the data entity. Can be a numeric string (e.g. '12345') or a DGC UUID (e.g. '550e8400-e29b-41d4-a716-446655440000')."` +} + +func NewGetLineageEntityTool(collibraClient *http.Client) *chip.Tool[GetLineageEntityInput, clients.GetLineageEntityOutput] { + return &chip.Tool[GetLineageEntityInput, clients.GetLineageEntityOutput]{ + Name: "get_lineage_entity", + Description: "Get detailed metadata about a specific data entity in the technical lineage graph. Technical lineage covers all data objects across external systems -- including source code, transformations, and temporary tables -- regardless of whether they are registered in Collibra (unlike business lineage, which only covers assets ingested into the Data Catalog). An entity represents any tracked data asset such as a table, column, file, report, API endpoint, or topic. Returns the entity's name, type, source systems, parent entity, and linked Data Governance Catalog (DGC) identifier. Use this when you have an entity ID from a lineage traversal, search result, or user input and need its full details.", + Handler: handleGetLineageEntity(collibraClient), + Permissions: []string{}, + } +} + +func handleGetLineageEntity(collibraClient *http.Client) chip.ToolHandlerFunc[GetLineageEntityInput, clients.GetLineageEntityOutput] { + return func(ctx context.Context, input GetLineageEntityInput) (clients.GetLineageEntityOutput, error) { + if input.EntityId == "" { + return clients.GetLineageEntityOutput{Found: false, Error: "entityId is required"}, nil + } + + result, err := clients.GetLineageEntity(ctx, collibraClient, input.EntityId) + if err != nil { + return clients.GetLineageEntityOutput{}, err + } + + return *result, nil + } +} diff --git a/pkg/tools/get_lineage_entity_test.go b/pkg/tools/get_lineage_entity_test.go new file mode 100644 index 0000000..ca8c84a --- /dev/null +++ b/pkg/tools/get_lineage_entity_test.go @@ -0,0 +1,93 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/clients" + "github.com/collibra/chip/pkg/tools" +) + +func TestGetLineageEntity(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1", JsonHandlerOut(func(r *http.Request) (int, clients.LineageEntity) { + return http.StatusOK, clients.LineageEntity{ + Id: "entity-1", + Name: "my_table", + Type: "table", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageEntityTool(client).Handler(t.Context(), tools.GetLineageEntityInput{ + EntityId: "entity-1", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !output.Found { + t.Fatalf("Expected entity to be found") + } + + if output.Entity.Id != "entity-1" { + t.Fatalf("Expected entity ID 'entity-1', got: '%s'", output.Entity.Id) + } + + if output.Entity.Name != "my_table" { + t.Fatalf("Expected entity name 'my_table', got: '%s'", output.Entity.Name) + } + + if output.Entity.Type != "table" { + t.Fatalf("Expected entity type 'table', got: '%s'", output.Entity.Type) + } +} + +func TestGetLineageEntityNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-unknown", JsonHandlerOut(func(r *http.Request) (int, string) { + return http.StatusNotFound, "entity not found" + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageEntityTool(client).Handler(t.Context(), tools.GetLineageEntityInput{ + EntityId: "entity-unknown", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Found { + t.Fatalf("Expected entity not to be found") + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} + +func TestGetLineageEntityMissingId(t *testing.T) { + server := httptest.NewServer(http.NewServeMux()) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageEntityTool(client).Handler(t.Context(), tools.GetLineageEntityInput{}) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Found { + t.Fatalf("Expected entity not to be found") + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} diff --git a/pkg/tools/get_lineage_transformation.go b/pkg/tools/get_lineage_transformation.go new file mode 100644 index 0000000..07ea994 --- /dev/null +++ b/pkg/tools/get_lineage_transformation.go @@ -0,0 +1,37 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type GetLineageTransformationInput struct { + TransformationId string `json:"transformationId" jsonschema:"Required. ID of the transformation to be fetched (e.g. '67890')."` +} + +func NewGetLineageTransformationTool(collibraClient *http.Client) *chip.Tool[GetLineageTransformationInput, clients.GetLineageTransformationOutput] { + return &chip.Tool[GetLineageTransformationInput, clients.GetLineageTransformationOutput]{ + Name: "get_lineage_transformation", + Description: "Get detailed information about a specific data transformation, including its SQL or script logic. A transformation represents a data processing activity (ETL job, SQL query, script, etc.) that connects source entities to target entities in the lineage graph. Use this when you found a transformation ID in an upstream/downstream lineage result and want to see what the transformation actually does -- the SQL query, script content, or processing logic.", + Handler: handleGetLineageTransformation(collibraClient), + Permissions: []string{}, + } +} + +func handleGetLineageTransformation(collibraClient *http.Client) chip.ToolHandlerFunc[GetLineageTransformationInput, clients.GetLineageTransformationOutput] { + return func(ctx context.Context, input GetLineageTransformationInput) (clients.GetLineageTransformationOutput, error) { + if input.TransformationId == "" { + return clients.GetLineageTransformationOutput{Found: false, Error: "transformationId is required"}, nil + } + + result, err := clients.GetLineageTransformation(ctx, collibraClient, input.TransformationId) + if err != nil { + return clients.GetLineageTransformationOutput{}, err + } + + return *result, nil + } +} diff --git a/pkg/tools/get_lineage_transformation_test.go b/pkg/tools/get_lineage_transformation_test.go new file mode 100644 index 0000000..ef59766 --- /dev/null +++ b/pkg/tools/get_lineage_transformation_test.go @@ -0,0 +1,93 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/tools" +) + +func TestGetLineageTransformation(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/transformations/transform-1", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "id": "transform-1", + "name": "etl_sales_daily", + "description": "Daily ETL for sales data", + "transformationLogic": "SELECT * FROM raw_sales WHERE date = CURRENT_DATE", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageTransformationTool(client).Handler(t.Context(), tools.GetLineageTransformationInput{ + TransformationId: "transform-1", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if !output.Found { + t.Fatalf("Expected transformation to be found") + } + + if output.Transformation.Id != "transform-1" { + t.Fatalf("Expected transformation ID 'transform-1', got: '%s'", output.Transformation.Id) + } + + if output.Transformation.Name != "etl_sales_daily" { + t.Fatalf("Expected transformation name 'etl_sales_daily', got: '%s'", output.Transformation.Name) + } + + if output.Transformation.TransformationLogic == "" { + t.Fatalf("Expected transformation logic to be present") + } +} + +func TestGetLineageTransformationNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/transformations/transform-unknown", JsonHandlerOut(func(r *http.Request) (int, string) { + return http.StatusNotFound, "transformation not found" + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageTransformationTool(client).Handler(t.Context(), tools.GetLineageTransformationInput{ + TransformationId: "transform-unknown", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Found { + t.Fatalf("Expected transformation not to be found") + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} + +func TestGetLineageTransformationMissingId(t *testing.T) { + server := httptest.NewServer(http.NewServeMux()) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageTransformationTool(client).Handler(t.Context(), tools.GetLineageTransformationInput{}) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Found { + t.Fatalf("Expected transformation not to be found") + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} diff --git a/pkg/tools/get_lineage_upstream.go b/pkg/tools/get_lineage_upstream.go new file mode 100644 index 0000000..6fa5ca0 --- /dev/null +++ b/pkg/tools/get_lineage_upstream.go @@ -0,0 +1,50 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type GetLineageUpstreamInput struct { + EntityId string `json:"entityId" jsonschema:"Required. ID of the entity to trace upstream from. Can be numeric string or DGC UUID."` + EntityType string `json:"entityType,omitempty" jsonschema:"Optional. Filter to only include entities of this type (e.g. 'table', 'column'). Useful when you only care about specific upstream asset types."` + Limit int `json:"limit,omitempty" jsonschema:"Optional. Max relations per page. Default: 20, Min: 1, Max: 100."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional. Pagination cursor from a previous response. Do not construct manually."` +} + +func NewGetLineageUpstreamTool(collibraClient *http.Client) *chip.Tool[GetLineageUpstreamInput, clients.GetLineageDirectionalOutput] { + return &chip.Tool[GetLineageUpstreamInput, clients.GetLineageDirectionalOutput]{ + Name: "get_lineage_upstream", + Description: "Get the upstream technical lineage graph for a data entity -- all direct and indirect source entities that feed data into it, along with the transformations connecting them. This traces through all data objects across external systems (including unregistered assets, temporary tables, and source code), not just assets in the Collibra Data Catalog. Use this to answer \"Where does this data come from?\" or \"What are the sources feeding this table?\" Each relation in the result connects a source entity to a target entity through one or more transformations. Results are paginated.", + Handler: handleGetLineageUpstream(collibraClient), + Permissions: []string{}, + } +} + +func handleGetLineageUpstream(collibraClient *http.Client) chip.ToolHandlerFunc[GetLineageUpstreamInput, clients.GetLineageDirectionalOutput] { + return func(ctx context.Context, input GetLineageUpstreamInput) (clients.GetLineageDirectionalOutput, error) { + return handleLineageDirectional(ctx, collibraClient, input.EntityId, input.EntityType, input.Limit, input.Cursor, clients.GetLineageUpstream) + } +} + +// handleLineageDirectional is a shared helper for the upstream and downstream tool handlers. +func handleLineageDirectional( + ctx context.Context, + collibraClient *http.Client, + entityId, entityType string, + limit int, + cursor string, + fetch func(context.Context, *http.Client, string, string, int, string) (*clients.GetLineageDirectionalOutput, error), +) (clients.GetLineageDirectionalOutput, error) { + if entityId == "" { + return clients.GetLineageDirectionalOutput{Error: "entityId is required"}, nil + } + result, err := fetch(ctx, collibraClient, entityId, entityType, limit, cursor) + if err != nil { + return clients.GetLineageDirectionalOutput{}, err + } + return *result, nil +} diff --git a/pkg/tools/get_lineage_upstream_test.go b/pkg/tools/get_lineage_upstream_test.go new file mode 100644 index 0000000..43b97b4 --- /dev/null +++ b/pkg/tools/get_lineage_upstream_test.go @@ -0,0 +1,103 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/clients" + "github.com/collibra/chip/pkg/tools" +) + +func TestGetLineageUpstream(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-1/upstream", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "relations": []map[string]any{ + { + "sourceEntityId": "source-1", + "targetEntityId": "entity-1", + "transformationIds": []string{"transform-1"}, + }, + }, + "nextCursor": "cursor-abc", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageUpstreamTool(client).Handler(t.Context(), tools.GetLineageUpstreamInput{ + EntityId: "entity-1", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error != "" { + t.Fatalf("Expected no error in output, got: %s", output.Error) + } + + if output.EntityId != "entity-1" { + t.Fatalf("Expected entityId 'entity-1', got: '%s'", output.EntityId) + } + + if output.Direction != clients.LineageDirectionUpstream { + t.Fatalf("Expected direction 'upstream', got: '%s'", output.Direction) + } + + if len(output.Relations) != 1 { + t.Fatalf("Expected 1 relation, got: %d", len(output.Relations)) + } + + relation := output.Relations[0] + if relation.SourceEntityId != "source-1" { + t.Fatalf("Expected sourceEntityId 'source-1', got: '%s'", relation.SourceEntityId) + } + + if output.Pagination == nil || output.Pagination.NextCursor != "cursor-abc" { + t.Fatalf("Expected nextCursor 'cursor-abc'") + } +} + +func TestGetLineageUpstreamNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities/entity-unknown/upstream", JsonHandlerOut(func(r *http.Request) (int, string) { + return http.StatusNotFound, "entity not found" + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageUpstreamTool(client).Handler(t.Context(), tools.GetLineageUpstreamInput{ + EntityId: "entity-unknown", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } + + if output.Relations == nil { + t.Fatalf("Expected Relations to be a non-nil slice, got nil") + } +} + +func TestGetLineageUpstreamMissingId(t *testing.T) { + server := httptest.NewServer(http.NewServeMux()) + defer server.Close() + + client := newClient(server) + output, err := tools.NewGetLineageUpstreamTool(client).Handler(t.Context(), tools.GetLineageUpstreamInput{}) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if output.Error == "" { + t.Fatalf("Expected an error message") + } +} diff --git a/pkg/tools/search_lineage_entities.go b/pkg/tools/search_lineage_entities.go new file mode 100644 index 0000000..11b5db6 --- /dev/null +++ b/pkg/tools/search_lineage_entities.go @@ -0,0 +1,37 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type SearchLineageEntitiesInput struct { + NameContains string `json:"nameContains,omitempty" jsonschema:"Optional. Partial match on entity name (case insensitive). Min: 1, Max: 256 chars. Example: 'sales'"` + Type string `json:"type,omitempty" jsonschema:"Optional. Exact match on entity type. Common types: table, column, file, report, apiEndpoint, topic. Example: 'table'"` + DgcId string `json:"dgcId,omitempty" jsonschema:"Optional. Filter by Data Governance Catalog UUID. Use to find the lineage entity linked to a specific Collibra catalog asset."` + Limit int `json:"limit,omitempty" jsonschema:"Optional. Max results per page. Default: 20, Min: 1, Max: 100."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional. Pagination cursor from a previous response. Do not construct manually."` +} + +func NewSearchLineageEntitiesTool(collibraClient *http.Client) *chip.Tool[SearchLineageEntitiesInput, clients.SearchLineageEntitiesOutput] { + return &chip.Tool[SearchLineageEntitiesInput, clients.SearchLineageEntitiesOutput]{ + Name: "search_lineage_entities", + Description: "Search for data entities in the technical lineage graph by name, type, or DGC identifier. Technical lineage covers all data objects across external systems -- including source code, transformations, and temporary tables -- regardless of whether they are registered in Collibra (unlike business lineage, which only covers assets ingested into the Data Catalog). Returns a paginated list of matching entities. This is typically the starting tool when you don't have a specific entity ID -- for example, to find all tables with \"sales\" in the name, or to find the lineage entity linked to a specific Collibra catalog asset via its DGC UUID. Supports partial name matching (case insensitive).", + Handler: handleSearchLineageEntities(collibraClient), + Permissions: []string{}, + } +} + +func handleSearchLineageEntities(collibraClient *http.Client) chip.ToolHandlerFunc[SearchLineageEntitiesInput, clients.SearchLineageEntitiesOutput] { + return func(ctx context.Context, input SearchLineageEntitiesInput) (clients.SearchLineageEntitiesOutput, error) { + result, err := clients.SearchLineageEntities(ctx, collibraClient, input.NameContains, input.Type, input.DgcId, input.Limit, input.Cursor) + if err != nil { + return clients.SearchLineageEntitiesOutput{}, err + } + + return *result, nil + } +} diff --git a/pkg/tools/search_lineage_entities_test.go b/pkg/tools/search_lineage_entities_test.go new file mode 100644 index 0000000..f388812 --- /dev/null +++ b/pkg/tools/search_lineage_entities_test.go @@ -0,0 +1,81 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/tools" +) + +func TestSearchLineageEntities(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "results": []map[string]any{ + { + "id": "entity-1", + "name": "sales_table", + "type": "table", + }, + }, + "nextCursor": "cursor-abc", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewSearchLineageEntitiesTool(client).Handler(t.Context(), tools.SearchLineageEntitiesInput{ + NameContains: "sales", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if len(output.Results) != 1 { + t.Fatalf("Expected 1 result, got: %d", len(output.Results)) + } + + entity := output.Results[0] + if entity.Id != "entity-1" { + t.Fatalf("Expected entity ID 'entity-1', got: '%s'", entity.Id) + } + + if entity.Name != "sales_table" { + t.Fatalf("Expected entity name 'sales_table', got: '%s'", entity.Name) + } + + if entity.Type != "table" { + t.Fatalf("Expected entity type 'table', got: '%s'", entity.Type) + } + + if output.Pagination == nil || output.Pagination.NextCursor != "cursor-abc" { + t.Fatalf("Expected nextCursor 'cursor-abc'") + } +} + +func TestSearchLineageEntitiesNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/entities", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "results": []map[string]any{}, + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewSearchLineageEntitiesTool(client).Handler(t.Context(), tools.SearchLineageEntitiesInput{ + NameContains: "nonexistent_table", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if len(output.Results) != 0 { + t.Fatalf("Expected 0 results, got: %d", len(output.Results)) + } +} diff --git a/pkg/tools/search_lineage_transformations.go b/pkg/tools/search_lineage_transformations.go new file mode 100644 index 0000000..e77669d --- /dev/null +++ b/pkg/tools/search_lineage_transformations.go @@ -0,0 +1,35 @@ +package tools + +import ( + "context" + "net/http" + + "github.com/collibra/chip/pkg/chip" + "github.com/collibra/chip/pkg/clients" +) + +type SearchLineageTransformationsInput struct { + NameContains string `json:"nameContains,omitempty" jsonschema:"Optional. Partial match on transformation name (case insensitive). Min: 1, Max: 256 chars. Example: 'etl'"` + Limit int `json:"limit,omitempty" jsonschema:"Optional. Max results per page. Default: 20, Min: 1, Max: 100."` + Cursor string `json:"cursor,omitempty" jsonschema:"Optional. Pagination cursor from a previous response. Do not construct manually."` +} + +func NewSearchLineageTransformationsTool(collibraClient *http.Client) *chip.Tool[SearchLineageTransformationsInput, clients.SearchLineageTransformationsOutput] { + return &chip.Tool[SearchLineageTransformationsInput, clients.SearchLineageTransformationsOutput]{ + Name: "search_lineage_transformations", + Description: "Search for transformations in the technical lineage graph by name. Returns a paginated list of matching transformation summaries. Use this to discover ETL jobs, SQL queries, or other processing activities without knowing their IDs. For example, find all transformations with \"etl\" or \"sales\" in the name. To see the full transformation logic (SQL/script), use get_lineage_transformation with the returned ID.", + Handler: handleSearchLineageTransformations(collibraClient), + Permissions: []string{}, + } +} + +func handleSearchLineageTransformations(collibraClient *http.Client) chip.ToolHandlerFunc[SearchLineageTransformationsInput, clients.SearchLineageTransformationsOutput] { + return func(ctx context.Context, input SearchLineageTransformationsInput) (clients.SearchLineageTransformationsOutput, error) { + result, err := clients.SearchLineageTransformations(ctx, collibraClient, input.NameContains, input.Limit, input.Cursor) + if err != nil { + return clients.SearchLineageTransformationsOutput{}, err + } + + return *result, nil + } +} diff --git a/pkg/tools/search_lineage_transformations_test.go b/pkg/tools/search_lineage_transformations_test.go new file mode 100644 index 0000000..d707b12 --- /dev/null +++ b/pkg/tools/search_lineage_transformations_test.go @@ -0,0 +1,77 @@ +package tools_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/collibra/chip/pkg/tools" +) + +func TestSearchLineageTransformations(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/transformations", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "results": []map[string]any{ + { + "id": "transform-1", + "name": "etl_sales_daily", + "description": "Daily ETL for sales data", + }, + }, + "nextCursor": "cursor-abc", + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewSearchLineageTransformationsTool(client).Handler(t.Context(), tools.SearchLineageTransformationsInput{ + NameContains: "etl", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if len(output.Results) != 1 { + t.Fatalf("Expected 1 result, got: %d", len(output.Results)) + } + + transformation := output.Results[0] + if transformation.Id != "transform-1" { + t.Fatalf("Expected transformation ID 'transform-1', got: '%s'", transformation.Id) + } + + if transformation.Name != "etl_sales_daily" { + t.Fatalf("Expected transformation name 'etl_sales_daily', got: '%s'", transformation.Name) + } + + if output.Pagination == nil || output.Pagination.NextCursor != "cursor-abc" { + t.Fatalf("Expected nextCursor 'cursor-abc'") + } +} + +func TestSearchLineageTransformationsNotFound(t *testing.T) { + handler := http.NewServeMux() + handler.Handle("/technical_lineage_resource/rest/lineageGraphRead/v1/transformations", JsonHandlerOut(func(r *http.Request) (int, map[string]any) { + return http.StatusOK, map[string]any{ + "results": []map[string]any{}, + } + })) + + server := httptest.NewServer(handler) + defer server.Close() + + client := newClient(server) + output, err := tools.NewSearchLineageTransformationsTool(client).Handler(t.Context(), tools.SearchLineageTransformationsInput{ + NameContains: "nonexistent_etl", + }) + if err != nil { + t.Fatalf("Expected no error, got: %v", err) + } + + if len(output.Results) != 0 { + t.Fatalf("Expected 0 results, got: %d", len(output.Results)) + } +} diff --git a/pkg/tools/tools_register.go b/pkg/tools/tools_register.go index 178c9dc..d84951c 100644 --- a/pkg/tools/tools_register.go +++ b/pkg/tools/tools_register.go @@ -23,6 +23,12 @@ func RegisterAll(server *chip.Server, client *http.Client, toolConfig *chip.Serv toolRegister(server, toolConfig, NewMeasureDataGetTool(client)) toolRegister(server, toolConfig, NewTableSemanticsGetTool(client)) toolRegister(server, toolConfig, NewBusinessTermDataGetTool(client)) + toolRegister(server, toolConfig, NewGetLineageEntityTool(client)) + toolRegister(server, toolConfig, NewGetLineageUpstreamTool(client)) + toolRegister(server, toolConfig, NewGetLineageDownstreamTool(client)) + toolRegister(server, toolConfig, NewSearchLineageEntitiesTool(client)) + toolRegister(server, toolConfig, NewGetLineageTransformationTool(client)) + toolRegister(server, toolConfig, NewSearchLineageTransformationsTool(client)) } func toolRegister[In, Out any](server *chip.Server, toolConfig *chip.ServerToolConfig, tool *chip.Tool[In, Out]) {