Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Final retries for msk cluster #9793

Merged
merged 2 commits into from
Aug 20, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,11 @@ func resourceAwsMskClusterCreate(d *schema.ResourceData, meta interface{}) error
}

func waitForMskClusterCreation(conn *kafka.Kafka, arn string) error {
return resource.Retry(60*time.Minute, func() *resource.RetryError {
out, err := conn.DescribeCluster(&kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
})
input := &kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
}
err := resource.Retry(60*time.Minute, func() *resource.RetryError {
out, err := conn.DescribeCluster(input)
if err != nil {
return resource.NonRetryableError(err)
}
Expand All @@ -265,6 +266,24 @@ func waitForMskClusterCreation(conn *kafka.Kafka, arn string) error {
}
return resource.RetryableError(fmt.Errorf("%q: cluster still creating", arn))
})
if isResourceTimeoutError(err) {
out, err := conn.DescribeCluster(input)
if err != nil {
return fmt.Errorf("Error describing MSK cluster state: %s", err)
}
if out.ClusterInfo != nil {
if aws.StringValue(out.ClusterInfo.State) == kafka.ClusterStateFailed {
return fmt.Errorf("Cluster creation failed with cluster state %q", kafka.ClusterStateFailed)
}
if aws.StringValue(out.ClusterInfo.State) == kafka.ClusterStateActive {
return nil
}
}
}
if err != nil {
return fmt.Errorf("Error waiting for MSK cluster creation: %s", err)
}
return nil
}

func resourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error {
Expand Down Expand Up @@ -597,10 +616,11 @@ func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error
}

func resourceAwsMskClusterDeleteWaiter(conn *kafka.Kafka, arn string) error {
return resource.Retry(60*time.Minute, func() *resource.RetryError {
_, err := conn.DescribeCluster(&kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
})
input := &kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
}
err := resource.Retry(60*time.Minute, func() *resource.RetryError {
_, err := conn.DescribeCluster(input)

if err != nil {
if isAWSErr(err, kafka.ErrCodeNotFoundException, "") {
Expand All @@ -611,6 +631,16 @@ func resourceAwsMskClusterDeleteWaiter(conn *kafka.Kafka, arn string) error {

return resource.RetryableError(fmt.Errorf("timeout while waiting for the cluster %q to be deleted", arn))
})
if isResourceTimeoutError(err) {
_, err = conn.DescribeCluster(input)
if isAWSErr(err, kafka.ErrCodeNotFoundException, "") {
return nil
}
}
if err != nil {
return fmt.Errorf("Error waiting for MSK cluster to be deleted: %s", err)
}
return nil
}

func mskClusterOperationRefreshFunc(conn *kafka.Kafka, clusterOperationARN string) resource.StateRefreshFunc {
Expand Down