Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fargate/app-provisioner/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/aws/aws-sdk-go-v2/service/ssm v1.56.9
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6
github.com/google/uuid v1.3.0
github.com/pennsieve/pennsieve-go-core v1.13.0
github.com/pusher/pusher-http-go/v5 v5.1.1
github.com/stretchr/testify v1.9.0
Expand Down
2 changes: 2 additions & 0 deletions fargate/app-provisioner/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down
19 changes: 10 additions & 9 deletions fargate/app-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
computeNodeUuid := os.Getenv("COMPUTE_NODE_UUID")

applicationsTable := os.Getenv("APPLICATIONS_TABLE")
deploymentsTable := os.Getenv(provisioner.DeploymentsTableNameKey)

// Initializing environment
cfg, err := config.LoadDefaultConfig(context.Background())
Expand All @@ -48,16 +49,16 @@ func main() {
accountId, action, env, utils.ExtractGitUrl(sourceUrl), storageId, utils.AppSlug(sourceUrl, computeNodeUuid))
dynamoDBClient := dynamodb.NewFromConfig(cfg)
applicationsStore := store_dynamodb.NewApplicationDatabaseStore(dynamoDBClient, applicationsTable)
statusManager := status.NewManager(applicationsStore, applicationUuid)
deploymentsStore := store_dynamodb.NewDeploymentsStore(dynamoDBClient, deploymentsTable)
statusManager := status.NewManager(applicationsStore, applicationUuid, deploymentsStore)

// deploymentId will only be present if this is not a DELETE. DELETE does not
// generate a Deployment record that needs to be updated.
// generate a Deployment record that needs to be updated. Instead, we must delete all
// deployment records for the application along with the application
var deploymentId string
if action == "CREATE" || action == "DEPLOY" {
deploymentsTable := os.Getenv(provisioner.DeploymentsTableNameKey)
deploymentId = os.Getenv(provisioner.DeploymentIdKey)
deploymentsStore := store_dynamodb.NewDeploymentsStore(dynamoDBClient, deploymentsTable)
statusManager = statusManager.WithDeployment(deploymentsStore, deploymentId)
statusManager = statusManager.WithDeploymentId(deploymentId)
}

