Feature/sqs#360
Open
sliu008 wants to merge 20 commits into
Open
Conversation
jamesfwood
approved these changes
Jun 15, 2026
Contributor
There was a problem hiding this comment.
Pull request overview
This PR migrates the CNM (Collection Notification Message) → granule loading flow from direct Lambda invocation to an SQS-based, decoupled architecture, and updates both Terraform and application code to support queue-triggered processing and DLQs.
Changes:
- Added CNM and granule SQS queues + DLQs and wired Lambda event source mappings (SQS → Lambdas).
- Updated SNS CNM response delivery to target SQS instead of invoking the CNM Lambda directly.
- Refactored
cnm_handlerto enqueue granule-load work items to SQS; updated granule handler to accept SQS event shapes; added requester-pays args to S3 downloads; addedaiohttpdependency.
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
terraform/hydrocron-sqs.tf |
Introduces CNM + granule SQS queues, DLQs, queue policy for SNS→SQS, and event source mappings. |
terraform/hydrocron-sns.tf |
Switches CNM SNS subscription protocol from lambda to sqs (raw delivery enabled). |
terraform/hydrocron-lambda.tf |
Updates CNM lambda env var to GRANULE_QUEUE_URL; removes direct invoke permission from CNM to granule lambda. |
terraform/hydrocron-iam.tf |
Adds SQS send policy for CNM lambda; adds SQS execution managed policy for granule loader role; removes SNS→Lambda permission. |
hydrocron/db/load_data.py |
Updates handlers for SQS event shapes; CNM handler now sends granule-load messages to SQS. |
hydrocron/db/io/swot_shp.py |
Adds RequestPayer=requester to S3 download_file calls. |
pyproject.toml |
Adds aiohttp dependency. |
CHANGELOG.md |
Documents the SQS migration and requester-pays change (wording needs cleanup). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+98
to
+102
| if 'Records' in event: | ||
| record = json.loads(event['Records'][0]['body']) | ||
| body = record['body'] | ||
| else: | ||
| body = event['body'] |
Comment on lines
173
to
+211
| @@ -186,19 +192,23 @@ def cnm_handler(event, _): | |||
| else: | |||
| raise MissingTable(f"Error: Cannot load granule: {granule_uri}") | |||
|
|
|||
| event2 = ('{"body": {"granule_path": "' + granule_uri | |||
| + '","table_name": "' + table_name | |||
| + '","track_table": "' + track_table | |||
| + '","checksum": "' + checksum | |||
| + '","revisionDate": "' + revision_date | |||
| + '","load_benchmarking_data": "' + load_benchmarking_data + '"}}') | |||
|
|
|||
| logging.info("Invoking granule load lambda with event json %s", str(event2)) | |||
|
|
|||
| lambda_client.invoke( | |||
| FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'], | |||
| InvocationType='Event', | |||
| Payload=event2) | |||
| msg_body = json.dumps({ | |||
| "body": { | |||
| "granule_path": granule_uri, | |||
| "table_name": table_name, | |||
| "track_table": track_table, | |||
| "checksum": checksum, | |||
| "revisionDate": revision_date, | |||
| "load_benchmarking_data": load_benchmarking_data | |||
| } | |||
| }) | |||
|
|
|||
| logging.info("Sending granule load message to SQS: %s", msg_body) | |||
|
|
|||
| sqs_client.send_message( | |||
| QueueUrl=queue_url, | |||
| MessageBody=msg_body | |||
| ) | |||
Comment on lines
49
to
54
| if filepath.startswith('s3'): | ||
| bucket_name, key = filepath.replace("s3://", "").split("/", 1) | ||
| logging.info("Opening granule %s from bucket %s", key, bucket_name) | ||
|
|
||
| s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file) | ||
| s3_resource.Bucket(bucket_name).download_file(key, lambda_temp_file, ExtraArgs={"RequestPayer": "requester"}) | ||
|
|
Comment on lines
29
to
33
| moto = "^5.0.9" | ||
| vcrpy = "^8.0.0" | ||
| python-cmr = "^0.13.0" | ||
| aiohttp = "3.13.5" | ||
|
|
Contributor
Author
There was a problem hiding this comment.
aiohttp latest breaks another library so im pinning it to the newest one possible
Contributor
|
Hi @sliu008 , @jamesfwood , do we have an issue to track that activity ? |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This pull request migrates the CNM (Collection Notification Message) and granule loading workflow from using direct Lambda invocations via SNS to a decoupled, queue-based architecture using Amazon SQS. This improves reliability, scalability, and error handling for message processing between components. The changes update both the application logic and Terraform infrastructure to support this new design.
Key changes include:
Infrastructure: Migration to SQS-based Messaging
terraform/hydrocron-sqs.tf)hydrocron_cnm_queue) instead of directly invoking the Lambda. (terraform/hydrocron-sns.tf)terraform/hydrocron-lambda.tf,terraform/hydrocron-iam.tf) [1] [2] [3] [4] [5]Application Logic: Decoupling and SQS Integration
hydrocron/db/load_data.py) [1] [2] [3]RequestPayer: requesterargument for compatibility with requester-pays buckets. (hydrocron/db/io/swot_shp.py) [1] [2]Dependency Updates
aiohttpas a new dependency in the Python project configuration. (pyproject.toml)