From 484dd44abee9a6b177c060c99f80e6d6e361fd22 Mon Sep 17 00:00:00 2001 From: Chodavarapu Naga Viswanadha Avinash Date: Thu, 4 Sep 2025 17:06:15 +0530 Subject: [PATCH 1/2] feat: Add Lambda Web Adapter support and telemetry processing --- config/config.go | 7 +- main.go | 31 ++++- telemetry/lwa_payload.go | 251 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 282 insertions(+), 7 deletions(-) create mode 100644 telemetry/lwa_payload.go diff --git a/config/config.go b/config/config.go index 0198c6ee..505907e3 100644 --- a/config/config.go +++ b/config/config.go @@ -43,6 +43,7 @@ type Configuration struct { NewRelicHost string APMLambdaMode bool PreconnectEnabled bool + LambdaWebAdapter bool } func parseIgnoredExtensionChecks(nrIgnoreExtensionChecksOverride bool, nrIgnoreExtensionChecksStr string) map[string]bool { @@ -100,7 +101,7 @@ func ConfigurationFromEnvironment() *Configuration { nrHostStr, nrHostOverride := os.LookupEnv("NEW_RELIC_HOST") nrAPMModeStr, nrAPMModeOverride := os.LookupEnv("NEW_RELIC_APM_LAMBDA_MODE") metricEndpoint, meOverride := os.LookupEnv("NEW_RELIC_METRIC_ENDPOINT") - + isLambdaWebAdapterStr, lwaOverride := os.LookupEnv("NEW_RELIC_LAMBDA_WEB_ADAPTER") extensionEnabled := true if nrEnabledOverride { @@ -129,7 +130,9 @@ func ConfigurationFromEnvironment() *Configuration { if nrHostOverride { ret.NewRelicHost = nrHostStr } - + if lwaOverride && strings.ToLower(isLambdaWebAdapterStr) == "true" { + ret.LambdaWebAdapter = true + } ret.ClientTimeout = DefaultClientTimeout if ctOverride && clientTimeout != "" { clientTimeout, err := time.ParseDuration(clientTimeout) diff --git a/main.go b/main.go index c2be10a7..f95a55ca 100644 --- a/main.go +++ b/main.go @@ -183,7 +183,7 @@ func main() { eventCounter = mainAPMLoop(ctx, invocationClient, telemetryChan, logServer, conf, internalAPMApp) } else { // In non-APM mode, we process telemetry and platform logs - eventCounter = mainLoop(ctx, invocationClient, batch, telemetryChan, logServer, telemetryClient, extensionStartup) + eventCounter = mainLoop(ctx, invocationClient, batch, telemetryChan, logServer, telemetryClient, extensionStartup, conf) } util.Logf("New Relic Extension shutting down after %v events\n", eventCounter) @@ -293,7 +293,7 @@ func APMlogShipLoop(ctx context.Context, logServer *logserver.LogServer, telemet } // mainLoop repeatedly calls the /next api, and processes telemetry and platform logs. The timing is rather complicated. -func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, batch *telemetry.Batch, telemetryChan chan []byte, logServer *logserver.LogServer, telemetryClient *telemetry.Client, extensionStartup time.Time) int { +func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, batch *telemetry.Batch, telemetryChan chan []byte, logServer *logserver.LogServer, telemetryClient *telemetry.Client, extensionStartup time.Time, conf *config.Configuration) int { eventCounter := 0 probablyTimeout := false @@ -397,9 +397,30 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba // We received telemetry util.Debugf("Agent telemetry bytes: %s", base64.URLEncoding.EncodeToString(telemetryBytes)) - inv := batch.AddTelemetry(lastRequestId, telemetryBytes, true) - if inv == nil { - util.Logf("Failed to add telemetry for request %v", lastRequestId) + if conf.LambdaWebAdapter { + awsContext := telemetry.AWSLambdaContext{ + RequestID: lastRequestId, + ARN: invokedFunctionARN, + FunctionVersion: LambdaFunctionVersion, + } + finalPayload, err := telemetry.ProcessTelemetry(base64.URLEncoding.EncodeToString(telemetryBytes), awsContext) + if err != nil { + util.Logf("Error processing telemetry: %v", err) + inv := batch.AddTelemetry(lastRequestId, telemetryBytes, true) + if inv == nil { + util.Logf("Failed to add telemetry for request %v", lastRequestId) + } + } else { + inv := batch.AddTelemetry(lastRequestId, []byte(finalPayload), true) + if inv == nil { + util.Logf("Failed to add processed telemetry for request %v", lastRequestId) + } + } + } else { + inv := batch.AddTelemetry(lastRequestId, telemetryBytes, true) + if inv == nil { + util.Logf("Failed to add telemetry for request %v", lastRequestId) + } } // Opportunity for an aggressive harvest, in which case, we definitely want to wait for the HTTP POST diff --git a/telemetry/lwa_payload.go b/telemetry/lwa_payload.go new file mode 100644 index 00000000..aabef6b3 --- /dev/null +++ b/telemetry/lwa_payload.go @@ -0,0 +1,251 @@ +// Package telemetry provides tools to process and enrich telemetry payloads for Lambda Web Adapter. +// This package specifically handles telemetry processing when NEW_RELIC_LAMBDA_WEB_ADAPTER is enabled. +// +// Lambda Web Adapter (LWA) Processing: +// - Decodes base64-encoded telemetry from the Newrelic agent +// - Enriches telemetry data with AWS Lambda context (RequestID, ARN, Function Version, Cold Start) +// - Returns processed telemetry as raw JSON bytes instead of base64-encoded strings +// - Supports both protocol versions 1 and 2 from the Newrelic agent +package telemetry + +import ( + "bytes" + "compress/gzip" + "encoding/base64" + "encoding/json" + "fmt" + + "github.com/newrelic/newrelic-lambda-extension/util" +) + +// LambdaMetadata contains metadata about the Lambda execution environment. +type LambdaMetadata struct { + MetadataVersion int `json:"metadata_version"` + ARN string `json:"arn,omitempty"` + ProtocolVersion int `json:"protocol_version"` + ExecutionEnvironment string `json:"execution_environment,omitempty"` + AgentVersion string `json:"agent_version"` + AgentLanguage string `json:"agent_language"` + FunctionVersion string `json:"function_version,omitempty"` +} + +// LambdaData holds the various telemetry data points. +type LambdaData struct { + MetricData []interface{} `json:"metric_data"` + CustomEventData []interface{} `json:"custom_event_data"` + LogEventData []interface{} `json:"log_event_data"` + AnalyticEventData []interface{} `json:"analytic_event_data"` + ErrorEventData []interface{} `json:"error_event_data"` + ErrorData []interface{} `json:"error_data"` + SpanEventData []interface{} `json:"span_event_data"` + UpdateLoadedModules []interface{} `json:"update_loaded_modules"` + TransactionSampleData []interface{} `json:"transaction_sample_data"` +} + +// LambdaRawData is the top-level structure for a Protocol v1 payload. +type LambdaRawData struct { + LambdaMetadata LambdaMetadata `json:"metadata"` + LambdaData LambdaData `json:"data"` +} + +// AWSLambdaContext provides the AWS-specific data to be added to telemetry payloads +// when processing through Lambda Web Adapter. +type AWSLambdaContext struct { + RequestID string + ARN string + FunctionVersion string + ColdStart bool +} + + +// ProcessTelemetry takes a raw telemetry string and AWS context, and returns enriched +// telemetry data as JSON bytes for Lambda Web Adapter processing. +// This is the main entry point for Lambda Web Adapter telemetry enrichment. +func ProcessTelemetry(telemetryBytes string, awsContext AWSLambdaContext) ([]byte, error) { + rawData, lambdaData, protocolVersion, err := getServerlessData([]byte(telemetryBytes)) + if err != nil { + util.Logf("Error decoding serverless data: %v", err) + return nil, fmt.Errorf("error decoding serverless data: %w", err) + } + + if err := addAWSData(&lambdaData, awsContext); err != nil { + util.Logf("Error adding AWS data: %v", err) + return nil, fmt.Errorf("error adding AWS data: %w", err) + } + + finalPayload, err := createTelemetryBytes(lambdaData, rawData.LambdaMetadata, awsContext, protocolVersion, "NR_LAMBDA_MONITORING") + if err != nil { + util.Logf("Error creating final telemetry bytes: %v", err) + return nil, fmt.Errorf("error creating final telemetry bytes: %w", err) + } + + return finalPayload, nil +} + +// getServerlessData decodes and parses base64-encoded telemetry data from the Newrelic agent +// for processing in Lambda Web Adapter. Supports both protocol versions 1 and 2. +func getServerlessData(data []byte) (LambdaRawData, LambdaData, int, error) { + decodedJSON, err := base64.StdEncoding.DecodeString(string(data)) + if err != nil { + util.Logf("Failed to base64 decode payload: %v", err) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("failed to base64 decode payload: %w", err) + } + + var rawPayload []json.RawMessage + if err := json.Unmarshal(decodedJSON, &rawPayload); err != nil { + util.Logf("Failed to unmarshal JSON array: %v", err) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("failed to unmarshal JSON array: %w", err) + } + if len(rawPayload) < 3 { + util.Logf("Invalid payload structure, expected at least 3 elements, got %d", len(rawPayload)) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("invalid payload structure, expected at least 3 elements") + } + + var protocolVersion int + if err := json.Unmarshal(rawPayload[0], &protocolVersion); err != nil { + util.Logf("Failed to parse protocol version: %v", err) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("failed to parse protocol version: %w", err) + } + + var encodedPart string + if err := json.Unmarshal(rawPayload[2], &encodedPart); err != nil { + util.Logf("Failed to parse encoded data part: %v", err) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("failed to parse encoded data part: %w", err) + } + + uncompressedJSON, err := decodeUncompress(encodedPart) + if err != nil { + util.Logf("Failed to decode and decompress data: %v", err) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("failed to decode and decompress data: %w", err) + } + + switch protocolVersion { + case 1: + util.Debugf("Processing protocol version 1 for Lambda Web Adapter") + var result LambdaRawData + if err := json.Unmarshal(uncompressedJSON, &result); err != nil { + util.Logf("Unable to unmarshal JSON data into LambdaRawData: %v", err) + return LambdaRawData{}, LambdaData{}, 1, fmt.Errorf("unable to unmarshal JSON data into LambdaRawData: %w", err) + } + return result, result.LambdaData, 1, nil + case 2: + util.Debugf("Processing protocol version 2 for Lambda Web Adapter") + var result LambdaData + if err := json.Unmarshal(uncompressedJSON, &result); err != nil { + util.Logf("Unable to unmarshal JSON data into LambdaData: %v", err) + return LambdaRawData{}, LambdaData{}, 2, fmt.Errorf("unable to unmarshal JSON data into LambdaData: %w", err) + } + return LambdaRawData{}, result, 2, nil + default: + util.Logf("Unsupported protocol version for Lambda Web Adapter: %d", protocolVersion) + return LambdaRawData{}, LambdaData{}, 0, fmt.Errorf("unsupported protocol version: %d", protocolVersion) + } +} + +func addAWSData(lambdaData *LambdaData, awsContext AWSLambdaContext) error { + util.Debugf("Adding AWS context data for Lambda Web Adapter: RequestID=%s, ARN=%s, FunctionVersion=%s, ColdStart=%t", + awsContext.RequestID, awsContext.ARN, awsContext.FunctionVersion, awsContext.ColdStart) + + awsAttributes := map[string]interface{}{ + "aws.requestId": awsContext.RequestID, + "aws.lambda.arn": awsContext.ARN, + "aws.lambda.functionVersion": awsContext.FunctionVersion, + } + + if awsContext.ColdStart { + awsAttributes["aws.lambda.coldStart"] = true + } + + if err := processEventData(lambdaData.AnalyticEventData, awsAttributes); err != nil { + util.Logf("Failed to process analytic event data: %v", err) + return fmt.Errorf("failed to process analytic event data: %w", err) + } + if err := processEventData(lambdaData.SpanEventData, awsAttributes); err != nil { + util.Logf("Failed to process span event data: %v", err) + return fmt.Errorf("failed to process span event data: %w", err) + } + return nil +} + +// createTelemetryBytes creates the final telemetry payload as JSON bytes for Lambda Web Adapter. +// Returns raw JSON bytes instead of base64-encoded string for direct consumption. +func createTelemetryBytes(data LambdaData, metadata LambdaMetadata, awsContext AWSLambdaContext, protocolVersion int, agentName string) ([]byte, error) { + util.Debugf("Creating telemetry bytes for Lambda Web Adapter - protocol version %d with agent %s", protocolVersion, agentName) + + metadata.ARN = awsContext.ARN + metadata.FunctionVersion = awsContext.FunctionVersion + + rawData := LambdaRawData{ + LambdaMetadata: metadata, + LambdaData: data, + } + + rawJSON, err := json.Marshal(rawData) + if err != nil { + util.Logf("Failed to marshal raw data: %v", err) + return nil, fmt.Errorf("failed to marshal raw data: %w", err) + } + + var buf bytes.Buffer + gz := gzip.NewWriter(&buf) + if _, err := gz.Write(rawJSON); err != nil { + util.Logf("Failed to gzip data: %v", err) + return nil, fmt.Errorf("failed to gzip data: %w", err) + } + if err := gz.Close(); err != nil { + util.Logf("Failed to close gzip writer: %v", err) + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + + innerPayload := base64.StdEncoding.EncodeToString(buf.Bytes()) + + finalArray := []interface{}{ + protocolVersion, + agentName, + innerPayload, + } + + finalJSON, err := json.Marshal(finalArray) + if err != nil { + util.Logf("Failed to marshal final array: %v", err) + return nil, fmt.Errorf("failed to marshal final array: %w", err) + } + + util.Debugf("Successfully created Lambda Web Adapter telemetry JSON with length %d bytes", len(finalJSON)) + return finalJSON, nil +} + +// processEventData enriches event data with AWS attributes for Lambda Web Adapter telemetry. +// This function adds AWS context information to analytic and span events. +func processEventData(eventData []interface{}, attributes map[string]interface{}) error { + if len(eventData) < 3 { + util.Debugf("Event data has insufficient elements (%d), skipping processing", len(eventData)) + return nil + } + eventsList, ok := eventData[2].([]interface{}) + if !ok { + util.Logf("Expected events list at index 2 to be a []interface{}, but it was not") + return fmt.Errorf("expected events list at index 2 to be a []interface{}, but it was not") + } + + processedEvents := 0 + for _, eventEntry := range eventsList { + event, ok := eventEntry.([]interface{}) + if !ok || len(event) < 3 { + util.Debugf("Skipping invalid event entry") + continue + } + agentAttributes, ok := event[2].(map[string]interface{}) + if !ok { + util.Debugf("Skipping event with invalid attributes structure") + continue + } + for key, value := range attributes { + agentAttributes[key] = value + } + processedEvents++ + } + + util.Debugf("Processed %d events with AWS attributes for Lambda Web Adapter", processedEvents) + return nil +} \ No newline at end of file From 2d624b41f627e13de3d64500546054a3b5247b02 Mon Sep 17 00:00:00 2001 From: Chodavarapu Naga Viswanadha Avinash Date: Fri, 5 Sep 2025 16:02:57 +0530 Subject: [PATCH 2/2] feat: Enhance telemetry processing with cold start detection --- main.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/main.go b/main.go index f95a55ca..49b8c45d 100644 --- a/main.go +++ b/main.go @@ -398,10 +398,15 @@ func mainLoop(ctx context.Context, invocationClient *client.InvocationClient, ba // We received telemetry util.Debugf("Agent telemetry bytes: %s", base64.URLEncoding.EncodeToString(telemetryBytes)) if conf.LambdaWebAdapter { + // Use event counter for reliable cold start detection + // First event in this container is always a cold start + isColdStart := eventCounter == 1 + awsContext := telemetry.AWSLambdaContext{ RequestID: lastRequestId, ARN: invokedFunctionARN, FunctionVersion: LambdaFunctionVersion, + ColdStart: isColdStart, } finalPayload, err := telemetry.ProcessTelemetry(base64.URLEncoding.EncodeToString(telemetryBytes), awsContext) if err != nil {