Skip to content

Commit

Permalink
[s3] move auth & cloudfront_signer config validation to init (#1302)
Browse files Browse the repository at this point in the history
  • Loading branch information
gerrod3 committed Sep 29, 2023
1 parent 635e170 commit d8a097d
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 33 deletions.
55 changes: 22 additions & 33 deletions storages/backends/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,24 @@ class S3Storage(CompressStorageMixin, BaseStorage):
"""

default_content_type = "application/octet-stream"
# If config provided in init, signature_version and addressing_style settings/args
# are ignored.
# If config provided in subclass, signature_version and addressing_style
# settings/args are ignored.
config = None

def __init__(self, **settings):
cloudfront_key_id = settings.pop("cloudfront_key_id", None)
cloudfront_key = settings.pop("cloudfront_key", None)
self.cloudfront_signer = settings.pop("cloudfront_signer", None)

super().__init__(**settings)

check_location(self)

if (self.access_key or self.secret_key) and self.session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and "
"AWS_S3_SECRET_ACCESS_KEY/secret_key"
)

self._bucket = None
self._connections = threading.local()

Expand All @@ -311,39 +317,21 @@ def __init__(self, **settings):
if self.transfer_config is None:
self.transfer_config = TransferConfig(use_threads=self.use_threads)

if cloudfront_key_id and cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
if not self.cloudfront_signer:
if self.cloudfront_key_id and self.cloudfront_key:
self.cloudfront_signer = self.get_cloudfront_signer(
self.cloudfront_key_id, self.cloudfront_key
)
elif bool(self.cloudfront_key_id) ^ bool(self.cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)

def get_cloudfront_signer(self, key_id, key):
return _cloud_front_signer_from_pem(key_id, key)

def get_default_settings(self):
cloudfront_key_id = setting("AWS_CLOUDFRONT_KEY_ID")
cloudfront_key = setting("AWS_CLOUDFRONT_KEY")
if bool(cloudfront_key_id) ^ bool(cloudfront_key):
raise ImproperlyConfigured(
"Both AWS_CLOUDFRONT_KEY_ID and AWS_CLOUDFRONT_KEY must be "
"provided together."
)

if cloudfront_key_id:
cloudfront_signer = self.get_cloudfront_signer(
cloudfront_key_id, cloudfront_key
)
else:
cloudfront_signer = None

s3_access_key_id = setting("AWS_S3_ACCESS_KEY_ID")
s3_secret_access_key = setting("AWS_S3_SECRET_ACCESS_KEY")
s3_session_profile = setting("AWS_S3_SESSION_PROFILE")
if (s3_access_key_id or s3_secret_access_key) and s3_session_profile:
raise ImproperlyConfigured(
"AWS_S3_SESSION_PROFILE should not be provided with "
"AWS_S3_ACCESS_KEY_ID and AWS_S3_SECRET_ACCESS_KEY"
)

return {
"access_key": setting("AWS_S3_ACCESS_KEY_ID", setting("AWS_ACCESS_KEY_ID")),
"secret_key": setting(
Expand All @@ -361,7 +349,8 @@ def get_default_settings(self):
"signature_version": setting("AWS_S3_SIGNATURE_VERSION"),
"location": setting("AWS_LOCATION", ""),
"custom_domain": setting("AWS_S3_CUSTOM_DOMAIN"),
"cloudfront_signer": cloudfront_signer,
"cloudfront_key_id": setting("AWS_CLOUDFRONT_KEY_ID"),
"cloudfront_key": setting("AWS_CLOUDFRONT_KEY"),
"addressing_style": setting("AWS_S3_ADDRESSING_STYLE"),
"file_name_charset": setting("AWS_S3_FILE_NAME_CHARSET", "utf-8"),
"gzip": setting("AWS_IS_GZIPPED", False),
Expand Down
95 changes: 95 additions & 0 deletions tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,101 @@ def test_transfer_config(self):
storage = s3.S3Storage()
self.assertFalse(storage.transfer_config.use_threads)

def test_cloudfront_config(self):
# Valid configs
storage = s3.S3Storage()
self.assertIsNone(storage.cloudfront_signer)

key_id = "test-id"
pem = dedent(
"""\
-----BEGIN RSA PRIVATE KEY-----
MIICWwIBAAKBgQCXVuwcMk+JmVSKuQ1K4dZx4Z1dEcRQgTlqvhAyljIpttXlZh2/
fD3GkJCiqfwEmo+cdNK/LFzRj/CX8Wz1z1lH2USONpG6sAkotkatCbejiItDu5y6
janGJHfuWXu6B/o9gwZylU1gIsePY3lLNk+r9QhXUO4jXw6zLJftVwKPhQIDAQAB
AoGAbpkRV9HUmoQ5al+uPSkp5HOy4s8XHpYxdbaMc8ubwSxiyJCF8OhE5RXE/Xso
N90UUox1b0xmUKfWddPzgvgTD/Ub7D6Ukf+nVWDX60tWgNxICAUHptGL3tWweaAy
H+0+vZ0TzvTt9r00vW0FzO7F8X9/Rs1ntDRLtF3RCCxdq0kCQQDHFu+t811lCvy/
67rMEKGvNsNNSTrzOrNr3PqUrCnOrzKazjFVjsKv5VzI/U+rXGYKWJsMpuCFiHZ3
DILUC09TAkEAwpm2S6MN6pzn9eY6pmhOxZ+GQGGRUkKZfC1GDxaRSRb8sKTjptYw
WSemJSxiDzdj3Po2hF0lbhkpJgUq6xnCxwJAZgHHfn5CLSJrDD7Q7/vZi/foK3JJ
BRTfl3Wa4pAvv5meuRjKyEakVBGV79lyd5+ZHNX3Y40hXunjoO3FHrZIxwJAdRzu
waxahrRxQOKSr20c4wAzWnGddIUSO9I/VHs/al5EKsbBHrnOlQkwizSfuwqZtfZ7
csNf8FeCFRiNELoLJwJAZxWBE2+8J9VW9AQ0SE7j4FyM/B8FvRhF5PLAAsw/OxHO
SxiFP7Ptdac1tm5H5zOqaqSHWphI19HNNilXKmxuCA==
-----END RSA PRIVATE KEY-----"""
).encode("ascii")

with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id, AWS_CLOUDFRONT_KEY=pem):
storage = s3.S3Storage()
self.assertIsNotNone(storage.cloudfront_signer)

storage = s3.S3Storage(cloudfront_key_id=key_id, cloudfront_key=pem)
self.assertIsNotNone(storage.cloudfront_signer)

cloudfront_signer = storage.get_cloudfront_signer(key_id, pem)
storage = s3.S3Storage(cloudfront_signer=cloudfront_signer)
self.assertIsNotNone(storage.cloudfront_signer)

with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id):
storage = s3.S3Storage(cloudfront_key=pem)
self.assertIsNotNone(storage.cloudfront_signer)

# Invalid configs
msg = (
"Both AWS_CLOUDFRONT_KEY_ID/cloudfront_key_id and "
"AWS_CLOUDFRONT_KEY/cloudfront_key must be provided together."
)
with override_settings(AWS_CLOUDFRONT_KEY_ID=key_id):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with override_settings(AWS_CLOUDFRONT_KEY=pem):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key_id=key_id)

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(cloudfront_key=pem)

def test_auth_config(self):
# Valid configs
with override_settings(
AWS_S3_ACCESS_KEY_ID="foo", AWS_S3_SECRET_ACCESS_KEY="boo"
):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

with override_settings(AWS_ACCESS_KEY_ID="foo", AWS_SECRET_ACCESS_KEY="boo"):
storage = s3.S3Storage()
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

storage = s3.S3Storage(access_key="foo", secret_key="boo")
self.assertEqual(storage.access_key, "foo")
self.assertEqual(storage.secret_key, "boo")

# Invalid configs
msg = (
"AWS_S3_SESSION_PROFILE/session_profile should not be provided with "
"AWS_S3_ACCESS_KEY_ID/access_key and AWS_S3_SECRET_ACCESS_KEY/secret_key"
)
with override_settings(
AWS_ACCESS_KEY_ID="foo",
AWS_SECRET_ACCESS_KEY="boo",
AWS_S3_SESSION_PROFILE="moo",
):
with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage()

with self.assertRaisesMessage(ImproperlyConfigured, msg):
storage = s3.S3Storage(
access_key="foo", secret_key="boo", session_profile="moo"
)


class S3StaticStorageTests(TestCase):
def setUp(self):
Expand Down

0 comments on commit d8a097d

Please sign in to comment.