Using AWS Step Functions to run the UNLOAD command on Redshift using the Redshift Data API is an efficient approach. This way, you avoid using Lambda for long-running queries and directly manage the workflow with Step Functions triggered by CloudWatch Events.
- Enable the Redshift Data API:
- Ensure that your Redshift cluster is using an IAM role that has the necessary permissions to use the Data API.
- Create an IAM Role for Step Functions:
- This role will need permissions to execute the Redshift Data API commands.
- Example IAM Policy for Redshift Data API:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"redshift-data:ExecuteStatement",
"redshift-data:DescribeStatement",
"redshift-data:GetStatementResult"
],
"Resource": "*"
}
]
} - Define the Step Functions Workflow:
- Create a state machine definition to run the UNLOAD command and check the status until completion.
- Example state machine definition in JSON:
{
"Comment": "StateMachine to UNLOAD data from RedShift",
"StartAt": "StartUnload",
"States": {
"StartUnload": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"Parameters": {
"ClusterIdentifier": "your-cluster-identifier",
"Database": "your-database-name",
"DbUser": "your-db-user",
"Sql": "UNLOAD ('SELECT * FROM your_table') TO 's3://your-s3-bucket/prefix_' IAM_ROLE 'arn:aws:iam::account-id:role/yourRedshiftRole' ALLOWOVERWRITE PARALLEL OFF;",
"StatementName": "StartUnload"
},
"Next": "CheckUnloadStatus"
},
"CheckUnloadStatus": {
"Type": "Wait",
"Seconds": 30,
"Next": "GetUnloadStatus"
},
"GetUnloadStatus": {
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement",
"Parameters": {
"Id.$": "$.StartUnload.Id"
},
"Next": "IsUnloadComplete"
},
"IsUnloadComplete": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Status",
"StringEquals": "FINISHED",
"Next": "Success"
},
{
"Variable": "$.Status",
"StringEquals": "FAILED",
"Next": "Fail"
}
],
"Default": "CheckUnloadStatus"
},
"Success": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"Error": "UNLOADFailed",
"Cause": "The UNLOAD command failed."
}
}
} -
Deploy the State Machine Using CDK:
- Create a new file redshift_unload_step_function_stack.py in your CDK project and add the following code:
from aws_cdk import (
core,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
aws_iam as iam,
aws_events as events,
aws_events_targets as targets,
)
class RedshiftUnloadStepFunctionStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# Define the IAM role for Step Functions
step_function_role = iam.Role(self, "StepFunctionRole",
assumed_by=iam.ServicePrincipal("states.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name("AmazonRedshiftDataFullAccess")
]
)
# Define the Step Functions state machine definition
start_unload_task = tasks.CallAwsService(self, "StartUnload",
service="redshiftdata",
action="executeStatement",
parameters={
"ClusterIdentifier": "your-cluster-identifier",
"Database": "your-database-name",
"DbUser": "your-db-user",
"Sql": "UNLOAD ('SELECT * FROM your_table') TO 's3://your-s3-bucket/prefix_' IAM_ROLE 'arn:aws:iam::account-id:role/yourRedshiftRole' ALLOWOVERWRITE PARALLEL OFF;",
"StatementName": "StartUnload"
},
iam_resources=["*"],
result_path="$.StartUnload"
)
check_unload_status_task = sfn.Wait(self, "CheckUnloadStatus",
time=sfn.WaitTime.duration(core.Duration.seconds(30))
)
get_unload_status_task = tasks.CallAwsService(self, "GetUnloadStatus",
service="redshiftdata",
action="describeStatement",
parameters={
"Id.$": "$.StartUnload.Id"
},
iam_resources=["*"],
result_path="$.GetUnloadStatus"
)
is_unload_complete_choice = sfn.Choice(self, "IsUnloadComplete")
success_state = sfn.Succeed(self, "Success")
fail_state = sfn.Fail(self, "Fail",
error="UNLOADFailed",
cause="The UNLOAD command failed."
)
is_unload_complete_choice.when(
sfn.Condition.string_equals("$.GetUnloadStatus.Status", "FINISHED"),
success_state
).when(
sfn.Condition.string_equals("$.GetUnloadStatus.Status", "FAILED"),
fail_state
).otherwise(
check_unload_status_task.next(get_unload_status_task)
)
definition = start_unload_task.next(check_unload_status_task) \
.next(get_unload_status_task) \
.next(is_unload_complete_choice)
state_machine = sfn.StateMachine(self, "RedshiftUnloadStateMachine",
definition=definition,
role=step_function_role,
timeout=core.Duration.hours(1)
)
# Create a CloudWatch rule to trigger the state machine periodically (e.g., every hour)
rule = events.Rule(self, "ScheduleRule",
schedule=events.Schedule.rate(core.Duration.hours(1)),
)
rule.add_target(targets.SfnStateMachine(state_machine)) - Update the app.py File:
- Open the app.py file in your CDK project and instantiate the RedshiftUnloadStepFunctionStack class:
# app.py
from aws_cdk import core
from redshift_unload_step_function_stack import RedshiftUnloadStepFunctionStack
app = core.App()
RedshiftUnloadStepFunctionStack(app, "RedshiftUnloadStepFunctionStack")
app.synth() -
Deploy the CDK Stack:
-
Ensure you have set up the necessary environment variables for AWS credentials.
-
Bootstrap your AWS environment if you haven't done it before:
-
cdk bootstrap
- Deploy the stack:
cdk deploy
This solution uses AWS CDK to create a Step Functions workflow that runs the UNLOAD command on Redshift using the Redshift Data API. The workflow is triggered by a CloudWatch Event on a scheduled basis (e.g., every hour). The Step Functions state machine handles the execution and monitoring of the UNLOAD command, ensuring it completes successfully or fails gracefully.