-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Cloudfront Testing * Unit test * Added env variable
- Loading branch information
Showing
5 changed files
with
119 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
"""This module provides a class to generate signed URLs for AWS CloudFront using RSA keys. | ||
The `CloudFrontUrlSigner` class allows you to create and sign CloudFront URLs with optional custom policies. | ||
""" | ||
|
||
import json | ||
from datetime import datetime, timedelta | ||
from typing import Any, Dict, Optional | ||
|
||
from botocore.signers import CloudFrontSigner | ||
from cryptography.hazmat.primitives import hashes, serialization | ||
from cryptography.hazmat.primitives.asymmetric import padding | ||
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey | ||
|
||
|
||
class CloudFrontUrlSigner: | ||
"""A class to generate signed URLs for AWS CloudFront using RSA keys.""" | ||
|
||
def __init__(self, key_id: str, private_key_path: str) -> None: | ||
"""Initialize the CloudFrontUrlSigner with a key ID and the path to the private key file. | ||
:param key_id: The CloudFront key ID associated with the public key in your CloudFront key group. | ||
:param private_key_path: The path to the private key PEM file. | ||
""" | ||
self.key_id = key_id | ||
self.private_key_path = private_key_path | ||
self.cf_signer = CloudFrontSigner(key_id, self._rsa_signer) | ||
|
||
def _rsa_signer(self, message: bytes) -> bytes: | ||
"""RSA signer function that signs a message using the private key. | ||
Args: | ||
message: The message to be signed. | ||
Returns: | ||
bytes: The RSA signature of the message. | ||
Raises: | ||
ValueError: If the loaded key is not an RSA private key. | ||
""" | ||
with open(self.private_key_path, "rb") as key_file: | ||
private_key = serialization.load_pem_private_key( | ||
key_file.read(), | ||
password=None, | ||
) | ||
|
||
if not isinstance(private_key, RSAPrivateKey): | ||
raise ValueError("The provided key is not an RSA private key") | ||
|
||
return private_key.sign(message, padding.PKCS1v15(), hashes.SHA1()) # CloudFront requires SHA-1 | ||
|
||
def generate_presigned_url(self, url: str, policy: Optional[str] = None) -> str: | ||
"""Generate a presigned URL for CloudFront using an optional custom policy. | ||
:param url: The URL to sign. | ||
:param policy: (Optional) A custom policy for the URL. | ||
:return: The signed URL. | ||
""" | ||
return self.cf_signer.generate_presigned_url(url, policy=policy) | ||
|
||
def create_custom_policy(self, url: str, expire_days: float = 1, ip_range: Optional[str] = None) -> str: | ||
"""Create a custom policy for CloudFront signed URLs. | ||
:param url: The URL to be signed. | ||
:param expire_days: Number of days until the policy expires (can be fractional, e.g., 1/24 for one hour). | ||
:param ip_range: Optional IP range to restrict access (e.g., "203.0.113.0/24"). | ||
:return: The custom policy in JSON format. | ||
""" | ||
expiration_time = int((datetime.utcnow() + timedelta(days=expire_days)).timestamp()) | ||
policy: Dict[str, Any] = { | ||
"Statement": [ | ||
{ | ||
"Resource": url, | ||
"Condition": { | ||
"DateLessThan": {"AWS:EpochTime": expiration_time}, | ||
}, | ||
} | ||
] | ||
} | ||
if ip_range: | ||
policy["Statement"][0]["Condition"]["IpAddress"] = {"AWS:SourceIp": ip_range} | ||
|
||
return json.dumps(policy, separators=(",", ":")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters