diff --git a/src/main.go b/src/main.go index 2db7275..244ee94 100644 --- a/src/main.go +++ b/src/main.go @@ -45,6 +45,14 @@ func handlerWithArgs(ctx context.Context, event unmarshal.Event, nrClient util.N log.Fatalf("error creating s3 client: %v", err) } err = s3.GetLogsFromS3Event(ctx, event.S3Event, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory) + case unmarshal.SNS: + log.Debugf("processing sns event: %v", event.SNSEvent) + var s3Client s3.ObjectClient + s3Client, err = s3.NewS3Client(ctx) + if err != nil { + log.Fatalf("error creating s3 client: %v", err) + } + err = s3.GetLogsFromSNSEvent(ctx, event.SNSEvent, awsConfiguration, channel, s3Client, s3.DefaultReaderFactory) default: log.Error("unable to process unknown event type. Supported event types are cloudwatch and s3") return nil diff --git a/src/s3/s3.go b/src/s3/s3.go index 454aaa6..d98c544 100644 --- a/src/s3/s3.go +++ b/src/s3/s3.go @@ -6,7 +6,9 @@ import ( "compress/bzip2" "compress/gzip" "context" + jsonpkg "encoding/json" "io" + "net/url" "os" "regexp" "strings" @@ -62,6 +64,73 @@ func GetLogsFromS3Event(ctx context.Context, s3Event events.S3Event, awsConfigur return nil } +// GetLogsFromSNSEvent batches logs from SNS into DetailedJson format and sends them to the specified channel. +// It returns an error if there is a problem retrieving or sending the logs. +func GetLogsFromSNSEvent(ctx context.Context, snsEvent events.SNSEvent, awsConfiguration util.AWSConfiguration, channel chan common.DetailedLogsBatch, s3Client ObjectClient, readerFactory ReaderFactory) error { + for _, record := range snsEvent.Records { + + // Unmarshal the Message field into a json array + var messageData struct { + Records []struct { + S3 struct { + Bucket struct { + Name string `json:"name"` + } + Object struct { + Key string `json:"key"` + } + } + } + } + + err := jsonpkg.Unmarshal([]byte(record.SNS.Message), &messageData) + if err != nil { + log.Errorf("failed to unmarshal SNS message: %v", err) + continue + } + + if len(messageData.Records) != 0 { + for _, msg := range messageData.Records { + log.Debugf("processing sns event message: %v", msg) + + // When S3 events come via SNS, the object key is URL-encoded. + // We need to decode it before we can use it to fetch the object. + decodedKey, err := url.QueryUnescape(msg.S3.Object.Key) + if err != nil { + log.Errorf("failed to URL decode S3 object key from SNS message: %v", err) + continue + } + + // The Following are the common attributes for all log messages. + // New Relic uses these common attributes to generate Unique Entity ID. + attributes := common.LogAttributes{ + "aws.accountId": awsConfiguration.AccountID, + "logBucketName": msg.S3.Bucket.Name, + "logObjectKey": decodedKey, // Use the decoded key + "aws.realm": awsConfiguration.Realm, + "aws.region": awsConfiguration.Region, + "instrumentation.provider": common.InstrumentationProvider, + "instrumentation.name": common.InstrumentationName, + "instrumentation.version": common.InstrumentationVersion, + } + + if err := util.AddCustomMetaData(os.Getenv(common.CustomMetaData), attributes); err != nil { + log.Errorf("failed to add custom metadata %v", err) + return err + } + + if err := buildMeltLogsFromS3Bucket(ctx, msg.S3.Bucket.Name, decodedKey, channel, attributes, s3Client, readerFactory); err != nil { + return err + } + } + } else { + log.Debugf("SNS event Message field contains no records") + } + } + + return nil +} + // fetchS3Reader fetches an S3 object from the specified bucket and returns an io.ReadCloser for reading its contents. // It returns the io.ReadCloser and any error encountered during the operation. func fetchS3Reader(ctx context.Context, bucketName string, objectName string, s3Client ObjectClient) (io.ReadCloser, error) { diff --git a/src/unmarshal/unmarshal.go b/src/unmarshal/unmarshal.go index 17c3886..be0f885 100644 --- a/src/unmarshal/unmarshal.go +++ b/src/unmarshal/unmarshal.go @@ -12,6 +12,7 @@ import ( const ( CLOUDWATCH = "cloudwatch" // CLOUDWATCH represents the event type for CloudWatch logs. S3 = "s3" // S3 represents the event type for S3 events. + SNS = "sns" // SNS represents the event type for SNS events. ) var log = logger.NewLogrusLogger(logger.WithDebugLevel()) @@ -21,6 +22,7 @@ type Event struct { EventType string // EventType represents the type of the event. CloudwatchLogsData events.CloudwatchLogsData // CloudwatchLogsData represents the CloudWatch logs data. S3Event events.S3Event // S3Event represents the S3 event data. + SNSEvent events.SNSEvent // SNSEvent represents the SNS event data. } // UnmarshalJSON unmarshals the JSON data into the Event struct. @@ -49,6 +51,15 @@ func (event *Event) UnmarshalJSON(data []byte) error { return err } + //Try to unmarshal the event as SNSEvent + var snsEvent events.SNSEvent + err = json.Unmarshal(data, &snsEvent) + if err == nil && len(snsEvent.Records) != 0 && snsEvent.Records[0].EventSource == "aws:sns" { + event.EventType = SNS + event.SNSEvent = snsEvent + + return err + } return nil }