Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions AwsLambdaExtension/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# ----- Target used with SAM (Serverless Application Model) -----
build-AwsLambdaExtension:
echo :build-AwsLambdaExtension
GOOS=linux GOARCH=amd64 go build -o $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension main.go
chmod +x $(ARTIFACTS_DIR)/extensions/AwsLambdaExtension

# ----- Target illustrating manual steps required to create the extension layer -----
buildAndDeployExtensionLayer:
echo :buildAndDeployExtensionLayer
rm -rf bin
GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go
chmod +x bin/extensions/AwsLambdaExtension
cd bin && zip -r extension.zip extensions

aws lambda publish-layer-version \
--layer-name "AwsLambdaExtension" \
--zip-file "fileb://bin/extension.zip"
72 changes: 72 additions & 0 deletions AwsLambdaExtension/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# New Relic Telemetry API Extension in Go

The provided code demonstrates how to get a basic Telemetry API extension written in Go up and running.

This extension:
1. Registers the extension with Lambda Extensions API (see `extensionApi/client.go`)
2. Starts a local HTTP server to receive incoming telemetry events from the Telemetry API (see `telemetryApi/listener.go`)
3. Subscribes to the Telemetry API to start receiving incoming telemetry events (see `telemetryApi/client.go`)
4. Receives telemetry events, batches them, and dispatches to New Relic via POST requests (see `telemetryApi/dispatcher.go`).
Requires the license key set up as the environment variable.

Note that step 4 is asynchronous in nature. The functions is thawed to process the incoming event, and new telemetry might arrive either
before or after dispatching existing telemetry. In case of the latter, the newly arrived telemetry will be kept in the telemetry queue and
dispatched when processing next event. Depending on buffering configuration you pass to the Telemetry API during subscription, you might get
either zero, one, or multiple requests from Telemetry API to the telemetry listener in a single function invocation.

The code is heavily instrumented with logs so you'll be able to see the Telemetry API extension lifecycle messages as you're learning to
implement one.

## Build package and dependencies

To run this example, you will need to ensure that your build architecture matches that of the Lambda execution environment by compiling with
`GOOS=linux` and `GOARCH=amd64` if you are not running in a Linux environment.

Building and saving package into a `bin/extensions` directory:
```bash
$ cd AwsLambdaExtension
$ GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go
$ chmod +x bin/extensions/AwsLambdaExtension
```

## Layer Setup Process
The extensions .zip file should contain a root directory called `extensions/`, where the extension executables are located.
You must include the `AwsLambdaExtension` binary.

Creating zip package for the extension:
```bash
$ cd bin
$ zip -r extension.zip extensions/
```

Publish a new layer using the `extension.zip` using below command. The output should provide you with a layer ARN.

```bash
aws lambda publish-layer-version \
--layer-name "AwsLambdaExtension" \
--zip-file "fileb://extension.zip"
```

Note the `LayerVersionArn` that is produced in the output. eg.

```
LayerVersionArn: arn:aws:lambda:<region>:533243300146:layer:<layerName>:<layerVersion>
```

Or use `build.sh` script to build and deploy the extension.

Add the newly created layer version to a Lambda function.

```bash
aws lambda update-function-configuration
--function-name <your function name>
--layers <layer arn>
```

## Function Invocation and Extension Execution

Configure the extension by setting below environment variables

* `LICENSE_KEY` - the key issueed to you when you registered an account with New Relic
* `DISPATCH_MIN_BATCH_SIZE` - optimize dispatching telemetry by telling the dispatcher how many log events you want it to batch. On function invoke the telemetry will be dispatched to `DISPATCH_POST_URI` only if number of log events collected so far is greater than `DISPATCH_MIN_BATCH_SIZE`. On function shutdown the telemetry will be dispatched to `DISPATCH_POST_URI` regardless of how many log events were collected so far.

15 changes: 15 additions & 0 deletions AwsLambdaExtension/build-deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
if [ ! -d "bin" ]; then
mkdir bin
fi
if [ ! -d "bin/extensions" ]; then
mkdir bin/extensions
fi
GOOS=linux GOARCH=amd64 go build -o bin/extensions/AwsLambdaExtension main.go
chmod +x bin/extensions/AwsLambdaExtension
cd bin
zip -r extension.zip extensions/
aws lambda publish-layer-version \
--layer-name "AwsLambdaExtension" \
--description "New Relic Lambda Extension" \
--compatible-architectures "x86_64" \
--zip-file "fileb://extension.zip"
222 changes: 222 additions & 0 deletions AwsLambdaExtension/extensionApi/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

package extensionApi

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"

log "github.com/sirupsen/logrus"
)

// RegisterResponse is the body of the response for /register
type RegisterResponse struct {
FunctionName string `json:"functionName"`
FunctionVersion string `json:"functionVersion"`
Handler string `json:"handler"`
}

