Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

Commit

Permalink
feat: Add S3 Bucket fetch speed improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
spangenberg committed May 4, 2022
1 parent ded9828 commit 3ed0f71
Showing 1 changed file with 108 additions and 84 deletions.
192 changes: 108 additions & 84 deletions resources/services/s3/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ import (
"github.com/cloudquery/cq-provider-sdk/provider/schema"
)

// fetchS3BucketsPoolSize describes the amount of go routines that resolve the S3 buckets
const fetchS3BucketsPoolSize = 10

func S3Buckets() *schema.Table {
return &schema.Table{
Name: "aws_s3_buckets",
Description: "An Amazon S3 bucket is a public cloud storage resource available in Amazon Web Services' (AWS) Simple Storage Service (S3)",
Resolver: fetchS3Buckets,
Multiplex: client.AccountMultiplex,
IgnoreError: client.IgnoreAccessDeniedServiceDisabled,
DeleteFilter: client.DeleteAccountFilter,
PostResourceResolver: resolveS3BucketsAttributes,
Options: schema.TableCreationOptions{PrimaryKeys: []string{"account_id", "name"}},
Name: "aws_s3_buckets",
Description: "An Amazon S3 bucket is a public cloud storage resource available in Amazon Web Services' (AWS) Simple Storage Service (S3)",
Resolver: fetchS3Buckets,
Multiplex: client.AccountMultiplex,
IgnoreError: client.IgnoreAccessDeniedServiceDisabled,
DeleteFilter: client.DeleteAccountFilter,
Options: schema.TableCreationOptions{PrimaryKeys: []string{"account_id", "name"}},
Columns: []schema.Column{
{
Name: "account_id",
Expand Down Expand Up @@ -451,27 +453,46 @@ func S3Buckets() *schema.Table {
// ====================================================================================================================
// Table Resolver Functions
// ====================================================================================================================

func fetchS3Buckets(ctx context.Context, meta schema.ClientMeta, parent *schema.Resource, res chan<- interface{}) error {
svc := meta.(*client.Client).Services().S3
response, err := svc.ListBuckets(ctx, nil)
if err != nil {
return diag.WrapError(err)
}
wb := make([]*WrappedBucket, len(response.Buckets))
for i, b := range response.Buckets {
wb[i] = &WrappedBucket{b, nil, nil}

buckets := make(chan types.Bucket, len(response.Buckets))
errs := make(chan error, len(response.Buckets))
for i := 0; i < fetchS3BucketsPoolSize; i++ {
go fetchS3BucketsWorker(ctx, meta, buckets, errs, res)
}
for _, bucket := range response.Buckets {
buckets <- bucket
}
close(buckets)

res <- wb
return nil
var diags diag.Diagnostics
for i := 0; i < len(response.Buckets); i++ {
diags = diags.Add(<-errs)
}
return diags
}
func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource) error {

func fetchS3BucketsWorker(ctx context.Context, meta schema.ClientMeta, buckets <-chan types.Bucket, err chan<- error, res chan<- interface{}) {
for bucket := range buckets {
wb := &WrappedBucket{Bucket: bucket}
e := resolveS3BucketsAttributes(ctx, meta, wb)
res <- wb
err <- e
}
}

func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket) error {
log := meta.Logger()
r := resource.Item.(*WrappedBucket)
log.Debug("fetching bucket attributes", "bucket", aws.ToString(r.Name))
log.Debug("fetching bucket attributes", "bucket", aws.ToString(resource.Name))
c := meta.(*client.Client)
mgr := c.Services().S3Manager
output, err := mgr.GetBucketRegion(ctx, *r.Name)
output, err := mgr.GetBucketRegion(ctx, *resource.Name)
if err != nil {
if c.IsNotFoundError(err) {
return nil
Expand All @@ -483,37 +504,35 @@ func resolveS3BucketsAttributes(ctx context.Context, meta schema.ClientMeta, res
// This is a weird corner case by AWS API https://github.com/aws/aws-sdk-net/issues/323#issuecomment-196584538
bucketRegion = output
}
if err := resource.Set("region", bucketRegion); err != nil {
return err
}
if err := resolveBucketLogging(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
resource.Region = bucketRegion
if err = resolveBucketLogging(ctx, meta, resource, bucketRegion); err != nil {
if c.IsNotFoundError(err) {
return nil
}
return err
}

if err := resolveBucketPolicy(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketPolicy(ctx, meta, resource, bucketRegion); err != nil {
return err
}

if err := resolveBucketVersioning(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketVersioning(ctx, meta, resource, bucketRegion); err != nil {
return err
}

if err := resolveBucketPublicAccessBlock(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketPublicAccessBlock(ctx, meta, resource, bucketRegion); err != nil {
return err
}

if err := resolveBucketReplication(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketReplication(ctx, meta, resource, bucketRegion); err != nil {
return err
}

if err := resolveBucketTagging(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketTagging(ctx, meta, resource, bucketRegion); err != nil {
return err
}

if err := resolveBucketOwnershipControls(ctx, meta, resource, *r.Name, bucketRegion); err != nil {
if err = resolveBucketOwnershipControls(ctx, meta, resource, bucketRegion); err != nil {
return err
}

Expand All @@ -527,6 +546,9 @@ func fetchS3BucketGrants(ctx context.Context, meta schema.ClientMeta, parent *sc
options.Region = parent.Get("region").(string)
})
if err != nil {
if client.IsAWSError(err, "NoSuchBucket") {
return nil
}
return diag.WrapError(err)
}
res <- aclOutput.Grants
Expand All @@ -540,7 +562,7 @@ func fetchS3BucketCorsRules(ctx context.Context, meta schema.ClientMeta, parent
options.Region = parent.Get("region").(string)
})
if err != nil {
if client.IsAWSError(err, "NoSuchCORSConfiguration") {
if client.IsAWSError(err, "NoSuchCORSConfiguration", "NoSuchBucket") {
return nil
}
return err
Expand Down Expand Up @@ -640,84 +662,89 @@ func resolveS3BucketLifecycleTransitions(ctx context.Context, meta schema.Client

type WrappedBucket struct {
types.Bucket
ReplicationRole *string
ReplicationRules []types.ReplicationRule
ReplicationRole *string
ReplicationRules []types.ReplicationRule
Region string
LoggingTargetBucket *string
LoggingTargetPrefix *string
Policy *string
VersioningStatus types.BucketVersioningStatus
VersioningMfaDelete types.MFADeleteStatus
BlockPublicAcls bool
BlockPublicPolicy bool
IgnorePublicAcls bool
RestrictPublicBuckets bool
Tags *string
OwnershipControls []string
}

func resolveBucketLogging(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketLogging(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
svc := meta.(*client.Client).Services().S3
loggingOutput, err := svc.GetBucketLogging(ctx, &s3.GetBucketLoggingInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
loggingOutput, err := svc.GetBucketLogging(ctx, &s3.GetBucketLoggingInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})
if err != nil {
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketLogging", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketLogging", "bucket", resource.Name, "err", err)
return nil
}
return err
}
if loggingOutput.LoggingEnabled == nil {
return nil
}
if err := resource.Set("logging_target_bucket", loggingOutput.LoggingEnabled.TargetBucket); err != nil {
return err
}
if err := resource.Set("logging_target_prefix", loggingOutput.LoggingEnabled.TargetPrefix); err != nil {
return err
}
resource.LoggingTargetBucket = loggingOutput.LoggingEnabled.TargetBucket
resource.LoggingTargetPrefix = loggingOutput.LoggingEnabled.TargetPrefix
return nil
}

func resolveBucketPolicy(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketPolicy(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3
policyOutput, err := svc.GetBucketPolicy(ctx, &s3.GetBucketPolicyInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
policyOutput, err := svc.GetBucketPolicy(ctx, &s3.GetBucketPolicyInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})
// check if we got an error but its access denied we can continue
if err != nil {
// if we got an error and its not a NoSuchBucketError, return err
// if we got an error, and it's not a NoSuchBucketError, return err
if client.IsAWSError(err, "NoSuchBucketPolicy") {
return nil
}
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketPolicy", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketPolicy", "bucket", resource.Name, "err", err)
return nil
}
return err
}
if policyOutput == nil {
return nil
}
return resource.Set("policy", policyOutput.Policy)
resource.Policy = policyOutput.Policy
return nil
}

func resolveBucketVersioning(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketVersioning(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3
versioningOutput, err := svc.GetBucketVersioning(ctx, &s3.GetBucketVersioningInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
versioningOutput, err := svc.GetBucketVersioning(ctx, &s3.GetBucketVersioningInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})
if err != nil {
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketVersioning", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketVersioning", "bucket", resource.Name, "err", err)
return nil
}
return err
}
if err := resource.Set("versioning_status", versioningOutput.Status); err != nil {
return err
}
if err := resource.Set("versioning_mfa_delete", versioningOutput.MFADelete); err != nil {
return err
}
resource.VersioningStatus = versioningOutput.Status
resource.VersioningMfaDelete = versioningOutput.MFADelete
return nil
}

func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3
publicAccessOutput, err := svc.GetPublicAccessBlock(ctx, &s3.GetPublicAccessBlockInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
publicAccessOutput, err := svc.GetPublicAccessBlock(ctx, &s3.GetPublicAccessBlockInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})
if err != nil {
Expand All @@ -726,30 +753,22 @@ func resolveBucketPublicAccessBlock(ctx context.Context, meta schema.ClientMeta,
return nil
}
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetPublicAccessBlock", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetPublicAccessBlock", "bucket", resource.Name, "err", err)
return nil
}
return err
}
if err := resource.Set("block_public_acls", publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicAcls); err != nil {
return err
}
if err := resource.Set("block_public_policy", publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicPolicy); err != nil {
return err
}
if err := resource.Set("ignore_public_acls", publicAccessOutput.PublicAccessBlockConfiguration.IgnorePublicAcls); err != nil {
return err
}
if err := resource.Set("restrict_public_buckets", publicAccessOutput.PublicAccessBlockConfiguration.RestrictPublicBuckets); err != nil {
return err
}
resource.BlockPublicAcls = publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicAcls
resource.BlockPublicPolicy = publicAccessOutput.PublicAccessBlockConfiguration.BlockPublicPolicy
resource.IgnorePublicAcls = publicAccessOutput.PublicAccessBlockConfiguration.IgnorePublicAcls
resource.RestrictPublicBuckets = publicAccessOutput.PublicAccessBlockConfiguration.RestrictPublicBuckets
return nil
}

func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3
replicationOutput, err := svc.GetBucketReplication(ctx, &s3.GetBucketReplicationInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
replicationOutput, err := svc.GetBucketReplication(ctx, &s3.GetBucketReplicationInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})

Expand All @@ -759,26 +778,23 @@ func resolveBucketReplication(ctx context.Context, meta schema.ClientMeta, resou
return nil
}
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketReplication", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketReplication", "bucket", resource.Name, "err", err)
return nil
}
return err
}
if replicationOutput.ReplicationConfiguration == nil {
return nil
}
if err := resource.Set("replication_role", replicationOutput.ReplicationConfiguration.Role); err != nil {
return err
}
// We set this here for fetchReplicationRules to get and insert
resource.Item.(*WrappedBucket).ReplicationRules = replicationOutput.ReplicationConfiguration.Rules
resource.ReplicationRole = replicationOutput.ReplicationConfiguration.Role
resource.ReplicationRules = replicationOutput.ReplicationConfiguration.Rules
return nil
}

func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3
taggingOutput, err := svc.GetBucketTagging(ctx, &s3.GetBucketTaggingInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
taggingOutput, err := svc.GetBucketTagging(ctx, &s3.GetBucketTaggingInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})
if err != nil {
Expand All @@ -787,7 +803,7 @@ func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource
return nil
}
if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketTagging", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketTagging", "bucket", resource.Name, "err", err)
return nil
}
return err
Expand All @@ -799,14 +815,21 @@ func resolveBucketTagging(ctx context.Context, meta schema.ClientMeta, resource
for _, t := range taggingOutput.TagSet {
tags[*t.Key] = t.Value
}
return resource.Set("tags", tags)

b, err := json.Marshal(tags)
if err != nil {
return err
}
t := string(b)
resource.Tags = &t
return nil
}

func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, resource *schema.Resource, bucketName, bucketRegion string) error {
func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta, resource *WrappedBucket, bucketRegion string) error {
c := meta.(*client.Client)
svc := c.Services().S3

getBucketOwnershipControlOutput, err := svc.GetBucketOwnershipControls(ctx, &s3.GetBucketOwnershipControlsInput{Bucket: aws.String(bucketName)}, func(options *s3.Options) {
getBucketOwnershipControlOutput, err := svc.GetBucketOwnershipControls(ctx, &s3.GetBucketOwnershipControlsInput{Bucket: resource.Name}, func(options *s3.Options) {
options.Region = bucketRegion
})

Expand All @@ -817,7 +840,7 @@ func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta,
}

if client.IgnoreAccessDeniedServiceDisabled(err) {
meta.Logger().Warn("received access denied on GetBucketOwnershipControls", "bucket", bucketName, "err", err)
meta.Logger().Warn("received access denied on GetBucketOwnershipControls", "bucket", resource.Name, "err", err)
return nil
}

Expand All @@ -840,5 +863,6 @@ func resolveBucketOwnershipControls(ctx context.Context, meta schema.ClientMeta,
stringArray = append(stringArray, string(ownershipControlRule.ObjectOwnership))
}

return resource.Set("ownership_controls", stringArray)
resource.OwnershipControls = stringArray
return nil
}

0 comments on commit 3ed0f71

Please sign in to comment.