Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/connector_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (c *connectorCommand) saveConnector(pc *fisk.ParseContext) error {
if !exists {
connector, err = appCtx.Client.CreateConnector(c.id, result.Description, result.RuntimeId, convert.ConvertStepsFromSpec(result.Steps), c.opts.Timeout)
if err != nil {
color.Red("Could not save connector: %s", err)
color.Red("Could not save connector: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand All @@ -386,7 +386,7 @@ func (c *connectorCommand) saveConnector(pc *fisk.ParseContext) error {

connector, err = appCtx.Client.PatchConnector(c.id, string(b), c.opts.Timeout)
if err != nil {
color.Red("Could not save connector: %s", err)
color.Red("Could not save connector: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand Down
9 changes: 5 additions & 4 deletions cli/library_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/fatih/color"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/synadia-io/connect/client"
"github.com/synadia-io/connect/model"

"os"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (c *libraryCommand) listRuntimes(pc *fisk.ParseContext) error {

runtimes, err := appCtx.Client.ListRuntimes(c.opts.Timeout)
if err != nil {
color.Red("Could not list runtimes: %s", err)
color.Red("Could not list runtimes: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand All @@ -85,7 +86,7 @@ func (c *libraryCommand) getRuntime(pc *fisk.ParseContext) error {

rt, err := appCtx.Client.GetRuntime(c.runtime, c.opts.Timeout)
if err != nil {
color.Red("Could not get runtime: %s", err)
color.Red("Could not get runtime: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand Down Expand Up @@ -126,7 +127,7 @@ func (c *libraryCommand) search(pc *fisk.ParseContext) error {

components, err := appCtx.Client.SearchComponents(filter, c.opts.Timeout)
if err != nil {
color.Red("Could not list components: %s", err)
color.Red("Could not list components: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand All @@ -146,7 +147,7 @@ func (c *libraryCommand) info(pc *fisk.ParseContext) error {

component, err := appCtx.Client.GetComponent(c.runtime, model.ComponentKind(c.kind), c.component, c.opts.Timeout)
if err != nil {
color.Red("Could not get component: %s", err)
color.Red("Could not get component: %s", client.ErrorMessage(err))
os.Exit(1)
}

Expand Down
16 changes: 8 additions & 8 deletions client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (c *connectorClient) ListConnectors(timeout time.Duration) ([]model.Connect
var resp model.ConnectorListResponse
gotResponse, err := c.t.RequestJson(c.subject("LIST"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to list connectors: %v", err)
return nil, fmt.Errorf("unable to list connectors: %w", err)
}

if !gotResponse {
Expand All @@ -46,7 +46,7 @@ func (c *connectorClient) GetConnector(name string, timeout time.Duration) (*mod
var resp model.ConnectorGetResponse
gotResponse, err := c.t.RequestJson(c.subject("GET"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to get connector: %v", err)
return nil, fmt.Errorf("unable to get connector: %w", err)
}

if !gotResponse {
Expand All @@ -64,7 +64,7 @@ func (c *connectorClient) GetConnectorStatus(name string, timeout time.Duration)
var resp model.ConnectorStatusResponse
gotResponse, err := c.t.RequestJson(c.subject("STATUS"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to get connector status: %v", err)
return nil, fmt.Errorf("unable to get connector status: %w", err)
}

if !gotResponse {
Expand All @@ -85,7 +85,7 @@ func (c *connectorClient) CreateConnector(id, description, runtimeId string, ste
var resp model.ConnectorCreateResponse
gotResponse, err := c.t.RequestJson(c.subject("CREATE"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to create connector: %v", err)
return nil, fmt.Errorf("unable to create connector: %w", err)
}

if !gotResponse {
Expand All @@ -104,7 +104,7 @@ func (c *connectorClient) PatchConnector(id string, patch string, timeout time.D
var resp model.ConnectorPatchResponse
gotResponse, err := c.t.RequestJson(c.subject("PATCH"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to patch connector: %v", err)
return nil, fmt.Errorf("unable to patch connector: %w", err)
}

if !gotResponse {
Expand All @@ -122,7 +122,7 @@ func (c *connectorClient) DeleteConnector(id string, timeout time.Duration) erro
var resp model.ConnectorDeleteResponse
_, err := c.t.RequestJson(c.subject("DELETE"), req, &resp, WithTimeout(timeout))
if err != nil {
return fmt.Errorf("unable to delete connector: %v", err)
return fmt.Errorf("unable to delete connector: %w", err)
}

return nil
Expand All @@ -136,7 +136,7 @@ func (c *connectorClient) ListConnectorInstances(id string, timeout time.Duratio
var resp model.ConnectorInstancesResponse
gotResponse, err := c.t.RequestJson(c.subject("INSTANCES"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to list connector instances: %v", err)
return nil, fmt.Errorf("unable to list connector instances: %w", err)
}

if !gotResponse {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (c *connectorClient) StopConnector(id string, timeout time.Duration) ([]mod
var resp model.ConnectorStopResponse
hasResponded, err := c.t.RequestJson(c.subject("STOP"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to stop connector: %v", err)
return nil, fmt.Errorf("unable to stop connector: %w", err)
}

if !hasResponded {
Expand Down
41 changes: 41 additions & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client

import (
"errors"
"fmt"
)

const GenericErrMsg = "an error occurred"

type ConnectServiceError struct {
Code int // nats service error code
Description string // friendly & safe error description
internalError error
id string
}

// Error implements the builtin/error interface and returns the full internal error
func (ce ConnectServiceError) Error() string {
return ce.internalError.Error()
}

// Body implements ClientError interface (control-plane) and returns a friendly & sanitized error
func (ce ConnectServiceError) Body() string {
return fmt.Sprintf("connect[%d]: %s", ce.Code, ce.Description)
}

// ID implements IdableClientError interface (control-plane)
func (ce ConnectServiceError) ID() string {
return ce.id
}

// ErrorMessage returns the user-friendly message from an error.
// If the error is a ConnectServiceError, returns Body().
// Otherwise returns err.Error().
func ErrorMessage(err error) string {
var svcErr *ConnectServiceError
if errors.As(err, &svcErr) {
return svcErr.Body()
}
return err.Error()
}
8 changes: 4 additions & 4 deletions client/library.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (c *libraryClient) ListRuntimes(timeout time.Duration) ([]model.RuntimeSumm
var resp model.RuntimeListResponse
gotResponse, err := c.t.RequestJson(c.subject(runtimes, "LIST"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to list runtimes: %v", err)
return nil, fmt.Errorf("unable to list runtimes: %w", err)
}

if !gotResponse {
Expand All @@ -44,7 +44,7 @@ func (c *libraryClient) GetRuntime(id string, timeout time.Duration) (*model.Run
var resp model.RuntimeGetResponse
gotResponse, err := c.t.RequestJson(c.subject(runtimes, "GET"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to get runtime: %v", err)
return nil, fmt.Errorf("unable to get runtime: %w", err)
}

if !gotResponse {
Expand All @@ -62,7 +62,7 @@ func (c *libraryClient) SearchComponents(filter *model.ComponentSearchFilter, ti
var resp model.ComponentSearchResponse
gotResponse, err := c.t.RequestJson(c.subject(components, "LIST"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to search components: %v", err)
return nil, fmt.Errorf("unable to search components: %w", err)
}

if !gotResponse {
Expand All @@ -82,7 +82,7 @@ func (c *libraryClient) GetComponent(runtimeId string, kind model.ComponentKind,
var resp model.ComponentGetResponse
gotResponse, err := c.t.RequestJson(c.subject(components, "GET"), req, &resp, WithTimeout(timeout))
if err != nil {
return nil, fmt.Errorf("unable to get component: %v", err)
return nil, fmt.Errorf("unable to get component: %w", err)
}

if !gotResponse {
Expand Down
37 changes: 23 additions & 14 deletions client/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package client
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"time"

"github.com/nats-io/nats-server/v2/server"
Expand Down Expand Up @@ -59,7 +62,7 @@ func (t *Transport) Account() string {
return t.account
}

func (t *Transport) Request(subject string, payload any, opts ...Opt) ([]byte, error) {
func (t *Transport) Request(subject string, payload any, opts ...Opt) ([]byte, *ConnectServiceError) {
options := DefaultRequestOpts()
for _, opt := range opts {
opt(options)
Expand All @@ -68,7 +71,7 @@ func (t *Transport) Request(subject string, payload any, opts ...Opt) ([]byte, e
// -- encode the Request
req, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("unable to marshal Request: %v", err)
return nil, &ConnectServiceError{Code: http.StatusBadRequest, Description: fmt.Sprintf("unable to marshal Request: %v", err), internalError: fmt.Errorf("unable to marshal Request: %v", err)}
}

if t.trace {
Expand All @@ -77,13 +80,16 @@ func (t *Transport) Request(subject string, payload any, opts ...Opt) ([]byte, e

resp, err := t.nc.Request(subject, req, options.Timeout)
if err != nil {
return nil, fmt.Errorf("unable to get response: %v", err)
return nil, &ConnectServiceError{Code: http.StatusBadRequest, Description: "unable to get response", internalError: fmt.Errorf("unable to get response: %v", err)}
}

serviceErr := resp.Header.Get("Nats-Service-Error")
serviceErrCode := resp.Header.Get("Nats-Service-Error-Code")
if serviceErr != "" {
return nil, fmt.Errorf("%s (%s)", serviceErr, serviceErrCode)
serviceErrCode, err := strconv.Atoi(resp.Header.Get("Nats-Service-Error-Code"))
if err != nil {
return nil, &ConnectServiceError{Code: http.StatusBadRequest, Description: "unable to get response", internalError: fmt.Errorf("unable to get response: %v", err)}
}
return nil, &ConnectServiceError{Code: serviceErrCode, Description: serviceErr, internalError: errors.New(string(resp.Data))}
}

return resp.Data, nil
Expand All @@ -106,7 +112,7 @@ func (t *Transport) RequestJson(subject string, payload any, target any, opts ..
return true, nil
}

func (t *Transport) RequestList(subject string, payload any, h ResponseHandler, opts ...Opt) error {
func (t *Transport) RequestList(subject string, payload any, h ResponseHandler, opts ...Opt) *ConnectServiceError {
options := DefaultRequestOpts()
for _, opt := range opts {
opt(options)
Expand All @@ -119,36 +125,39 @@ func (t *Transport) RequestList(subject string, payload any, h ResponseHandler,

sub, err := t.nc.SubscribeSync(inb)
if err != nil {
return err
return &ConnectServiceError{Code: http.StatusInternalServerError, Description: "could not subscribe to inbox", internalError: err}
}

// -- encode the Request
req, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("unable to marshal Request: %v", err)
return &ConnectServiceError{Code: http.StatusBadRequest, Description: "json parsing error", internalError: fmt.Errorf("unable to marshal Request: %v", err)}
}

if t.trace {
fmt.Println(">>> ", subject, " [", string(req), "]")
}

if err := t.nc.PublishRequest(subject, inb, req); err != nil {
return fmt.Errorf("unable to publish Request: %v", err)
return &ConnectServiceError{Code: http.StatusInternalServerError, Description: GenericErrMsg, internalError: fmt.Errorf("unable to publish Request: %v", err)}
}
if err := t.nc.Flush(); err != nil {
return fmt.Errorf("unable to flush: %v", err)
return &ConnectServiceError{Code: http.StatusInternalServerError, Description: GenericErrMsg, internalError: fmt.Errorf("unable to flush: %v", err)}
}

for {
msg, err := sub.NextMsg(options.Timeout)
if err != nil {
return fmt.Errorf("unable to get response: %v", err)
return &ConnectServiceError{Code: http.StatusInternalServerError, Description: GenericErrMsg, internalError: fmt.Errorf("unable to get response: %v", err)}
}

serviceErr := msg.Header.Get("Nats-Service-Error")
serviceErrCode := msg.Header.Get("Nats-Service-Error-Code")
if serviceErr != "" {
return fmt.Errorf("%s (%s)", serviceErr, serviceErrCode)
serviceErrCode, err := strconv.Atoi(msg.Header.Get("Nats-Service-Error-Code"))
if err != nil {
return &ConnectServiceError{Code: http.StatusBadRequest, Description: GenericErrMsg, internalError: err}
}
return &ConnectServiceError{Code: serviceErrCode, Description: serviceErr, internalError: errors.New(string(msg.Data))}
}

hasMore := msg.Header.Get(HasMoreHeader) == "true"
Expand All @@ -157,7 +166,7 @@ func (t *Transport) RequestList(subject string, payload any, h ResponseHandler,
data = nil
}
if err := h(data, hasMore); err != nil {
return err
return &ConnectServiceError{Code: http.StatusInternalServerError, Description: GenericErrMsg, internalError: err}
}

if !hasMore {
Expand Down
Loading