diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 68c686e..5191c9b 100755 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,13 +1,13 @@ ack_generate_info: - build_date: "2023-05-15T23:40:33Z" + build_date: "2023-07-20T09:41:41Z" build_hash: 8f3ba427974fd6e769926778d54834eaee3b81a3 - go_version: go1.19 + go_version: go1.20.5 version: v0.26.1 -api_directory_checksum: 62a4051ba2ded255ad270b491703d3c14440b2c7 +api_directory_checksum: 7d367fcbd95521797ddaf41af226deeac4098ff3 api_version: v1alpha1 aws_sdk_go_version: v1.44.93 generator_config_info: - file_checksum: 0b7493aa8cdf19370936a973ed31804875d2dfba + file_checksum: eb06942b1bdc2a26a2c51d674d061b72385c2634 original_file_name: generator.yaml last_modification: reason: API generation diff --git a/apis/v1alpha1/generator.yaml b/apis/v1alpha1/generator.yaml index 89a127a..47b361e 100644 --- a/apis/v1alpha1/generator.yaml +++ b/apis/v1alpha1/generator.yaml @@ -23,6 +23,11 @@ resources: from: operation: UpdateTimeToLive path: TimeToLiveSpecification + ContinuousBackups: + is_required: false + from: + operation: UpdateContinuousBackups + path: PointInTimeRecoverySpecification AttributeDefinitions: compare: is_ignored: true @@ -134,4 +139,4 @@ resources: type: string - name: STATUS json_path: .status.backupStatus - type: string \ No newline at end of file + type: string diff --git a/apis/v1alpha1/table.go b/apis/v1alpha1/table.go index 45a667f..3524753 100644 --- a/apis/v1alpha1/table.go +++ b/apis/v1alpha1/table.go @@ -35,6 +35,8 @@ type TableSpec struct { // - PAY_PER_REQUEST - We recommend using PAY_PER_REQUEST for unpredictable // workloads. PAY_PER_REQUEST sets the billing mode to On-Demand Mode (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand). BillingMode *string `json:"billingMode,omitempty"` + // Represents the settings used to enable point in time recovery. + ContinuousBackups *PointInTimeRecoverySpecification `json:"continuousBackups,omitempty"` // One or more global secondary indexes (the maximum is 20) to be created on // the table. Each global secondary index in the array includes the following: // diff --git a/apis/v1alpha1/types.go b/apis/v1alpha1/types.go index 034e1ff..cc0987d 100644 --- a/apis/v1alpha1/types.go +++ b/apis/v1alpha1/types.go @@ -44,16 +44,29 @@ type AttributeDefinition struct { // Represents the auto scaling settings for a global table or global secondary // index. type AutoScalingSettingsDescription struct { - AutoScalingRoleARN *string `json:"autoScalingRoleARN,omitempty"` - MaximumUnits *int64 `json:"maximumUnits,omitempty"` - MinimumUnits *int64 `json:"minimumUnits,omitempty"` + AutoScalingDisabled *bool `json:"autoScalingDisabled,omitempty"` + AutoScalingRoleARN *string `json:"autoScalingRoleARN,omitempty"` + MaximumUnits *int64 `json:"maximumUnits,omitempty"` + MinimumUnits *int64 `json:"minimumUnits,omitempty"` } // Represents the auto scaling settings to be modified for a global table or // global secondary index. type AutoScalingSettingsUpdate struct { - MaximumUnits *int64 `json:"maximumUnits,omitempty"` - MinimumUnits *int64 `json:"minimumUnits,omitempty"` + AutoScalingDisabled *bool `json:"autoScalingDisabled,omitempty"` + MaximumUnits *int64 `json:"maximumUnits,omitempty"` + MinimumUnits *int64 `json:"minimumUnits,omitempty"` +} + +// Represents the properties of a target tracking scaling policy. +type AutoScalingTargetTrackingScalingPolicyConfigurationDescription struct { + DisableScaleIn *bool `json:"disableScaleIn,omitempty"` +} + +// Represents the settings of a target tracking scaling policy that will be +// modified. +type AutoScalingTargetTrackingScalingPolicyConfigurationUpdate struct { + DisableScaleIn *bool `json:"disableScaleIn,omitempty"` } // Contains the description of the backup created for the table. @@ -189,6 +202,30 @@ type Endpoint struct { CachePeriodInMinutes *int64 `json:"cachePeriodInMinutes,omitempty"` } +// Represents a condition to be compared with an attribute value. This condition +// can be used with DeleteItem, PutItem, or UpdateItem operations; if the comparison +// evaluates to true, the operation succeeds; if not, the operation fails. You +// can use ExpectedAttributeValue in one of two different ways: +// +// - Use AttributeValueList to specify one or more values to compare against +// an attribute. Use ComparisonOperator to specify how you want to perform +// the comparison. If the comparison evaluates to true, then the conditional +// operation succeeds. +// +// - Use Value to specify a value that DynamoDB will compare against an attribute. +// If the values match, then ExpectedAttributeValue evaluates to true and +// the conditional operation succeeds. Optionally, you can also set Exists +// to false, indicating that you do not expect to find the attribute value +// in the table. In this case, the conditional operation succeeds only if +// the comparison evaluates to false. +// +// Value and Exists are incompatible with AttributeValueList and ComparisonOperator. +// Note that if you use both sets of parameters at once, DynamoDB will return +// a ValidationException exception. +type ExpectedAttributeValue struct { + Exists *bool `json:"exists,omitempty"` +} + // Represents the properties of the exported table. type ExportDescription struct { ItemCount *int64 `json:"itemCount,omitempty"` @@ -375,6 +412,11 @@ type PointInTimeRecoveryDescription struct { LatestRestorableDateTime *metav1.Time `json:"latestRestorableDateTime,omitempty"` } +// Represents the settings used to enable point in time recovery. +type PointInTimeRecoverySpecification struct { + PointInTimeRecoveryEnabled *bool `json:"pointInTimeRecoveryEnabled,omitempty"` +} + // Represents attributes that are copied (projected) from the table into an // index. These are in addition to the primary key attributes and index key // attributes, which are automatically projected. diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 50f3730..cddbf3d 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -82,6 +82,11 @@ func (in *AttributeDefinition) DeepCopy() *AttributeDefinition { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AutoScalingSettingsDescription) DeepCopyInto(out *AutoScalingSettingsDescription) { *out = *in + if in.AutoScalingDisabled != nil { + in, out := &in.AutoScalingDisabled, &out.AutoScalingDisabled + *out = new(bool) + **out = **in + } if in.AutoScalingRoleARN != nil { in, out := &in.AutoScalingRoleARN, &out.AutoScalingRoleARN *out = new(string) @@ -112,6 +117,11 @@ func (in *AutoScalingSettingsDescription) DeepCopy() *AutoScalingSettingsDescrip // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *AutoScalingSettingsUpdate) DeepCopyInto(out *AutoScalingSettingsUpdate) { *out = *in + if in.AutoScalingDisabled != nil { + in, out := &in.AutoScalingDisabled, &out.AutoScalingDisabled + *out = new(bool) + **out = **in + } if in.MaximumUnits != nil { in, out := &in.MaximumUnits, &out.MaximumUnits *out = new(int64) @@ -134,6 +144,46 @@ func (in *AutoScalingSettingsUpdate) DeepCopy() *AutoScalingSettingsUpdate { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AutoScalingTargetTrackingScalingPolicyConfigurationDescription) DeepCopyInto(out *AutoScalingTargetTrackingScalingPolicyConfigurationDescription) { + *out = *in + if in.DisableScaleIn != nil { + in, out := &in.DisableScaleIn, &out.DisableScaleIn + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AutoScalingTargetTrackingScalingPolicyConfigurationDescription. +func (in *AutoScalingTargetTrackingScalingPolicyConfigurationDescription) DeepCopy() *AutoScalingTargetTrackingScalingPolicyConfigurationDescription { + if in == nil { + return nil + } + out := new(AutoScalingTargetTrackingScalingPolicyConfigurationDescription) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *AutoScalingTargetTrackingScalingPolicyConfigurationUpdate) DeepCopyInto(out *AutoScalingTargetTrackingScalingPolicyConfigurationUpdate) { + *out = *in + if in.DisableScaleIn != nil { + in, out := &in.DisableScaleIn, &out.DisableScaleIn + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AutoScalingTargetTrackingScalingPolicyConfigurationUpdate. +func (in *AutoScalingTargetTrackingScalingPolicyConfigurationUpdate) DeepCopy() *AutoScalingTargetTrackingScalingPolicyConfigurationUpdate { + if in == nil { + return nil + } + out := new(AutoScalingTargetTrackingScalingPolicyConfigurationUpdate) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Backup) DeepCopyInto(out *Backup) { *out = *in @@ -754,6 +804,26 @@ func (in *Endpoint) DeepCopy() *Endpoint { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExpectedAttributeValue) DeepCopyInto(out *ExpectedAttributeValue) { + *out = *in + if in.Exists != nil { + in, out := &in.Exists, &out.Exists + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExpectedAttributeValue. +func (in *ExpectedAttributeValue) DeepCopy() *ExpectedAttributeValue { + if in == nil { + return nil + } + out := new(ExpectedAttributeValue) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExportDescription) DeepCopyInto(out *ExportDescription) { *out = *in @@ -1479,6 +1549,26 @@ func (in *PointInTimeRecoveryDescription) DeepCopy() *PointInTimeRecoveryDescrip return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PointInTimeRecoverySpecification) DeepCopyInto(out *PointInTimeRecoverySpecification) { + *out = *in + if in.PointInTimeRecoveryEnabled != nil { + in, out := &in.PointInTimeRecoveryEnabled, &out.PointInTimeRecoveryEnabled + *out = new(bool) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PointInTimeRecoverySpecification. +func (in *PointInTimeRecoverySpecification) DeepCopy() *PointInTimeRecoverySpecification { + if in == nil { + return nil + } + out := new(PointInTimeRecoverySpecification) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Projection) DeepCopyInto(out *Projection) { *out = *in @@ -2597,6 +2687,11 @@ func (in *TableSpec) DeepCopyInto(out *TableSpec) { *out = new(string) **out = **in } + if in.ContinuousBackups != nil { + in, out := &in.ContinuousBackups, &out.ContinuousBackups + *out = new(PointInTimeRecoverySpecification) + (*in).DeepCopyInto(*out) + } if in.GlobalSecondaryIndexes != nil { in, out := &in.GlobalSecondaryIndexes, &out.GlobalSecondaryIndexes *out = make([]*GlobalSecondaryIndex, len(*in)) diff --git a/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml b/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml index a737928..2c9ef44 100644 --- a/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml +++ b/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml @@ -75,6 +75,13 @@ spec: workloads. PAY_PER_REQUEST sets the billing mode to On-Demand Mode (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)." type: string + continuousBackups: + description: Represents the settings used to enable point in time + recovery. + properties: + pointInTimeRecoveryEnabled: + type: boolean + type: object globalSecondaryIndexes: description: "One or more global secondary indexes (the maximum is 20) to be created on the table. Each global secondary index in the diff --git a/generator.yaml b/generator.yaml index 89a127a..47b361e 100644 --- a/generator.yaml +++ b/generator.yaml @@ -23,6 +23,11 @@ resources: from: operation: UpdateTimeToLive path: TimeToLiveSpecification + ContinuousBackups: + is_required: false + from: + operation: UpdateContinuousBackups + path: PointInTimeRecoverySpecification AttributeDefinitions: compare: is_ignored: true @@ -134,4 +139,4 @@ resources: type: string - name: STATUS json_path: .status.backupStatus - type: string \ No newline at end of file + type: string diff --git a/helm/crds/dynamodb.services.k8s.aws_tables.yaml b/helm/crds/dynamodb.services.k8s.aws_tables.yaml index 8e7cd24..8f4839a 100644 --- a/helm/crds/dynamodb.services.k8s.aws_tables.yaml +++ b/helm/crds/dynamodb.services.k8s.aws_tables.yaml @@ -75,6 +75,13 @@ spec: workloads. PAY_PER_REQUEST sets the billing mode to On-Demand Mode (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html#HowItWorks.OnDemand)." type: string + continuousBackups: + description: Represents the settings used to enable point in time + recovery. + properties: + pointInTimeRecoveryEnabled: + type: boolean + type: object globalSecondaryIndexes: description: "One or more global secondary indexes (the maximum is 20) to be created on the table. Each global secondary index in the diff --git a/pkg/resource/table/delta.go b/pkg/resource/table/delta.go index ab035b8..85e82fd 100644 --- a/pkg/resource/table/delta.go +++ b/pkg/resource/table/delta.go @@ -51,6 +51,17 @@ func newResourceDelta( delta.Add("Spec.BillingMode", a.ko.Spec.BillingMode, b.ko.Spec.BillingMode) } } + if ackcompare.HasNilDifference(a.ko.Spec.ContinuousBackups, b.ko.Spec.ContinuousBackups) { + delta.Add("Spec.ContinuousBackups", a.ko.Spec.ContinuousBackups, b.ko.Spec.ContinuousBackups) + } else if a.ko.Spec.ContinuousBackups != nil && b.ko.Spec.ContinuousBackups != nil { + if ackcompare.HasNilDifference(a.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled, b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled) { + delta.Add("Spec.ContinuousBackups.PointInTimeRecoveryEnabled", a.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled, b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled) + } else if a.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil && b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil { + if *a.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != *b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled { + delta.Add("Spec.ContinuousBackups.PointInTimeRecoveryEnabled", a.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled, b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled) + } + } + } if ackcompare.HasNilDifference(a.ko.Spec.ProvisionedThroughput, b.ko.Spec.ProvisionedThroughput) { delta.Add("Spec.ProvisionedThroughput", a.ko.Spec.ProvisionedThroughput, b.ko.Spec.ProvisionedThroughput) } else if a.ko.Spec.ProvisionedThroughput != nil && b.ko.Spec.ProvisionedThroughput != nil { diff --git a/pkg/resource/table/hooks.go b/pkg/resource/table/hooks.go index 5067165..38b1bcb 100644 --- a/pkg/resource/table/hooks.go +++ b/pkg/resource/table/hooks.go @@ -32,23 +32,36 @@ import ( ) var ( - ErrTableDeleting = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusDeleting) - ErrTableCreating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusCreating) - ErrTableUpdating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusUpdating) - ErrTableGSIsUpdating = fmt.Errorf("Table GSIs in '%v' state, cannot be modified or deleted", svcsdk.IndexStatusCreating) + ErrTableDeleting = fmt.Errorf( + "Table in '%v' state, cannot be modified or deleted", + svcsdk.TableStatusDeleting, + ) + ErrTableCreating = fmt.Errorf( + "Table in '%v' state, cannot be modified or deleted", + svcsdk.TableStatusCreating, + ) + ErrTableUpdating = fmt.Errorf( + "Table in '%v' state, cannot be modified or deleted", + svcsdk.TableStatusUpdating, + ) + ErrTableGSIsUpdating = fmt.Errorf( + "Table GSIs in '%v' state, cannot be modified or deleted", + svcsdk.IndexStatusCreating, + ) ) +// TerminalStatuses are the status strings that are terminal states for a +// DynamoDB table +var TerminalStatuses = []v1alpha1.TableStatus_SDK{ + v1alpha1.TableStatus_SDK_ARCHIVING, + v1alpha1.TableStatus_SDK_DELETING, +} + var ( - // TerminalStatuses are the status strings that are terminal states for a - // DynamoDB table - TerminalStatuses = []v1alpha1.TableStatus_SDK{ - v1alpha1.TableStatus_SDK_ARCHIVING, - v1alpha1.TableStatus_SDK_DELETING, - } + DefaultTTLEnabledValue = false + DefaultPITREnabledValue = false ) -var DefaultTTLEnabledValue = false - var ( requeueWaitWhileDeleting = ackrequeue.NeededAfter( ErrTableDeleting, @@ -124,7 +137,10 @@ func (rm *resourceManager) customUpdateTable( defer func(err error) { exit(err) }(err) if immutableFieldChanges := rm.getImmutableFieldChanges(delta); len(immutableFieldChanges) > 0 { - msg := fmt.Sprintf("Immutable Spec fields have been modified: %s", strings.Join(immutableFieldChanges, ",")) + msg := fmt.Sprintf( + "Immutable Spec fields have been modified: %s", + strings.Join(immutableFieldChanges, ","), + ) return nil, ackerr.NewTerminalError(fmt.Errorf(msg)) } @@ -187,6 +203,13 @@ func (rm *resourceManager) customUpdateTable( } } + if delta.DifferentAt("Spec.ContinuousBackups") { + err = rm.syncContinuousBackup(ctx, desired) + if err != nil { + return nil, fmt.Errorf("cannot update table %v", err) + } + } + // We want to update fast fields first // Then attributes // then GSI @@ -202,7 +225,8 @@ func (rm *resourceManager) customUpdateTable( } case delta.DifferentAt("Spec.GlobalSecondaryIndexes") && delta.DifferentAt("Spec.AttributeDefinitions"): if err := rm.syncTableGlobalSecondaryIndexes(ctx, latest, desired); err != nil { - if awsErr, ok := ackerr.AWSError(err); ok && awsErr.Code() == "LimitExceededException" { + if awsErr, ok := ackerr.AWSError(err); ok && + awsErr.Code() == "LimitExceededException" { return nil, requeueWaitGSIReady } return nil, err @@ -257,13 +281,17 @@ func (rm *resourceManager) newUpdateTablePayload( input.ProvisionedThroughput = &svcsdk.ProvisionedThroughput{} if r.ko.Spec.ProvisionedThroughput != nil { if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil { - input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits) + input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64( + *r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits, + ) } else { input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0) } if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil { - input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits) + input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64( + *r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits, + ) } else { input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0) } @@ -277,8 +305,11 @@ func (rm *resourceManager) newUpdateTablePayload( StreamEnabled: aws.Bool(*r.ko.Spec.StreamSpecification.StreamEnabled), } // Only set streamViewType when streamSpefication is enabled and streamViewType is non-nil. - if *r.ko.Spec.StreamSpecification.StreamEnabled && r.ko.Spec.StreamSpecification.StreamViewType != nil { - input.StreamSpecification.StreamViewType = aws.String(*r.ko.Spec.StreamSpecification.StreamViewType) + if *r.ko.Spec.StreamSpecification.StreamEnabled && + r.ko.Spec.StreamSpecification.StreamViewType != nil { + input.StreamSpecification.StreamViewType = aws.String( + *r.ko.Spec.StreamSpecification.StreamViewType, + ) } } else { input.StreamSpecification = &svcsdk.StreamSpecification{ @@ -317,7 +348,9 @@ func (rm *resourceManager) syncTableSSESpecification( input.SSESpecification.SSEType = aws.String(*r.ko.Spec.SSESpecification.SSEType) } if r.ko.Spec.SSESpecification.KMSMasterKeyID != nil { - input.SSESpecification.KMSMasterKeyId = aws.String(*r.ko.Spec.SSESpecification.KMSMasterKeyID) + input.SSESpecification.KMSMasterKeyId = aws.String( + *r.ko.Spec.SSESpecification.KMSMasterKeyID, + ) } } } else { @@ -350,13 +383,17 @@ func (rm *resourceManager) syncTableProvisionedThroughput( } if r.ko.Spec.ProvisionedThroughput != nil { if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil { - input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits) + input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64( + *r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits, + ) } else { input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0) } if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil { - input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits) + input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64( + *r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits, + ) } else { input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0) } @@ -395,6 +432,12 @@ func (rm *resourceManager) setResourceAdditionalFields( ko.Spec.TimeToLive = ttlSpec } + if pitrSpec, err := rm.getResourcePointInTimeRecoveryWithContext(ctx, ko.Spec.TableName); err != nil { + return err + } else { + ko.Spec.ContinuousBackups = pitrSpec + } + return nil } @@ -403,11 +446,14 @@ func customPreCompare( a *resource, b *resource, ) { - if ackcompare.HasNilDifference(a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification) { if a.ko.Spec.SSESpecification != nil && b.ko.Spec.SSESpecification == nil { if *a.ko.Spec.SSESpecification.Enabled { - delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification) + delta.Add( + "Spec.SSESpecification", + a.ko.Spec.SSESpecification, + b.ko.Spec.SSESpecification, + ) } } else { delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification) @@ -447,7 +493,11 @@ func customPreCompare( } if len(a.ko.Spec.AttributeDefinitions) != len(b.ko.Spec.AttributeDefinitions) { - delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions) + delta.Add( + "Spec.AttributeDefinitions", + a.ko.Spec.AttributeDefinitions, + b.ko.Spec.AttributeDefinitions, + ) } else if a.ko.Spec.AttributeDefinitions != nil && b.ko.Spec.AttributeDefinitions != nil { if !equalAttributeDefinitions(a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions) { delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions) @@ -455,7 +505,11 @@ func customPreCompare( } if len(a.ko.Spec.GlobalSecondaryIndexes) != len(b.ko.Spec.GlobalSecondaryIndexes) { - delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes) + delta.Add( + "Spec.GlobalSecondaryIndexes", + a.ko.Spec.GlobalSecondaryIndexes, + b.ko.Spec.GlobalSecondaryIndexes, + ) } else if a.ko.Spec.GlobalSecondaryIndexes != nil && b.ko.Spec.GlobalSecondaryIndexes != nil { if !equalGlobalSecondaryIndexesArrays(a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes) { delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes) @@ -463,7 +517,11 @@ func customPreCompare( } if len(a.ko.Spec.LocalSecondaryIndexes) != len(b.ko.Spec.LocalSecondaryIndexes) { - delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes) + delta.Add( + "Spec.LocalSecondaryIndexes", + a.ko.Spec.LocalSecondaryIndexes, + b.ko.Spec.LocalSecondaryIndexes, + ) } else if a.ko.Spec.LocalSecondaryIndexes != nil && b.ko.Spec.LocalSecondaryIndexes != nil { if !equalLocalSecondaryIndexesArrays(a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes) { delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes) @@ -496,6 +554,12 @@ func customPreCompare( Enabled: &DefaultTTLEnabledValue, } } + if a.ko.Spec.ContinuousBackups == nil && b.ko.Spec.ContinuousBackups != nil && + b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil { + a.ko.Spec.ContinuousBackups = &v1alpha1.PointInTimeRecoverySpecification{ + PointInTimeRecoveryEnabled: &DefaultPITREnabledValue, + } + } } // equalAttributeDefinitions return whether two AttributeDefinition arrays are equal or not. @@ -614,7 +678,10 @@ func equalLocalSecondaryIndexes( if !equalStrings(a.Projection.ProjectionType, b.Projection.ProjectionType) { return false } - if !ackcompare.SliceStringPEqual(a.Projection.NonKeyAttributes, b.Projection.NonKeyAttributes) { + if !ackcompare.SliceStringPEqual( + a.Projection.NonKeyAttributes, + b.Projection.NonKeyAttributes, + ) { return false } } diff --git a/pkg/resource/table/hooks_continuous_backup.go b/pkg/resource/table/hooks_continuous_backup.go new file mode 100644 index 0000000..77180ff --- /dev/null +++ b/pkg/resource/table/hooks_continuous_backup.go @@ -0,0 +1,83 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package table + +import ( + "context" + + ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" + svcsdk "github.com/aws/aws-sdk-go/service/dynamodb" + + "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1" +) + +// syncContinuousBackup syncs the PointInTimeRecoverySpecification of the dynamodb table. +func (rm *resourceManager) syncContinuousBackup( + ctx context.Context, + desired *resource, +) (err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.syncContinuousBackup") + defer func(err error) { exit(err) }(err) + + pitrSpec := &svcsdk.PointInTimeRecoverySpecification{} + if desired.ko.Spec.ContinuousBackups != nil && + desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil { + pitrSpec.SetPointInTimeRecoveryEnabled( + *desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled, + ) + } + + _, err = rm.sdkapi.UpdateContinuousBackupsWithContext( + ctx, + &svcsdk.UpdateContinuousBackupsInput{ + TableName: desired.ko.Spec.TableName, + PointInTimeRecoverySpecification: pitrSpec, + }, + ) + rm.metrics.RecordAPICall("UPDATE", "UpdateContinuousBackups", err) + return err +} + +// getResourcePointInTimeRecoveryWithContext gets the PointInTimeRecoverySpecification of the dynamodb table. +func (rm *resourceManager) getResourcePointInTimeRecoveryWithContext( + ctx context.Context, + tableName *string, +) (*v1alpha1.PointInTimeRecoverySpecification, error) { + var err error + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.getResourcePointInTimeRecoveryWithContext") + defer func(err error) { exit(err) }(err) + + res, err := rm.sdkapi.DescribeContinuousBackupsWithContext( + ctx, + &svcsdk.DescribeContinuousBackupsInput{ + TableName: tableName, + }, + ) + + rm.metrics.RecordAPICall("GET", "DescribeContinuousBackups", err) + if err != nil { + return nil, err + } + + isEnabled := false + if res.ContinuousBackupsDescription != nil { + isEnabled = *res.ContinuousBackupsDescription.PointInTimeRecoveryDescription.PointInTimeRecoveryStatus == svcsdk.PointInTimeRecoveryStatusEnabled + } + + return &v1alpha1.PointInTimeRecoverySpecification{ + PointInTimeRecoveryEnabled: &isEnabled, + }, nil +} diff --git a/test/e2e/requirements.txt b/test/e2e/requirements.txt index 1575c5a..8d94056 100644 --- a/test/e2e/requirements.txt +++ b/test/e2e/requirements.txt @@ -1 +1 @@ -acktest @ git+https://github.com/aws-controllers-k8s/test-infra.git@cee228cabae1c65599df8f87b0860d161f8093bf +acktest @ git+https://github.com/aws-controllers-k8s/test-infra.git@38ce32256cc2552ab54e190cc8a8618e93af9e0c diff --git a/test/e2e/table.py b/test/e2e/table.py index 5d5bf33..e4af7e3 100644 --- a/test/e2e/table.py +++ b/test/e2e/table.py @@ -72,6 +72,18 @@ def __call__(self, record: dict) -> bool: def ttl_on_attribute_matches(attr_name: str) -> TableMatchFunc: return TTLAttributeMatcher(attr_name) +class PITRMatcher: + def __init__(self, enabled: bool): + self.enabled = enabled + + def __call__(self, record: dict) -> bool: + pitr_enabled = get_point_in_time_recovery_enabled(record['TableName']) + if pitr_enabled is None: + return False + return pitr_enabled == self.enabled + +def point_in_time_recovery_matches(enabled: bool) -> TableMatchFunc: + return PITRMatcher(enabled) class StreamSpecificationMatcher: def __init__(self, enabled: bool): @@ -230,4 +242,16 @@ def get_time_to_live(table_name): resp = c.describe_time_to_live(TableName=table_name) return resp['TimeToLiveDescription'] except c.exceptions.ResourceNotFoundException: - return None \ No newline at end of file + return None + +def get_point_in_time_recovery_enabled(table_name): + """Returns whether point in time recovery is enabled for the table with a supplied name. + + If no such Table exists, returns None. + """ + c = boto3.client('dynamodb', region_name=get_region()) + try: + resp = c.describe_continuous_backups(TableName=table_name) + return resp['ContinuousBackupsDescription']['PointInTimeRecoveryDescription']['PointInTimeRecoveryStatus'] == 'ENABLED' + except c.exceptions.ResourceNotFoundException: + return None diff --git a/test/e2e/tests/test_table.py b/test/e2e/tests/test_table.py index 4ef443f..a5974fe 100644 --- a/test/e2e/tests/test_table.py +++ b/test/e2e/tests/test_table.py @@ -265,6 +265,59 @@ def test_enable_ttl(self, table_lsi): ttl_status = ttl["TimeToLiveStatus"] assert ttl_status in ("ENABLED", "ENABLING") + def test_enable_point_in_time_recovery(self, table_lsi): + (ref, res) = table_lsi + + table_name = res["spec"]["tableName"] + + # Check DynamoDB Table exists + assert self.table_exists(table_name) + + # Get CR latest revision + cr = k8s.wait_resource_consumed_by_controller(ref) + + # Update PITR + updates = { + "spec": { + "continuousBackups": { + "pointInTimeRecoveryEnabled": True + } + } + } + + # Patch k8s resource + k8s.patch_custom_resource(ref, updates) + + table.wait_until( + table_name, + table.point_in_time_recovery_matches(True), + ) + + pitr_enabled = table.get_point_in_time_recovery_enabled(table_name) + assert pitr_enabled is not None + assert pitr_enabled + + # turn off pitr again and ensure it is disabled + updates = { + "spec": { + "continuousBackups": { + "pointInTimeRecoveryEnabled": False + } + } + } + + # Patch k8s resource + k8s.patch_custom_resource(ref, updates) + + table.wait_until( + table_name, + table.point_in_time_recovery_matches(False), + ) + + pitr_enabled = table.get_point_in_time_recovery_enabled(table_name) + assert pitr_enabled is not None + assert not pitr_enabled + def test_enable_stream_specification(self, table_lsi): (ref, res) = table_lsi @@ -706,4 +759,4 @@ def test_multi_updates(self, table_gsi): assert latestTable["ProvisionedThroughput"] is not None assert latestTable["ProvisionedThroughput"]["ReadCapacityUnits"] == 10 - assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10 \ No newline at end of file + assert latestTable["ProvisionedThroughput"]["WriteCapacityUnits"] == 10