diff --git a/AwsLambdaExtension/Makefile b/AwsLambdaExtension/Makefile new file mode 100644 index 00000000..2435198b --- /dev/null +++ b/AwsLambdaExtension/Makefile @@ -0,0 +1,17 @@ +# ----- Target used with SAM (Serverless Application Model) ----- +build-AwsLambdaExtension: + echo :build-AwsLambdaExtension + GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension main.go + chmod +x $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension + +# ----- Target illustrating manual steps required to create the extension layer ----- +buildAndDeployExtensionLayer: + echo :buildAndDeployExtensionLayer + rm -rf bin + GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go + chmod +x bin/extensions/AwsLambdaExtension + cd bin && zip -r extension.zip extensions + + aws lambda publish-layer-version \ + --layer-name "AwsLambdaExtension" \ + --zip-file "fileb://bin/extension.zip" diff --git a/AwsLambdaExtension/README.md b/AwsLambdaExtension/README.md new file mode 100644 index 00000000..d69255cb --- /dev/null +++ b/AwsLambdaExtension/README.md @@ -0,0 +1,72 @@ +# New Relic Telemetry API Extension in Go + +The provided code demonstrates how to get a basic Telemetry API extension written in Go up and running. + +This extension: +1. Registers the extension with Lambda Extensions API (see `extensionApi/client.go`) +2. Starts a local HTTP server to receive incoming telemetry events from the Telemetry API (see `telemetryApi/listener.go`) +3. Subscribes to the Telemetry API to start receiving incoming telemetry events (see `telemetryApi/client.go`) +4. Receives telemetry events, batches them, and dispatches to New Relic via POST requests (see `telemetryApi/dispatcher.go`). +Requires the license key set up as the environment variable. + +Note that step 4 is asynchronous in nature. The functions is thawed to process the incoming event, and new telemetry might arrive either +before or after dispatching existing telemetry. In case of the latter, the newly arrived telemetry will be kept in the telemetry queue and +dispatched when processing next event. Depending on buffering configuration you pass to the Telemetry API during subscription, you might get +either zero, one, or multiple requests from Telemetry API to the telemetry listener in a single function invocation. + +The code is heavily instrumented with logs so you'll be able to see the Telemetry API extension lifecycle messages as you're learning to +implement one. + +## Build package and dependencies + +To run this example, you will need to ensure that your build architecture matches that of the Lambda execution environment by compiling with +`GOOS=linux` and `GOARCH=amd64` if you are not running in a Linux environment. + +Building and saving package into a `bin/extensions` directory: +```bash +$ cd AwsLambdaExtension +$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go +$ chmod +x bin/extensions/AwsLambdaExtension +``` + +## Layer Setup Process +The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located. +You must include the `AwsLambdaExtension` binary. + +Creating zip package for the extension: +```bash +$ cd bin +$ zip -r extension.zip extensions/ +``` + +Publish a new layer using the `extension.zip` using below command. The output should provide you with a layer ARN. + +```bash +aws lambda publish-layer-version \ + --layer-name "AwsLambdaExtension" \ + --zip-file "fileb://extension.zip" +``` + +Note the `LayerVersionArn` that is produced in the output. eg. + +``` +LayerVersionArn: arn:aws:lambda::533243300146:layer:: +``` + +Or use `build.sh` script to build and deploy the extension. + +Add the newly created layer version to a Lambda function. + +```bash +aws lambda update-function-configuration + --function-name + --layers +``` + +## Function Invocation and Extension Execution + +Configure the extension by setting below environment variables + +* `LICENSE_KEY` - the key issueed to you when you registered an account with New Relic +* `DISPATCH_MIN_BATCH_SIZE` - optimize dispatching telemetry by telling the dispatcher how many log events you want it to batch. On function invoke the telemetry will be dispatched to `DISPATCH_POST_URI` only if number of log events collected so far is greater than `DISPATCH_MIN_BATCH_SIZE`. On function shutdown the telemetry will be dispatched to `DISPATCH_POST_URI` regardless of how many log events were collected so far. + diff --git a/AwsLambdaExtension/build-deploy.sh b/AwsLambdaExtension/build-deploy.sh new file mode 100755 index 00000000..a117425f --- /dev/null +++ b/AwsLambdaExtension/build-deploy.sh @@ -0,0 +1,15 @@ +if [ ! -d "bin" ]; then + mkdir bin +fi +if [ ! -d "bin/extensions" ]; then + mkdir bin/extensions +fi +GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go +chmod +x bin/extensions/AwsLambdaExtension +cd bin +zip -r extension.zip extensions/ +aws lambda publish-layer-version \ + --layer-name "AwsLambdaExtension" \ + --description "New Relic Lambda Extension" \ + --compatible-architectures "x86_64" \ + --zip-file "fileb://extension.zip" diff --git a/AwsLambdaExtension/extensionApi/client.go b/AwsLambdaExtension/extensionApi/client.go new file mode 100644 index 00000000..489f82a4 --- /dev/null +++ b/AwsLambdaExtension/extensionApi/client.go @@ -0,0 +1,222 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package extensionApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + + log "github.com/sirupsen/logrus" +) + +// RegisterResponse is the body of the response for /register +type RegisterResponse struct { + FunctionName string `json:"functionName"` + FunctionVersion string `json:"functionVersion"` + Handler string `json:"handler"` +} + +// NextEventResponse is the response for /event/next +type NextEventResponse struct { + EventType EventType `json:"eventType"` + DeadlineMs int64 `json:"deadlineMs"` + RequestID string `json:"requestId"` + InvokedFunctionArn string `json:"invokedFunctionArn"` + Tracing Tracing `json:"tracing"` +} + +// Tracing is part of the response for /event/next +type Tracing struct { + Type string `json:"type"` + Value string `json:"value"` +} + +// StatusResponse is the body of the response for /init/error and /exit/error +type StatusResponse struct { + Status string `json:"status"` +} + +// EventType represents the type of events recieved from /event/next +type EventType string + +const ( + // Function invocation event + Invoke EventType = "INVOKE" + + // Runtime environment shutdown event + Shutdown EventType = "SHUTDOWN" + + extensionNameHeader = "Lambda-Extension-Name" + extensionIdentiferHeader = "Lambda-Extension-Identifier" + extensionErrorType = "Lambda-Extension-Function-Error-Type" +) + +// Client is a simple client for the Lambda Extensions API +type Client struct { + httpClient *http.Client + baseUrl string + ExtensionID string + functionName string +} + +func (e *Client) GetFunctionName() string { + return e.functionName +} + +var l = log.WithFields(log.Fields{"pkg": "extensionApi"}) + +// Returns a Lambda Extensions API client +func NewClient() *Client { + baseUrl := fmt.Sprintf("http://%s/2020-01-01/extension", os.Getenv("AWS_LAMBDA_RUNTIME_API")) + return &Client{ + baseUrl: baseUrl, + httpClient: &http.Client{}, + } +} + +// Registers the extension with Extensions API +func (e *Client) Register(ctx context.Context, extensionName string) (string, error) { + + const action = "/register" + url := e.baseUrl + action + + l.Info("[client:Register] Registering using baseURL", e.baseUrl) + reqBody, err := json.Marshal(map[string]interface{}{ + "events": []EventType{Invoke, Shutdown}, + }) + if err != nil { + return "", err + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody)) + if err != nil { + return "", err + } + httpReq.Header.Set(extensionNameHeader, extensionName) + + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + l.Error("[client:Register] Registration failed", err) + return "", err + } + + if httpRes.StatusCode != 200 { + l.Error("[client:Register] Registration failed with statusCode ", httpReq.Response.StatusCode) + return "", fmt.Errorf("registration failed with status %s", httpRes.Status) + } + + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return "", err + } + + res := RegisterResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return "", err + } + + e.functionName = res.FunctionName + e.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader) + l.Info("[client:Register] Registration success with extensionId ", e.ExtensionID) + return e.ExtensionID, nil +} + +// Blocks while long polling for the next Lambda invoke or shutdown +func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) { + const action = "/event/next" + url := e.baseUrl + action + + httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := NextEventResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} + +// Reports an initialization error to the platform. Call it when you registered but failed to initialize +func (e *Client) InitError(errorType string) (*StatusResponse, error) { + const action = "/init/error" + url := e.baseUrl + action + + httpReq, err := http.NewRequest("POST", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpReq.Header.Set(extensionErrorType, errorType) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := StatusResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} + +// Reports an error to the platform before exiting. Call it when you encounter an unexpected failure +func (e *Client) ExitError(errorType string) (*StatusResponse, error) { + const action = "/exit/error" + url := e.baseUrl + action + + httpReq, err := http.NewRequest("POST", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpReq.Header.Set(extensionErrorType, errorType) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := StatusResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} diff --git a/AwsLambdaExtension/go.mod b/AwsLambdaExtension/go.mod new file mode 100644 index 00000000..55d579e3 --- /dev/null +++ b/AwsLambdaExtension/go.mod @@ -0,0 +1,16 @@ +module newrelic-lambda-extension/AwsLambdaExtension + +go 1.18 + +require ( + github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/google/uuid v1.3.0 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.0 +) + +require ( + github.com/aws/aws-sdk-go v1.44.176 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + golang.org/x/sys v0.1.0 // indirect +) diff --git a/AwsLambdaExtension/go.sum b/AwsLambdaExtension/go.sum new file mode 100644 index 00000000..f6fbde94 --- /dev/null +++ b/AwsLambdaExtension/go.sum @@ -0,0 +1,55 @@ +github.com/aws/aws-sdk-go v1.44.176 h1:mxcfI3IWHemX+5fEKt5uqIS/hdbaR7qzGfJYo5UyjJE= +github.com/aws/aws-sdk-go v1.44.176/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= +github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/AwsLambdaExtension/main.go b/AwsLambdaExtension/main.go new file mode 100644 index 00000000..b1b1780c --- /dev/null +++ b/AwsLambdaExtension/main.go @@ -0,0 +1,108 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +/** + +Notes: +- Because of the asynchronous nature of the system, it is possible that telemetry for one invoke will be + processed during the next invoke slice. Likewise, it is possible that telemetry for the last invoke will + be processed during the SHUTDOWN event. + +*/ + +package main + +import ( + "context" + "newrelic-lambda-extension/AwsLambdaExtension/extensionApi" + "newrelic-lambda-extension/AwsLambdaExtension/telemetryApi" + "os" + "os/signal" + "path" + "syscall" + + log "github.com/sirupsen/logrus" +) + +var l = log.WithFields(log.Fields{"pkg": "main"}) + +func main() { + l.Info("[main] Starting the Telemetry API extension") + extensionName := path.Base(os.Args[0]) + + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sigs + cancel() + l.Info("[main] Received", s) + l.Info("[main] Exiting") + }() + + // Step 1 - Register the extension with Extensions API + l.Info("[main] Registering extension") + extensionApiClient := extensionApi.NewClient() + extensionId, err := extensionApiClient.Register(ctx, extensionName) + if err != nil { + panic(err) + } + l.Info("[main] Registation success with extensionId", extensionId) + + // Step 2 - Start the local http listener which will receive data from Telemetry API + l.Info("[main] Starting the Telemetry listener") + telemetryListener := telemetryApi.NewTelemetryApiListener() + telemetryListenerUri, err := telemetryListener.Start() + if err != nil { + panic(err) + } + + // Step 3 - Subscribe the listener to Telemetry API + l.Info("[main] Subscribing to the Telemetry API") + telemetryApiClient := telemetryApi.NewClient() + _, err = telemetryApiClient.Subscribe(ctx, extensionId, telemetryListenerUri) + if err != nil { + panic(err) + } + l.Info("[main] Subscription success") + dispatcher := telemetryApi.NewDispatcher(extensionApiClient.GetFunctionName(), ctx) + + // Will block until invoke or shutdown event is received or cancelled via the context. + for { + select { + case <-ctx.Done(): + return + default: + l.Info("[main] Waiting for next event...") + + // This is a blocking action + res, err := extensionApiClient.NextEvent(ctx) + if err != nil { + l.Error("[main] Exiting. Error:", err) + return + } + + // Dispatching log events from previous invocations + dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, false) + + l.Info("[main] Received event") + + if res.EventType == extensionApi.Invoke { + handleInvoke(res) + } else if res.EventType == extensionApi.Shutdown { + // Dispatch all remaining telemetry, handle shutdown + dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, true) + handleShutdown(res) + return + } + } + } +} + +func handleInvoke(r *extensionApi.NextEventResponse) { + l.Info("[handleInvoke]") +} + +func handleShutdown(r *extensionApi.NextEventResponse) { + l.Info("[handleShutdown]") +} diff --git a/AwsLambdaExtension/telemetryApi/client.go b/AwsLambdaExtension/telemetryApi/client.go new file mode 100644 index 00000000..d601be13 --- /dev/null +++ b/AwsLambdaExtension/telemetryApi/client.go @@ -0,0 +1,198 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + + "net/http" + "os" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const lambdaAgentIdentifierHeaderKey string = "Lambda-Extension-Identifier" + +var l = log.WithFields(log.Fields{"pkg": "telemetryApi"}) + +// The client used for subscribing to the Telemetry API +type Client struct { + httpClient *http.Client + baseUrl string +} + +func NewClient() *Client { + baseUrl := fmt.Sprintf("http://%s/2022-07-01/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API")) + return &Client{ + httpClient: &http.Client{}, + baseUrl: baseUrl, + } +} + +// Represents the type of log events in Lambda +type EventType string + +const ( + // Used to receive log events emitted by the platform + Platform EventType = "platform" + // Used to receive log events emitted by the function + Function EventType = "function" + // Used is to receive log events emitted by the extension + Extension EventType = "extension" +) + +// Configuration for receiving telemetry from the Telemetry API. +// Telemetry will be sent to your listener when one of the conditions below is met. +type BufferingCfg struct { + // Maximum number of log events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000) + MaxItems uint32 `json:"maxItems"` + // Maximum size in bytes of the log events to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576) + MaxBytes uint32 `json:"maxBytes"` + // Maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000) + TimeoutMS uint32 `json:"timeoutMs"` +} + +// URI is used to set the endpoint where the logs will be sent to +type URI string + +// HttpMethod represents the HTTP method used to receive logs from Logs API +type HttpMethod string + +const ( + // Receive log events via POST requests to the listener + HttpPost HttpMethod = "POST" + // Receive log events via PUT requests to the listener + HttpPut HttpMethod = "PUT" +) + +// Used to specify the protocol when subscribing to Telemetry API for HTTP +type HttpProtocol string + +const ( + HttpProto HttpProtocol = "HTTP" +) + +// Denotes what the content is encoded in +type HttpEncoding string + +const ( + JSON HttpEncoding = "JSON" +) + +// Configuration for listeners that would like to receive telemetry via HTTP +type Destination struct { + Protocol HttpProtocol `json:"protocol"` + URI URI `json:"URI"` + HttpMethod HttpMethod `json:"method"` + Encoding HttpEncoding `json:"encoding"` +} + +type SchemaVersion string + +const ( + SchemaVersion20220701 = "2022-07-01" + SchemaVersionLatest = SchemaVersion20220701 +) + +// Request body that is sent to the Telemetry API on subscribe +type SubscribeRequest struct { + SchemaVersion SchemaVersion `json:"schemaVersion"` + EventTypes []EventType `json:"types"` + BufferingCfg BufferingCfg `json:"buffering"` + Destination Destination `json:"destination"` +} + +// Response body that is received from the Telemetry API on subscribe +type SubscribeResponse struct { + body string +} + +// Subscribes to the Telemetry API to start receiving the log events +func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) { + eventTypes := []EventType{ + Platform, + Function, + Extension, + } + + bufferingConfig := BufferingCfg{ + MaxItems: 1000, + MaxBytes: 256 * 1024, + TimeoutMS: 1000, + } + + destination := Destination{ + Protocol: HttpProto, + HttpMethod: HttpPost, + Encoding: JSON, + URI: URI(listenerUri), + } + + data, err := json.Marshal( + &SubscribeRequest{ + SchemaVersion: SchemaVersionLatest, + EventTypes: eventTypes, + BufferingCfg: bufferingConfig, + Destination: destination, + }) + + if err != nil { + return nil, errors.WithMessage(err, "Failed to marshal SubscribeRequest") + } + + headers := make(map[string]string) + headers[lambdaAgentIdentifierHeaderKey] = extensionId + + l.Info("[client:Subscribe] Subscribing using baseUrl:", c.baseUrl) + resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseUrl, data, &headers) + if err != nil { + l.Error("[client:Subscribe] Subscription failed:", err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusAccepted { + l.Error("[client:Subscribe] Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?") + } else if resp.StatusCode != http.StatusOK { + l.Error("[client:Subscribe] Subscription failed") + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("%s failed: %d[%s]", c.baseUrl, resp.StatusCode, resp.Status) + } + + return nil, errors.Errorf("%s failed: %d[%s] %s", c.baseUrl, resp.StatusCode, resp.Status, string(body)) + } + + body, _ := ioutil.ReadAll(resp.Body) + l.Info("[client:Subscribe] Subscription success:", string(body)) + + return &SubscribeResponse{string(body)}, nil +} + +func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers *map[string]string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + contentType := "application/json" + req.Header.Set("Content-Type", contentType) + if headers != nil { + for k, v := range *headers { + req.Header.Set(k, v) + } + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/AwsLambdaExtension/telemetryApi/dispatcher.go b/AwsLambdaExtension/telemetryApi/dispatcher.go new file mode 100644 index 00000000..a948f63b --- /dev/null +++ b/AwsLambdaExtension/telemetryApi/dispatcher.go @@ -0,0 +1,59 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "context" + "net/http" + "os" + "strconv" + + "github.com/golang-collections/go-datastructures/queue" +) + +type Dispatcher struct { + httpClient *http.Client + licenseKey string + minBatchSize int64 + functionName string +} + +func NewDispatcher(functionName string, ctx context.Context) *Dispatcher { + var licenseKey string + licenseKey = os.Getenv("NEW_RELIC_LICENSE_KEY") + if len(licenseKey) == 0 { + licenseKey, _ = getNewRelicLicenseKey(ctx) + } + if len(licenseKey) == 0 { + panic("NEW_RELIC_LICENSE_KEY undefined") + } + + dispatchMinBatchSize, err := strconv.ParseInt(os.Getenv("DISPATCH_MIN_BATCH_SIZE"), 0, 16) + if err != nil { + dispatchMinBatchSize = 1 + } + + return &Dispatcher{ + httpClient: &http.Client{}, + licenseKey: licenseKey, + minBatchSize: dispatchMinBatchSize, + functionName: functionName, + } + +} + +func (d *Dispatcher) Dispatch(ctx context.Context, logEventsQueue *queue.Queue, force bool) { + if !logEventsQueue.Empty() && (force || logEventsQueue.Len() >= d.minBatchSize) { + l.Info("[dispatcher:Dispatch] Dispatching", logEventsQueue.Len(), "log events") + logEntries, _ := logEventsQueue.Get(logEventsQueue.Len()) + + err := sendDataToNR(ctx, logEntries, d) + if err != nil { + l.Error("[dispatcher:Dispatch] Failed to dispatch, returning to queue:", err) + for logEntry := range logEntries { + logEventsQueue.Put(logEntry) + } + } + } +} diff --git a/AwsLambdaExtension/telemetryApi/listener.go b/AwsLambdaExtension/telemetryApi/listener.go new file mode 100644 index 00000000..edebbcc4 --- /dev/null +++ b/AwsLambdaExtension/telemetryApi/listener.go @@ -0,0 +1,106 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/golang-collections/go-datastructures/queue" +) + +const defaultListenerPort = "4323" +const initialQueueSize = 5 + +// Used to listen to the Telemetry API +type TelemetryApiListener struct { + httpServer *http.Server + // LogEventsQueue is a synchronous queue and is used to put the received log events to be dispatched later + LogEventsQueue *queue.Queue +} + +func NewTelemetryApiListener() *TelemetryApiListener { + return &TelemetryApiListener{ + httpServer: nil, + LogEventsQueue: queue.New(initialQueueSize), + } +} + +func listenOnAddress() string { + env_aws_local, ok := os.LookupEnv("AWS_SAM_LOCAL") + var addr string + if ok && env_aws_local == "true" { + addr = ":" + defaultListenerPort + } else { + addr = "sandbox:" + defaultListenerPort + } + + return addr +} + +// Starts the server in a goroutine where the log events will be sent +func (s *TelemetryApiListener) Start() (string, error) { + address := listenOnAddress() + l.Info("[listener:Start] Starting on address", address) + s.httpServer = &http.Server{Addr: address} + http.HandleFunc("/", s.http_handler) + go func() { + err := s.httpServer.ListenAndServe() + if err != http.ErrServerClosed { + l.Error("[listener:goroutine] Unexpected stop on Http Server:", err) + s.Shutdown() + } else { + l.Info("[listener:goroutine] Http Server closed:", err) + } + }() + return fmt.Sprintf("http://%s/", address), nil +} + +// http_handler handles the requests coming from the Telemetry API. +// Everytime Telemetry API sends log events, this function will read them from the response body +// and put into a synchronous queue to be dispatched later. +// Logging or printing besides the error cases below is not recommended if you have subscribed to +// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for +// the printed lines which may create an infinite loop. +func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + l.Error("[listener:http_handler] Error reading body:", err) + return + } + // Parse and put the log messages into the queue + var slice []LambdaTelemetryEvent + _ = json.Unmarshal(body, &slice) + + for _, el := range slice { + s.LogEventsQueue.Put(el) + } + + l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) + slice = nil +} + +// Terminates the HTTP server listening for logs +func (s *TelemetryApiListener) Shutdown() { + if s.httpServer != nil { + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + err := s.httpServer.Shutdown(ctx) + if err != nil { + l.Error("[listener:Shutdown] Failed to shutdown http server gracefully:", err) + } else { + s.httpServer = nil + } + } +} + +type LambdaTelemetryEvent struct { + Time string + Type string + Record any +} diff --git a/AwsLambdaExtension/telemetryApi/send_to_new_relic.go b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go new file mode 100644 index 00000000..330668bd --- /dev/null +++ b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go @@ -0,0 +1,286 @@ +package telemetryApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "os" + "path" + "reflect" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/secretsmanager" + "github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface" + + "github.com/google/uuid" +) + +const EXTENSION_LAMBDA_VERSION = "1.0" + +const ( + LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" + LogEndpointUS string = "https://log-api.newrelic.com/log/v1" + + MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" + MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" + + EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net/v1/accounts/" + EventsEndpointUS string = "https://insights-collector.newrelic.com/v1/accounts/" + + TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" + TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" +) + +var ( + sess = session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + })) + secrets secretsmanageriface.SecretsManagerAPI +) + +type licenseKeySecret struct { + LicenseKey string +} + +func init() { + secrets = secretsmanager.New(sess) +} + +func decodeLicenseKey(rawJson *string) (string, error) { + var lks licenseKeySecret + + err := json.Unmarshal([]byte(*rawJson), &lks) + if err != nil { + return "", err + } + if lks.LicenseKey == "" { + return "", fmt.Errorf("malformed license key secret; missing \"LicenseKey\" attribute") + } + + return lks.LicenseKey, nil +} + +func getNewRelicLicenseKey(ctx context.Context) (string, error) { + sId := "NEW_RELIC_LICENSE_KEY" + v := os.Getenv("NEW_RELIC_LICENSE_KEY_SECRET") + if len(v) > 0 { + sId = v + } + secretValueInput := secretsmanager.GetSecretValueInput{SecretId: &sId} + secretValueOutput, err := secrets.GetSecretValueWithContext(ctx, &secretValueInput) + if err != nil { + return "", err + } + return decodeLicenseKey(secretValueOutput.SecretString) +} + +func getEndpointURL(licenseKey string, typ string, EndpointOverride string) string { + if EndpointOverride != "" { + return EndpointOverride + } + switch typ { + case "logging": + if strings.HasPrefix(licenseKey, "eu") { + return LogEndpointEU + } else { + return LogEndpointUS + } + case "metrics": + if strings.HasPrefix(licenseKey, "eu") { + return MetricsEndpointEU + } else { + return MetricsEndpointUS + } + case "events": + if strings.HasPrefix(licenseKey, "eu") { + return EventsEndpointEU + } else { + return EventsEndpointUS + } + case "traces": + if strings.HasPrefix(licenseKey, "eu") { + return TracesEndpointEU + } else { + return TracesEndpointUS + } + } + return "" +} + +func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) error { + req, err := http.NewRequestWithContext(ctx, "POST", uri, bytes.NewBuffer(bodyBytes)) + if err != nil { + return err + } + // the headers might be different for different endpoints + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Api-Key", d.licenseKey) + if strings.Contains(uri, "trace") { + req.Header.Set("Data-Format", "newrelic") + req.Header.Set("Data-Format-Version", "1") + } + _, err = d.httpClient.Do(req) + + return err +} + +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { + + var lambda_name = d.functionName + var agent_name = path.Base(os.Args[0]) + + // NB "." is not allowed in NR eventType + var replacer = strings.NewReplacer(".", "_") + + data := make(map[string][]map[string]interface{}) + data["events"] = []map[string]interface{}{} + data["traces"] = []map[string]interface{}{} + data["logging"] = []map[string]interface{}{} + data["metrics"] = []map[string]interface{}{} + + // current logic - terminate processing on an error, can be changed later + for _, event := range logEntries { + msInt, err := time.Parse(time.RFC3339, event.(LambdaTelemetryEvent).Time) + if err != nil { + return err + } + // events + data["events"] = append(data["events"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "eventType": "AwsLambdaExtension", + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, + "lambda.logevent.type": replacer.Replace(event.(LambdaTelemetryEvent).Type), + }) + // logs + if event.(LambdaTelemetryEvent).Record != nil { + data["logging"] = append(data["logging"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "message": event.(LambdaTelemetryEvent).Record, + "attributes": map[string]map[string]string{ + "plugin" : { "type": "lambda extension"}, + "aws": { + "lambda.logevent.type": event.(LambdaTelemetryEvent).Type, + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, + }, + }, + }) + + if reflect.ValueOf(event.(LambdaTelemetryEvent).Record).Kind() == reflect.Map { + eventRecord := event.(LambdaTelemetryEvent).Record.(map[string]interface{}) + // metrics + rid := "" + if v, okk := eventRecord["requestId"].(string); okk { + rid = v + } + if val, ok := eventRecord["metrics"].(map[string]interface{}); ok { + for key := range val { + data["metrics"] = append(data["metrics"], map[string]interface{}{ + "name": "aws.telemetry.lambda_ext."+key, + "value": val[key], + "timestamp": msInt.UnixMilli(), + "attributes": map[string]interface{}{ + "lambda.logevent.type": event.(LambdaTelemetryEvent).Type, + "requestId": rid, + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, + }, + }) + } + } + // spans + if val, ok := eventRecord["spans"].([]interface{}); ok { + for _, span := range val { + attributes := make(map[string]interface{}) + attributes["event"] = event.(LambdaTelemetryEvent).Type + attributes["service.name"] = agent_name + var start time.Time + for key,v := range span.(map[string]interface{}) { + if key == "durationMs" { + attributes["duration.ms"] = v.(float64) + } else if key == "start" { + start, err = time.Parse(time.RFC3339, v.(string)) + if err != nil { + return err + } + } else { + attributes[key] = v.(string) + } + } + el := map[string]interface{}{ + "trace.id": rid, + "timestamp": start.UnixMilli(), + "id": (uuid.New()).String(), + "attributes": attributes, + } + data["traces"] = append(data["traces"], el) + } + } + } + } + } + // data ready + if len(data) > 0 { + // send logs + if len(data["logging"]) > 0 { + bodyBytes, _ := json.Marshal(data["logging"]) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "logging", ""), bodyBytes) + if er != nil { + return er + } + } + // send metrics + if len(data["metrics"]) > 0 { + var dataMet[]map[string][]map[string]interface{} + dataMet = append(dataMet, map[string][]map[string]interface{}{ + "metrics": data["metrics"], + }) + bodyBytes, _ := json.Marshal(dataMet) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "metrics", ""), bodyBytes) + if er != nil { + return er + } + } + // send events + if len(data["events"]) > 0 { + ACCOUNT_ID := os.Getenv("NEW_RELIC_ACCOUNT_ID") + if len(ACCOUNT_ID) > 0 { + bodyBytes, _ := json.Marshal(data["events"]) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events", "")+ACCOUNT_ID+"/events", bodyBytes) + if er != nil { + return er + } + } else { + l.Info("NEW_RELIC_ACCOUNT_ID is not set, therefore no events data sent") + } + } + // send traces + if len(data["traces"]) > 0 { + var dataTraces[]map[string]interface{} + dataTraces = append(dataTraces, map[string]interface{}{ + "common": map[string]map[string]string{ + "attributes": { + "host": "aws.amazon.com", + "service.name": lambda_name, + }, + }, + "spans": data["traces"], + }) + bodyBytes, _ := json.Marshal(dataTraces) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "traces", ""), bodyBytes) + if er != nil { + return er + } + } + } + + return nil // success +} diff --git a/AwsLambdaExtension/template.yaml b/AwsLambdaExtension/template.yaml new file mode 100644 index 00000000..6603f609 --- /dev/null +++ b/AwsLambdaExtension/template.yaml @@ -0,0 +1,26 @@ +AWSTemplateFormatVersion: 2010-09-09 +Transform: AWS::Serverless-2016-10-31 +Description: AwsLambdaExtension + +Parameters: + NEW_RELIC_LICENSE_KEY: + Type: String + Description: Your New Relic ingest key. + +Resources: + GoTelemetryApiExtensionLayer: + Type: AWS::Serverless::LayerVersion + Metadata: + BuildMethod: makefile + Properties: + LayerName: AwsLambdaExtension + ContentUri: . + LicenseInfo: MIT-0 + RetentionPolicy: Delete + + Environment: + Variables: + DISPATCH_MIN_BATCH_SIZE: 10 + DEBUG_LOGGING_ENABLED: True + LOGGING_ENABLED: True + NEW_RELIC_LICENSE_KEY: !Sub ${NEW_RELIC_LICENSE_KEY}