Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dynamodb/table: Not recreate on PITR enable #29269

Merged
merged 13 commits into from
Feb 9, 2023
7 changes: 7 additions & 0 deletions .changelog/29269.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:bug
resource/aws_dynamodb_table: Avoid recreating table replicas when enabling PITR on them
```

```release-note:enhancement
resource/aws_dynamodb_table: Add `arn`, `stream_arn`, and `stream_label` attributes to `replica` to obtain this information for replicas
```
18 changes: 18 additions & 0 deletions GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ SVC_DIR ?= ./internal/service
TEST_COUNT ?= 1
ACCTEST_TIMEOUT ?= 180m
ACCTEST_PARALLELISM ?= 20
P ?= 20
GO_VER ?= go
SWEEP_TIMEOUT ?= 60m

Expand All @@ -14,10 +15,19 @@ ifneq ($(origin PKG), undefined)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(origin K), undefined)
PKG_NAME = internal/service/$(K)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(origin TESTS), undefined)
RUNARGS = -run='$(TESTS)'
endif

ifneq ($(origin T), undefined)
RUNARGS = -run='$(T)'
endif

ifneq ($(origin SWEEPERS), undefined)
SWEEPARGS = -sweep-run='$(SWEEPERS)'
endif
Expand Down Expand Up @@ -57,6 +67,10 @@ ifeq ($(PKG_NAME), internal/service/wavelength)
TEST = ./$(PKG_NAME)/...
endif

ifneq ($(P), 20)
ACCTEST_PARALLELISM = $(P)
endif

default: build

build: fmtcheck
Expand Down Expand Up @@ -101,6 +115,9 @@ testacc: fmtcheck
fi
TF_ACC=1 $(GO_VER) test ./$(PKG_NAME)/... -v -count $(TEST_COUNT) -parallel $(ACCTEST_PARALLELISM) $(RUNARGS) $(TESTARGS) -timeout $(ACCTEST_TIMEOUT)

t: fmtcheck
TF_ACC=1 $(GO_VER) test ./$(PKG_NAME)/... -v -count $(TEST_COUNT) -parallel $(ACCTEST_PARALLELISM) $(RUNARGS) $(TESTARGS) -timeout $(ACCTEST_TIMEOUT)

testacc-lint:
@echo "Checking acceptance tests with terrafmt"
find $(SVC_DIR) -type f -name '*_test.go' \
Expand Down Expand Up @@ -297,6 +314,7 @@ yamllint:
build \
gen \
sweep \
t \
test \
testacc \
testacc-lint \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/hashicorp/terraform-plugin-framework-timeouts v0.3.0
github.com/hashicorp/terraform-plugin-framework-validators v0.9.0
github.com/hashicorp/terraform-plugin-go v0.14.3
github.com/hashicorp/terraform-plugin-log v0.7.0
github.com/hashicorp/terraform-plugin-log v0.8.0
github.com/hashicorp/terraform-plugin-mux v0.8.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1
github.com/jmespath/go-jmespath v0.4.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ github.com/hashicorp/terraform-plugin-framework-validators v0.9.0 h1:LYz4bXh3t7b
github.com/hashicorp/terraform-plugin-framework-validators v0.9.0/go.mod h1:+BVERsnfdlhYR2YkXMBtPnmn9UsL19U3qUtSZ+Y/5MY=
github.com/hashicorp/terraform-plugin-go v0.14.3 h1:nlnJ1GXKdMwsC8g1Nh05tK2wsC3+3BL/DBBxFEki+j0=
github.com/hashicorp/terraform-plugin-go v0.14.3/go.mod h1:7ees7DMZ263q8wQ6E4RdIdR6nHHJtrdt4ogX5lPkX1A=
github.com/hashicorp/terraform-plugin-log v0.7.0 h1:SDxJUyT8TwN4l5b5/VkiTIaQgY6R+Y2BQ0sRZftGKQs=
github.com/hashicorp/terraform-plugin-log v0.7.0/go.mod h1:p4R1jWBXRTvL4odmEkFfDdhUjHf9zcs/BCoNHAc7IK4=
github.com/hashicorp/terraform-plugin-log v0.8.0 h1:pX2VQ/TGKu+UU1rCay0OlzosNKe4Nz1pepLXj95oyy0=
github.com/hashicorp/terraform-plugin-log v0.8.0/go.mod h1:1myFrhVsBLeylQzYYEV17VVjtG8oYPRFdaZs7xdW2xs=
github.com/hashicorp/terraform-plugin-mux v0.8.0 h1:WCTP66mZ+iIaIrCNJnjPEYnVjawTshnDJu12BcXK1EI=
github.com/hashicorp/terraform-plugin-mux v0.8.0/go.mod h1:vdW0daEi8Kd4RFJmet5Ot+SIVB/B8SwQVJiYKQwdCy8=
github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.1 h1:zHcMbxY0+rFO9gY99elV/XC/UnQVg7FhRCbj1i5b7vM=
Expand Down
167 changes: 143 additions & 24 deletions internal/service/dynamodb/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func ResourceTable() *schema.Resource {
Optional: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Computed: true,
},
"kms_key_arn": {
Type: schema.TypeString,
Optional: true,
Expand All @@ -282,6 +286,14 @@ func ResourceTable() *schema.Resource {
Required: true,
// update is equivalent of force a new *replica*, not table
},
"stream_arn": {
Type: schema.TypeString,
Computed: true,
},
"stream_label": {
Type: schema.TypeString,
Computed: true,
},
},
},
},
Expand Down Expand Up @@ -666,6 +678,10 @@ func resourceTableRead(ctx context.Context, d *schema.ResourceData, meta interfa
return create.DiagError(names.DynamoDB, create.ErrActionReading, ResNameTable, d.Id(), err)
}

if replicas, err = enrichReplicas(ctx, conn, aws.StringValue(table.TableArn), d.Id(), meta.(*conns.AWSClient).TerraformVersion, replicas); err != nil {
return create.DiagError(names.DynamoDB, create.ErrActionReading, ResNameTable, d.Id(), err)
}

replicas = addReplicaTagPropagates(d.Get("replica").(*schema.Set), replicas)
replicas = clearReplicaDefaultKeys(ctx, replicas, meta)

Expand Down Expand Up @@ -1099,9 +1115,14 @@ func createReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
},
}

// currently this would not be needed because (replica has these arguments):
// region_name can't be updated - new replica
// kms_key_arn can't be updated - remove/add replica
// propagate_tags - handled elsewhere
// point_in_time_recovery - handled elsewhere
// if provisioned_throughput_override or table_class_override were added, they could be updated here
if !create {
var replicaInput = &dynamodb.UpdateReplicationGroupMemberAction{}

if v, ok := tfMap["region_name"].(string); ok && v != "" {
replicaInput.RegionName = aws.String(v)
}
Expand Down Expand Up @@ -1145,6 +1166,10 @@ func createReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
_, err = conn.UpdateTableWithContext(ctx, input)
}

// An update that doesn't (makes no changes) returns ValidationException
// (same region_name and kms_key_arn as currently) throws unhelpfully worded exception:
// ValidationException: One or more parameter values were invalid: KMSMasterKeyId must be specified for each replica.

if create && tfawserr.ErrMessageContains(err, "ValidationException", "already exist") {
return createReplicas(ctx, conn, tableName, tfList, tfVersion, false, timeout)
}
Expand Down Expand Up @@ -1283,49 +1308,95 @@ func updateReplica(ctx context.Context, d *schema.ResourceData, conn *dynamodb.D
o := oRaw.(*schema.Set)
n := nRaw.(*schema.Set)

removed := o.Difference(n).List()
added := n.Difference(o).List()

// 1. changing replica kms keys requires recreation of the replica, like ForceNew, but we don't want to ForceNew the *table*
// 2. also, in order to update PITR if a replica is encrypted (has KMS key), it requires recreation (how'd u recover from a backup encrypted with a different key?)
removeRaw := o.Difference(n).List()
addRaw := n.Difference(o).List()

var removeFirst []interface{} // replicas to delete before recreating (like ForceNew without recreating table)
var toAdd []interface{}
var toRemove []interface{}

// first pass - add replicas that don't have corresponding remove entry
for _, a := range addRaw {
add := true
ma := a.(map[string]interface{})
for _, r := range removeRaw {
mr := r.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) {
add = false
break
}
}

// For true updates, don't remove and add, just update (i.e., keep in added
// but remove from removed)
for _, a := range added {
for j, r := range removed {
if add {
toAdd = append(toAdd, ma)
}
}

// second pass - remove replicas that don't have corresponding add entry
for _, r := range removeRaw {
remove := true
mr := r.(map[string]interface{})
for _, a := range addRaw {
ma := a.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) {
remove = false
break
}
}

if remove {
toRemove = append(toRemove, mr)
}
}

// third pass - for replicas that exist in both add and remove
// For true updates, don't remove and add, just update
for _, a := range addRaw {
ma := a.(map[string]interface{})
for _, r := range removeRaw {
mr := r.(map[string]interface{})

if ma["region_name"].(string) == mr["region_name"].(string) && (ma["kms_key_arn"].(string) != "" || mr["kms_key_arn"].(string) != "") {
removeFirst = append(removeFirst, removed[j])
removed = append(removed[:j], removed[j+1:]...)
if ma["region_name"].(string) != mr["region_name"].(string) {
continue
}

if ma["region_name"].(string) == mr["region_name"].(string) {
removed = append(removed[:j], removed[j+1:]...)
continue
// like "ForceNew" for the replica - KMS change
if ma["kms_key_arn"].(string) != mr["kms_key_arn"].(string) {
toRemove = append(toRemove, mr)
toAdd = append(toAdd, ma)
break
}

// just update PITR
if ma["point_in_time_recovery"].(bool) != mr["point_in_time_recovery"].(bool) {
if err := updatePITR(ctx, conn, d.Id(), ma["point_in_time_recovery"].(bool), ma["region_name"].(string), tfVersion, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replica (%s) point in time recovery: %w", ma["region_name"].(string), err)
}
break
}

// nothing changed, assuming propagate_tags changed so do nothing here
break
}
}

if len(removeFirst) > 0 { // like ForceNew but doesn't recreate the table
if len(removeFirst) > 0 { // mini ForceNew, recreates replica but doesn't recreate the table
if err := deleteReplicas(ctx, conn, d.Id(), removeFirst, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(added) > 0 {
if err := createReplicas(ctx, conn, d.Id(), added, tfVersion, true, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while creating: %w", err)
if len(toRemove) > 0 {
if err := deleteReplicas(ctx, conn, d.Id(), toRemove, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
}
}

if len(removed) > 0 {
if err := deleteReplicas(ctx, conn, d.Id(), removed, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("updating replicas, while deleting: %w", err)
if len(toAdd) > 0 {
if err := createReplicas(ctx, conn, d.Id(), toAdd, tfVersion, true, d.Timeout(schema.TimeoutCreate)); err != nil {
return fmt.Errorf("updating replicas, while creating: %w", err)
}
}

Expand Down Expand Up @@ -1501,10 +1572,18 @@ func deleteReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri

err := resource.RetryContext(ctx, updateTableTimeout, func() *resource.RetryError {
_, err := conn.UpdateTableWithContext(ctx, input)
notFoundRetries := 0
if err != nil {
if tfawserr.ErrCodeEquals(err, "ThrottlingException") {
return resource.RetryableError(err)
}
if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
notFoundRetries++
if notFoundRetries > 3 {
return resource.NonRetryableError(err)
}
return resource.RetryableError(err)
}
if tfawserr.ErrMessageContains(err, dynamodb.ErrCodeLimitExceededException, "can be created, updated, or deleted simultaneously") {
return resource.RetryableError(err)
}
Expand All @@ -1521,7 +1600,7 @@ func deleteReplicas(ctx context.Context, conn *dynamodb.DynamoDB, tableName stri
_, err = conn.UpdateTableWithContext(ctx, input)
}

if err != nil {
if err != nil && !tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return fmt.Errorf("deleting replica (%s): %w", regionName, err)
}

Expand Down Expand Up @@ -1570,6 +1649,25 @@ func replicaPITR(ctx context.Context, conn *dynamodb.DynamoDB, tableName string,
return enabled, nil
}

func replicaStream(ctx context.Context, conn *dynamodb.DynamoDB, tableName string, region string, tfVersion string) (string, string) {
// This does not return an error because it is attempting to add "Computed"-only information to replica - tolerating errors.
session, err := conns.NewSessionForRegion(&conn.Config, region, tfVersion)
if err != nil {
log.Printf("[WARN] Attempting to get replica (%s) stream information, ignoring encountered error: %s", tableName, err)
return "", ""
}

conn = dynamodb.New(session)

table, err := FindTableByName(ctx, conn, tableName)
if err != nil {
log.Printf("[WARN] When attempting to get replica (%s) stream information, ignoring encountered error: %s", tableName, err)
return "", ""
}

return aws.StringValue(table.LatestStreamArn), aws.StringValue(table.LatestStreamLabel)
}

func addReplicaPITRs(ctx context.Context, conn *dynamodb.DynamoDB, tableName string, tfVersion string, replicas []interface{}) ([]interface{}, error) {
// This non-standard approach is needed because PITR info for a replica
// must come from a region-specific connection.
Expand All @@ -1588,6 +1686,27 @@ func addReplicaPITRs(ctx context.Context, conn *dynamodb.DynamoDB, tableName str
return replicas, nil
}

func enrichReplicas(ctx context.Context, conn *dynamodb.DynamoDB, arn, tableName, tfVersion string, replicas []interface{}) ([]interface{}, error) {
// This non-standard approach is needed because PITR info for a replica
// must come from a region-specific connection.
for i, replicaRaw := range replicas {
replica := replicaRaw.(map[string]interface{})

newARN, err := ARNForNewRegion(arn, replica["region_name"].(string))
if err != nil {
return nil, fmt.Errorf("creating new-region ARN: %s", err)
}
replica["arn"] = newARN

streamARN, streamLabel := replicaStream(ctx, conn, tableName, replica["region_name"].(string), tfVersion)
replica["stream_arn"] = streamARN
replica["stream_label"] = streamLabel
replicas[i] = replica
}

return replicas, nil
}

func addReplicaTagPropagates(configReplicas *schema.Set, replicas []interface{}) []interface{} {
if configReplicas.Len() == 0 {
return replicas
Expand Down
Loading