From 29fb0ff81981d81752f04167f1a265ee2e121014 Mon Sep 17 00:00:00 2001 From: Himanshu Rai Date: Mon, 9 Jun 2025 16:53:39 +0530 Subject: [PATCH 1/3] code changes for supporting sns events --- src/main.go | 8 +++++ src/s3/s3.go | 60 ++++++++++++++++++++++++++++++++++++++ src/unmarshal/unmarshal.go | 11 +++++++ 3 files changed, 79 insertions(+) diff --git a/src/main.go b/src/main.go index 2db7275..244ee94 100644 --- a/src/main.go +++ b/src/main.go @@ -45,6 +45,14 @@ func handlerWithArgs(ctx context.Context, event unmarshal.Event, nrClient util.N log.Fatalf("error creating s3 client: %v", err) } err = s3.GetLogsFromS3Event(ctx, event.S3Event, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory) + case unmarshal.SNS: + log.Debugf("processing sns event: %v", event.SNSEvent) + var s3Client s3.ObjectClient + s3Client, err = s3.NewS3Client(ctx) + if err != nil { + log.Fatalf("error creating s3 client: %v", err) + } + err = s3.GetLogsFromSNSEvent(ctx, event.SNSEvent, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory) default: log.Error("unable to process unknown event type. Supported event types are cloudwatch and s3") return nil diff --git a/src/s3/s3.go b/src/s3/s3.go index 454aaa6..9f939fb 100644 --- a/src/s3/s3.go +++ b/src/s3/s3.go @@ -6,6 +6,7 @@ import ( "compress/bzip2" "compress/gzip" "context" + jsonpkg "encoding/json" "io" "os" "regexp" @@ -62,6 +63,65 @@ func GetLogsFromS3Event(ctx context.Context, s3Event events.S3Event, awsConfigur return nil } +// GetLogsFromSNSEvent batches logs from SNS into DetailedJson format and sends them to the specified channel. +// It returns an error if there is a problem retrieving or sending the logs. +func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfiguration util.AWSConfiguration, channel chan common.DetailedLogsBatch, s3Client ObjectClient, readerFactory ReaderFactory) error { + for _, record := range snsEvent.Records { + + // Unmarshal the Message field into a json array + var messageData struct { + Records []struct { + S3 struct { + Bucket struct { + Name string `json:"name"` + } + Object struct { + Key string `json:"key"` + } + } + } + } + + err := jsonpkg.Unmarshal([]byte(record.SNS.Message), &messageData) + if err != nil { + log.Errorf("failed to unmarshal SNS message: %v", err) + continue + } + + if len(messageData.Records) != 0 { + for _, msg := range messageData.Records { + log.Debugf("processing sns event message: %v", msg) + + // The Following are the common attributes for all log messages. + // New Relic uses these common attributes to generate Unique Entity ID. + attributes := common.LogAttributes{ + "aws.accountId": awsConfiguration.AccountID, + "logBucketName": msg.S3.Bucket.Name, + "logObjectKey": msg.S3.Object.Key, + "aws.realm": awsConfiguration.Realm, + "aws.region": awsConfiguration.Region, + "instrumentation.provider": common.InstrumentationProvider, + "instrumentation.name": common.InstrumentationName, + "instrumentation.version": common.InstrumentationVersion, + } + + if err := util.AddCustomMetaData(os.Getenv(common.CustomMetaData), attributes); err != nil { + log.Errorf("failed to add custom metadata %v", err) + return err + } + + if err := buildMeltLogsFromS3Bucket(ctx, msg.S3.Bucket.Name, msg.S3.Object.Key, channel, attributes, s3Client, readerFactory); err != nil { + return err + } + } + } else { + log.Debugf("SNS event Message field contains no records") + } + } + + return nil +} + // fetchS3Reader fetches an S3 object from the specified bucket and returns an io.ReadCloser for reading its contents. // It returns the io.ReadCloser and any error encountered during the operation. func fetchS3Reader(ctx context.Context, bucketName string, objectName string, s3Client ObjectClient) (io.ReadCloser, error) { diff --git a/src/unmarshal/unmarshal.go b/src/unmarshal/unmarshal.go index 17c3886..be0f885 100644 --- a/src/unmarshal/unmarshal.go +++ b/src/unmarshal/unmarshal.go @@ -12,6 +12,7 @@ import ( const ( CLOUDWATCH = "cloudwatch" // CLOUDWATCH represents the event type for CloudWatch logs. S3 = "s3" // S3 represents the event type for S3 events. + SNS = "sns" // SNS represents the event type for SNS events. ) var log = logger.NewLogrusLogger(logger.WithDebugLevel()) @@ -21,6 +22,7 @@ type Event struct { EventType string // EventType represents the type of the event. CloudwatchLogsData events.CloudwatchLogsData // CloudwatchLogsData represents the CloudWatch logs data. S3Event events.S3Event // S3Event represents the S3 event data. + SNSEvent events.SNSEvent // SNSEvent represents the SNS event data. } // UnmarshalJSON unmarshals the JSON data into the Event struct. @@ -49,6 +51,15 @@ func (event *Event) UnmarshalJSON(data []byte) error { return err } + //Try to unmarshal the event as SNSEvent + var snsEvent events.SNSEvent + err = json.Unmarshal(data, &snsEvent) + if err == nil && len(snsEvent.Records) != 0 && snsEvent.Records[0].EventSource == "aws:sns" { + event.EventType = SNS + event.SNSEvent = snsEvent + + return err + } return nil } From e798a604b8b829f0e1300b2cb13ce3761b3a0c92 Mon Sep 17 00:00:00 2001 From: Rajeev Kumar Date: Mon, 9 Jun 2025 21:50:00 +0530 Subject: [PATCH 2/3] decoding S3 object key from the URL for SNS event paylaod --- src/s3/s3.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/s3/s3.go b/src/s3/s3.go index 9f939fb..1e906cb 100644 --- a/src/s3/s3.go +++ b/src/s3/s3.go @@ -8,6 +8,7 @@ import ( "context" jsonpkg "encoding/json" "io" + "net/url" "os" "regexp" "strings" @@ -92,12 +93,14 @@ func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfi for _, msg := range messageData.Records { log.Debugf("processing sns event message: %v", msg) + decodedKey, err := url.QueryUnescape(msg.S3.Object.Key) + if err != nil { // The Following are the common attributes for all log messages. // New Relic uses these common attributes to generate Unique Entity ID. attributes := common.LogAttributes{ "aws.accountId": awsConfiguration.AccountID, "logBucketName": msg.S3.Bucket.Name, - "logObjectKey": msg.S3.Object.Key, + "logObjectKey": decodedKey, // Use the decoded key "aws.realm": awsConfiguration.Realm, "aws.region": awsConfiguration.Region, "instrumentation.provider": common.InstrumentationProvider, @@ -110,7 +113,7 @@ func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfi return err } - if err := buildMeltLogsFromS3Bucket(ctx, msg.S3.Bucket.Name, msg.S3.Object.Key, channel, attributes, s3Client, readerFactory); err != nil { + if err := buildMeltLogsFromS3Bucket(ctx, msg.S3.Bucket.Name, decodedKey, channel, attributes, s3Client, readerFactory); err != nil { return err } } From 7ac54934cc00772f235a9cfd9a9bd36a3c841379 Mon Sep 17 00:00:00 2001 From: Rajeev Kumar Date: Mon, 9 Jun 2025 22:15:30 +0530 Subject: [PATCH 3/3] Update s3.go --- src/s3/s3.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/s3/s3.go b/src/s3/s3.go index 1e906cb..d98c544 100644 --- a/src/s3/s3.go +++ b/src/s3/s3.go @@ -93,8 +93,14 @@ func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfi for _, msg := range messageData.Records { log.Debugf("processing sns event message: %v", msg) + // When S3 events come via SNS, the object key is URL-encoded. + // We need to decode it before we can use it to fetch the object. decodedKey, err := url.QueryUnescape(msg.S3.Object.Key) if err != nil { + log.Errorf("failed to URL decode S3 object key from SNS message: %v", err) + continue + } + // The Following are the common attributes for all log messages. // New Relic uses these common attributes to generate Unique Entity ID. attributes := common.LogAttributes{