// NextEventResponse is the response for /event/next
type NextEventResponse struct {
EventType EventType `json:"eventType"`
DeadlineMs int64 `json:"deadlineMs"`
RequestID string `json:"requestId"`
InvokedFunctionArn string `json:"invokedFunctionArn"`
Tracing Tracing `json:"tracing"`
}

// Tracing is part of the response for /event/next
type Tracing struct {
Type string `json:"type"`
Value string `json:"value"`
}

// StatusResponse is the body of the response for /init/error and /exit/error
type StatusResponse struct {
Status string `json:"status"`
}

// EventType represents the type of events recieved from /event/next
type EventType string

const (
// Function invocation event
Invoke EventType = "INVOKE"

// Runtime environment shutdown event
Shutdown EventType = "SHUTDOWN"

extensionNameHeader = "Lambda-Extension-Name"
extensionIdentiferHeader = "Lambda-Extension-Identifier"
extensionErrorType = "Lambda-Extension-Function-Error-Type"
)

// Client is a simple client for the Lambda Extensions API
type Client struct {
httpClient *http.Client
baseUrl string
ExtensionID string
functionName string
}

func (e *Client) GetFunctionName() string {
return e.functionName
}

var l = log.WithFields(log.Fields{"pkg": "extensionApi"})

// Returns a Lambda Extensions API client
func NewClient() *Client {
baseUrl := fmt.Sprintf("http://%s/2020-01-01/extension", os.Getenv("AWS_LAMBDA_RUNTIME_API"))
return &Client{
baseUrl: baseUrl,
httpClient: &http.Client{},
}
}

// Registers the extension with Extensions API
func (e *Client) Register(ctx context.Context, extensionName string) (string, error) {

const action = "/register"
url := e.baseUrl + action

l.Info("[client:Register] Registering using baseURL", e.baseUrl)
reqBody, err := json.Marshal(map[string]interface{}{
"events": []EventType{Invoke, Shutdown},
})
if err != nil {
return "", err
}

httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
if err != nil {
return "", err
}
httpReq.Header.Set(extensionNameHeader, extensionName)

httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
l.Error("[client:Register] Registration failed", err)
return "", err
}

if httpRes.StatusCode != 200 {
l.Error("[client:Register] Registration failed with statusCode ", httpReq.Response.StatusCode)
return "", fmt.Errorf("registration failed with status %s", httpRes.Status)
}

defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return "", err
}

res := RegisterResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return "", err
}

e.functionName = res.FunctionName
e.ExtensionID = httpRes.Header.Get(extensionIdentiferHeader)
l.Info("[client:Register] Registration success with extensionId ", e.ExtensionID)
return e.ExtensionID, nil
}

// Blocks while long polling for the next Lambda invoke or shutdown
func (e *Client) NextEvent(ctx context.Context) (*NextEventResponse, error) {
const action = "/event/next"
url := e.baseUrl + action

httpReq, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID)
httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRes.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return nil, err
}
res := NextEventResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return nil, err
}
return &res, nil
}

// Reports an initialization error to the platform. Call it when you registered but failed to initialize
func (e *Client) InitError(errorType string) (*StatusResponse, error) {
const action = "/init/error"
url := e.baseUrl + action

httpReq, err := http.NewRequest("POST", url, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID)
httpReq.Header.Set(extensionErrorType, errorType)
httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRes.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return nil, err
}
res := StatusResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return nil, err
}
return &res, nil
}

// Reports an error to the platform before exiting. Call it when you encounter an unexpected failure
func (e *Client) ExitError(errorType string) (*StatusResponse, error) {
const action = "/exit/error"
url := e.baseUrl + action

httpReq, err := http.NewRequest("POST", url, nil)
if err != nil {
return nil, err
}
httpReq.Header.Set(extensionIdentiferHeader, e.ExtensionID)
httpReq.Header.Set(extensionErrorType, errorType)
httpRes, err := e.httpClient.Do(httpReq)
if err != nil {
return nil, err
}
if httpRes.StatusCode != 200 {
return nil, fmt.Errorf("request failed with status %s", httpRes.Status)
}
defer httpRes.Body.Close()
body, err := ioutil.ReadAll(httpRes.Body)
if err != nil {
return nil, err
}
res := StatusResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
return nil, err
}
return &res, nil
}
16 changes: 16 additions & 0 deletions AwsLambdaExtension/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module newrelic-lambda-extension/AwsLambdaExtension

go 1.18

require (
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/google/uuid v1.3.0
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
)

require (
github.com/aws/aws-sdk-go v1.44.176 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
golang.org/x/sys v0.1.0 // indirect
)
Loading