Source code for aws.rds.helpers
from datetime import datetime
from helpers import get_param_id
[docs]def is_rds_db_snapshot_attr_public_access(rds_db_snapshot_attribute):
"""
Checks whether a RDS snapshot attribute is:
{
"AttributeName": "restore",
"AttributeValues": ["random_aws_account_id", "any"]
}
>>> is_rds_db_snapshot_attr_public_access({"AttributeName": "restore", "AttributeValues": ["any"]})
True
>>> is_rds_db_snapshot_attr_public_access({"AttributeName": "restore", "AttributeValues": ["aws_account_id"]})
False
>>> is_rds_db_snapshot_attr_public_access({"AttributeName": "restore", "AttributeValues": []})
False
>>> is_rds_db_snapshot_attr_public_access({"AttributeName": "blorg", "AttributeValues": ["any"]})
False
>>> is_rds_db_snapshot_attr_public_access([])
Traceback (most recent call last):
...
TypeError: list indices must be integers or slices, not str
>>> is_rds_db_snapshot_attr_public_access(0)
Traceback (most recent call last):
...
TypeError: 'int' object is not subscriptable
>>> is_rds_db_snapshot_attr_public_access(None)
Traceback (most recent call last):
...
TypeError: 'NoneType' object is not subscriptable
"""
return (
rds_db_snapshot_attribute["AttributeName"] == "restore"
and "any" in rds_db_snapshot_attribute["AttributeValues"]
)
[docs]def does_rds_db_security_group_grant_public_access(sg):
"""
Checks an RDS instance for a DB security group with CIDRIP 0.0.0.0/0
>>> does_rds_db_security_group_grant_public_access(
... {"IPRanges": [{"CIDRIP": "127.0.0.1/32", "Status": "authorized"},
... {"CIDRIP": "0.0.0.0/0", "Status": "authorized"}]})
True
>>> does_rds_db_security_group_grant_public_access({"IPRanges": []})
False
"""
return any(
ipr["CIDRIP"] == "0.0.0.0/0" and ipr["Status"] == "authorized"
for ipr in sg["IPRanges"]
)
[docs]def does_vpc_security_group_grant_public_access(sg):
"""
Checks an RDS instance for a VPC security groups with ingress permission ipv4 range 0.0.0.0/0 or ipv6 range :::/0
>>> does_vpc_security_group_grant_public_access(
... {'IpPermissions': [{'Ipv6Ranges': [], 'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}]})
True
>>> does_vpc_security_group_grant_public_access(
... {'IpPermissions': [{'Ipv6Ranges': [], 'IpRanges': []}]})
False
>>> does_vpc_security_group_grant_public_access(
... {'IpPermissions': [{'Ipv6Ranges': [], 'IpRanges': [{'CidrIp': '192.168.1.0/0'}]}]})
False
"""
public_ipv4 = any(
ipr["CidrIp"] == "0.0.0.0/0"
for ipp in sg["IpPermissions"]
for ipr in ipp["IpRanges"]
)
public_ipv6 = any(
ipr["CidrIpv6"] == "::/0"
for ipp in sg["IpPermissions"]
for ipr in ipp["Ipv6Ranges"]
)
return public_ipv4 or public_ipv6
[docs]def is_rds_db_instance_encrypted(rds_db_instance):
"""
Checks the RDS instance 'StorageEncrypted' value.
>>> is_rds_db_instance_encrypted({'StorageEncrypted': True})
True
>>> is_rds_db_instance_encrypted({'StorageEncrypted': False})
False
>>> is_rds_db_instance_encrypted({})
Traceback (most recent call last):
...
KeyError: 'StorageEncrypted'
>>> is_rds_db_instance_encrypted(0)
Traceback (most recent call last):
...
TypeError: 'int' object is not subscriptable
>>> is_rds_db_instance_encrypted(None)
Traceback (most recent call last):
...
TypeError: 'NoneType' object is not subscriptable
"""
return bool(rds_db_instance["StorageEncrypted"])
[docs]def is_rds_db_snapshot_encrypted(rds_db_snapshot):
"""
Checks the RDS snapshot 'Encrypted' value.
>>> is_rds_db_snapshot_encrypted({'Encrypted': True})
True
>>> is_rds_db_snapshot_encrypted({'Encrypted': False})
False
>>> is_rds_db_snapshot_encrypted({})
Traceback (most recent call last):
...
KeyError: 'Encrypted'
>>> is_rds_db_snapshot_encrypted(0)
Traceback (most recent call last):
...
TypeError: 'int' object is not subscriptable
>>> is_rds_db_snapshot_encrypted(None)
Traceback (most recent call last):
...
TypeError: 'NoneType' object is not subscriptable
"""
return bool(rds_db_snapshot["Encrypted"])
[docs]def get_db_instance_id(db_instance):
return get_param_id(db_instance, "DBInstanceIdentifier")
[docs]def get_db_snapshot_arn(snapshot):
return get_param_id(snapshot, "DBSnapshotArn")
[docs]def get_db_security_group_arn(sg):
return get_param_id(sg, "DBSecurityGroupArn")
[docs]def get_rds_resource_id(resource):
if isinstance(resource, dict) and "DBInstanceIdentifier" in resource:
return get_db_instance_id(resource)
if isinstance(resource, dict) and "DBSnapshotArn" in resource:
return get_db_snapshot_arn(resource)
if isinstance(resource, dict) and "DBSecurityGroupArn" in resource:
return get_db_security_group_arn(resource)
if isinstance(resource, dict) and "AttributeName" in resource:
return get_param_id(resource, "AttributeName")
if isinstance(resource, list):
if len(resource) == 0:
return "empty"
return get_rds_resource_id(resource[0])
return None
[docs]def rds_db_snapshot_not_too_old(snapshot, snapshot_created_days_ago=365):
"""
Check a rds snapshot is created "snapshot_created_days_ago".
>>> from datetime import datetime
>>> from datetime import timezone
>>> rds_db_snapshot_not_too_old({"SnapshotCreateTime": datetime.now(timezone.utc)})
True
>>> rds_db_snapshot_not_too_old({"SnapshotCreateTime": datetime.fromisoformat("2019-09-11T19:45:22.116+00:00")})
False
"""
create_time = snapshot["SnapshotCreateTime"]
now = datetime.now(tz=create_time.tzinfo)
if (now - create_time).days < snapshot_created_days_ago:
return True
else:
return False