// use pusher if we can get the config
Expand All @@ -76,7 +77,7 @@ func main() {
log.Fatal(err)
}
case "DELETE":
if err := Delete(ctx, applicationUuid, appProvisioner, applicationsStore); err != nil {
if err := Delete(ctx, applicationUuid, appProvisioner, statusManager); err != nil {
statusManager.UpdateApplicationStatus(ctx, err.Error(), true)
log.Fatal(err)
}
Expand Down Expand Up @@ -207,15 +208,15 @@ func Deploy(ctx context.Context, applicationUuid string, deploymentId string, so
return nil
}

func Delete(ctx context.Context, applicationUuid string, appProvisioner provisioner.Provisioner, applicationsStore store_dynamodb.DynamoDBStore) error {
func Delete(ctx context.Context, applicationUuid string, appProvisioner provisioner.Provisioner, statusManager *status.Manager) error {
log.Println("Deleting", applicationUuid)

if err := appProvisioner.Delete(ctx); err != nil {
return fmt.Errorf("error deleting infrastructure: :%w", err)
}

if err := applicationsStore.Delete(ctx, applicationUuid); err != nil {
return fmt.Errorf("error deleting application from store: %w", err)
if err := statusManager.ApplicationDelete(ctx); err != nil {
return err
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ type ApplicationStatusEvent struct {
Source string `json:"source"`
}

const ApplicationDeletionEventName = "application_deletion_event"

type ApplicationDeletionEvent struct {
ApplicationId string `json:"application_id"`
Time time.Time `json:"time"`
Source string `json:"source"`
}

func ApplicationStatusChannel(applicationUuid string) string {
return fmt.Sprintf("application-%s", applicationUuid)
}
36 changes: 31 additions & 5 deletions fargate/app-provisioner/provisioner/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ type Manager struct {
DeploymentId string
}

func NewManager(applicationsStore store_dynamodb.DynamoDBStore, applicationId string) *Manager {
return &Manager{HandlerName: "AppProvisioner", ApplicationsStore: applicationsStore, ApplicationId: applicationId}
func NewManager(applicationsStore store_dynamodb.DynamoDBStore, applicationId string, deploymentsStore *store_dynamodb.DeploymentsStore) *Manager {
return &Manager{HandlerName: "AppProvisioner", ApplicationsStore: applicationsStore, ApplicationId: applicationId, DeploymentsStore: deploymentsStore}
}

func (m *Manager) WithDeployment(deploymentsStore *store_dynamodb.DeploymentsStore, deploymentId string) *Manager {
m.DeploymentsStore = deploymentsStore
func (m *Manager) WithDeploymentId(deploymentId string) *Manager {
m.DeploymentId = deploymentId
return m
}
Expand All @@ -49,7 +48,7 @@ func (m *Manager) SetErrorStatus(ctx context.Context, err error) {
if appStoreErr := m.ApplicationsStore.UpdateStatus(ctx, msg, m.ApplicationId); appStoreErr != nil {
log.Printf("warning: error updating applications table with error: %s: %s\n", msg, appStoreErr.Error())
}
if m.DeploymentsStore != nil {
if len(m.DeploymentId) > 0 {
if deployStoreErr := m.DeploymentsStore.SetErroredFlag(ctx, m.ApplicationId, m.DeploymentId); deployStoreErr != nil {
log.Printf("warning: error setting errored on deployments table: %s\n", deployStoreErr.Error())
}
Expand All @@ -70,6 +69,17 @@ func (m *Manager) ApplicationCreateUpdate(ctx context.Context, application store
return m.ApplicationsStore.Update(ctx, application, m.ApplicationId)
}

func (m *Manager) ApplicationDelete(ctx context.Context) error {
if err := m.ApplicationsStore.Delete(ctx, m.ApplicationId); err != nil {
return fmt.Errorf("error deleting application %s from store: %w", m.ApplicationId, err)
}
if err := m.DeploymentsStore.DeleteApplicationDeployments(ctx, m.ApplicationId); err != nil {
log.Printf("warning: error deleting deployments for application %s: %s\n", m.ApplicationId, err.Error())
}
m.sendApplicationDeletionEvent()
return nil
}

func (m *Manager) sendApplicationStatusEvent(status string, isErrorStatus bool) {
if m.Pusher == nil {
log.Printf("warning: no Pusher client configured")
Expand All @@ -88,3 +98,19 @@ func (m *Manager) sendApplicationStatusEvent(status string, isErrorStatus bool)
log.Printf("warning: error updating pusher application channel %s with status: %s: %s\n", channel, status, err.Error())
}
}

func (m *Manager) sendApplicationDeletionEvent() {
if m.Pusher == nil {
log.Printf("warning: no Pusher client configured")
return
}
channel := events.ApplicationStatusChannel(m.ApplicationId)
event := events.ApplicationDeletionEvent{
ApplicationId: m.ApplicationId,
Time: time.Now().UTC(),
Source: m.HandlerName,
}
if err := m.Pusher.Trigger(channel, events.ApplicationDeletionEventName, event); err != nil {
log.Printf("warning: error sending deletion event to pusher application channel %s: %s\n", channel, err.Error())
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package store_dynamodb

const DeploymentIdField = "deploymentId"
const DeploymentApplicationIdField = "applicationId"

type DeploymentKey struct {
ApplicationId string `dynamodbav:"applicationId"`
DeploymentId string `dynamodbav:"deploymentId"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
Expand All @@ -13,6 +14,8 @@ import (
// DynamoDB client methods used by DeploymentsStore
type DeploymentsTableAPI interface {
UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
Query(ctx context.Context, params *dynamodb.QueryInput, optFns ...func(options *dynamodb.Options)) (*dynamodb.QueryOutput, error)
}

type DeploymentsStore struct {
Expand Down Expand Up @@ -50,3 +53,52 @@ func (s *DeploymentsStore) SetErroredFlag(ctx context.Context, applicationId str

return nil
}

func (s *DeploymentsStore) DeleteApplicationDeployments(ctx context.Context, applicationId string) error {
expressions, err := expression.NewBuilder().
WithKeyCondition(expression.KeyEqual(
expression.Key(DeploymentApplicationIdField), expression.Value(applicationId))).
WithProjection(expression.NamesList(expression.Name(DeploymentIdField), expression.Name(DeploymentApplicationIdField))).
Build()
if err != nil {
return fmt.Errorf("error building key condition for query to delete deployments of application %s: %w", applicationId, err)
}
queryIn := &dynamodb.QueryInput{
TableName: aws.String(s.tableName),
ExpressionAttributeNames: expressions.Names(),
ExpressionAttributeValues: expressions.Values(),
KeyConditionExpression: expressions.KeyCondition(),
ProjectionExpression: expressions.Projection(),
Limit: aws.Int32(25), // 25 is max number of items that can be deleted in a batch delete
}

for doQuery, page := true, 1; doQuery; doQuery, page = len(queryIn.ExclusiveStartKey) > 0, page+1 {
// Get a batch of items
queryOut, err := s.api.Query(ctx, queryIn)
if err != nil {
return fmt.Errorf("error getting page %d of deployments to delete for application %s: %w", page, applicationId, err)
}
// Delete this batch of items
deleteBatch := batchDeletes(s.tableName, queryOut.Items)
for len(deleteBatch) > 0 {
batchWriteOut, err := s.api.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{RequestItems: deleteBatch})
if err != nil {
return fmt.Errorf("error deleting page %d of deployments for application %s: %w", page, applicationId, err)
}
deleteBatch = batchWriteOut.UnprocessedItems
}
queryIn.ExclusiveStartKey = queryOut.LastEvaluatedKey
}

return nil
}

func batchDeletes(tableName string, items []map[string]types.AttributeValue) map[string][]types.WriteRequest {
var deleteBatch []types.WriteRequest
for _, item := range items {
deleteBatch = append(deleteBatch, types.WriteRequest{
DeleteRequest: &types.DeleteRequest{Key: item},
})
}
return map[string][]types.WriteRequest{tableName: deleteBatch}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package store_dynamodb

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

type ArgCaptureDeploymentsTableAPI struct {
// Don't save pointers to inputs; make defensive copies instead
UpdateItemInput dynamodb.UpdateItemInput
BatchWriteItemInputs []dynamodb.BatchWriteItemInput
// If there are > UnprocessedItemThreshold DeleteRequests in a BatchWriteItemInput
// this mock API will return the remainder in the UnprocessedItems field of the corresponding BatchWriteItemOutput
UnprocessedItemThreshold int

// Query may be paginated, so one DeleteApplicationDeployments call may result in multiple Query calls.
// Set QueryOutputs before calling DeleteApplicationDeployments to control how many times Query is called.
QueryOutputs []*dynamodb.QueryOutput
currentQueryOutputIndex int
// Check QueryInputs after calling DeleteApplicationDeployments to see if we sent the correct inputs.
QueryInputs []dynamodb.QueryInput
}

func (a *ArgCaptureDeploymentsTableAPI) BatchWriteItem(_ context.Context, params *dynamodb.BatchWriteItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error) {
a.BatchWriteItemInputs = append(a.BatchWriteItemInputs, *params)
unprocessed := map[string][]types.WriteRequest{}
for k, v := range params.RequestItems {
if len(v) > a.UnprocessedItemThreshold {
unprocessed[k] = v[a.UnprocessedItemThreshold:]
}
}
return &dynamodb.BatchWriteItemOutput{UnprocessedItems: unprocessed}, nil
}

func (a *ArgCaptureDeploymentsTableAPI) UpdateItem(_ context.Context, params *dynamodb.UpdateItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) {
a.UpdateItemInput = *params
return &dynamodb.UpdateItemOutput{}, nil
}

func (a *ArgCaptureDeploymentsTableAPI) Query(_ context.Context, params *dynamodb.QueryInput, _ ...func(options *dynamodb.Options)) (*dynamodb.QueryOutput, error) {
a.QueryInputs = append(a.QueryInputs, *params)
if a.currentQueryOutputIndex < len(a.QueryOutputs) {
output := a.QueryOutputs[a.currentQueryOutputIndex]
a.currentQueryOutputIndex++
return output, nil
}
return nil, fmt.Errorf("query called too many times! Expected %d calls", len(a.QueryOutputs))

}

func TestDeploymentsStore_DeleteApplicationDeployments(t *testing.T) {
unprocessedItemThreshold := 3
argCaptureAPI := &ArgCaptureDeploymentsTableAPI{UnprocessedItemThreshold: unprocessedItemThreshold}
tableName := uuid.NewString()
applicationId := uuid.NewString()
expectedDeploymentItems := []map[string]types.AttributeValue{
deploymentKeyItem(applicationId, uuid.NewString()),
deploymentKeyItem(applicationId, uuid.NewString()),
deploymentKeyItem(applicationId, uuid.NewString()),
deploymentKeyItem(applicationId, uuid.NewString()),
deploymentKeyItem(applicationId, uuid.NewString()),
deploymentKeyItem(applicationId, uuid.NewString()),
}
argCaptureAPI.QueryOutputs = []*dynamodb.QueryOutput{
{
Count: 5,
Items: expectedDeploymentItems[:5],
LastEvaluatedKey: expectedDeploymentItems[4],
ScannedCount: 5,
},
{
Count: 1,
Items: []map[string]types.AttributeValue{expectedDeploymentItems[5]},
LastEvaluatedKey: nil,
ScannedCount: 1,
},
}
store := NewDeploymentsStore(argCaptureAPI, tableName)
err := store.DeleteApplicationDeployments(context.Background(), applicationId)
require.NoError(t, err)

// Verify Query Inputs
for i := range argCaptureAPI.QueryInputs {
input := argCaptureAPI.QueryInputs[i]
assert.Equal(t, tableName, aws.ToString(input.TableName))
assert.Equal(t, int32(25), aws.ToInt32(input.Limit))
if i == 0 {
assert.Empty(t, input.ExclusiveStartKey)
} else {
assert.Equal(t, argCaptureAPI.QueryOutputs[i-1].LastEvaluatedKey, input.ExclusiveStartKey)
}
// Names
assert.Len(t, input.ExpressionAttributeNames, 2)
var deploymentIdNameKey, appIdNameKey string
for k, v := range input.ExpressionAttributeNames {
switch v {
case DeploymentApplicationIdField:
appIdNameKey = k
case DeploymentIdField:
deploymentIdNameKey = k
default:
assert.Fail(t, "unexpected value in ExpressionAttributeNames", v)

}
}
assert.NotEmpty(t, appIdNameKey)
assert.NotEmpty(t, deploymentIdNameKey)

//Values
assert.Len(t, input.ExpressionAttributeValues, 1)
var appIdValueKey string
for k, v := range input.ExpressionAttributeValues {
if assert.Equal(t, &types.AttributeValueMemberS{Value: applicationId}, v) {
appIdValueKey = k
}
}
assert.NotEmpty(t, appIdValueKey)

//Key expression
assert.Equal(t, fmt.Sprintf("%s = %s", appIdNameKey, appIdValueKey), aws.ToString(input.KeyConditionExpression))

//Projection expression
assert.Equal(t, fmt.Sprintf("%s, %s", deploymentIdNameKey, appIdNameKey), aws.ToString(input.ProjectionExpression))
}

// Verify BatchWrite Inputs
// The two query outputs will turn into three batch write inputs because of how we have
// UnprocessedItemThreshold configured on the mock
expectedDeleteRequests := [][]map[string]types.AttributeValue{
// First batch write will attempt to delete all 5 of the items returned by first QueryOutput
expectedDeploymentItems[:5],
// There will be a second batch write to process the unprocessed items from the first
// batch write request
expectedDeploymentItems[unprocessedItemThreshold : len(expectedDeploymentItems)-1],
// Final batch write in response to second QueryOutput
{expectedDeploymentItems[len(expectedDeploymentItems)-1]},
}
assert.Len(t, argCaptureAPI.BatchWriteItemInputs, len(expectedDeleteRequests))
for i := range argCaptureAPI.BatchWriteItemInputs {
input := argCaptureAPI.BatchWriteItemInputs[i]
assert.Len(t, input.RequestItems, 1)
assert.Contains(t, input.RequestItems, tableName)
writeRequests := input.RequestItems[tableName]
assert.Len(t, writeRequests, len(expectedDeleteRequests[i]))
for j := 0; j < len(expectedDeleteRequests[i]); j++ {
assert.Nil(t, writeRequests[j].PutRequest)
assert.Equal(t, expectedDeleteRequests[i][j], writeRequests[j].DeleteRequest.Key)
}
}

}

func deploymentKeyItem(applicationId, deploymentId string) map[string]types.AttributeValue {
return map[string]types.AttributeValue{
DeploymentApplicationIdField: &types.AttributeValueMemberS{Value: applicationId},
DeploymentIdField: &types.AttributeValueMemberS{Value: deploymentId},
}
}
Loading