From 4850ebd5c826d9e9089c621f3a18b810f51e855e Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Wed, 23 Nov 2022 02:10:21 +0000 Subject: [PATCH 01/11] go-telemetry-api-extension init --- go-telemetry-api-extension/Makefile | 17 ++ go-telemetry-api-extension/README.md | 72 ++++++ go-telemetry-api-extension/build-deploy.sh | 13 ++ .../extensionApi/client.go | 215 ++++++++++++++++++ go-telemetry-api-extension/go.mod | 10 + go-telemetry-api-extension/go.sum | 15 ++ go-telemetry-api-extension/main.go | 109 +++++++++ .../telemetryApi/client.go | 198 ++++++++++++++++ .../telemetryApi/dispatcher.go | 62 +++++ .../telemetryApi/listener.go | 101 ++++++++ .../telemetryApi/send_to_new_relic.go | 26 +++ go-telemetry-api-extension/template.yaml | 18 ++ 12 files changed, 856 insertions(+) create mode 100644 go-telemetry-api-extension/Makefile create mode 100644 go-telemetry-api-extension/README.md create mode 100755 go-telemetry-api-extension/build-deploy.sh create mode 100644 go-telemetry-api-extension/extensionApi/client.go create mode 100644 go-telemetry-api-extension/go.mod create mode 100644 go-telemetry-api-extension/go.sum create mode 100644 go-telemetry-api-extension/main.go create mode 100644 go-telemetry-api-extension/telemetryApi/client.go create mode 100644 go-telemetry-api-extension/telemetryApi/dispatcher.go create mode 100644 go-telemetry-api-extension/telemetryApi/listener.go create mode 100644 go-telemetry-api-extension/telemetryApi/send_to_new_relic.go create mode 100644 go-telemetry-api-extension/template.yaml diff --git a/go-telemetry-api-extension/Makefile b/go-telemetry-api-extension/Makefile new file mode 100644 index 00000000..8b80815c --- /dev/null +++ b/go-telemetry-api-extension/Makefile @@ -0,0 +1,17 @@ +# ----- Target used with SAM (Serverless Application Model) ----- +build-GoTelemetryApiExtensionLayer: + echo :build-GoTelemetryApiExtensionLayer + GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/go-telemetry-api-extension main.go + chmod +x $(ARTIFACTS_DIR)/extensions/go-telemetry-api-extension + +# ----- Target illustrating manual steps required to create the extension layer ----- +buildAndDeployExtensionLayer: + echo :buildAndDeployExtensionLayer + rm -rf bin + GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go + chmod +x bin/extensions/go-telemetry-api-extension + cd bin && zip -r extension.zip extensions + + aws lambda publish-layer-version \ + --layer-name "go-telemetry-api-extension-layer" \ + --zip-file "fileb://bin/extension.zip" diff --git a/go-telemetry-api-extension/README.md b/go-telemetry-api-extension/README.md new file mode 100644 index 00000000..94fb3f7c --- /dev/null +++ b/go-telemetry-api-extension/README.md @@ -0,0 +1,72 @@ +# New Relic Telemetry API Extension in Go + +The provided code demonstrates how to get a basic Telemetry API extension written in Go up and running. + +This extension: +1. Registers the extension with Lambda Extensions API (see `extensionApi/client.go`) +2. Starts a local HTTP server to receive incoming telemetry events from the Telemetry API (see `telemetryApi/listener.go`) +3. Subscribes to the Telemetry API to start receiving incoming telemetry events (see `telemetryApi/client.go`) +4. Receives telemetry events, batches them, and dispatches to New Relic via POST requests (see `telemetryApi/dispatcher.go`). +Requires the license key set up as the environment variable. + +Note that step 4 is asynchronous in nature. The functions is thawed to process the incoming event, and new telemetry might arrive either +before or after dispatching existing telemetry. In case of the latter, the newly arrived telemetry will be kept in the telemetry queue and +dispatched when processing next event. Depending on buffering configuration you pass to the Telemetry API during subscription, you might get +either zero, one, or multiple requests from Telemetry API to the telemetry listener in a single function invocation. + +The code is heavily instrumented with logs so you'll be able to see the Telemetry API extension lifecycle messages as you're learning to +implement one. + +## Build package and dependencies + +To run this example, you will need to ensure that your build architecture matches that of the Lambda execution environment by compiling with +`GOOS=linux` and `GOARCH=amd64` if you are not running in a Linux environment. + +Building and saving package into a `bin/extensions` directory: +```bash +$ cd go-telemetry-api-extension +$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go +$ chmod +x bin/extensions/go-telemetry-api-extension +``` + +## Layer Setup Process +The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located. +You must include the `go-telemetry-api-extension` binary. + +Creating zip package for the extension: +```bash +$ cd bin +$ zip -r extension.zip extensions/ +``` + +Publish a new layer using the `extension.zip` using below command. The output should provide you with a layer ARN. + +```bash +aws lambda publish-layer-version \ + --layer-name "go-telemetry-api-extension" \ + --zip-file "fileb://extension.zip" +``` + +Note the `LayerVersionArn` that is produced in the output. eg. + +``` +LayerVersionArn: arn:aws:lambda::123456789012:layer::1 +``` + +Or use `build.sh` script to build and deploy the extension. + +Add the newly created layer version to a Lambda function. + +```bash +aws lambda update-function-configuration + --function-name + --layers +``` + +## Function Invocation and Extension Execution + +Configure the extension by setting below environment variables + +* `LICENSE_KEY` - the key issueed to you when you registered an account with New Relic +* `DISPATCH_MIN_BATCH_SIZE` - optimize dispatching telemetry by telling the dispatcher how many log events you want it to batch. On function invoke the telemetry will be dispatched to `DISPATCH_POST_URI` only if number of log events collected so far is greater than `DISPATCH_MIN_BATCH_SIZE`. On function shutdown the telemetry will be dispatched to `DISPATCH_POST_URI` regardless of how many log events were collected so far. + diff --git a/go-telemetry-api-extension/build-deploy.sh b/go-telemetry-api-extension/build-deploy.sh new file mode 100755 index 00000000..a64a4b43 --- /dev/null +++ b/go-telemetry-api-extension/build-deploy.sh @@ -0,0 +1,13 @@ +if [ ! -d "bin" ]; then + mkdir bin +fi +if [ ! -d "bin/extensions" ]; then + mkdir bin/extensions +fi +GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go +chmod +x bin/extensions/go-telemetry-api-extension +cd bin +zip -r extension.zip extensions/ +aws lambda publish-layer-version \ + --layer-name "go-telemetry-api-extension" \ + --zip-file "fileb://extension.zip" diff --git a/go-telemetry-api-extension/extensionApi/client.go b/go-telemetry-api-extension/extensionApi/client.go new file mode 100644 index 00000000..a336ad7f --- /dev/null +++ b/go-telemetry-api-extension/extensionApi/client.go @@ -0,0 +1,215 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package extensionApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + + log "github.com/sirupsen/logrus" +) + +// RegisterResponse is the body of the response for /register +type RegisterResponse struct { + FunctionName string `json:"functionName"` + FunctionVersion string `json:"functionVersion"` + Handler string `json:"handler"` +} + +// NextEventResponse is the response for /event/next +type NextEventResponse struct { + EventType EventType `json:"eventType"` + DeadlineMs int64 `json:"deadlineMs"` + RequestID string `json:"requestId"` + InvokedFunctionArn string `json:"invokedFunctionArn"` + Tracing Tracing `json:"tracing"` +} + +// Tracing is part of the response for /event/next +type Tracing struct { + Type string `json:"type"` + Value string `json:"value"` +} + +// StatusResponse is the body of the response for /init/error and /exit/error +type StatusResponse struct { + Status string `json:"status"` +} + +// EventType represents the type of events recieved from /event/next +type EventType string + +const ( + // Function invocation event + Invoke EventType = "INVOKE" + + // Runtime environment shutdown event + Shutdown EventType = "SHUTDOWN" + + extensionNameHeader = "Lambda-Extension-Name" + extensionIdentiferHeader = "Lambda-Extension-Identifier" + extensionErrorType = "Lambda-Extension-Function-Error-Type" +) + +// Client is a simple client for the Lambda Extensions API +type Client struct { + httpClient *http.Client + baseUrl string + ExtensionID string +} + +var l = log.WithFields(log.Fields{"pkg": "extensionApi"}) + +// Returns a Lambda Extensions API client +func NewClient() *Client { + baseUrl := fmt.Sprintf("http://%s/2020-01-01/extension", os.Getenv("AWS_LAMBDA_RUNTIME_API")) + return &Client{ + baseUrl: baseUrl, + httpClient: &http.Client{}, + } +} + +// Registers the extension with Extensions API +func (e *Client) Register(ctx context.Context, extensionName string) (string, error) { + const action = "/register" + url := e.baseUrl + action + + l.Info("[client:Register] Registering using baseURL", e.baseUrl) + reqBody, err := json.Marshal(map[string]interface{}{ + "events": []EventType{Invoke, Shutdown}, + }) + if err != nil { + return "", err + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody)) + if err != nil { + return "", err + } + httpReq.Header.Set(extensionNameHeader, extensionName) + + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + l.Error("[client:Register] Registration failed", err) + return "", err + } + + if httpRes.StatusCode != 200 { + l.Error("[client:Register] Registration failed with statusCode ", httpReq.Response.StatusCode) + return "", fmt.Errorf("registration failed with status %s", httpRes.Status) + } + + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return "", err + } + + res := RegisterResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return "", err + } + + e.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader) + l.Info("[client:Register] Registration success with extensionId ", e.ExtensionID) + return e.ExtensionID, nil +} + +// Blocks while long polling for the next Lambda invoke or shutdown +func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) { + const action = "/event/next" + url := e.baseUrl + action + + httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := NextEventResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} + +// Reports an initialization error to the platform. Call it when you registered but failed to initialize +func (e *Client) InitError(errorType string) (*StatusResponse, error) { + const action = "/init/error" + url := e.baseUrl + action + + httpReq, err := http.NewRequest("POST", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpReq.Header.Set(extensionErrorType, errorType) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := StatusResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} + +// Reports an error to the platform before exiting. Call it when you encounter an unexpected failure +func (e *Client) ExitError(errorType string) (*StatusResponse, error) { + const action = "/exit/error" + url := e.baseUrl + action + + httpReq, err := http.NewRequest("POST", url, nil) + if err != nil { + return nil, err + } + httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID) + httpReq.Header.Set(extensionErrorType, errorType) + httpRes, err := e.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpRes.StatusCode != 200 { + return nil, fmt.Errorf("request failed with status %s", httpRes.Status) + } + defer httpRes.Body.Close() + body, err := ioutil.ReadAll(httpRes.Body) + if err != nil { + return nil, err + } + res := StatusResponse{} + err = json.Unmarshal(body, &res) + if err != nil { + return nil, err + } + return &res, nil +} diff --git a/go-telemetry-api-extension/go.mod b/go-telemetry-api-extension/go.mod new file mode 100644 index 00000000..1b8d1ed3 --- /dev/null +++ b/go-telemetry-api-extension/go.mod @@ -0,0 +1,10 @@ +module newrelic-lambda-extension/go-telemetry-api-extension + +go 1.18 + +require ( + github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go-telemetry-api-extension/go.sum b/go-telemetry-api-extension/go.sum new file mode 100644 index 00000000..47e4990e --- /dev/null +++ b/go-telemetry-api-extension/go.sum @@ -0,0 +1,15 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= +github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go-telemetry-api-extension/main.go b/go-telemetry-api-extension/main.go new file mode 100644 index 00000000..a8bd61a0 --- /dev/null +++ b/go-telemetry-api-extension/main.go @@ -0,0 +1,109 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +/** + +Notes: +- Because of the asynchronous nature of the system, it is possible that telemetry for one invoke will be + processed during the next invoke slice. Likewise, it is possible that telemetry for the last invoke will + be processed during the SHUTDOWN event. + +*/ + +package main + +import ( + "newrelic-lambda-extension/go-telemetry-api-extension/extensionApi" + "newrelic-lambda-extension/go-telemetry-api-extension/telemetryApi" + "context" + "os" + "os/signal" + "path" + "syscall" + + log "github.com/sirupsen/logrus" +) + +var l = log.WithFields(log.Fields{"pkg": "main"}) + +func main() { + l.Info("[main] Starting the Telemetry API extension") + extensionName := path.Base(os.Args[0]) + + ctx, cancel := context.WithCancel(context.Background()) + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT) + go func() { + s := <-sigs + cancel() + l.Info("[main] Received", s) + l.Info("[main] Exiting") + }() + + // Step 1 - Register the extension with Extensions API + l.Info("[main] Registering extension") + extensionApiClient := extensionApi.NewClient() + extensionId, err := extensionApiClient.Register(ctx, extensionName) + if err != nil { + panic(err) + } + l.Info("[main] Registation success with extensionId", extensionId) + + // Step 2 - Start the local http listener which will receive data from Telemetry API + l.Info("[main] Starting the Telemetry listener") + telemetryListener := telemetryApi.NewTelemetryApiListener() + telemetryListenerUri, err := telemetryListener.Start() + if err != nil { + panic(err) + } + + // Step 3 - Subscribe the listener to Telemetry API + l.Info("[main] Subscribing to the Telemetry API") + telemetryApiClient := telemetryApi.NewClient() + _, err = telemetryApiClient.Subscribe(ctx, extensionId, telemetryListenerUri) + if err != nil { + panic(err) + } + l.Info("[main] Subscription success") + + dispatcher := telemetryApi.NewDispatcher() + + // Will block until invoke or shutdown event is received or cancelled via the context. + for { + select { + case <-ctx.Done(): + return + default: + l.Info("[main] Waiting for next event...") + + // This is a blocking action + res, err := extensionApiClient.NextEvent(ctx) + if err != nil { + l.Error("[main] Exiting. Error:", err) + return + } + + // Dispatching log events from previous invocations + dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, false) + + l.Info("[main] Received event") + + if res.EventType == extensionApi.Invoke { + handleInvoke(res) + } else if res.EventType == extensionApi.Shutdown { + // Dispatch all remaining telemetry, handle shutdown + dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, true) + handleShutdown(res) + return + } + } + } +} + +func handleInvoke(r *extensionApi.NextEventResponse) { + l.Info("[handleInvoke]") +} + +func handleShutdown(r *extensionApi.NextEventResponse) { + l.Info("[handleShutdown]") +} diff --git a/go-telemetry-api-extension/telemetryApi/client.go b/go-telemetry-api-extension/telemetryApi/client.go new file mode 100644 index 00000000..5b7ff301 --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/client.go @@ -0,0 +1,198 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + + "net/http" + "os" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +const lambdaAgentIdentifierHeaderKey string = "Lambda-Extension-Identifier" + +var l = log.WithFields(log.Fields{"pkg": "telemetryApi"}) + +// The client used for subscribing to the Telemetry API +type Client struct { + httpClient *http.Client + baseUrl string +} + +func NewClient() *Client { + baseUrl := fmt.Sprintf("http://%s/2022-07-01/telemetry", os.Getenv("AWS_LAMBDA_RUNTIME_API")) + return &Client{ + httpClient: &http.Client{}, + baseUrl: baseUrl, + } +} + +// Represents the type of log events in Lambda +type EventType string + +const ( + // Used to receive log events emitted by the platform + Platform EventType = "platform" + // Used to receive log events emitted by the function + Function EventType = "function" + // Used is to receive log events emitted by the extension + Extension EventType = "extension" +) + +// Configuration for receiving telemetry from the Telemetry API. +// Telemetry will be sent to your listener when one of the conditions below is met. +type BufferingCfg struct { + // Maximum number of log events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000) + MaxItems uint32 `json:"maxItems"` + // Maximum size in bytes of the log events to be buffered in memory. (default: 262144, minimum: 262144, maximum: 1048576) + MaxBytes uint32 `json:"maxBytes"` + // Maximum time (in milliseconds) for a batch to be buffered. (default: 1000, minimum: 100, maximum: 30000) + TimeoutMS uint32 `json:"timeoutMs"` +} + +// URI is used to set the endpoint where the logs will be sent to +type URI string + +// HttpMethod represents the HTTP method used to receive logs from Logs API +type HttpMethod string + +const ( + // Receive log events via POST requests to the listener + HttpPost HttpMethod = "POST" + // Receive log events via PUT requests to the listener + HttpPut HttpMethod = "PUT" +) + +// Used to specify the protocol when subscribing to Telemetry API for HTTP +type HttpProtocol string + +const ( + HttpProto HttpProtocol = "HTTP" +) + +// Denotes what the content is encoded in +type HttpEncoding string + +const ( + JSON HttpEncoding = "JSON" +) + +// Configuration for listeners that would like to receive telemetry via HTTP +type Destination struct { + Protocol HttpProtocol `json:"protocol"` + URI URI `json:"URI"` + HttpMethod HttpMethod `json:"method"` + Encoding HttpEncoding `json:"encoding"` +} + +type SchemaVersion string + +const ( + SchemaVersion20220701 = "2022-07-01" + SchemaVersionLatest = SchemaVersion20220701 +) + +// Request body that is sent to the Telemetry API on subscribe +type SubscribeRequest struct { + SchemaVersion SchemaVersion `json:"schemaVersion"` + EventTypes []EventType `json:"types"` + BufferingCfg BufferingCfg `json:"buffering"` + Destination Destination `json:"destination"` +} + +// Response body that is received from the Telemetry API on subscribe +type SubscribeResponse struct { + body string +} + +// Subscribes to the Telemetry API to start receiving the log events +func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) { + eventTypes := []EventType{ + Platform, + // Function, + // Extension, + } + + bufferingConfig := BufferingCfg{ + MaxItems: 1000, + MaxBytes: 256 * 1024, + TimeoutMS: 1000, + } + + destination := Destination{ + Protocol: HttpProto, + HttpMethod: HttpPost, + Encoding: JSON, + URI: URI(listenerUri), + } + + data, err := json.Marshal( + &SubscribeRequest{ + SchemaVersion: SchemaVersionLatest, + EventTypes: eventTypes, + BufferingCfg: bufferingConfig, + Destination: destination, + }) + + if err != nil { + return nil, errors.WithMessage(err, "Failed to marshal SubscribeRequest") + } + + headers := make(map[string]string) + headers[lambdaAgentIdentifierHeaderKey] = extensionId + + l.Info("[client:Subscribe] Subscribing using baseUrl:", c.baseUrl) + resp, err := httpPutWithHeaders(ctx, c.httpClient, c.baseUrl, data, &headers) + if err != nil { + l.Error("[client:Subscribe] Subscription failed:", err) + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusAccepted { + l.Error("[client:Subscribe] Subscription failed. Logs API is not supported! Is this extension running in a local sandbox?") + } else if resp.StatusCode != http.StatusOK { + l.Error("[client:Subscribe] Subscription failed") + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Errorf("%s failed: %d[%s]", c.baseUrl, resp.StatusCode, resp.Status) + } + + return nil, errors.Errorf("%s failed: %d[%s] %s", c.baseUrl, resp.StatusCode, resp.Status, string(body)) + } + + body, _ := ioutil.ReadAll(resp.Body) + l.Info("[client:Subscribe] Subscription success:", string(body)) + + return &SubscribeResponse{string(body)}, nil +} + +func httpPutWithHeaders(ctx context.Context, client *http.Client, url string, data []byte, headers *map[string]string) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + contentType := "application/json" + req.Header.Set("Content-Type", contentType) + if headers != nil { + for k, v := range *headers { + req.Header.Set(k, v) + } + } + + resp, err := client.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} diff --git a/go-telemetry-api-extension/telemetryApi/dispatcher.go b/go-telemetry-api-extension/telemetryApi/dispatcher.go new file mode 100644 index 00000000..99867f2d --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/dispatcher.go @@ -0,0 +1,62 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "context" + "net/http" + "os" + "strconv" + + "errors" + + "github.com/golang-collections/go-datastructures/queue" +) + +type Dispatcher struct { + httpClient *http.Client + postUri string + licenseKey string + minBatchSize int64 +} + +func NewDispatcher() *Dispatcher { + dispatchPostUri := os.Getenv("DISPATCH_POST_URI") + if len(dispatchPostUri) == 0 { + panic("dispatchPostUri undefined") + } + + licenseKey := os.Getenv("LICENSE_KEY") + if len(licenseKey) == 0 { + panic("licenseKey undefined") + } + + dispatchMinBatchSize, err := strconv.ParseInt(os.Getenv("DISPATCH_MIN_BATCH_SIZE"), 0, 16) + if err != nil { + dispatchMinBatchSize = 1 + } + + return &Dispatcher{ + httpClient: &http.Client{}, + postUri: dispatchPostUri, + licenseKey: licenseKey, + minBatchSize: dispatchMinBatchSize, + } + +} + +func (d *Dispatcher) Dispatch(ctx context.Context, logEventsQueue *queue.Queue, force bool) { + if !logEventsQueue.Empty() && (force || logEventsQueue.Len() >= d.minBatchSize) { + l.Info("[dispatcher:Dispatch] Dispatching", logEventsQueue.Len(), "log events") + logEntries, _ := logEventsQueue.Get(logEventsQueue.Len()) + + err = sendDataToNR(ctx, logEntries, d) + if err != nil { + l.Error("[dispatcher:Dispatch] Failed to dispatch, returning to queue:", err) + for logEntry := range logEntries { + logEventsQueue.Put(logEntry) + } + } + } +} diff --git a/go-telemetry-api-extension/telemetryApi/listener.go b/go-telemetry-api-extension/telemetryApi/listener.go new file mode 100644 index 00000000..69f98b71 --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/listener.go @@ -0,0 +1,101 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: MIT-0 + +package telemetryApi + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "time" + + "github.com/golang-collections/go-datastructures/queue" +) + +const defaultListenerPort = "4323" +const initialQueueSize = 5 + +// Used to listen to the Telemetry API +type TelemetryApiListener struct { + httpServer *http.Server + // LogEventsQueue is a synchronous queue and is used to put the received log events to be dispatched later + LogEventsQueue *queue.Queue +} + +func NewTelemetryApiListener() *TelemetryApiListener { + return &TelemetryApiListener{ + httpServer: nil, + LogEventsQueue: queue.New(initialQueueSize), + } +} + +func listenOnAddress() string { + env_aws_local, ok := os.LookupEnv("AWS_SAM_LOCAL") + var addr string + if ok && env_aws_local == "true" { + addr = ":" + defaultListenerPort + } else { + addr = "sandbox:" + defaultListenerPort + } + + return addr +} + +// Starts the server in a goroutine where the log events will be sent +func (s *TelemetryApiListener) Start() (string, error) { + address := listenOnAddress() + l.Info("[listener:Start] Starting on address", address) + s.httpServer = &http.Server{Addr: address} + http.HandleFunc("/", s.http_handler) + go func() { + err := s.httpServer.ListenAndServe() + if err != http.ErrServerClosed { + l.Error("[listener:goroutine] Unexpected stop on Http Server:", err) + s.Shutdown() + } else { + l.Info("[listener:goroutine] Http Server closed:", err) + } + }() + return fmt.Sprintf("http://%s/", address), nil +} + +// http_handler handles the requests coming from the Telemetry API. +// Everytime Telemetry API sends log events, this function will read them from the response body +// and put into a synchronous queue to be dispatched later. +// Logging or printing besides the error cases below is not recommended if you have subscribed to +// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for +// the printed lines which may create an infinite loop. +func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + l.Error("[listener:http_handler] Error reading body:", err) + return + } + + // Parse and put the log messages into the queue + var slice []interface{} + _ = json.Unmarshal(body, &slice) + + for _, el := range slice { + s.LogEventsQueue.Put(el) + } + + l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) + slice = nil +} + +// Terminates the HTTP server listening for logs +func (s *TelemetryApiListener) Shutdown() { + if s.httpServer != nil { + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + err := s.httpServer.Shutdown(ctx) + if err != nil { + l.Error("[listener:Shutdown] Failed to shutdown http server gracefully:", err) + } else { + s.httpServer = nil + } + } +} diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go new file mode 100644 index 00000000..426adb63 --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -0,0 +1,26 @@ +package telemetryApi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "errors" + "net/http" + +// "github.com/pkg/errors" +) + +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) (error) { + + bodyBytes, _ := json.Marshal(map[string]string{"message": fmt.Sprintf("%v", logEntries)}) + req, err := http.NewRequestWithContext(ctx, "POST", d.postUri, bytes.NewBuffer(bodyBytes)) + if err != nil { + panic(err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Api-Key", d.licenseKey) + _, err = d.httpClient.Do(req) + + return err +} diff --git a/go-telemetry-api-extension/template.yaml b/go-telemetry-api-extension/template.yaml new file mode 100644 index 00000000..c241961f --- /dev/null +++ b/go-telemetry-api-extension/template.yaml @@ -0,0 +1,18 @@ +AWSTemplateFormatVersion: 2010-09-09 +Transform: AWS::Serverless-2016-10-31 +Description: go-telemetry-api-extension + +Resources: + GoTelemetryApiExtensionLayer: + Type: AWS::Serverless::LayerVersion + Metadata: + BuildMethod: makefile + Properties: + LayerName: go-telemetry-api-extension-layer + ContentUri: . + LicenseInfo: MIT-0 + RetentionPolicy: Delete + + Environment: + Variables: + DISPATCH_MIN_BATCH_SIZE: 10 From 280bd7ba188ebae11a0d76141f2e219a9945f97d Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Wed, 23 Nov 2022 15:37:13 -0500 Subject: [PATCH 02/11] log json serialization --- go-telemetry-api-extension/go.mod | 9 +- go-telemetry-api-extension/go.sum | 4 + .../telemetryApi/dispatcher.go | 8 +- .../telemetryApi/jsonEncode.go | 190 ++++++++++++++++++ .../telemetryApi/jsonUtil.go | 104 ++++++++++ .../telemetryApi/jsonWriter.go | 64 ++++++ .../telemetryApi/new-relic-logs.go | 102 ++++++++++ .../telemetryApi/send_to_new_relic.go | 21 +- 8 files changed, 489 insertions(+), 13 deletions(-) create mode 100644 go-telemetry-api-extension/telemetryApi/jsonEncode.go create mode 100644 go-telemetry-api-extension/telemetryApi/jsonUtil.go create mode 100644 go-telemetry-api-extension/telemetryApi/jsonWriter.go create mode 100644 go-telemetry-api-extension/telemetryApi/new-relic-logs.go diff --git a/go-telemetry-api-extension/go.mod b/go-telemetry-api-extension/go.mod index 1b8d1ed3..aec2d487 100644 --- a/go-telemetry-api-extension/go.mod +++ b/go-telemetry-api-extension/go.mod @@ -3,8 +3,9 @@ module newrelic-lambda-extension/go-telemetry-api-extension go 1.18 require ( - github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/sirupsen/logrus v1.9.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.0 ) + +require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect diff --git a/go-telemetry-api-extension/go.sum b/go-telemetry-api-extension/go.sum index 47e4990e..d65581a4 100644 --- a/go-telemetry-api-extension/go.sum +++ b/go-telemetry-api-extension/go.sum @@ -1,15 +1,19 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go-telemetry-api-extension/telemetryApi/dispatcher.go b/go-telemetry-api-extension/telemetryApi/dispatcher.go index 99867f2d..1496cbf1 100644 --- a/go-telemetry-api-extension/telemetryApi/dispatcher.go +++ b/go-telemetry-api-extension/telemetryApi/dispatcher.go @@ -9,15 +9,13 @@ import ( "os" "strconv" - "errors" - "github.com/golang-collections/go-datastructures/queue" ) type Dispatcher struct { httpClient *http.Client postUri string - licenseKey string + licenseKey string minBatchSize int64 } @@ -40,7 +38,7 @@ func NewDispatcher() *Dispatcher { return &Dispatcher{ httpClient: &http.Client{}, postUri: dispatchPostUri, - licenseKey: licenseKey, + licenseKey: licenseKey, minBatchSize: dispatchMinBatchSize, } @@ -51,7 +49,7 @@ func (d *Dispatcher) Dispatch(ctx context.Context, logEventsQueue *queue.Queue, l.Info("[dispatcher:Dispatch] Dispatching", logEventsQueue.Len(), "log events") logEntries, _ := logEventsQueue.Get(logEventsQueue.Len()) - err = sendDataToNR(ctx, logEntries, d) + err := sendDataToNR(ctx, logEntries, d) if err != nil { l.Error("[dispatcher:Dispatch] Failed to dispatch, returning to queue:", err) for logEntry := range logEntries { diff --git a/go-telemetry-api-extension/telemetryApi/jsonEncode.go b/go-telemetry-api-extension/telemetryApi/jsonEncode.go new file mode 100644 index 00000000..cff22f8a --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/jsonEncode.go @@ -0,0 +1,190 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package jsonx extends the encoding/json package to encode JSON +// incrementally and without requiring reflection. +package telemetryApi + +import ( + "bytes" + "encoding/json" + "math" + "reflect" + "strconv" + "unicode/utf8" +) + +var hex = "0123456789abcdef" + +// AppendString escapes s appends it to buf. +func AppendString(buf *bytes.Buffer, s string) { + buf.WriteByte('"') + start := 0 + for i := 0; i < len(s); { + if b := s[i]; b < utf8.RuneSelf { + if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' { + i++ + continue + } + if start < i { + buf.WriteString(s[start:i]) + } + switch b { + case '\\', '"': + buf.WriteByte('\\') + buf.WriteByte(b) + case '\n': + buf.WriteByte('\\') + buf.WriteByte('n') + case '\r': + buf.WriteByte('\\') + buf.WriteByte('r') + case '\t': + buf.WriteByte('\\') + buf.WriteByte('t') + default: + // This encodes bytes < 0x20 except for \n and \r, + // as well as <, > and &. The latter are escaped because they + // can lead to security holes when user-controlled strings + // are rendered into JSON and served to some browsers. + buf.WriteString(`\u00`) + buf.WriteByte(hex[b>>4]) + buf.WriteByte(hex[b&0xF]) + } + i++ + start = i + continue + } + c, size := utf8.DecodeRuneInString(s[i:]) + if c == utf8.RuneError && size == 1 { + if start < i { + buf.WriteString(s[start:i]) + } + buf.WriteString(`\ufffd`) + i += size + start = i + continue + } + // U+2028 is LINE SEPARATOR. + // U+2029 is PARAGRAPH SEPARATOR. + // They are both technically valid characters in JSON strings, + // but don't work in JSONP, which has to be evaluated as JavaScript, + // and can lead to security holes there. It is valid JSON to + // escape them, so we do so unconditionally. + // See http://timelessrepo.com/json-isnt-a-javascript-subset for discussion. + if c == '\u2028' || c == '\u2029' { + if start < i { + buf.WriteString(s[start:i]) + } + buf.WriteString(`\u202`) + buf.WriteByte(hex[c&0xF]) + i += size + start = i + continue + } + i += size + } + if start < len(s) { + buf.WriteString(s[start:]) + } + buf.WriteByte('"') +} + +// AppendStringArray appends an array of string literals to buf. +func AppendStringArray(buf *bytes.Buffer, a ...string) { + buf.WriteByte('[') + for i, s := range a { + if i > 0 { + buf.WriteByte(',') + } + AppendString(buf, s) + } + buf.WriteByte(']') +} + +// AppendFloat appends a numeric literal representing the value to buf. +func AppendFloat(buf *bytes.Buffer, x float64) error { + var scratch [64]byte + + if math.IsInf(x, 0) || math.IsNaN(x) { + return &json.UnsupportedValueError{ + Value: reflect.ValueOf(x), + Str: strconv.FormatFloat(x, 'g', -1, 64), + } + } + + buf.Write(strconv.AppendFloat(scratch[:0], x, 'g', -1, 64)) + return nil +} + +// AppendFloat32 appends a numeric literal representingthe value to buf. +func AppendFloat32(buf *bytes.Buffer, x float32) error { + var scratch [64]byte + x64 := float64(x) + + if math.IsInf(x64, 0) || math.IsNaN(x64) { + return &json.UnsupportedValueError{ + Value: reflect.ValueOf(x64), + Str: strconv.FormatFloat(x64, 'g', -1, 32), + } + } + + buf.Write(strconv.AppendFloat(scratch[:0], x64, 'g', -1, 32)) + return nil +} + +// AppendFloatArray appends an array of numeric literals to buf. +func AppendFloatArray(buf *bytes.Buffer, a ...float64) error { + buf.WriteByte('[') + for i, x := range a { + if i > 0 { + buf.WriteByte(',') + } + if err := AppendFloat(buf, x); err != nil { + return err + } + } + buf.WriteByte(']') + return nil +} + +// AppendInt appends a numeric literal representing the value to buf. +func AppendInt(buf *bytes.Buffer, x int64) { + var scratch [64]byte + buf.Write(strconv.AppendInt(scratch[:0], x, 10)) +} + +// AppendIntArray appends an array of numeric literals to buf. +func AppendIntArray(buf *bytes.Buffer, a ...int64) { + var scratch [64]byte + + buf.WriteByte('[') + for i, x := range a { + if i > 0 { + buf.WriteByte(',') + } + buf.Write(strconv.AppendInt(scratch[:0], x, 10)) + } + buf.WriteByte(']') +} + +// AppendUint appends a numeric literal representing the value to buf. +func AppendUint(buf *bytes.Buffer, x uint64) { + var scratch [64]byte + buf.Write(strconv.AppendUint(scratch[:0], x, 10)) +} + +// AppendUintArray appends an array of numeric literals to buf. +func AppendUintArray(buf *bytes.Buffer, a ...uint64) { + var scratch [64]byte + + buf.WriteByte('[') + for i, x := range a { + if i > 0 { + buf.WriteByte(',') + } + buf.Write(strconv.AppendUint(scratch[:0], x, 10)) + } + buf.WriteByte(']') +} diff --git a/go-telemetry-api-extension/telemetryApi/jsonUtil.go b/go-telemetry-api-extension/telemetryApi/jsonUtil.go new file mode 100644 index 00000000..6e555776 --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/jsonUtil.go @@ -0,0 +1,104 @@ +package telemetryApi + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" +) + +// jsonString assists in logging JSON: Based on the formatter used to log +// Context contents, the contents could be marshalled as JSON or just printed +// directly. +type jsonString string + +// MarshalJSON returns the jsonString unmodified without any escaping. +func (js jsonString) MarshalJSON() ([]byte, error) { + if "" == js { + return []byte("null"), nil + } + return []byte(js), nil +} + +func removeFirstSegment(name string) string { + idx := strings.Index(name, "/") + if -1 == idx { + return name + } + return name[idx+1:] +} + +func timeToIntMillis(t time.Time) int64 { + return t.UnixNano() / (1000 * 1000) +} + +func timeToFloatMilliseconds(t time.Time) float64 { + return float64(t.UnixNano()) / float64(1000*1000) +} + +// compactJSONString removes the whitespace from a JSON string. This function +// will panic if the string provided is not valid JSON. Thus is must only be +// used in testing code! +func compactJSONString(js string) string { + buf := new(bytes.Buffer) + if err := json.Compact(buf, []byte(js)); err != nil { + panic(fmt.Errorf("unable to compact JSON: %v", err)) + } + return buf.String() +} + +// getContentLengthFromHeader gets the content length from a HTTP header, or -1 +// if no content length is available. +func getContentLengthFromHeader(h http.Header) int64 { + if cl := h.Get("Content-Length"); cl != "" { + if contentLength, err := strconv.ParseInt(cl, 10, 64); err == nil { + return contentLength + } + } + + return -1 +} + +// stringLengthByteLimit truncates strings using a byte-limit boundary and +// avoids terminating in the middle of a multibyte character. +func stringLengthByteLimit(str string, byteLimit int) string { + if len(str) <= byteLimit { + return str + } + + limitIndex := 0 + for pos := range str { + if pos > byteLimit { + break + } + limitIndex = pos + } + return str[0:limitIndex] +} + +func timeFromUnixMilliseconds(millis uint64) time.Time { + secs := int64(millis) / 1000 + msecsRemaining := int64(millis) % 1000 + nsecsRemaining := msecsRemaining * (1000 * 1000) + return time.Unix(secs, nsecsRemaining) +} + +// timeToUnixMilliseconds converts a time into a Unix timestamp in millisecond +// units. +func timeToUnixMilliseconds(tm time.Time) uint64 { + return uint64(tm.UnixNano()) / uint64(1000*1000) +} + +// minorVersion takes a given version string and returns only the major and +// minor portions of it. If the input is malformed, it returns the input +// untouched. +func minorVersion(v string) string { + split := strings.SplitN(v, ".", 3) + if len(split) < 2 { + return v + } + return split[0] + "." + split[1] +} diff --git a/go-telemetry-api-extension/telemetryApi/jsonWriter.go b/go-telemetry-api-extension/telemetryApi/jsonWriter.go new file mode 100644 index 00000000..4381d16a --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/jsonWriter.go @@ -0,0 +1,64 @@ +package telemetryApi + +import ( + "bytes" +) + +type jsonWriter interface { + WriteJSON(buf *bytes.Buffer) +} + +type jsonFieldsWriter struct { + buf *bytes.Buffer + needsComma bool +} + +func (w *jsonFieldsWriter) addKey(key string) { + if w.needsComma { + w.buf.WriteByte(',') + } else { + w.needsComma = true + } + // defensively assume that the key needs escaping: + AppendString(w.buf, key) + w.buf.WriteByte(':') +} + +func (w *jsonFieldsWriter) stringField(key string, val string) { + w.addKey(key) + AppendString(w.buf, val) +} + +func (w *jsonFieldsWriter) intField(key string, val int64) { + w.addKey(key) + AppendInt(w.buf, val) +} + +func (w *jsonFieldsWriter) floatField(key string, val float64) { + w.addKey(key) + AppendFloat(w.buf, val) +} + +func (w *jsonFieldsWriter) float32Field(key string, val float32) { + w.addKey(key) + AppendFloat32(w.buf, val) +} + +func (w *jsonFieldsWriter) boolField(key string, val bool) { + w.addKey(key) + if val { + w.buf.WriteString("true") + } else { + w.buf.WriteString("false") + } +} + +func (w *jsonFieldsWriter) rawField(key string, val jsonString) { + w.addKey(key) + w.buf.WriteString(string(val)) +} + +func (w *jsonFieldsWriter) writerField(key string, val jsonWriter) { + w.addKey(key) + val.WriteJSON(w.buf) +} diff --git a/go-telemetry-api-extension/telemetryApi/new-relic-logs.go b/go-telemetry-api-extension/telemetryApi/new-relic-logs.go new file mode 100644 index 00000000..d2a8e5f8 --- /dev/null +++ b/go-telemetry-api-extension/telemetryApi/new-relic-logs.go @@ -0,0 +1,102 @@ +package telemetryApi + +import ( + "bytes" +) + +const ( + // LogLevelFieldName is the name of the log level field in New Relic logging JSON + LogLevelFieldName = "level" + + // LogMessageFieldName is the name of the log message field in New Relic logging JSON + LogMessageFieldName = "message" + + // LogTimestampFieldName is the name of the timestamp field in New Relic logging JSON + LogTimestampFieldName = "timestamp" + + // LogSpanIDFieldName is the name of the span ID field in the New Relic logging JSON + LogSpanIDFieldName = "span.id" + + // LogTraceIDFieldName is the name of the trace ID field in the New Relic logging JSON + LogTraceIDFieldName = "trace.id" + + // LogSeverityUnknown is the value the log severity should be set to if no log severity is known + LogSeverityUnknown = "UNKNOWN" + + // JSON Attribute Constants + HostnameAttributeKey = "hostname" + EntityNameAttributeKey = "entity.name" + entityGUIDAttributeKey = "entity.guid" + + MaxLogLength = 32768 +) + +type LogPayload struct { + *bytes.Buffer + done bool +} + +// NewLogLine creates an object for processing a single log line and sending it to New Relic +func NewLogPayload(commonAttributes map[string]string) *LogPayload { + buf := bytes.NewBuffer([]byte{}) + buf.WriteByte('[') + buf.WriteByte('{') + buf.WriteString(`"common":`) + buf.WriteByte('{') + buf.WriteString(`"attributes":`) + buf.WriteByte('{') + + for name, value := range commonAttributes { + name = "\"" + name + "\":" + buf.WriteString(name) + AppendString(buf, value) + buf.WriteByte(',') + } + buf.WriteByte('}') + buf.WriteByte('}') + buf.WriteByte(',') + buf.WriteString(`"logs":`) + buf.WriteByte('[') + + return &LogPayload{Buffer: buf} +} + +// AddLogLine prepares a Log Event JSON object in the format expected by the collector. +// Timestamp must be unix millisecond time +func (buf *LogPayload) AddLogLine(Timestamp int64, Level, Message string) { + if buf.done { + return + } + + if Level == "" { + Level = LogSeverityUnknown + } + + if len(Message) > MaxLogLength { + Message = Message[:MaxLogLength] + } + + w := jsonFieldsWriter{buf: buf.Buffer} + buf.WriteByte('{') + w.stringField(LogLevelFieldName, Level) + w.stringField(LogMessageFieldName, Message) + + w.needsComma = false + buf.WriteByte(',') + w.intField(LogTimestampFieldName, Timestamp) + buf.WriteByte('}') +} + +func (buf *LogPayload) Marshal() []byte { + if buf.done { + return buf.Bytes() + } + + // prevent Duplication of JSON closure + buf.done = true + + buf.WriteByte(']') + buf.WriteByte('}') + buf.WriteByte(']') + return buf.Bytes() +} diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index 426adb63..e08a76af 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -5,13 +5,26 @@ import ( "context" "encoding/json" "fmt" - "errors" - "net/http" -// "github.com/pkg/errors" + "net/http" ) -func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) (error) { +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { + + /* + logAttributes := map[string]string{} + logAttributes[HostnameAttributeKey] = "a host name" + logAttributes["mykey"] = "my value" + + payload := NewLogPayload(logAttributes) + + for _, logLine := range logEntries{ + // do some processing and add line to payload + payload.AddLogLine(time.Now().UnixMilli(), "debug", "message") + } + + bodyBytes := payload.Marshal() + */ bodyBytes, _ := json.Marshal(map[string]string{"message": fmt.Sprintf("%v", logEntries)}) req, err := http.NewRequestWithContext(ctx, "POST", d.postUri, bytes.NewBuffer(bodyBytes)) From f8bf420343c08382bd6fe70b8a149b57b56fafb9 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Thu, 1 Dec 2022 01:53:41 +0000 Subject: [PATCH 03/11] send_to_new_relic translated from python --- .../extensionApi/client.go | 3 + go-telemetry-api-extension/go.mod | 5 +- go-telemetry-api-extension/go.sum | 2 + go-telemetry-api-extension/main.go | 3 +- .../telemetryApi/client.go | 4 +- .../telemetryApi/dispatcher.go | 4 +- .../telemetryApi/listener.go | 1 - .../telemetryApi/send_to_new_relic.go | 222 ++++++++++++++++-- 8 files changed, 221 insertions(+), 23 deletions(-) diff --git a/go-telemetry-api-extension/extensionApi/client.go b/go-telemetry-api-extension/extensionApi/client.go index a336ad7f..442cfd73 100644 --- a/go-telemetry-api-extension/extensionApi/client.go +++ b/go-telemetry-api-extension/extensionApi/client.go @@ -62,6 +62,7 @@ type Client struct { httpClient *http.Client baseUrl string ExtensionID string + functionName string } var l = log.WithFields(log.Fields{"pkg": "extensionApi"}) @@ -77,6 +78,7 @@ func NewClient() *Client { // Registers the extension with Extensions API func (e *Client) Register(ctx context.Context, extensionName string) (string, error) { + const action = "/register" url := e.baseUrl + action @@ -117,6 +119,7 @@ func (e *Client) Register(ctx context.Context, extensionName string) (string, er return "", err } + e.functionName = res.FunctionName e.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader) l.Info("[client:Register] Registration success with extensionId ", e.ExtensionID) return e.ExtensionID, nil diff --git a/go-telemetry-api-extension/go.mod b/go-telemetry-api-extension/go.mod index aec2d487..925d2dc7 100644 --- a/go-telemetry-api-extension/go.mod +++ b/go-telemetry-api-extension/go.mod @@ -8,4 +8,7 @@ require ( github.com/sirupsen/logrus v1.9.0 ) -require golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +require ( + github.com/google/uuid v1.3.0 // indirect + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/go-telemetry-api-extension/go.sum b/go-telemetry-api-extension/go.sum index d65581a4..d6d3e285 100644 --- a/go-telemetry-api-extension/go.sum +++ b/go-telemetry-api-extension/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/go-telemetry-api-extension/main.go b/go-telemetry-api-extension/main.go index a8bd61a0..92e9899e 100644 --- a/go-telemetry-api-extension/main.go +++ b/go-telemetry-api-extension/main.go @@ -65,8 +65,7 @@ func main() { panic(err) } l.Info("[main] Subscription success") - - dispatcher := telemetryApi.NewDispatcher() + dispatcher := telemetryApi.NewDispatcher(extensionApiClient.functionName) // Will block until invoke or shutdown event is received or cancelled via the context. for { diff --git a/go-telemetry-api-extension/telemetryApi/client.go b/go-telemetry-api-extension/telemetryApi/client.go index 5b7ff301..d601be13 100644 --- a/go-telemetry-api-extension/telemetryApi/client.go +++ b/go-telemetry-api-extension/telemetryApi/client.go @@ -117,8 +117,8 @@ type SubscribeResponse struct { func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) { eventTypes := []EventType{ Platform, - // Function, - // Extension, + Function, + Extension, } bufferingConfig := BufferingCfg{ diff --git a/go-telemetry-api-extension/telemetryApi/dispatcher.go b/go-telemetry-api-extension/telemetryApi/dispatcher.go index 1496cbf1..1d0b3916 100644 --- a/go-telemetry-api-extension/telemetryApi/dispatcher.go +++ b/go-telemetry-api-extension/telemetryApi/dispatcher.go @@ -17,9 +17,10 @@ type Dispatcher struct { postUri string licenseKey string minBatchSize int64 + functionName string } -func NewDispatcher() *Dispatcher { +func NewDispatcher(functionName string) *Dispatcher { dispatchPostUri := os.Getenv("DISPATCH_POST_URI") if len(dispatchPostUri) == 0 { panic("dispatchPostUri undefined") @@ -40,6 +41,7 @@ func NewDispatcher() *Dispatcher { postUri: dispatchPostUri, licenseKey: licenseKey, minBatchSize: dispatchMinBatchSize, + functionName: functionName, } } diff --git a/go-telemetry-api-extension/telemetryApi/listener.go b/go-telemetry-api-extension/telemetryApi/listener.go index 69f98b71..eb60ee9c 100644 --- a/go-telemetry-api-extension/telemetryApi/listener.go +++ b/go-telemetry-api-extension/telemetryApi/listener.go @@ -74,7 +74,6 @@ func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Reque l.Error("[listener:http_handler] Error reading body:", err) return } - // Parse and put the log messages into the queue var slice []interface{} _ = json.Unmarshal(body, &slice) diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index e08a76af..00894b17 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -5,35 +5,225 @@ import ( "context" "encoding/json" "fmt" - + "os" + "path" + "reflect" + "strings" "net/http" + "strconv" + "github.com/google/uuid" ) -func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { +const ( + LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" + LogEndpointUS string = "https://log-api.newrelic.com/log/v1" - /* - logAttributes := map[string]string{} - logAttributes[HostnameAttributeKey] = "a host name" - logAttributes["mykey"] = "my value" + MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" + MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" - payload := NewLogPayload(logAttributes) + EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" + EventsEndpointUS string = "https://insights-collector.newrelic.com" - for _, logLine := range logEntries{ - // do some processing and add line to payload - payload.AddLogLine(time.Now().UnixMilli(), "debug", "message") - } + TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" + TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" +) - bodyBytes := payload.Marshal() - */ +func getEndpointURL(licenseKey string, typ string, EndpointOverride string) string { + if EndpointOverride != "" { + return EndpointOverride + } + switch typ { + case "logging": + if strings.HasPrefix(licenseKey, "eu") { + return LogEndpointEU + } else { + return LogEndpointUS + } + case "metrics": + if strings.HasPrefix(licenseKey, "eu") { + return MetricsEndpointEU + } else { + return MetricsEndpointUS + } + case "events": + if strings.HasPrefix(licenseKey, "eu") { + return EventsEndpointEU + } else { + return EventsEndpointUS + } + case "traces": + if strings.HasPrefix(licenseKey, "eu") { + return TracesEndpointEU + } else { + return TracesEndpointUS + } + } + return "" +} - bodyBytes, _ := json.Marshal(map[string]string{"message": fmt.Sprintf("%v", logEntries)}) - req, err := http.NewRequestWithContext(ctx, "POST", d.postUri, bytes.NewBuffer(bodyBytes)) +func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) error { + req, err := http.NewRequestWithContext(ctx, "POST", uri, bytes.NewBuffer(bodyBytes)) if err != nil { - panic(err) + return err } +// the headers might be different for different endpoints req.Header.Set("Content-Type", "application/json") req.Header.Set("Api-Key", d.licenseKey) _, err = d.httpClient.Do(req) return err } + +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { + + // will be replaced later + var lambda_name = "---" +// should be as below +// var lambda_name = d.functionName + var agent_name = path.Base(os.Args[0]) + + // NB "." is not allowed in NR eventType + var replacer = strings.NewReplacer(".", "_") + + data := make(map[string][]string) +// data := make(map[string][]interface{}) + data["events"] = []string{} + data["traces"] = []string{} + data["logging"] = []string{} + data["metrics"] = []string{} + + for _, event := range logEntries { +// do some processing and add line to payload +// payload.AddLogLine(time.Now().UnixMilli(), "debug", "message") + + msInt, err := strconv.ParseInt(event["time"], 10, 64) + if err != nil { + return err + } + // events + data["events"] := append(data["events"], fmt.Sprintf("%v",`{ + "timestamp": time.UnixMilli(msInt) + "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) + }`)) + + // logging + if val, ok := event["record"]; ok { + if len(val) > 0 { + data["logging"] := append(data["logging"], fmt.Sprintf("%v",`{ + "timestamp": time.UnixMilli(msInt), + "message": event["record"], + "attributes": { + "aws": { + "event": event["type"], + "lambda": lambda_name + } + } + }`)) + } + } + // metrics + if reflect.ValueOf(event["record"]).Kind() == reflect.Map && val, ok := event["record"]["metrics"]; ok { + mts := [...]string{} + for key := range val { + mts := appand(mts, fmt.Sprintf("%v",`{ + "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, + "value": event["record"]["metrics"][key] + }`)) + } + rid := "" + if val, ok := event["record"]["requestId"]; ok { + rid = val + } + data["metrics"] := append(data["metrics"], fmt.Sprintf("%v",`{ + "common" : { + "timestamp": time.UnixMilli(msInt), + "attributes": { + "event": event["type"], + "requestId": rid, + "extension": agent_name + } + }, + "metrics": mts + }`)) + } + // spans + if (reflect.ValueOf(event["record"]).Kind() == reflect.Map) && val, ok := event["record"]["spans"]; ok { + spans := [...]string{} + for span := range val { + el := `{ + "trace.id": event["record"]["requestId"], + "id": uuid.New().String(), + "attributes": { + "event": event["type"], + "service.name": agent_name + } + }` + start, err := strconv.ParseInt(event["start"], 10, 64) + if err != nil { + return err + } + for key := range span { + if (key == "durationMs") { + el["attributes"]["duration.ms"] := span[key] + } else if (key =="start") { + el["timestamp"] := time.UnixMilli(start) + } else { + el["attributes"][key] := span[key] + } + } + data["traces"] := append(data["traces"], fmt.Sprintf("%v",el)) + } + } + } + // data ready + if (len(data) > 0) { +// send logs + if (len(data["logging"]) > 0) { +//bodyBytes := payload.Marshal() +//bodyBytes, _ := json.Marshal(map[string]string{"message": fmt.Sprintf("%v", logEntries)}) + dt := NewLogPayload(`{ + "common": { + "attributes": { + "aws": { + "logType": "aws/lambda-ext", + "function": lambda_name, + "extension": agent_name + } + } + }, + "logs": data["logging"] + }`) + bodyBytes := dt.Marshal() +fmt.Println(reflect.TypeOf(bodyBytes)) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"logging"), bodyBytes) + } +// send metrics + if (len(data["metrics"]) > 0) { + for payload := range data["metrics"] { + bodyBytes := NewLogPayload(payload).Marshal() + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"metrics"), bodyBytes) + } + } +// send events + if (len(data["events"]) > 0) { + bodyBytes := NewLogPayload(data["events"]).Marshal() + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"events"), bodyBytes) + } +// send traces + if (len(data["traces"]) > 0) { + dt := NewLogPayload(`{ + "common": { + "attributes": { + "host": "aws.amazon.com", + "service.name": lambda_name + } + }, + "spans": data["traces"] + }`) + bodyBytes := dt.Marshal() + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"traces"), bodyBytes) + } + } + + return err // if one of the sents failed, it'd be nice to know which +} From d744654aff95f4608062d793af2fce8fae8c91e3 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Fri, 2 Dec 2022 19:40:52 +0000 Subject: [PATCH 04/11] env vars added to the template --- go-telemetry-api-extension/template.yaml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/go-telemetry-api-extension/template.yaml b/go-telemetry-api-extension/template.yaml index c241961f..5f3754e9 100644 --- a/go-telemetry-api-extension/template.yaml +++ b/go-telemetry-api-extension/template.yaml @@ -2,6 +2,11 @@ AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 Description: go-telemetry-api-extension +Parameters: + LICENSE_KEY: + Type: String + Description: Your New Relic ingest key. + Resources: GoTelemetryApiExtensionLayer: Type: AWS::Serverless::LayerVersion @@ -16,3 +21,6 @@ Resources: Environment: Variables: DISPATCH_MIN_BATCH_SIZE: 10 + DEBUG_LOGGING_ENABLED: True + LOGGING_ENABLED: True + NEW_RELIC_LICENSE_KEY: !Sub ${LICENSE_KEY} From b34804f4f65c18b322eebffc1e66bf85efef0975 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Mon, 5 Dec 2022 19:52:29 +0000 Subject: [PATCH 05/11] tmp --- .../telemetryApi/send_to_new_relic.go | 80 ++++++++++--------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index 00894b17..4d5326ce 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -11,6 +11,7 @@ import ( "strings" "net/http" "strconv" + "time" "github.com/google/uuid" ) @@ -76,7 +77,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { - // will be replaced later +// will be replaced later var lambda_name = "---" // should be as below // var lambda_name = d.functionName @@ -85,32 +86,38 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) // NB "." is not allowed in NR eventType var replacer = strings.NewReplacer(".", "_") - data := make(map[string][]string) -// data := make(map[string][]interface{}) - data["events"] = []string{} - data["traces"] = []string{} - data["logging"] = []string{} - data["metrics"] = []string{} + data := make(map[string][]interface{}) + data["events"] := []map[string]interface{}{} + data["traces"] := []map[string]interface{}{} + data["logging"] := []map[string]interface{}{} + data["metrics"] := []map[string]interface{}{} - for _, event := range logEntries { -// do some processing and add line to payload -// payload.AddLogLine(time.Now().UnixMilli(), "debug", "message") + var bb map[string]any + var event map[string]any - msInt, err := strconv.ParseInt(event["time"], 10, 64) + for _, ev := range logEntries { + json.Unmarshal([]byte(ev), &event) + msInt, err := time.Parse(time.RFC3339, event["time"]) if err != nil { return err } // events - data["events"] := append(data["events"], fmt.Sprintf("%v",`{ - "timestamp": time.UnixMilli(msInt) + json.Unmarshal([]byte(`{ + "timestamp": msInt.UnixMilli() + "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) + }`), &bb) + data["events"] := append(data["events"], bb) +/* + data["events"] := append(data["events"], map[string]interface{}(`{ + "timestamp": msInt.UnixMilli() "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) }`)) - +*/ // logging if val, ok := event["record"]; ok { if len(val) > 0 { - data["logging"] := append(data["logging"], fmt.Sprintf("%v",`{ - "timestamp": time.UnixMilli(msInt), + data["logging"] := append(data["logging"], map[string]interface{}(`{ + "timestamp": msInt.UnixMilli(), "message": event["record"], "attributes": { "aws": { @@ -123,9 +130,9 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) } // metrics if reflect.ValueOf(event["record"]).Kind() == reflect.Map && val, ok := event["record"]["metrics"]; ok { - mts := [...]string{} + mts := []map[string]interface{}{} for key := range val { - mts := appand(mts, fmt.Sprintf("%v",`{ + mts := appand(mts, map[string]interface{}(`{ "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, "value": event["record"]["metrics"][key] }`)) @@ -134,9 +141,9 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) if val, ok := event["record"]["requestId"]; ok { rid = val } - data["metrics"] := append(data["metrics"], fmt.Sprintf("%v",`{ + data["metrics"] := append(data["metrics"], map[string]interface{}(`{ "common" : { - "timestamp": time.UnixMilli(msInt), + "timestamp": msInt.UnixMilli(), "attributes": { "event": event["type"], "requestId": rid, @@ -158,7 +165,7 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) "service.name": agent_name } }` - start, err := strconv.ParseInt(event["start"], 10, 64) + start, err := time.Parse(time.RFC3339, event["time"]) if err != nil { return err } @@ -166,22 +173,21 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) if (key == "durationMs") { el["attributes"]["duration.ms"] := span[key] } else if (key =="start") { - el["timestamp"] := time.UnixMilli(start) + el["timestamp"] := start.UnixMilli() } else { el["attributes"][key] := span[key] } } - data["traces"] := append(data["traces"], fmt.Sprintf("%v",el)) + data["traces"] := append(data["traces"], el) } } } // data ready if (len(data) > 0) { -// send logs + // send logs if (len(data["logging"]) > 0) { -//bodyBytes := payload.Marshal() -//bodyBytes, _ := json.Marshal(map[string]string{"message": fmt.Sprintf("%v", logEntries)}) - dt := NewLogPayload(`{ +// bodyBytes, _ := json.Marshal(map[string]interface{}(`{ + bodyBytes := `{ "common": { "attributes": { "aws": { @@ -192,26 +198,25 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) } }, "logs": data["logging"] - }`) - bodyBytes := dt.Marshal() -fmt.Println(reflect.TypeOf(bodyBytes)) + }` +// }`)) err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"logging"), bodyBytes) } -// send metrics + // send metrics if (len(data["metrics"]) > 0) { for payload := range data["metrics"] { - bodyBytes := NewLogPayload(payload).Marshal() + bodyBytes, _ := json.Marshal(payload) err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"metrics"), bodyBytes) } } -// send events + // send events if (len(data["events"]) > 0) { - bodyBytes := NewLogPayload(data["events"]).Marshal() + bodyBytes, _ := json.Marshal(data["events"]) err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"events"), bodyBytes) } -// send traces + // send traces if (len(data["traces"]) > 0) { - dt := NewLogPayload(`{ + bodyBytes, _ := json.Marshal(map[string]interface{}(`{ "common": { "attributes": { "host": "aws.amazon.com", @@ -219,8 +224,7 @@ fmt.Println(reflect.TypeOf(bodyBytes)) } }, "spans": data["traces"] - }`) - bodyBytes := dt.Marshal() + }`)) err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"traces"), bodyBytes) } } From 709a10ada4ad7174dca72b1ce1de3797e6f3ceaf Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Mon, 5 Dec 2022 16:01:33 -0500 Subject: [PATCH 06/11] collab changes 12/5 --- .../extensionApi/client.go | 10 +- go-telemetry-api-extension/main.go | 4 +- .../telemetryApi/listener.go | 8 +- .../telemetryApi/send_to_new_relic.go | 280 ++++++++++-------- 4 files changed, 166 insertions(+), 136 deletions(-) diff --git a/go-telemetry-api-extension/extensionApi/client.go b/go-telemetry-api-extension/extensionApi/client.go index 442cfd73..489f82a4 100644 --- a/go-telemetry-api-extension/extensionApi/client.go +++ b/go-telemetry-api-extension/extensionApi/client.go @@ -59,12 +59,16 @@ const ( // Client is a simple client for the Lambda Extensions API type Client struct { - httpClient *http.Client - baseUrl string - ExtensionID string + httpClient *http.Client + baseUrl string + ExtensionID string functionName string } +func (e *Client) GetFunctionName() string { + return e.functionName +} + var l = log.WithFields(log.Fields{"pkg": "extensionApi"}) // Returns a Lambda Extensions API client diff --git a/go-telemetry-api-extension/main.go b/go-telemetry-api-extension/main.go index 92e9899e..132e3ef7 100644 --- a/go-telemetry-api-extension/main.go +++ b/go-telemetry-api-extension/main.go @@ -13,9 +13,9 @@ Notes: package main import ( + "context" "newrelic-lambda-extension/go-telemetry-api-extension/extensionApi" "newrelic-lambda-extension/go-telemetry-api-extension/telemetryApi" - "context" "os" "os/signal" "path" @@ -65,7 +65,7 @@ func main() { panic(err) } l.Info("[main] Subscription success") - dispatcher := telemetryApi.NewDispatcher(extensionApiClient.functionName) + dispatcher := telemetryApi.NewDispatcher(extensionApiClient.GetFunctionName()) // Will block until invoke or shutdown event is received or cancelled via the context. for { diff --git a/go-telemetry-api-extension/telemetryApi/listener.go b/go-telemetry-api-extension/telemetryApi/listener.go index eb60ee9c..edebbcc4 100644 --- a/go-telemetry-api-extension/telemetryApi/listener.go +++ b/go-telemetry-api-extension/telemetryApi/listener.go @@ -75,7 +75,7 @@ func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Reque return } // Parse and put the log messages into the queue - var slice []interface{} + var slice []LambdaTelemetryEvent _ = json.Unmarshal(body, &slice) for _, el := range slice { @@ -98,3 +98,9 @@ func (s *TelemetryApiListener) Shutdown() { } } } + +type LambdaTelemetryEvent struct { + Time string + Type string + Record any +} diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index 4d5326ce..eef2bc3b 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -4,60 +4,57 @@ import ( "bytes" "context" "encoding/json" - "fmt" + "net/http" "os" "path" "reflect" "strings" - "net/http" - "strconv" "time" - "github.com/google/uuid" ) const ( - LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" - LogEndpointUS string = "https://log-api.newrelic.com/log/v1" + LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" + LogEndpointUS string = "https://log-api.newrelic.com/log/v1" - MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" - MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" + MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" + MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" - EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" - EventsEndpointUS string = "https://insights-collector.newrelic.com" + EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" + EventsEndpointUS string = "https://insights-collector.newrelic.com" - TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" - TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" + TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" + TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" ) func getEndpointURL(licenseKey string, typ string, EndpointOverride string) string { - if EndpointOverride != "" { - return EndpointOverride - } + if EndpointOverride != "" { + return EndpointOverride + } switch typ { - case "logging": - if strings.HasPrefix(licenseKey, "eu") { - return LogEndpointEU - } else { - return LogEndpointUS - } - case "metrics": - if strings.HasPrefix(licenseKey, "eu") { - return MetricsEndpointEU - } else { - return MetricsEndpointUS - } - case "events": - if strings.HasPrefix(licenseKey, "eu") { - return EventsEndpointEU - } else { - return EventsEndpointUS - } - case "traces": - if strings.HasPrefix(licenseKey, "eu") { - return TracesEndpointEU - } else { - return TracesEndpointUS - } + case "logging": + if strings.HasPrefix(licenseKey, "eu") { + return LogEndpointEU + } else { + return LogEndpointUS + } + case "metrics": + if strings.HasPrefix(licenseKey, "eu") { + return MetricsEndpointEU + } else { + return MetricsEndpointUS + } + case "events": + if strings.HasPrefix(licenseKey, "eu") { + return EventsEndpointEU + } else { + return EventsEndpointUS + } + case "traces": + if strings.HasPrefix(licenseKey, "eu") { + return TracesEndpointEU + } else { + return TracesEndpointUS + } } return "" } @@ -67,7 +64,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) if err != nil { return err } -// the headers might be different for different endpoints + // the headers might be different for different endpoints req.Header.Set("Content-Type", "application/json") req.Header.Set("Api-Key", d.licenseKey) _, err = d.httpClient.Do(req) @@ -75,29 +72,29 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) return err } -func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { +func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dispatcher) error { -// will be replaced later - var lambda_name = "---" -// should be as below -// var lambda_name = d.functionName + // will be replaced later + var lambda_name = "---" + // should be as below + // var lambda_name = d.functionName var agent_name = path.Base(os.Args[0]) // NB "." is not allowed in NR eventType var replacer = strings.NewReplacer(".", "_") - data := make(map[string][]interface{}) - data["events"] := []map[string]interface{}{} - data["traces"] := []map[string]interface{}{} - data["logging"] := []map[string]interface{}{} - data["metrics"] := []map[string]interface{}{} + data := make(map[string][]map[string]interface{}) + data["events"] = []map[string]interface{}{} + data["traces"] = []map[string]interface{}{} + data["logging"] = []map[string]interface{}{} + data["metrics"] = []map[string]interface{}{} var bb map[string]any - var event map[string]any + // var event map[string]any - for _, ev := range logEntries { - json.Unmarshal([]byte(ev), &event) - msInt, err := time.Parse(time.RFC3339, event["time"]) + for _, event := range logEntries { + //json.Unmarshal([]byte(ev), &event) + msInt, err := time.Parse(time.RFC3339, event.Time) if err != nil { return err } @@ -106,87 +103,110 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) "timestamp": msInt.UnixMilli() "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) }`), &bb) - data["events"] := append(data["events"], bb) -/* - data["events"] := append(data["events"], map[string]interface{}(`{ - "timestamp": msInt.UnixMilli() - "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) - }`)) -*/ - // logging - if val, ok := event["record"]; ok { - if len(val) > 0 { - data["logging"] := append(data["logging"], map[string]interface{}(`{ - "timestamp": msInt.UnixMilli(), - "message": event["record"], - "attributes": { - "aws": { - "event": event["type"], - "lambda": lambda_name - } - } - }`)) - } + data["events"] = append(data["events"], bb) + + data["events"] = append(data["events"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "eventType": "Lambda_Ext_" + replacer.Replace(event.Type), + }) + + switch event.Type { + case "platform.iniStart": + + case "platform.iniRuntimeDone": + + case "platform.iniReport": + + case "platform.start": + + case "platform.runtimeDone": + + case "platform.report": + + case "platform.extension": + + case "platform.telemetrySubscription": + + case "platform.logsDropped": + + } + + if event.Record != nil { + data["logging"] = append(data["logging"], map[string]interface{}{ + "timestamp": msInt.UnixMilli(), + "message": event.Record, + "attributes": map[string]map[string]string{ + "aws": { + "event": event.Type, + "lambda": lambda_name, + }, + }, + }) } + // metrics - if reflect.ValueOf(event["record"]).Kind() == reflect.Map && val, ok := event["record"]["metrics"]; ok { - mts := []map[string]interface{}{} - for key := range val { - mts := appand(mts, map[string]interface{}(`{ - "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, - "value": event["record"]["metrics"][key] + if event.Record != nil { + if val, ok := event.Record["metrics"]; ok { + mts := []map[string]interface{}{} + for key := range val { + mts := appand(mts, map[string]interface{}(`{ + "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, + "value": event["record"]["metrics"][key] + }`)) + } + rid := "" + if val, ok := event["record"]["requestId"]; ok { + rid = val + } + data["metrics"] = append(data["metrics"], map[string]interface{}(`{ + "common" : { + "timestamp": msInt.UnixMilli(), + "attributes": { + "event": event["type"], + "requestId": rid, + "extension": agent_name + } + }, + "metrics": mts }`)) } - rid := "" - if val, ok := event["record"]["requestId"]; ok { - rid = val - } - data["metrics"] := append(data["metrics"], map[string]interface{}(`{ - "common" : { - "timestamp": msInt.UnixMilli(), - "attributes": { - "event": event["type"], - "requestId": rid, - "extension": agent_name - } - }, - "metrics": mts - }`)) } // spans - if (reflect.ValueOf(event["record"]).Kind() == reflect.Map) && val, ok := event["record"]["spans"]; ok { - spans := [...]string{} - for span := range val { - el := `{ - "trace.id": event["record"]["requestId"], - "id": uuid.New().String(), - "attributes": { - "event": event["type"], - "service.name": agent_name + if reflect.ValueOf(event["record"]).Kind() == reflect.Map { + if val, ok := event["record"]["spans"]; ok { + spans := [...]string{} + for span := range val { + el := `{ + "trace.id": event["record"]["requestId"], + "id": uuid.New().String(), + "attributes": { + "event": event["type"], + "service.name": agent_name + } + }` + start, err := time.Parse(time.RFC3339, event["time"]) + if err != nil { + return err + } + for key := range span { + if key == "durationMs" { + el["attributes"]["duration.ms"] = span[key] + } else if key == "start" { + el["timestamp"] = start.UnixMilli() + } else { + el["attributes"][key] = span[key] } - }` - start, err := time.Parse(time.RFC3339, event["time"]) - if err != nil { - return err - } - for key := range span { - if (key == "durationMs") { - el["attributes"]["duration.ms"] := span[key] - } else if (key =="start") { - el["timestamp"] := start.UnixMilli() - } else { - el["attributes"][key] := span[key] } + data["traces"] = append(data["traces"], el) } - data["traces"] := append(data["traces"], el) } } } // data ready - if (len(data) > 0) { + if len(data) > 0 { // send logs - if (len(data["logging"]) > 0) { -// bodyBytes, _ := json.Marshal(map[string]interface{}(`{ + if len(data["logging"]) > 0 { + // bodyBytes, _ := json.Marshal(map[string]interface{}(`{ bodyBytes := `{ "common": { "attributes": { @@ -199,23 +219,23 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) }, "logs": data["logging"] }` -// }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"logging"), bodyBytes) + // }`)) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "logging"), bodyBytes) } // send metrics - if (len(data["metrics"]) > 0) { + if len(data["metrics"]) > 0 { for payload := range data["metrics"] { bodyBytes, _ := json.Marshal(payload) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"metrics"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "metrics"), bodyBytes) } } // send events - if (len(data["events"]) > 0) { + if len(data["events"]) > 0 { bodyBytes, _ := json.Marshal(data["events"]) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"events"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events"), bodyBytes) } // send traces - if (len(data["traces"]) > 0) { + if len(data["traces"]) > 0 { bodyBytes, _ := json.Marshal(map[string]interface{}(`{ "common": { "attributes": { @@ -225,9 +245,9 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) }, "spans": data["traces"] }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey,"traces"), bodyBytes) + err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "traces"), bodyBytes) } } - return err // if one of the sents failed, it'd be nice to know which + return err // if one of the sents failed, it'd be nice to know which } From 98f01300ea39a64b53effec1946f2739fd0faded Mon Sep 17 00:00:00 2001 From: Emilio Garcia Date: Mon, 5 Dec 2022 16:07:15 -0500 Subject: [PATCH 07/11] interface fix --- .../telemetryApi/send_to_new_relic.go | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index eef2bc3b..05c0713a 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -72,7 +72,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) return err } -func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dispatcher) error { +func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { // will be replaced later var lambda_name = "---" @@ -94,7 +94,7 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis for _, event := range logEntries { //json.Unmarshal([]byte(ev), &event) - msInt, err := time.Parse(time.RFC3339, event.Time) + msInt, err := time.Parse(time.RFC3339, event.(LambdaTelemetryEvent).Time) if err != nil { return err } @@ -107,10 +107,10 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis data["events"] = append(data["events"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "eventType": "Lambda_Ext_" + replacer.Replace(event.Type), + "eventType": "Lambda_Ext_" + replacer.Replace(event.(LambdaTelemetryEvent).Type), }) - switch event.Type { + switch event.(LambdaTelemetryEvent).Type { case "platform.iniStart": case "platform.iniRuntimeDone": @@ -131,13 +131,13 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis } - if event.Record != nil { + if event.(LambdaTelemetryEvent).Record != nil { data["logging"] = append(data["logging"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "message": event.Record, + "message": event.(LambdaTelemetryEvent).Record, "attributes": map[string]map[string]string{ "aws": { - "event": event.Type, + "event": event.(LambdaTelemetryEvent).Type, "lambda": lambda_name, }, }, @@ -145,11 +145,11 @@ func sendDataToNR(ctx context.Context, logEntries []LambdaTelemetryEvent, d *Dis } // metrics - if event.Record != nil { - if val, ok := event.Record["metrics"]; ok { + if event.(LambdaTelemetryEvent).Record != nil { + if val, ok := event.(LambdaTelemetryEvent).Record["metrics"]; ok { mts := []map[string]interface{}{} for key := range val { - mts := appand(mts, map[string]interface{}(`{ + mts := append(mts, map[string]interface{}(`{ "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, "value": event["record"]["metrics"][key] }`)) From 11113ef041d4dcd5842b62da916305d6d3642162 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Fri, 9 Dec 2022 01:19:00 +0000 Subject: [PATCH 08/11] 4 data types sent to New Relic --- .../telemetryApi/jsonEncode.go | 190 ---------------- .../telemetryApi/jsonUtil.go | 104 --------- .../telemetryApi/jsonWriter.go | 64 ------ .../telemetryApi/new-relic-logs.go | 102 --------- .../telemetryApi/send_to_new_relic.go | 208 ++++++++---------- 5 files changed, 96 insertions(+), 572 deletions(-) delete mode 100644 go-telemetry-api-extension/telemetryApi/jsonEncode.go delete mode 100644 go-telemetry-api-extension/telemetryApi/jsonUtil.go delete mode 100644 go-telemetry-api-extension/telemetryApi/jsonWriter.go delete mode 100644 go-telemetry-api-extension/telemetryApi/new-relic-logs.go diff --git a/go-telemetry-api-extension/telemetryApi/jsonEncode.go b/go-telemetry-api-extension/telemetryApi/jsonEncode.go deleted file mode 100644 index cff22f8a..00000000 --- a/go-telemetry-api-extension/telemetryApi/jsonEncode.go +++ /dev/null @@ -1,190 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package jsonx extends the encoding/json package to encode JSON -// incrementally and without requiring reflection. -package telemetryApi - -import ( - "bytes" - "encoding/json" - "math" - "reflect" - "strconv" - "unicode/utf8" -) - -var hex = "0123456789abcdef" - -// AppendString escapes s appends it to buf. -func AppendString(buf *bytes.Buffer, s string) { - buf.WriteByte('"') - start := 0 - for i := 0; i < len(s); { - if b := s[i]; b < utf8.RuneSelf { - if 0x20 <= b && b != '\\' && b != '"' && b != '<' && b != '>' && b != '&' { - i++ - continue - } - if start < i { - buf.WriteString(s[start:i]) - } - switch b { - case '\\', '"': - buf.WriteByte('\\') - buf.WriteByte(b) - case '\n': - buf.WriteByte('\\') - buf.WriteByte('n') - case '\r': - buf.WriteByte('\\') - buf.WriteByte('r') - case '\t': - buf.WriteByte('\\') - buf.WriteByte('t') - default: - // This encodes bytes < 0x20 except for \n and \r, - // as well as <, > and &. The latter are escaped because they - // can lead to security holes when user-controlled strings - // are rendered into JSON and served to some browsers. - buf.WriteString(`\u00`) - buf.WriteByte(hex[b>>4]) - buf.WriteByte(hex[b&0xF]) - } - i++ - start = i - continue - } - c, size := utf8.DecodeRuneInString(s[i:]) - if c == utf8.RuneError && size == 1 { - if start < i { - buf.WriteString(s[start:i]) - } - buf.WriteString(`\ufffd`) - i += size - start = i - continue - } - // U+2028 is LINE SEPARATOR. - // U+2029 is PARAGRAPH SEPARATOR. - // They are both technically valid characters in JSON strings, - // but don't work in JSONP, which has to be evaluated as JavaScript, - // and can lead to security holes there. It is valid JSON to - // escape them, so we do so unconditionally. - // See http://timelessrepo.com/json-isnt-a-javascript-subset for discussion. - if c == '\u2028' || c == '\u2029' { - if start < i { - buf.WriteString(s[start:i]) - } - buf.WriteString(`\u202`) - buf.WriteByte(hex[c&0xF]) - i += size - start = i - continue - } - i += size - } - if start < len(s) { - buf.WriteString(s[start:]) - } - buf.WriteByte('"') -} - -// AppendStringArray appends an array of string literals to buf. -func AppendStringArray(buf *bytes.Buffer, a ...string) { - buf.WriteByte('[') - for i, s := range a { - if i > 0 { - buf.WriteByte(',') - } - AppendString(buf, s) - } - buf.WriteByte(']') -} - -// AppendFloat appends a numeric literal representing the value to buf. -func AppendFloat(buf *bytes.Buffer, x float64) error { - var scratch [64]byte - - if math.IsInf(x, 0) || math.IsNaN(x) { - return &json.UnsupportedValueError{ - Value: reflect.ValueOf(x), - Str: strconv.FormatFloat(x, 'g', -1, 64), - } - } - - buf.Write(strconv.AppendFloat(scratch[:0], x, 'g', -1, 64)) - return nil -} - -// AppendFloat32 appends a numeric literal representingthe value to buf. -func AppendFloat32(buf *bytes.Buffer, x float32) error { - var scratch [64]byte - x64 := float64(x) - - if math.IsInf(x64, 0) || math.IsNaN(x64) { - return &json.UnsupportedValueError{ - Value: reflect.ValueOf(x64), - Str: strconv.FormatFloat(x64, 'g', -1, 32), - } - } - - buf.Write(strconv.AppendFloat(scratch[:0], x64, 'g', -1, 32)) - return nil -} - -// AppendFloatArray appends an array of numeric literals to buf. -func AppendFloatArray(buf *bytes.Buffer, a ...float64) error { - buf.WriteByte('[') - for i, x := range a { - if i > 0 { - buf.WriteByte(',') - } - if err := AppendFloat(buf, x); err != nil { - return err - } - } - buf.WriteByte(']') - return nil -} - -// AppendInt appends a numeric literal representing the value to buf. -func AppendInt(buf *bytes.Buffer, x int64) { - var scratch [64]byte - buf.Write(strconv.AppendInt(scratch[:0], x, 10)) -} - -// AppendIntArray appends an array of numeric literals to buf. -func AppendIntArray(buf *bytes.Buffer, a ...int64) { - var scratch [64]byte - - buf.WriteByte('[') - for i, x := range a { - if i > 0 { - buf.WriteByte(',') - } - buf.Write(strconv.AppendInt(scratch[:0], x, 10)) - } - buf.WriteByte(']') -} - -// AppendUint appends a numeric literal representing the value to buf. -func AppendUint(buf *bytes.Buffer, x uint64) { - var scratch [64]byte - buf.Write(strconv.AppendUint(scratch[:0], x, 10)) -} - -// AppendUintArray appends an array of numeric literals to buf. -func AppendUintArray(buf *bytes.Buffer, a ...uint64) { - var scratch [64]byte - - buf.WriteByte('[') - for i, x := range a { - if i > 0 { - buf.WriteByte(',') - } - buf.Write(strconv.AppendUint(scratch[:0], x, 10)) - } - buf.WriteByte(']') -} diff --git a/go-telemetry-api-extension/telemetryApi/jsonUtil.go b/go-telemetry-api-extension/telemetryApi/jsonUtil.go deleted file mode 100644 index 6e555776..00000000 --- a/go-telemetry-api-extension/telemetryApi/jsonUtil.go +++ /dev/null @@ -1,104 +0,0 @@ -package telemetryApi - -import ( - "bytes" - "encoding/json" - "fmt" - "net/http" - "strconv" - "strings" - "time" -) - -// jsonString assists in logging JSON: Based on the formatter used to log -// Context contents, the contents could be marshalled as JSON or just printed -// directly. -type jsonString string - -// MarshalJSON returns the jsonString unmodified without any escaping. -func (js jsonString) MarshalJSON() ([]byte, error) { - if "" == js { - return []byte("null"), nil - } - return []byte(js), nil -} - -func removeFirstSegment(name string) string { - idx := strings.Index(name, "/") - if -1 == idx { - return name - } - return name[idx+1:] -} - -func timeToIntMillis(t time.Time) int64 { - return t.UnixNano() / (1000 * 1000) -} - -func timeToFloatMilliseconds(t time.Time) float64 { - return float64(t.UnixNano()) / float64(1000*1000) -} - -// compactJSONString removes the whitespace from a JSON string. This function -// will panic if the string provided is not valid JSON. Thus is must only be -// used in testing code! -func compactJSONString(js string) string { - buf := new(bytes.Buffer) - if err := json.Compact(buf, []byte(js)); err != nil { - panic(fmt.Errorf("unable to compact JSON: %v", err)) - } - return buf.String() -} - -// getContentLengthFromHeader gets the content length from a HTTP header, or -1 -// if no content length is available. -func getContentLengthFromHeader(h http.Header) int64 { - if cl := h.Get("Content-Length"); cl != "" { - if contentLength, err := strconv.ParseInt(cl, 10, 64); err == nil { - return contentLength - } - } - - return -1 -} - -// stringLengthByteLimit truncates strings using a byte-limit boundary and -// avoids terminating in the middle of a multibyte character. -func stringLengthByteLimit(str string, byteLimit int) string { - if len(str) <= byteLimit { - return str - } - - limitIndex := 0 - for pos := range str { - if pos > byteLimit { - break - } - limitIndex = pos - } - return str[0:limitIndex] -} - -func timeFromUnixMilliseconds(millis uint64) time.Time { - secs := int64(millis) / 1000 - msecsRemaining := int64(millis) % 1000 - nsecsRemaining := msecsRemaining * (1000 * 1000) - return time.Unix(secs, nsecsRemaining) -} - -// timeToUnixMilliseconds converts a time into a Unix timestamp in millisecond -// units. -func timeToUnixMilliseconds(tm time.Time) uint64 { - return uint64(tm.UnixNano()) / uint64(1000*1000) -} - -// minorVersion takes a given version string and returns only the major and -// minor portions of it. If the input is malformed, it returns the input -// untouched. -func minorVersion(v string) string { - split := strings.SplitN(v, ".", 3) - if len(split) < 2 { - return v - } - return split[0] + "." + split[1] -} diff --git a/go-telemetry-api-extension/telemetryApi/jsonWriter.go b/go-telemetry-api-extension/telemetryApi/jsonWriter.go deleted file mode 100644 index 4381d16a..00000000 --- a/go-telemetry-api-extension/telemetryApi/jsonWriter.go +++ /dev/null @@ -1,64 +0,0 @@ -package telemetryApi - -import ( - "bytes" -) - -type jsonWriter interface { - WriteJSON(buf *bytes.Buffer) -} - -type jsonFieldsWriter struct { - buf *bytes.Buffer - needsComma bool -} - -func (w *jsonFieldsWriter) addKey(key string) { - if w.needsComma { - w.buf.WriteByte(',') - } else { - w.needsComma = true - } - // defensively assume that the key needs escaping: - AppendString(w.buf, key) - w.buf.WriteByte(':') -} - -func (w *jsonFieldsWriter) stringField(key string, val string) { - w.addKey(key) - AppendString(w.buf, val) -} - -func (w *jsonFieldsWriter) intField(key string, val int64) { - w.addKey(key) - AppendInt(w.buf, val) -} - -func (w *jsonFieldsWriter) floatField(key string, val float64) { - w.addKey(key) - AppendFloat(w.buf, val) -} - -func (w *jsonFieldsWriter) float32Field(key string, val float32) { - w.addKey(key) - AppendFloat32(w.buf, val) -} - -func (w *jsonFieldsWriter) boolField(key string, val bool) { - w.addKey(key) - if val { - w.buf.WriteString("true") - } else { - w.buf.WriteString("false") - } -} - -func (w *jsonFieldsWriter) rawField(key string, val jsonString) { - w.addKey(key) - w.buf.WriteString(string(val)) -} - -func (w *jsonFieldsWriter) writerField(key string, val jsonWriter) { - w.addKey(key) - val.WriteJSON(w.buf) -} diff --git a/go-telemetry-api-extension/telemetryApi/new-relic-logs.go b/go-telemetry-api-extension/telemetryApi/new-relic-logs.go deleted file mode 100644 index d2a8e5f8..00000000 --- a/go-telemetry-api-extension/telemetryApi/new-relic-logs.go +++ /dev/null @@ -1,102 +0,0 @@ -package telemetryApi - -import ( - "bytes" -) - -const ( - // LogLevelFieldName is the name of the log level field in New Relic logging JSON - LogLevelFieldName = "level" - - // LogMessageFieldName is the name of the log message field in New Relic logging JSON - LogMessageFieldName = "message" - - // LogTimestampFieldName is the name of the timestamp field in New Relic logging JSON - LogTimestampFieldName = "timestamp" - - // LogSpanIDFieldName is the name of the span ID field in the New Relic logging JSON - LogSpanIDFieldName = "span.id" - - // LogTraceIDFieldName is the name of the trace ID field in the New Relic logging JSON - LogTraceIDFieldName = "trace.id" - - // LogSeverityUnknown is the value the log severity should be set to if no log severity is known - LogSeverityUnknown = "UNKNOWN" - - // JSON Attribute Constants - HostnameAttributeKey = "hostname" - EntityNameAttributeKey = "entity.name" - entityGUIDAttributeKey = "entity.guid" - - MaxLogLength = 32768 -) - -type LogPayload struct { - *bytes.Buffer - done bool -} - -// NewLogLine creates an object for processing a single log line and sending it to New Relic -func NewLogPayload(commonAttributes map[string]string) *LogPayload { - buf := bytes.NewBuffer([]byte{}) - buf.WriteByte('[') - buf.WriteByte('{') - buf.WriteString(`"common":`) - buf.WriteByte('{') - buf.WriteString(`"attributes":`) - buf.WriteByte('{') - - for name, value := range commonAttributes { - name = "\"" + name + "\":" - buf.WriteString(name) - AppendString(buf, value) - buf.WriteByte(',') - } - buf.WriteByte('}') - buf.WriteByte('}') - buf.WriteByte(',') - buf.WriteString(`"logs":`) - buf.WriteByte('[') - - return &LogPayload{Buffer: buf} -} - -// AddLogLine prepares a Log Event JSON object in the format expected by the collector. -// Timestamp must be unix millisecond time -func (buf *LogPayload) AddLogLine(Timestamp int64, Level, Message string) { - if buf.done { - return - } - - if Level == "" { - Level = LogSeverityUnknown - } - - if len(Message) > MaxLogLength { - Message = Message[:MaxLogLength] - } - - w := jsonFieldsWriter{buf: buf.Buffer} - buf.WriteByte('{') - w.stringField(LogLevelFieldName, Level) - w.stringField(LogMessageFieldName, Message) - - w.needsComma = false - buf.WriteByte(',') - w.intField(LogTimestampFieldName, Timestamp) - buf.WriteByte('}') -} - -func (buf *LogPayload) Marshal() []byte { - if buf.done { - return buf.Bytes() - } - - // prevent Duplication of JSON closure - buf.done = true - - buf.WriteByte(']') - buf.WriteByte('}') - buf.WriteByte(']') - return buf.Bytes() -} diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go index 05c0713a..1585416b 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go @@ -10,8 +10,11 @@ import ( "reflect" "strings" "time" + "github.com/google/uuid" ) +const EXTENSION_LAMBDA_VERSION = "1.0" + const ( LogEndpointEU string = "https://log-api.eu.newrelic.com/log/v1" LogEndpointUS string = "https://log-api.newrelic.com/log/v1" @@ -19,8 +22,8 @@ const ( MetricsEndpointEU string = "https://metric-api.eu.newrelic.com/metric/v1" MetricsEndpointUS string = "https://metric-api.newrelic.com/metric/v1" - EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net" - EventsEndpointUS string = "https://insights-collector.newrelic.com" + EventsEndpointEU string = "https://insights-collector.eu01.nr-data.net/v1/accounts/" + EventsEndpointUS string = "https://insights-collector.newrelic.com/v1/accounts/" TracesEndpointEU string = "https://trace-api.eu.newrelic.com/trace/v1" TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" @@ -67,6 +70,10 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) // the headers might be different for different endpoints req.Header.Set("Content-Type", "application/json") req.Header.Set("Api-Key", d.licenseKey) + if strings.Contains(uri, "trace") { + req.Header.Set("Data-Format", "newrelic") + req.Header.Set("Data-Format-Version", "1") + } _, err = d.httpClient.Do(req) return err @@ -74,10 +81,7 @@ func sendBatch(ctx context.Context, d *Dispatcher, uri string, bodyBytes []byte) func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) error { - // will be replaced later - var lambda_name = "---" - // should be as below - // var lambda_name = d.functionName + var lambda_name = d.functionName var agent_name = path.Base(os.Args[0]) // NB "." is not allowed in NR eventType @@ -89,165 +93,145 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) data["logging"] = []map[string]interface{}{} data["metrics"] = []map[string]interface{}{} - var bb map[string]any - // var event map[string]any - + // current logic - terminate processing on an error, can be changed later for _, event := range logEntries { - //json.Unmarshal([]byte(ev), &event) msInt, err := time.Parse(time.RFC3339, event.(LambdaTelemetryEvent).Time) if err != nil { return err } // events - json.Unmarshal([]byte(`{ - "timestamp": msInt.UnixMilli() - "eventType": "Lambda_Ext_"+ replacer.Replace(event["type"]) - }`), &bb) - data["events"] = append(data["events"], bb) - data["events"] = append(data["events"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "eventType": "Lambda_Ext_" + replacer.Replace(event.(LambdaTelemetryEvent).Type), + "eventType": "AwsLambdaExtensionGo", + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, + "lambda.logevent.type": replacer.Replace(event.(LambdaTelemetryEvent).Type), }) - - switch event.(LambdaTelemetryEvent).Type { - case "platform.iniStart": - - case "platform.iniRuntimeDone": - - case "platform.iniReport": - - case "platform.start": - - case "platform.runtimeDone": - - case "platform.report": - - case "platform.extension": - - case "platform.telemetrySubscription": - - case "platform.logsDropped": - - } - + // logs if event.(LambdaTelemetryEvent).Record != nil { data["logging"] = append(data["logging"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), "message": event.(LambdaTelemetryEvent).Record, "attributes": map[string]map[string]string{ + "plugin" : { "type": "lambda extension"}, "aws": { - "event": event.(LambdaTelemetryEvent).Type, - "lambda": lambda_name, + "lambda.logevent.type": event.(LambdaTelemetryEvent).Type, + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, }, }, }) - } + if reflect.ValueOf(event.(LambdaTelemetryEvent).Record).Kind() == reflect.Map { + eventRecord := event.(LambdaTelemetryEvent).Record.(map[string]interface{}) // metrics - if event.(LambdaTelemetryEvent).Record != nil { - if val, ok := event.(LambdaTelemetryEvent).Record["metrics"]; ok { - mts := []map[string]interface{}{} + rid := "" + if v, okk := eventRecord["requestId"].(string); okk { + rid = v + } + if val, ok := eventRecord["metrics"].(map[string]interface{}); ok { for key := range val { - mts := append(mts, map[string]interface{}(`{ - "name": "aws.telemetry.lambda_ext."+lambda_name+"."+key, - "value": event["record"]["metrics"][key] - }`)) - } - rid := "" - if val, ok := event["record"]["requestId"]; ok { - rid = val - } - data["metrics"] = append(data["metrics"], map[string]interface{}(`{ - "common" : { + data["metrics"] = append(data["metrics"], map[string]interface{}{ + "name": "aws.telemetry.lambda_ext."+key, + "value": val[key], "timestamp": msInt.UnixMilli(), - "attributes": { - "event": event["type"], + "attributes": map[string]interface{}{ + "lambda.logevent.type": event.(LambdaTelemetryEvent).Type, "requestId": rid, - "extension": agent_name - } - }, - "metrics": mts - }`)) + "extension.name": agent_name, + "extension.version": EXTENSION_LAMBDA_VERSION, + "lambda.name": lambda_name, + }, + }) + } } - } // spans - if reflect.ValueOf(event["record"]).Kind() == reflect.Map { - if val, ok := event["record"]["spans"]; ok { - spans := [...]string{} - for span := range val { - el := `{ - "trace.id": event["record"]["requestId"], - "id": uuid.New().String(), - "attributes": { - "event": event["type"], - "service.name": agent_name - } - }` - start, err := time.Parse(time.RFC3339, event["time"]) - if err != nil { - return err - } - for key := range span { + if val, ok := eventRecord["spans"].([]interface{}); ok { + for _, span := range val { + attributes := make(map[string]interface{}) + attributes["event"] = event.(LambdaTelemetryEvent).Type + attributes["service.name"] = agent_name + var start time.Time + for key,v := range span.(map[string]interface{}) { if key == "durationMs" { - el["attributes"]["duration.ms"] = span[key] + attributes["duration.ms"] = v.(float64) } else if key == "start" { - el["timestamp"] = start.UnixMilli() + start, err = time.Parse(time.RFC3339, v.(string)) + if err != nil { + return err + } } else { - el["attributes"][key] = span[key] + attributes[key] = v.(string) } } + el := map[string]interface{}{ + "trace.id": rid, + "timestamp": start.UnixMilli(), + "id": (uuid.New()).String(), + "attributes": attributes, + } data["traces"] = append(data["traces"], el) } } + } } } // data ready if len(data) > 0 { // send logs if len(data["logging"]) > 0 { - // bodyBytes, _ := json.Marshal(map[string]interface{}(`{ - bodyBytes := `{ - "common": { - "attributes": { - "aws": { - "logType": "aws/lambda-ext", - "function": lambda_name, - "extension": agent_name - } - } - }, - "logs": data["logging"] - }` - // }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "logging"), bodyBytes) + bodyBytes, _ := json.Marshal(data["logging"]) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "logging", ""), bodyBytes) + if er != nil { + return er + } } // send metrics if len(data["metrics"]) > 0 { - for payload := range data["metrics"] { - bodyBytes, _ := json.Marshal(payload) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "metrics"), bodyBytes) + var dataMet[]map[string][]map[string]interface{} + dataMet = append(dataMet, map[string][]map[string]interface{}{ + "metrics": data["metrics"], + }) + bodyBytes, _ := json.Marshal(dataMet) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "metrics", ""), bodyBytes) + if er != nil { + return er } } // send events if len(data["events"]) > 0 { - bodyBytes, _ := json.Marshal(data["events"]) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events"), bodyBytes) + ACCOUNT_ID := os.Getenv("ACCOUNT_ID") + if len(ACCOUNT_ID) > 0 { + bodyBytes, _ := json.Marshal(data["events"]) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events", "")+ACCOUNT_ID+"/events", bodyBytes) + if er != nil { + return er + } + } else { + l.Info("ACCOUNT_ID is not set, therefore no events data sent") + } } // send traces if len(data["traces"]) > 0 { - bodyBytes, _ := json.Marshal(map[string]interface{}(`{ - "common": { + var dataTraces[]map[string]interface{} + dataTraces = append(dataTraces, map[string]interface{}{ + "common": map[string]map[string]string{ "attributes": { "host": "aws.amazon.com", - "service.name": lambda_name - } + "service.name": lambda_name, + }, }, - "spans": data["traces"] - }`)) - err := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "traces"), bodyBytes) + "spans": data["traces"], + }) + bodyBytes, _ := json.Marshal(dataTraces) + er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "traces", ""), bodyBytes) + if er != nil { + return er + } } } - return err // if one of the sents failed, it'd be nice to know which + return nil // success } From 9a17b0c03d7ca8370967b7186c3989664b73dec9 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Wed, 21 Dec 2022 01:54:04 +0000 Subject: [PATCH 09/11] go-telemetry-api-extension -> AwsLambdaExtension --- .../Makefile | 14 +++++++------- .../README.md | 12 ++++++------ AwsLambdaExtension/build-deploy.sh | 15 +++++++++++++++ .../extensionApi/client.go | 0 .../go.mod | 2 +- .../go.sum | 0 .../main.go | 4 ++-- .../telemetryApi/client.go | 0 .../telemetryApi/dispatcher.go | 11 ++--------- .../telemetryApi/listener.go | 0 .../telemetryApi/send_to_new_relic.go | 4 ++-- .../template.yaml | 8 ++++---- go-telemetry-api-extension/build-deploy.sh | 13 ------------- 13 files changed, 39 insertions(+), 44 deletions(-) rename {go-telemetry-api-extension => AwsLambdaExtension}/Makefile (52%) rename {go-telemetry-api-extension => AwsLambdaExtension}/README.md (89%) create mode 100755 AwsLambdaExtension/build-deploy.sh rename {go-telemetry-api-extension => AwsLambdaExtension}/extensionApi/client.go (100%) rename {go-telemetry-api-extension => AwsLambdaExtension}/go.mod (82%) rename {go-telemetry-api-extension => AwsLambdaExtension}/go.sum (100%) rename {go-telemetry-api-extension => AwsLambdaExtension}/main.go (95%) rename {go-telemetry-api-extension => AwsLambdaExtension}/telemetryApi/client.go (100%) rename {go-telemetry-api-extension => AwsLambdaExtension}/telemetryApi/dispatcher.go (83%) rename {go-telemetry-api-extension => AwsLambdaExtension}/telemetryApi/listener.go (100%) rename {go-telemetry-api-extension => AwsLambdaExtension}/telemetryApi/send_to_new_relic.go (98%) rename {go-telemetry-api-extension => AwsLambdaExtension}/template.yaml (76%) delete mode 100755 go-telemetry-api-extension/build-deploy.sh diff --git a/go-telemetry-api-extension/Makefile b/AwsLambdaExtension/Makefile similarity index 52% rename from go-telemetry-api-extension/Makefile rename to AwsLambdaExtension/Makefile index 8b80815c..2435198b 100644 --- a/go-telemetry-api-extension/Makefile +++ b/AwsLambdaExtension/Makefile @@ -1,17 +1,17 @@ # ----- Target used with SAM (Serverless Application Model) ----- -build-GoTelemetryApiExtensionLayer: - echo :build-GoTelemetryApiExtensionLayer - GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/go-telemetry-api-extension main.go - chmod +x $(ARTIFACTS_DIR)/extensions/go-telemetry-api-extension +build-AwsLambdaExtension: + echo :build-AwsLambdaExtension + GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension main.go + chmod +x $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension # ----- Target illustrating manual steps required to create the extension layer ----- buildAndDeployExtensionLayer: echo :buildAndDeployExtensionLayer rm -rf bin - GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go - chmod +x bin/extensions/go-telemetry-api-extension + GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go + chmod +x bin/extensions/AwsLambdaExtension cd bin && zip -r extension.zip extensions aws lambda publish-layer-version \ - --layer-name "go-telemetry-api-extension-layer" \ + --layer-name "AwsLambdaExtension" \ --zip-file "fileb://bin/extension.zip" diff --git a/go-telemetry-api-extension/README.md b/AwsLambdaExtension/README.md similarity index 89% rename from go-telemetry-api-extension/README.md rename to AwsLambdaExtension/README.md index 94fb3f7c..d69255cb 100644 --- a/go-telemetry-api-extension/README.md +++ b/AwsLambdaExtension/README.md @@ -24,14 +24,14 @@ To run this example, you will need to ensure that your build architecture matche Building and saving package into a `bin/extensions` directory: ```bash -$ cd go-telemetry-api-extension -$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go -$ chmod +x bin/extensions/go-telemetry-api-extension +$ cd AwsLambdaExtension +$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go +$ chmod +x bin/extensions/AwsLambdaExtension ``` ## Layer Setup Process The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located. -You must include the `go-telemetry-api-extension` binary. +You must include the `AwsLambdaExtension` binary. Creating zip package for the extension: ```bash @@ -43,14 +43,14 @@ Publish a new layer using the `extension.zip` using below command. The output sh ```bash aws lambda publish-layer-version \ - --layer-name "go-telemetry-api-extension" \ + --layer-name "AwsLambdaExtension" \ --zip-file "fileb://extension.zip" ``` Note the `LayerVersionArn` that is produced in the output. eg. ``` -LayerVersionArn: arn:aws:lambda::123456789012:layer::1 +LayerVersionArn: arn:aws:lambda::533243300146:layer:: ``` Or use `build.sh` script to build and deploy the extension. diff --git a/AwsLambdaExtension/build-deploy.sh b/AwsLambdaExtension/build-deploy.sh new file mode 100755 index 00000000..a117425f --- /dev/null +++ b/AwsLambdaExtension/build-deploy.sh @@ -0,0 +1,15 @@ +if [ ! -d "bin" ]; then + mkdir bin +fi +if [ ! -d "bin/extensions" ]; then + mkdir bin/extensions +fi +GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go +chmod +x bin/extensions/AwsLambdaExtension +cd bin +zip -r extension.zip extensions/ +aws lambda publish-layer-version \ + --layer-name "AwsLambdaExtension" \ + --description "New Relic Lambda Extension" \ + --compatible-architectures "x86_64" \ + --zip-file "fileb://extension.zip" diff --git a/go-telemetry-api-extension/extensionApi/client.go b/AwsLambdaExtension/extensionApi/client.go similarity index 100% rename from go-telemetry-api-extension/extensionApi/client.go rename to AwsLambdaExtension/extensionApi/client.go diff --git a/go-telemetry-api-extension/go.mod b/AwsLambdaExtension/go.mod similarity index 82% rename from go-telemetry-api-extension/go.mod rename to AwsLambdaExtension/go.mod index 925d2dc7..018b8126 100644 --- a/go-telemetry-api-extension/go.mod +++ b/AwsLambdaExtension/go.mod @@ -1,4 +1,4 @@ -module newrelic-lambda-extension/go-telemetry-api-extension +module newrelic-lambda-extension/AwsLambdaExtension go 1.18 diff --git a/go-telemetry-api-extension/go.sum b/AwsLambdaExtension/go.sum similarity index 100% rename from go-telemetry-api-extension/go.sum rename to AwsLambdaExtension/go.sum diff --git a/go-telemetry-api-extension/main.go b/AwsLambdaExtension/main.go similarity index 95% rename from go-telemetry-api-extension/main.go rename to AwsLambdaExtension/main.go index 132e3ef7..27019771 100644 --- a/go-telemetry-api-extension/main.go +++ b/AwsLambdaExtension/main.go @@ -14,8 +14,8 @@ package main import ( "context" - "newrelic-lambda-extension/go-telemetry-api-extension/extensionApi" - "newrelic-lambda-extension/go-telemetry-api-extension/telemetryApi" + "newrelic-lambda-extension/AwsLambdaExtension/extensionApi" + "newrelic-lambda-extension/AwsLambdaExtension/telemetryApi" "os" "os/signal" "path" diff --git a/go-telemetry-api-extension/telemetryApi/client.go b/AwsLambdaExtension/telemetryApi/client.go similarity index 100% rename from go-telemetry-api-extension/telemetryApi/client.go rename to AwsLambdaExtension/telemetryApi/client.go diff --git a/go-telemetry-api-extension/telemetryApi/dispatcher.go b/AwsLambdaExtension/telemetryApi/dispatcher.go similarity index 83% rename from go-telemetry-api-extension/telemetryApi/dispatcher.go rename to AwsLambdaExtension/telemetryApi/dispatcher.go index 1d0b3916..9500e980 100644 --- a/go-telemetry-api-extension/telemetryApi/dispatcher.go +++ b/AwsLambdaExtension/telemetryApi/dispatcher.go @@ -14,21 +14,15 @@ import ( type Dispatcher struct { httpClient *http.Client - postUri string licenseKey string minBatchSize int64 functionName string } func NewDispatcher(functionName string) *Dispatcher { - dispatchPostUri := os.Getenv("DISPATCH_POST_URI") - if len(dispatchPostUri) == 0 { - panic("dispatchPostUri undefined") - } - - licenseKey := os.Getenv("LICENSE_KEY") + licenseKey := os.Getenv("NEW_RELIC_LICENSE_KEY") if len(licenseKey) == 0 { - panic("licenseKey undefined") + panic("NEW_RELIC_LICENSE_KEY undefined") } dispatchMinBatchSize, err := strconv.ParseInt(os.Getenv("DISPATCH_MIN_BATCH_SIZE"), 0, 16) @@ -38,7 +32,6 @@ func NewDispatcher(functionName string) *Dispatcher { return &Dispatcher{ httpClient: &http.Client{}, - postUri: dispatchPostUri, licenseKey: licenseKey, minBatchSize: dispatchMinBatchSize, functionName: functionName, diff --git a/go-telemetry-api-extension/telemetryApi/listener.go b/AwsLambdaExtension/telemetryApi/listener.go similarity index 100% rename from go-telemetry-api-extension/telemetryApi/listener.go rename to AwsLambdaExtension/telemetryApi/listener.go diff --git a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go similarity index 98% rename from go-telemetry-api-extension/telemetryApi/send_to_new_relic.go rename to AwsLambdaExtension/telemetryApi/send_to_new_relic.go index 1585416b..2c7e83f9 100644 --- a/go-telemetry-api-extension/telemetryApi/send_to_new_relic.go +++ b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go @@ -202,7 +202,7 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) } // send events if len(data["events"]) > 0 { - ACCOUNT_ID := os.Getenv("ACCOUNT_ID") + ACCOUNT_ID := os.Getenv("NEW_RELIC_ACCOUNT_ID") if len(ACCOUNT_ID) > 0 { bodyBytes, _ := json.Marshal(data["events"]) er := sendBatch(ctx, d, getEndpointURL(d.licenseKey, "events", "")+ACCOUNT_ID+"/events", bodyBytes) @@ -210,7 +210,7 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) return er } } else { - l.Info("ACCOUNT_ID is not set, therefore no events data sent") + l.Info("NEW_RELIC_ACCOUNT_ID is not set, therefore no events data sent") } } // send traces diff --git a/go-telemetry-api-extension/template.yaml b/AwsLambdaExtension/template.yaml similarity index 76% rename from go-telemetry-api-extension/template.yaml rename to AwsLambdaExtension/template.yaml index 5f3754e9..6603f609 100644 --- a/go-telemetry-api-extension/template.yaml +++ b/AwsLambdaExtension/template.yaml @@ -1,9 +1,9 @@ AWSTemplateFormatVersion: 2010-09-09 Transform: AWS::Serverless-2016-10-31 -Description: go-telemetry-api-extension +Description: AwsLambdaExtension Parameters: - LICENSE_KEY: + NEW_RELIC_LICENSE_KEY: Type: String Description: Your New Relic ingest key. @@ -13,7 +13,7 @@ Resources: Metadata: BuildMethod: makefile Properties: - LayerName: go-telemetry-api-extension-layer + LayerName: AwsLambdaExtension ContentUri: . LicenseInfo: MIT-0 RetentionPolicy: Delete @@ -23,4 +23,4 @@ Resources: DISPATCH_MIN_BATCH_SIZE: 10 DEBUG_LOGGING_ENABLED: True LOGGING_ENABLED: True - NEW_RELIC_LICENSE_KEY: !Sub ${LICENSE_KEY} + NEW_RELIC_LICENSE_KEY: !Sub ${NEW_RELIC_LICENSE_KEY} diff --git a/go-telemetry-api-extension/build-deploy.sh b/go-telemetry-api-extension/build-deploy.sh deleted file mode 100755 index a64a4b43..00000000 --- a/go-telemetry-api-extension/build-deploy.sh +++ /dev/null @@ -1,13 +0,0 @@ -if [ ! -d "bin" ]; then - mkdir bin -fi -if [ ! -d "bin/extensions" ]; then - mkdir bin/extensions -fi -GOOS=linux GOARCH=amd64 go build -o bin/extensions/go-telemetry-api-extension main.go -chmod +x bin/extensions/go-telemetry-api-extension -cd bin -zip -r extension.zip extensions/ -aws lambda publish-layer-version \ - --layer-name "go-telemetry-api-extension" \ - --zip-file "fileb://extension.zip" From 11f034220825e96fd33ce210be28ca72f56c8da8 Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Wed, 21 Dec 2022 22:19:17 +0000 Subject: [PATCH 10/11] AwsLambdaExtensionGo -> AwsLambdaExtension --- AwsLambdaExtension/telemetryApi/send_to_new_relic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AwsLambdaExtension/telemetryApi/send_to_new_relic.go b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go index 2c7e83f9..dba7dcaa 100644 --- a/AwsLambdaExtension/telemetryApi/send_to_new_relic.go +++ b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go @@ -102,7 +102,7 @@ func sendDataToNR(ctx context.Context, logEntries []interface{}, d *Dispatcher) // events data["events"] = append(data["events"], map[string]interface{}{ "timestamp": msInt.UnixMilli(), - "eventType": "AwsLambdaExtensionGo", + "eventType": "AwsLambdaExtension", "extension.name": agent_name, "extension.version": EXTENSION_LAMBDA_VERSION, "lambda.name": lambda_name, From a8d077b81f98ec0d9c80a37e012e57964c8967fc Mon Sep 17 00:00:00 2001 From: Olga Kirillova Date: Mon, 9 Jan 2023 23:05:39 +0000 Subject: [PATCH 11/11] support of Secrets Manager --- AwsLambdaExtension/go.mod | 6 ++- AwsLambdaExtension/go.sum | 34 +++++++++++++ AwsLambdaExtension/main.go | 2 +- AwsLambdaExtension/telemetryApi/dispatcher.go | 8 ++- .../telemetryApi/send_to_new_relic.go | 49 +++++++++++++++++++ 5 files changed, 94 insertions(+), 5 deletions(-) diff --git a/AwsLambdaExtension/go.mod b/AwsLambdaExtension/go.mod index 018b8126..55d579e3 100644 --- a/AwsLambdaExtension/go.mod +++ b/AwsLambdaExtension/go.mod @@ -4,11 +4,13 @@ go 1.18 require ( github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/google/uuid v1.3.0 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 ) require ( - github.com/google/uuid v1.3.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + github.com/aws/aws-sdk-go v1.44.176 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + golang.org/x/sys v0.1.0 // indirect ) diff --git a/AwsLambdaExtension/go.sum b/AwsLambdaExtension/go.sum index d6d3e285..f6fbde94 100644 --- a/AwsLambdaExtension/go.sum +++ b/AwsLambdaExtension/go.sum @@ -1,3 +1,5 @@ +github.com/aws/aws-sdk-go v1.44.176 h1:mxcfI3IWHemX+5fEKt5uqIS/hdbaR7qzGfJYo5UyjJE= +github.com/aws/aws-sdk-go v1.44.176/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -5,6 +7,9 @@ github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb2 github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -14,8 +19,37 @@ github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVs github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/AwsLambdaExtension/main.go b/AwsLambdaExtension/main.go index 27019771..b1b1780c 100644 --- a/AwsLambdaExtension/main.go +++ b/AwsLambdaExtension/main.go @@ -65,7 +65,7 @@ func main() { panic(err) } l.Info("[main] Subscription success") - dispatcher := telemetryApi.NewDispatcher(extensionApiClient.GetFunctionName()) + dispatcher := telemetryApi.NewDispatcher(extensionApiClient.GetFunctionName(), ctx) // Will block until invoke or shutdown event is received or cancelled via the context. for { diff --git a/AwsLambdaExtension/telemetryApi/dispatcher.go b/AwsLambdaExtension/telemetryApi/dispatcher.go index 9500e980..a948f63b 100644 --- a/AwsLambdaExtension/telemetryApi/dispatcher.go +++ b/AwsLambdaExtension/telemetryApi/dispatcher.go @@ -19,8 +19,12 @@ type Dispatcher struct { functionName string } -func NewDispatcher(functionName string) *Dispatcher { - licenseKey := os.Getenv("NEW_RELIC_LICENSE_KEY") +func NewDispatcher(functionName string, ctx context.Context) *Dispatcher { + var licenseKey string + licenseKey = os.Getenv("NEW_RELIC_LICENSE_KEY") + if len(licenseKey) == 0 { + licenseKey, _ = getNewRelicLicenseKey(ctx) + } if len(licenseKey) == 0 { panic("NEW_RELIC_LICENSE_KEY undefined") } diff --git a/AwsLambdaExtension/telemetryApi/send_to_new_relic.go b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go index dba7dcaa..330668bd 100644 --- a/AwsLambdaExtension/telemetryApi/send_to_new_relic.go +++ b/AwsLambdaExtension/telemetryApi/send_to_new_relic.go @@ -4,12 +4,18 @@ import ( "bytes" "context" "encoding/json" + "fmt" "net/http" "os" "path" "reflect" "strings" "time" + + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/secretsmanager" + "github.com/aws/aws-sdk-go/service/secretsmanager/secretsmanageriface" + "github.com/google/uuid" ) @@ -29,6 +35,49 @@ const ( TracesEndpointUS string = "https://trace-api.newrelic.com/trace/v1" ) +var ( + sess = session.Must(session.NewSessionWithOptions(session.Options{ + SharedConfigState: session.SharedConfigEnable, + })) + secrets secretsmanageriface.SecretsManagerAPI +) + +type licenseKeySecret struct { + LicenseKey string +} + +func init() { + secrets = secretsmanager.New(sess) +} + +func decodeLicenseKey(rawJson *string) (string, error) { + var lks licenseKeySecret + + err := json.Unmarshal([]byte(*rawJson), &lks) + if err != nil { + return "", err + } + if lks.LicenseKey == "" { + return "", fmt.Errorf("malformed license key secret; missing \"LicenseKey\" attribute") + } + + return lks.LicenseKey, nil +} + +func getNewRelicLicenseKey(ctx context.Context) (string, error) { + sId := "NEW_RELIC_LICENSE_KEY" + v := os.Getenv("NEW_RELIC_LICENSE_KEY_SECRET") + if len(v) > 0 { + sId = v + } + secretValueInput := secretsmanager.GetSecretValueInput{SecretId: &sId} + secretValueOutput, err := secrets.GetSecretValueWithContext(ctx, &secretValueInput) + if err != nil { + return "", err + } + return decodeLicenseKey(secretValueOutput.SecretString) +} + func getEndpointURL(licenseKey string, typ string, EndpointOverride string) string { if EndpointOverride != "" { return EndpointOverride