diff --git a/lambdas/dynamo_export_poll/__init__.py b/lambdas/dynamo_export_poll/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lambdas/dynamo_export_poll/dynamo_export_poll.py b/lambdas/dynamo_export_poll/dynamo_export_poll.py new file mode 100644 index 000000000..44cc53a31 --- /dev/null +++ b/lambdas/dynamo_export_poll/dynamo_export_poll.py @@ -0,0 +1,31 @@ +import boto3 +from botocore.config import Config + +ddb = boto3.client( + "dynamodb", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(event, _): + completed = [] + for arn in event["export_arns"]: + response = ddb.describe_export(ExportArn=arn) + if response["ExportDescription"]["ExportStatus"] == "FAILED": + return { + "status": "FAILED", + "export_to_time": event["export_to_time"], + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } + + completed.append(response["ExportDescription"]["ExportStatus"]) + + status = "COMPLETED" if all(s == "COMPLETED" for s in completed) else "IN_PROGRESS" + + return { + "status": status, + "export_to_time": event["export_to_time"], + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } diff --git a/lambdas/dynamo_export_trigger/__init__.py b/lambdas/dynamo_export_trigger/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lambdas/dynamo_export_trigger/dynamo_export_trigger.py b/lambdas/dynamo_export_trigger/dynamo_export_trigger.py new file mode 100644 index 000000000..a2c9ce3a9 --- /dev/null +++ b/lambdas/dynamo_export_trigger/dynamo_export_trigger.py @@ -0,0 +1,83 @@ +import os +from datetime import datetime, timedelta, timezone + +import boto3 +from botocore.config import Config +from botocore.exceptions import ClientError + +bucket = os.environ["BUCKET"] +ddb_table_arn = os.environ["DDB_TABLE_ARN"] +kms_key = os.environ["KMS_KEY"] +env = os.environ["ENVIRONMENT"] +ddb_table_name = os.environ["DDB_TABLE_NAME"] + +SSM_PARAM = "/exports/DynamoExportRuntime" + +ddb_client = boto3.client( + "dynamodb", + config=Config(connect_timeout=5, read_timeout=5), +) +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(_, __): + to_time = datetime.now(timezone.utc).replace(microsecond=0, second=0, minute=0) + export_arns = [] + + try: + from_time = ssm.get_parameter(Name=SSM_PARAM)["Parameter"]["Value"] + from_time = datetime.fromisoformat(from_time).replace( + microsecond=0, second=0, minute=0 + ) + + # Handle exports longer than 24 hours by splitting into multiple exports + earliest_pitr = ddb_client.describe_continuous_backups( + TableName=ddb_table_name + )["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"][ + "EarliestRestorableDateTime" + ] + from_time = max(from_time, earliest_pitr) + days_difference = (to_time - from_time).days + 1 + from_times = [from_time + timedelta(days=i) for i in range(days_difference)] + + for base_time in from_times: + end_time = min(base_time + timedelta(days=1), to_time) + if end_time == base_time: + continue + response = ddb_client.export_table_to_point_in_time( + TableArn=ddb_table_arn, + S3Bucket=bucket, + S3SseAlgorithm="KMS", + S3SseKmsKeyId=kms_key, + ExportFormat="DYNAMODB_JSON", + ExportType="INCREMENTAL_EXPORT", + IncrementalExportSpecification={ + "ExportFromTime": base_time, + "ExportToTime": end_time, + "ExportViewType": "NEW_AND_OLD_IMAGES", + }, + ) + export_arns.append(response["ExportDescription"]["ExportArn"]) + export_type = response["ExportDescription"]["ExportType"] + except ClientError as e: + if e.response["Error"]["Code"] != "ParameterNotFound": + raise + response = ddb_client.export_table_to_point_in_time( + TableArn=ddb_table_arn, + S3Bucket=bucket, + S3SseAlgorithm="KMS", + S3SseKmsKeyId=kms_key, + ExportFormat="DYNAMODB_JSON", + ExportType="FULL_EXPORT", + ) + export_arns.append(response["ExportDescription"]["ExportArn"]) + export_type = response["ExportDescription"]["ExportType"] + + return { + "export_to_time": to_time.isoformat(), + "export_arns": export_arns, + "export_type": export_type, + } diff --git a/lambdas/ssm_put_param/__init__.py b/lambdas/ssm_put_param/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lambdas/ssm_put_param/ssm_put_param.py b/lambdas/ssm_put_param/ssm_put_param.py new file mode 100644 index 000000000..a9d64c581 --- /dev/null +++ b/lambdas/ssm_put_param/ssm_put_param.py @@ -0,0 +1,18 @@ +import boto3 +from botocore.config import Config + +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + + +def lambda_handler(event, _): + param_name = "/exports/DynamoExportRuntime" + param_value = event["export_to_time"] + ssm.put_parameter(Name=param_name, Value=param_value, Type="String", Overwrite=True) + return { + "to_time": param_value, + "export_arns": event["export_arns"], + "export_type": event["export_type"], + } diff --git a/terraform/account-wide-infrastructure/dev/dynamo_export.tf b/terraform/account-wide-infrastructure/dev/dynamo_export.tf new file mode 100644 index 000000000..4e6d69e09 --- /dev/null +++ b/terraform/account-wide-infrastructure/dev/dynamo_export.tf @@ -0,0 +1,6 @@ +module "dynamo_export" { + source = "./modules/dynamo_export" + name_prefix = "nhsd-nrlf--dev" + environment = "dev" + pointer_table_name = module.dev-pointers-table.table_name +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json new file mode 100644 index 000000000..8327ae3bd --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_export_step_function_definition.json @@ -0,0 +1,87 @@ +{ + "Comment": "execute lambdas", + "StartAt": "DynamoExport", + "States": { + "DynamoExport": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_export_trigger_function_name}", + "Payload.$": "$" + }, + "Next": "DynamoExportStatusCheck" + }, + "DynamoExportStatusCheck": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_export_poll_function_name}", + "Payload.$": "$" + }, + "Next": "Choice" + }, + "Choice": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.status", + "StringEquals": "COMPLETED", + "Next": "SSMPut" + }, + { + "Variable": "$.status", + "StringEquals": "IN_PROGRESS", + "Next": "WaitState" + }, + { + "Variable": "$.status", + "StringEquals": "FAILED", + "Next": "ExportFailure" + } + ], + "Default": "FailState" + }, + "FailState": { + "Type": "Fail", + "Error": "UnhandledStatus", + "Cause": "Status not recognised" + }, + "ExportFailure": { + "Type": "Fail", + "Error": "DynamoExportFailed", + "Cause": "DynamoDB Export Failed" + }, + "WaitState": { + "Type": "Wait", + "Seconds": 120, + "Next": "DynamoExportStatusCheck" + }, + "SSMPut": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "FunctionName": "${lambda_ssm_put_param_function_name}", + "Payload.$": "$" + }, + "Next": "GlueJobTrigger" + }, + "GlueJobTrigger": { + "Type": "Task", + "Resource": "arn:aws:states:::glue:startJobRun.sync", + "Parameters": { + "JobName": "${glue_job_name}", + "Arguments": { + "--SOURCE_BUCKET": "${dynamo_export_s3_bucket_name}", + "--TARGET_BUCKET": "${dynamo_export_processed_s3_bucket_name}", + "--DDB_TABLE_ARN": "${ddb_table_arn}", + "--GLUE_CRAWLER_NAME": "${glue_crawler_name}", + "--EXPORT_TYPE.$": "$.export_type" + } + }, + "End": true + } + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf new file mode 100644 index 000000000..f18938d43 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_output_s3.tf @@ -0,0 +1,77 @@ +resource "aws_s3_bucket" "dynamodb_output" { + bucket = "${var.name_prefix}-${var.environment}-dynamo-output-bucket" +} + +# May need to restrict access to specific IAM roles/principals in future, but helps with testing for now. +data "aws_iam_policy_document" "dynamodb_output" { + statement { + sid = "HTTPSOnly" + effect = "Deny" + actions = ["s3:*"] + + principals { + type = "AWS" + identifiers = ["*"] + } + + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*" + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } +} + +resource "aws_s3_bucket_policy" "dynamodb_output" { + bucket = aws_s3_bucket.dynamodb_output.id + policy = data.aws_iam_policy_document.dynamodb_output.json +} + + +resource "aws_s3_bucket_server_side_encryption_configuration" "dynamodb_output" { + bucket = aws_s3_bucket.dynamodb_output.bucket + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = aws_kms_key.dynamo.arn + sse_algorithm = "aws:kms" + } + } +} + + +resource "aws_s3_bucket_public_access_block" "dynamodb_output_public_access_block" { + bucket = aws_s3_bucket.dynamodb_output.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_lifecycle_configuration" "dynamodb_output_lifecycle" { + bucket = aws_s3_bucket.dynamodb_output.id + + + rule { + id = "object-auto-delete-rule" + status = "Enabled" + filter {} + + expiration { + days = 2 + } + } +} + +resource "aws_s3_bucket_versioning" "dynamodb_output_versioning" { + bucket = aws_s3_bucket.dynamodb_output.id + versioning_configuration { + status = "Enabled" + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf new file mode 100644 index 000000000..8af17df9c --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/dynamo_processed_output_s3.tf @@ -0,0 +1,76 @@ +resource "aws_s3_bucket" "dynamodb_output_processed" { + bucket = "${var.name_prefix}-${var.environment}-dynamo-output-processed-bucket" +} + +data "aws_iam_policy_document" "dynamodb_output_processed" { + statement { + sid = "HTTPSOnly" + effect = "Deny" + actions = ["s3:*"] + + principals { + type = "AWS" + identifiers = ["*"] + } + + resources = [ + aws_s3_bucket.dynamodb_output_processed.arn, + "${aws_s3_bucket.dynamodb_output_processed.arn}/*" + ] + + condition { + test = "Bool" + variable = "aws:SecureTransport" + values = ["false"] + } + } +} + +resource "aws_s3_bucket_policy" "dynamodb_output_processed" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + policy = data.aws_iam_policy_document.dynamodb_output_processed.json +} + + +resource "aws_s3_bucket_server_side_encryption_configuration" "dynamodb_output_processed" { + bucket = aws_s3_bucket.dynamodb_output_processed.bucket + + rule { + apply_server_side_encryption_by_default { + kms_master_key_id = aws_kms_key.dynamo_processed.arn + sse_algorithm = "aws:kms" + } + } +} + + +resource "aws_s3_bucket_public_access_block" "dynamodb_output_processed_public_access_block" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + + block_public_acls = true + block_public_policy = true + ignore_public_acls = true + restrict_public_buckets = true +} + +resource "aws_s3_bucket_lifecycle_configuration" "dynamodb_output_processed_lifecycle" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + + + rule { + id = "object-auto-delete-rule" + status = "Enabled" + filter {} + + expiration { + days = 3 * 365 + } + } +} + +resource "aws_s3_bucket_versioning" "dynamodb_output_processed_versioning" { + bucket = aws_s3_bucket.dynamodb_output_processed.id + versioning_configuration { + status = "Enabled" + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf new file mode 100644 index 000000000..395e5feaa --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/glue.tf @@ -0,0 +1,178 @@ +data "aws_iam_policy_document" "glue_service_assume_role" { + statement { + actions = ["sts:AssumeRole"] + principals { + type = "Service" + identifiers = ["glue.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "glue_service_role" { + name = "${var.name_prefix}-dynamo-glue-service-role" + assume_role_policy = data.aws_iam_policy_document.glue_service_assume_role.json +} + +data "aws_iam_policy_document" "glue_service_policy" { + statement { + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:GetObject", + "s3:GetObjectVersion", + "s3:GetBucketLocation", + "s3:DeleteObject", + "s3:ListBucket", + ] + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*", + aws_s3_bucket.dynamodb_output_processed.arn, + "${aws_s3_bucket.dynamodb_output_processed.arn}/*", + ] + } + + statement { + effect = "Allow" + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey" + ] + resources = [ + aws_kms_key.dynamo.arn, + aws_kms_key.dynamo_processed.arn, + ] + } + + statement { + actions = [ + "glue:GetDatabase", + "glue:GetDatabases", + "glue:GetTable", + "glue:GetTables", + "glue:CreateTable", + "glue:UpdateTable", + "glue:DeleteTable", + "glue:GetPartition", + "glue:GetPartitions", + "glue:CreatePartition", + "glue:BatchCreatePartition", + "glue:UpdatePartition", + ] + + resources = [ + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:catalog", + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:database/*", + "arn:aws:glue:eu-west-2:${data.aws_caller_identity.current.account_id}:table/*", + ] + + effect = "Allow" + } + + statement { + actions = [ + "glue:GetCrawler", + "glue:StartCrawler", + "glue:GetCrawlerMetrics", + ] + + resources = [ + aws_glue_crawler.log_crawler.arn, + ] + + effect = "Allow" + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["arn:aws:logs:*:*:*:/aws-glue/*"] + } + + statement { + effect = "Allow" + actions = [ + "ssm:GetParameter", + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } +} + +resource "aws_iam_role_policy" "glue_service_policy" { + name = "${var.name_prefix}-dynamo-glue-service-policy" + role = aws_iam_role.glue_service_role.id + policy = data.aws_iam_policy_document.glue_service_policy.json +} + +resource "aws_glue_catalog_database" "log_database" { + name = "${var.name_prefix}-dynamo-reporting" + location_uri = "${aws_s3_bucket.dynamodb_output_processed.id}/" +} + +resource "aws_glue_crawler" "log_crawler" { + name = "${var.name_prefix}-${var.environment}-dynamo-crawler" + database_name = aws_glue_catalog_database.log_database.name + role = aws_iam_role.glue_service_role.name + delta_target { + delta_tables = ["s3://${aws_s3_bucket.dynamodb_output_processed.id}/processed/dynamo_export_flags"] + write_manifest = false + create_native_delta_table = true + } + schema_change_policy { + delete_behavior = "LOG" + } + configuration = jsonencode({ + "Version" : 1.0, + "Grouping" : { + "TableGroupingPolicy" : "CombineCompatibleSchemas" + } + }) +} + +resource "aws_glue_trigger" "crawler_trigger" { + name = "${var.name_prefix}-crawler-trigger" + type = "ON_DEMAND" + actions { + crawler_name = aws_glue_crawler.log_crawler.name + } +} + +resource "aws_s3_object" "script" { + bucket = aws_s3_bucket.dynamodb_output_processed.bucket + key = "glue_code/glue_job.py" + source = "${path.module}/src/glue_job.py" + source_hash = filemd5("${path.module}/src/glue_job.py") +} + +resource "aws_glue_job" "glue_job" { + name = "${var.name_prefix}-dynamo-export-glue-job" + role_arn = aws_iam_role.glue_service_role.arn + description = "Export DynamoDB data to S3" + glue_version = "5.0" + worker_type = "G.1X" + timeout = 48 * 60 + max_retries = 0 + number_of_workers = 4 + + command { + name = "glueetl" + python_version = 3 + script_location = "s3://${aws_s3_bucket.dynamodb_output_processed.id}/glue_code/glue_job.py" + } + + default_arguments = { + "--enable-auto-scaling" = "true" + "--enable-continuous-cloudwatch-log" = "true" + "--enable-continuous-log-filter" = "true" + "--datalake-formats" = "delta" + "--enable-metrics" = "true" + "--enable-glue-datacatalog" = "true" + "--conf" = "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore --conf spark.jars.packages=io.delta:delta-core_2.3:3.3.0" + # "--SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME" = "/${var.environment}-blue/api_config/slack_service_alert_webhook" + "--ENVIRONMENT" = var.environment + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf new file mode 100644 index 000000000..6c2ac99d1 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/kms.tf @@ -0,0 +1,15 @@ +resource "aws_kms_key" "dynamo" { +} + +resource "aws_kms_alias" "dynamo" { + name = "alias/${var.name_prefix}-dynamo-output-bucket" + target_key_id = aws_kms_key.dynamo.key_id +} + +resource "aws_kms_key" "dynamo_processed" { +} + +resource "aws_kms_alias" "dynamo_processed" { + name = "alias/${var.name_prefix}-dynamo-processed-output-bucket" + target_key_id = aws_kms_key.dynamo_processed.key_id +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf new file mode 100644 index 000000000..15d6a595d --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/lambda.tf @@ -0,0 +1,199 @@ +data "aws_dynamodb_table" "pointer_table" { + name = var.pointer_table_name +} + +data "aws_iam_policy_document" "lambda_assume_role" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "lambda_role" { + name = "${var.name_prefix}-dynamodb-export-lambda" + + assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json +} + +data "aws_iam_policy_document" "lambda_policy" { + statement { + effect = "Allow" + actions = [ + "dynamodb:ExportTableToPointInTime", + "dynamodb:DescribeExport", + "dynamodb:DescribeTable", + "dynamodb:DescribeContinuousBackups", + ] + resources = ["*"] + } + + statement { + effect = "Allow" + actions = [ + "ssm:GetParameter" + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } + + statement { + effect = "Allow" + actions = [ + "s3:PutObject", + "s3:PutObjectAcl", + "s3:AbortMultipartUpload", + "s3:ListMultipartUploadParts" + ] + resources = [ + aws_s3_bucket.dynamodb_output.arn, + "${aws_s3_bucket.dynamodb_output.arn}/*" + ] + } + + statement { + effect = "Allow" + actions = [ + "kms:GenerateDataKey", + "kms:Encrypt", + "kms:Decrypt" + ] + resources = [aws_kms_key.dynamo.arn] + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "lambda_policy" { + name = "${var.name_prefix}-dynamodb-export-lambda-policy" + role = aws_iam_role.lambda_role.id + + policy = data.aws_iam_policy_document.lambda_policy.json +} + +resource "aws_lambda_function" "dynamo_export_trigger" { + function_name = "${var.name_prefix}-dynamo-export-trigger" + role = aws_iam_role.lambda_role.arn + handler = "src.lambdas.dynamo_export_trigger.dynamo_export_trigger.lambda_handler" + runtime = "python3.13" + timeout = 30 + + filename = "${path.module}/../../../../dist/dynamo_export_trigger.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/dynamo_export_trigger.zip") + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + DDB_TABLE_NAME = var.pointer_table_name + } + } +} + +resource "aws_lambda_function" "dynamo_export_poll" { + function_name = "${var.name_prefix}-dynamo-export-poll" + role = aws_iam_role.lambda_role.arn + handler = "src.lambdas.dynamo_export_poll.dynamo_export_poll.lambda_handler" + runtime = "python3.13" + timeout = 30 + + filename = "${path.module}/../../../../dist/dynamo_export_poll.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/dynamo_export_poll.zip") + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + } + } +} + +data "aws_iam_policy_document" "ssm_put_param_assume_role" { + statement { + actions = ["sts:AssumeRole"] + + principals { + type = "Service" + identifiers = ["lambda.amazonaws.com"] + } + } +} + +resource "aws_iam_role" "ssm_put_param_role" { + name = "${var.name_prefix}-ssm-put-param-lambda" + + assume_role_policy = data.aws_iam_policy_document.ssm_put_param_assume_role.json +} + +data "aws_iam_policy_document" "ssm_put_param_policy" { + statement { + effect = "Allow" + actions = [ + "ssm:PutParameter" + ] + resources = ["arn:aws:ssm:eu-west-2:${data.aws_caller_identity.current.account_id}:*"] + } + + statement { + effect = "Allow" + actions = [ + "logs:CreateLogGroup", + "logs:CreateLogStream", + "logs:PutLogEvents" + ] + resources = ["*"] + } +} + +resource "aws_iam_role_policy" "ssm_put_param_policy" { + name = "${var.name_prefix}-ssm-put-param-lambda-policy" + role = aws_iam_role.ssm_put_param_role.id + + policy = data.aws_iam_policy_document.ssm_put_param_policy.json +} + +resource "aws_lambda_function" "ssm_put_param" { + function_name = "${var.name_prefix}-ssm-put-param" + role = aws_iam_role.ssm_put_param_role.arn + handler = "src.lambdas.ssm_put_param.ssm_put_param.lambda_handler" + runtime = "python3.13" + timeout = 30 + + filename = "${path.module}/../../../../dist/ssm_put_param.zip" + source_code_hash = filebase64sha256("${path.module}/../../../../dist/ssm_put_param.zip") + + logging_config { + log_format = "JSON" + } + + environment { + variables = { + BUCKET = aws_s3_bucket.dynamodb_output.id + DDB_TABLE_ARN = data.aws_dynamodb_table.pointer_table.arn + KMS_KEY = aws_kms_key.dynamo.key_id + ENVIRONMENT = var.environment + } + } +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf new file mode 100644 index 000000000..8fc4b38cc --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/main.tf @@ -0,0 +1 @@ +data "aws_caller_identity" "current" {} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf new file mode 100644 index 000000000..af4ad2b56 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/outputs.tf @@ -0,0 +1,12 @@ +output "dynamo_processed_bucket_arn" { + value = aws_s3_bucket.dynamodb_output_processed.arn + description = "The ARN of the S3 bucket used for processed DynamoDB data." +} +output "dynamo_processed_key_arn" { + value = aws_kms_key.dynamo_processed.arn + description = "The ARN of the KMS key used to encrypt the processed DynamoDB S3 bucket." +} +output "dynamo_export_step_function_arn" { + value = aws_sfn_state_machine.dynamo_export.arn + description = "The ARN of the Step Function for DynamoDB patient history processing." +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py new file mode 100644 index 000000000..aaf2fdeb3 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/src/glue_job.py @@ -0,0 +1,363 @@ +import hashlib +import logging +import os +import sys +import time +import timeit + +import boto3 +from awsglue.context import GlueContext +from awsglue.utils import getResolvedOptions +from botocore.config import Config +from delta.tables import DeltaTable +from pyspark.context import SparkContext +from pyspark.sql import DataFrame +from pyspark.sql.functions import col, desc, lit, row_number, udf, when +from pyspark.sql.types import StringType, StructField, StructType +from pyspark.sql.window import Window + +spark_context = SparkContext.getOrCreate() +glue_context = GlueContext(spark_context) +session = glue_context.spark_session + +MSG_FORMAT = "%(asctime)s %(levelname)s %(name)s: %(message)s" +DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" +logging.basicConfig(format=MSG_FORMAT, datefmt=DATETIME_FORMAT) +logger = logging.getLogger("ETLLogger") +logger.setLevel(logging.INFO) + +region = os.environ.get("AWS_REGION", "eu-west-2") +glue = boto3.client( + service_name="glue", + region_name=region, + config=Config(connect_timeout=5, read_timeout=5), +) +ssm = boto3.client( + "ssm", + config=Config(connect_timeout=5, read_timeout=5), +) + +ARGS = getResolvedOptions( + sys.argv, + [ + "SOURCE_BUCKET", + "DDB_TABLE_ARN", + "TARGET_BUCKET", + "GLUE_CRAWLER_NAME", + "EXPORT_TYPE", + "SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME", + "ENVIRONMENT", + ], +) + +KEYS = [ + "pk", + "sk", + "author", + "category", + "category_id", + "created_on", + "custodian", + "custodian_suffix", + "id", + "master_identifier", + "nhs_number", + "patient_key", + "patient_sort", + "producer_id", + "source", + "type", + "type_id", + "updated_on", + "version", +] + +MAP_KEYS = [ + "document", +] + +DERIVED_KEYS = [ + "date", + "test_patient", +] + + +# def get_ssm_parameter(parameter): +# """Get ssm parameter value""" +# return ssm.get_parameter(Name=parameter, WithDecryption=True)["Parameter"]["Value"] + + +# def send_slack_notification(webhook_url: str, exception: Optional[Exception]): +# """Send a message to Slack via webhook.""" +# alert_message = { +# "blocks": [ +# { +# "type": "section", +# "text": { +# "type": "mrkdwn", +# "text": f"*Dynamo Export Processing Failed in {ARGS['ENVIRONMENT']} environment*\nUncaught exception: {exception}", +# }, +# }, +# ] +# } +# response = requests.post( +# webhook_url, +# data=json.dumps(alert_message), +# headers={"Content-Type": "application/json"}, +# ) + +# if response.status_code != 200: +# raise ValueError( +# f"Request to Slack returned {response.status_code}, {response.text}" +# ) + + +@udf +def hash_nhs(nhs_number: str) -> str: + """Hash the NHS number using SHA-256.""" + if not nhs_number: + return "" + + return hashlib.sha256(nhs_number.encode()).hexdigest() + + +def validate_df_schema(df: DataFrame) -> DataFrame: + """Ensure that the DataFrame has the expected schema for NewImage and OldImage.""" + logger.info("Validating DataFrame schema for NewImage and OldImage columns.") + image_schema = StructType( + [StructField(k, StructType([StructField("S", StringType())])) for k in KEYS] + + [ + StructField(k, StructType([StructField("M", StructType([]))])) + for k in MAP_KEYS + ] + ) + + for column in ["NewImage", "OldImage"]: + if column not in df.columns: + logger.info( + f"{column} column is missing from DataFrame. Adding empty column." + ) + df = df.withColumn(column, lit(None).cast(image_schema)) + + return df + + +def run_crawler( + crawler: str, *, timeout_minutes: int = 120, retry_seconds: int = 5 +) -> None: + """Run the specified AWS Glue crawler, waiting until completion.""" + timeout_seconds = timeout_minutes * 60 + start_time = timeit.default_timer() + abort_time = start_time + timeout_seconds + + def wait_until_ready() -> None: + state_previous = None + while True: + response_get = glue.get_crawler(Name=crawler) + state = response_get["Crawler"]["State"] + if state != state_previous: + logger.info(f"Crawler {crawler} is {state.lower()}.") + state_previous = state + if state == "READY": + return + if timeit.default_timer() > abort_time: + raise TimeoutError( + f"Failed to crawl {crawler}. The allocated time of {timeout_minutes:,} minutes has elapsed." + ) + time.sleep(retry_seconds) + + wait_until_ready() + try: + response_start = glue.start_crawler(Name=crawler) + assert response_start["ResponseMetadata"]["HTTPStatusCode"] == 200 + except glue.exceptions.CrawlerRunningException as e: + logger.info(f"{crawler} is already running: {e}") + logger.info(f"Crawling {crawler}.") + wait_until_ready() + logger.info(f"Crawled {crawler}.") + + +def process_full_export(df: DataFrame) -> None: + """Process full export DataFrame.""" + fields = [field for field in KEYS + MAP_KEYS if field in df.columns] + + df = df.withColumnRenamed("nhs_number", "nhs_number_raw") + df = ( + df.withColumn( + "test_patient", + when( + col("nhs_number_raw").startswith("9") + | col("nhs_number_raw").startswith("5"), + True, + ).otherwise(False), + ) + .withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + .withColumn("date", df["last_updated"].cast("date")) + .select(*fields, *DERIVED_KEYS) + ) + + df.write.format("delta").mode("append").partitionBy("date").save( + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_pointers" + ) + + run_crawler(ARGS["GLUE_CRAWLER_NAME"]) + + +def safe_select(df: DataFrame, field: str, dtype: str, image: str) -> col: + """Safely select a field from the DataFrame, returning None if the field does not exist""" + try: + df.select(f"{image}.{field}.{dtype}") + return col(f"{image}.{field}.{dtype}").alias(field) + except Exception: + logger.info( + f"Field {field} of type {dtype} is missing in {image}. Returning null for this field." + ) + return lit(None).alias(field) + + +def process_incremental_export(df: DataFrame) -> None: + """Process incremental export DataFrame""" + df = validate_df_schema(df).cache() + df = df.withColumn( + "eventName", + when(col("NewImage").isNotNull() & col("OldImage").isNull(), "INSERT") + .when(col("NewImage").isNotNull() & col("OldImage").isNotNull(), "MODIFY") + .when(col("NewImage").isNull() & col("OldImage").isNotNull(), "REMOVE") + .otherwise(None), + ).cache() + + window = ( + Window.partitionBy("nhs_number", "path") + .orderBy(desc("last_updated")) + .rowsBetween(Window.unboundedPreceding, Window.currentRow) + ) + upserted = ( + df.filter(col("eventName").isin("INSERT", "MODIFY")) + .select( + *[safe_select(df, field, "S", "NewImage") for field in KEYS], + *[safe_select(df, field, "M", "NewImage") for field in MAP_KEYS], + ) + .withColumn("rn", row_number().over(window)) + .filter(col("rn") == 1) + .drop("rn") + .cache() + ) + logger.info(f"Upserted DataFrame has {upserted.count()} rows.") + + deleted = ( + df.filter(col("eventName") == "REMOVE") + .select( + safe_select(df, "nhs_number", "S", "OldImage"), + safe_select(df, "path", "S", "OldImage"), + ) + .dropDuplicates(subset=["nhs_number", "path"]) + .cache() + ) + logger.info(f"Deleted DataFrame has {deleted.count()} rows.") + + delta_table = DeltaTable.forPath( + session, + f"s3://{ARGS['TARGET_BUCKET']}/processed/dynamo_export_pointers", + ) + + if not upserted.isEmpty(): + run_upsert_logic(delta_table, upserted, KEYS + MAP_KEYS) + + if not deleted.isEmpty(): + run_delete_logic(delta_table, deleted) + + run_crawler(ARGS["GLUE_CRAWLER_NAME"]) + + +def run_delete_logic(delta_table: DeltaTable, deleted: DataFrame) -> None: + """Delete records from Delta table based on deleted DataFrame""" + logger.info("Running delete logic for removed records.") + deleted = deleted.withColumnRenamed("nhs_number", "nhs_number_raw") + deleted = deleted.withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + delta_table.alias("t").merge( + deleted.alias("d"), + "t.nhs_number = d.nhs_number AND t.path = d.path", + ).whenMatchedDelete().execute() + + +def run_upsert_logic( + delta_table: DeltaTable, upserted: DataFrame, fields: list[str] +) -> None: + """Upsert records into Delta table based on upserted DataFrame""" + logger.info("Running upsert logic for inserted/modified records.") + upserted = upserted.withColumnRenamed("nhs_number", "nhs_number_raw") + upserted = ( + upserted.withColumn( + "test_patient", + when( + col("nhs_number_raw").startswith("9") + | col("nhs_number_raw").startswith("5"), + True, + ).otherwise(False), + ) + .withColumn("nhs_number", hash_nhs(col("nhs_number_raw"))) + .withColumn( + "date", + upserted["last_updated"].cast("date"), + ) + .select(*fields, *DERIVED_KEYS) + ) + + delta_table.alias("t").merge( + upserted.alias("u"), + "t.nhs_number = u.nhs_number AND t.path = u.path", + ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() + + +def main(): + """Process DynamoDB export data and write to Delta Lake.""" + if ARGS["EXPORT_TYPE"] == "FULL_EXPORT": + logger.info("Starting full export processing.") + df = glue_context.create_dynamic_frame.from_options( + connection_type="dynamodb", + connection_options={ + "dynamodb.export": "s3", + "dynamodb.tableArn": ARGS["DDB_TABLE_ARN"], + "dynamodb.s3.bucket": ARGS["SOURCE_BUCKET"], + "dynamodb.s3.prefix": "AWSDynamoDB/", + "dynamodb.simplifyDDBJson": True, # Only applicable for the full export + }, + ).toDF() + + if df.isEmpty(): + return + + process_full_export(df) + return + + if ARGS["EXPORT_TYPE"] == "INCREMENTAL_EXPORT": + logger.info("Starting incremental export processing.") + df = ( + glue_context.create_dynamic_frame.from_options( + connection_type="dynamodb", + connection_options={ + "dynamodb.export": "s3", + "dynamodb.tableArn": ARGS["DDB_TABLE_ARN"], + "dynamodb.s3.bucket": ARGS["SOURCE_BUCKET"], + "dynamodb.s3.prefix": "AWSDynamoDB/data/", + }, + ) + .toDF() + .dropDuplicates() + ) + + if df.isEmpty(): + return + + process_incremental_export(df) + + +if __name__ == "__main__": + try: + main() + except Exception as e: + # send_slack_notification( + # webhook_url=get_ssm_parameter(ARGS["SLACK_WEBHOOK_URL_SSM_PARAMETER_NAME"]), + # exception=e, + # ) + raise diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf new file mode 100644 index 000000000..07ee74f2b --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/step_function.tf @@ -0,0 +1,61 @@ +data "aws_iam_policy_document" "step_functions_assume_role" { + statement { + effect = "Allow" + principals { + type = "Service" + identifiers = ["states.amazonaws.com"] + } + actions = ["sts:AssumeRole"] + } +} + +resource "aws_iam_role" "step_functions_role" { + name = "${var.name_prefix}-dynamodb-export-sf-role" + assume_role_policy = data.aws_iam_policy_document.step_functions_assume_role.json +} + +data "aws_iam_policy_document" "step_functions_policy" { + statement { + effect = "Allow" + actions = [ + "lambda:InvokeFunction", + "lambda:ListFunctions" + ] + resources = [ + aws_lambda_function.dynamo_export_trigger.arn, + aws_lambda_function.dynamo_export_poll.arn, + aws_lambda_function.ssm_put_param.arn + ] + } + + statement { + effect = "Allow" + actions = [ + "glue:StartJobRun", + "glue:GetJobRun" + ] + resources = [aws_glue_job.glue_job.arn] + } +} + +resource "aws_iam_role_policy" "step_functions_policy" { + name = "${var.name_prefix}-dynamodb-export-sf-policy" + role = aws_iam_role.step_functions_role.id + policy = data.aws_iam_policy_document.step_functions_policy.json +} + +resource "aws_sfn_state_machine" "dynamo_export" { + name = "${var.name_prefix}-dynamodb-export-sf" + role_arn = aws_iam_role.step_functions_role.arn + + definition = templatefile("${path.module}/dynamo_export_step_function_definition.json", { + lambda_export_trigger_function_name = aws_lambda_function.dynamo_export_trigger.function_name + lambda_export_poll_function_name = aws_lambda_function.dynamo_export_poll.function_name + lambda_ssm_put_param_function_name = aws_lambda_function.ssm_put_param.function_name + dynamo_export_s3_bucket_name = aws_s3_bucket.dynamodb_output.id + dynamo_export_processed_s3_bucket_name = aws_s3_bucket.dynamodb_output_processed.id + ddb_table_arn = data.aws_dynamodb_table.pointer_table.arn + glue_job_name = aws_glue_job.glue_job.name + glue_crawler_name = aws_glue_crawler.log_crawler.name + }) +} diff --git a/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf new file mode 100644 index 000000000..6c0b14575 --- /dev/null +++ b/terraform/account-wide-infrastructure/modules/dynamo_export/variables.tf @@ -0,0 +1,24 @@ +variable "name_prefix" { + type = string + description = "The prefix to apply to all resources in the module" +} + +variable "environment" { + type = string + description = "Environment in use" +} + +variable "pointer_table_name" { + type = string + description = "patient_flags_datastore table name" +} + +variable "asset_bucket" { + type = string + description = "Name of the bucket that holds lambda zips" +} + +variable "asset_version" { + type = string + description = "Version for the lambda zips to use during deployment" +}