Source code for aws.ec2.helpers

from helpers import get_param_id
from datetime import datetime


[docs]def ip_permission_opens_all_ports(ipp): """ Returns True if an EC2 security group IP permission opens all ports and False otherwise. >>> ip_permission_opens_all_ports({'FromPort': 1, 'ToPort': 65535}) True >>> ip_permission_opens_all_ports({'FromPort': 1, 'ToPort': 965535}) True >>> ip_permission_opens_all_ports({'FromPort': -1, 'ToPort': 20}) True >>> ip_permission_opens_all_ports({'FromPort': 20, 'ToPort': -1}) True >>> ip_permission_opens_all_ports({'ToPort': -1}) False """ if "FromPort" not in ipp or "ToPort" not in ipp: return False from_port, to_port = ipp["FromPort"], ipp["ToPort"] # -1 indicates all ICMP/ICMPv6 codes if from_port == -1 or to_port == -1: return True if ipp["FromPort"] <= 1 and ipp["ToPort"] >= 65535: return True return False
[docs]def ip_permission_cidr_allows_all_ips(ipp): """ Returns True if any IPv4 or IPv6 range for an EC2 security group IP permission opens allows access to or from all IPs and False otherwise. >>> ip_permission_cidr_allows_all_ips({'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}) True >>> ip_permission_cidr_allows_all_ips({'Ipv6Ranges': [{'CidrIpv6': '::/0'}]}) True >>> ip_permission_cidr_allows_all_ips({'IpRanges': [{'CidrIp': '192.0.1.1/8'}]}) False >>> ip_permission_cidr_allows_all_ips({'Ipv6Ranges': [{'CidrIpv6': '192.0.1.1/8'}]}) False >>> ip_permission_cidr_allows_all_ips({}) False """ for ip_range in ipp.get("IpRanges", []): if ip_range.get("CidrIp", "") == "0.0.0.0/0": return True for ip_range in ipp.get("Ipv6Ranges", []): if ip_range.get("CidrIpv6", "") == "::/0": return True return False
[docs]def ip_permission_grants_access_to_group_with_id(ipp, security_group_id): """ Returns True if an EC2 security group IP permission opens access to a security with the given ID and False otherwise. >>> ip_permission_grants_access_to_group_with_id( ... {'UserIdGroupPairs': [{'GroupId': 'test-sgid'}]}, 'test-sgid') True >>> ip_permission_grants_access_to_group_with_id( ... {'UserIdGroupPairs': [{'GroupId': 'test-sgid'}]}, 'not-test-sgid') False >>> ip_permission_grants_access_to_group_with_id({}, 'test-sgid') False """ for user_id_group_pair in ipp.get("UserIdGroupPairs", []): if user_id_group_pair.get("GroupId", None) == security_group_id: return True return False
[docs]def ec2_security_group_opens_all_ports(ec2_security_group): """ Returns True if an ec2 security group includes a permission allowing inbound access on all ports and False otherwise or if protocol is ICMP. >>> ec2_security_group_opens_all_ports( ... {'IpPermissions': [{}, {'FromPort': -1,'ToPort': 65536}]}) True >>> ec2_security_group_opens_all_ports( ... {'IpPermissions': [{}, {'IpProtocol': 'icmp', 'FromPort': -1,'ToPort': -1}]}) False >>> ec2_security_group_opens_all_ports({}) False """ if "IpPermissions" not in ec2_security_group: return False for ipp in ec2_security_group["IpPermissions"]: if "IpProtocol" in ipp and ipp["IpProtocol"] in ["icmp", "icmpv6"]: continue if ip_permission_opens_all_ports(ipp): return True return False
[docs]def ec2_security_group_opens_all_ports_to_self(ec2_security_group): """ Returns True if an ec2 security group includes a permission allowing all IPs inbound access on all ports and False otherwise or if protocol is ICMP. >>> ec2_security_group_opens_all_ports_to_self({ ... 'GroupId': 'test-sgid', ... 'IpPermissions': [ ... {'FromPort': 1, 'ToPort': 65535, 'UserIdGroupPairs': [{'GroupId': 'test-sgid'}]}, ... ]}) True >>> ec2_security_group_opens_all_ports_to_self({ ... 'GroupId': 'test-sgid', ... 'IpPermissions': [ ... {'IpProtocol': "icmp", 'FromPort': -1, 'ToPort': -1, 'UserIdGroupPairs': [{'GroupId': 'test-sgid'}]}, ... ]}) False >>> ec2_security_group_opens_all_ports_to_self({ ... 'GroupId': 'test-sgid', ... 'IpPermissions': [ ... {'UserIdGroupPairs': [{'GroupId': 'test-sgid'}]}, ... ]}) False >>> ec2_security_group_opens_all_ports_to_self({'GroupId': 'test-sgid'}) False >>> ec2_security_group_opens_all_ports_to_self({ ... 'GroupId': 'test-sgid', ... 'IpPermissions': [ ... {'UserIdGroupPairs': []}, ... ]}) False >>> ec2_security_group_opens_all_ports_to_self({}) False >>> ec2_security_group_opens_all_ports_to_self([]) False """ if "GroupId" not in ec2_security_group: return False self_group_id = ec2_security_group["GroupId"] if "IpPermissions" not in ec2_security_group: return False for ipp in ec2_security_group["IpPermissions"]: if "IpProtocol" in ipp and ipp["IpProtocol"] in ["icmp", "icmpv6"]: continue if ip_permission_opens_all_ports( ipp ) and ip_permission_grants_access_to_group_with_id(ipp, self_group_id): return True return False
[docs]def ec2_security_group_opens_all_ports_to_all(ec2_security_group): """ Returns True if an ec2 security group includes a permission allowing all IPs inbound access on all ports and False otherwise or if protocol is ICMP. >>> ec2_security_group_opens_all_ports_to_all({'IpPermissions': [ ... {'FromPort': -1,'ToPort': 65535,'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}, ... ]}) True >>> ec2_security_group_opens_all_ports_to_all({'IpPermissions': [ ... {'FromPort': 1,'ToPort': 65535,'Ipv6Ranges': [{'CidrIpv6': '::/0'}]} ... ]}) True >>> ec2_security_group_opens_all_ports_to_all({'IpPermissions': [ ... {'IpProtocol': 'icmp','FromPort': -1,'ToPort': -1,'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}, ... ]}) False >>> ec2_security_group_opens_all_ports_to_all({'IpPermissions': []}) False >>> ec2_security_group_opens_all_ports_to_all({}) False >>> ec2_security_group_opens_all_ports_to_all([]) False """ if "IpPermissions" not in ec2_security_group: return False for ipp in ec2_security_group["IpPermissions"]: if "IpProtocol" in ipp and ipp["IpProtocol"] in ["icmp", "icmpv6"]: continue if ip_permission_opens_all_ports(ipp) and ip_permission_cidr_allows_all_ips( ipp ): return True return False
[docs]def ec2_security_group_opens_specific_ports_to_all( ec2_security_group, allowed_ports=None ): """ Returns True if an ec2 security group includes a permission allowing all IPs inbound access on specific unsafe ports and False otherwise or if protocol is ICMP. >>> ec2_security_group_opens_specific_ports_to_all({'IpPermissions': [ ... {'FromPort': 22,'ToPort': 22,'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}, ... ]}) True >>> ec2_security_group_opens_specific_ports_to_all({'IpPermissions': [ ... {'FromPort': 234,'ToPort': 432,'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}, ... ]}) True >>> ec2_security_group_opens_specific_ports_to_all({'IpPermissions': [ ... {'FromPort': 80,'ToPort': 80,'IpRanges': [{'CidrIp': '0.0.0.0/0'}]}, ... ]}) False >>> ec2_security_group_opens_specific_ports_to_all({'IpPermissions': []}) False >>> ec2_security_group_opens_specific_ports_to_all({}) False >>> ec2_security_group_opens_specific_ports_to_all([]) False """ if allowed_ports is None: allowed_ports = [] if "IpPermissions" not in ec2_security_group: return False for ipp in ec2_security_group["IpPermissions"]: if "IpProtocol" in ipp and ipp["IpProtocol"] in ["icmp", "icmpv6"]: continue if ip_permission_cidr_allows_all_ips(ipp): if "FromPort" not in ipp or "ToPort" not in ipp: continue from_port, to_port = ipp["FromPort"], ipp["ToPort"] if from_port == to_port and from_port in [80, 443]: continue if from_port == to_port and from_port in allowed_ports: continue return True return False
[docs]def ec2_instance_test_id(ec2_instance): """A getter fn for test ids for EC2 instances""" return ( "{0[InstanceId]}".format(ec2_instance) if isinstance(ec2_instance, dict) else None )
[docs]def ec2_security_group_test_id(ec2_security_group): """A getter fn for test ids for EC2 security groups""" return ( "{0[GroupId]} {0[GroupName]}".format(ec2_security_group) if isinstance(ec2_security_group, dict) else None )
[docs]def ec2_address_id(ec2_address): """Format an Elastic IP address.""" return get_param_id(ec2_address, "PublicIp")
[docs]def is_ebs_volume_encrypted(ebs): """ Checks the EBS volume 'Encrypted' value. >>> is_ebs_volume_encrypted({'Encrypted': True}) True >>> is_ebs_volume_encrypted({'Encrypted': False}) False >>> is_ebs_volume_encrypted({}) Traceback (most recent call last): ... KeyError: 'Encrypted' >>> is_ebs_volume_encrypted(0) Traceback (most recent call last): ... TypeError: 'int' object is not subscriptable >>> is_ebs_volume_encrypted(None) Traceback (most recent call last): ... TypeError: 'NoneType' object is not subscriptable """ return ebs["Encrypted"]
[docs]def is_ebs_volume_piops(ebs): """ Checks if the EBS volume type is provisioned iops >>> is_ebs_volume_piops({'VolumeType': 'io1'}) True >>> is_ebs_volume_piops({'VolumeType': 'standard'}) False >>> is_ebs_volume_piops({}) Traceback (most recent call last): ... KeyError: 'VolumeType' >>> is_ebs_volume_piops(0) Traceback (most recent call last): ... TypeError: 'int' object is not subscriptable >>> is_ebs_volume_piops(None) Traceback (most recent call last): ... TypeError: 'NoneType' object is not subscriptable """ return ebs["VolumeType"].startswith("io")
[docs]def is_ebs_snapshot_public(ebs_snapshot): """ Checks if the EBS snapshot's 'CreateVolumePermissions' attribute allows for public creation. >>> is_ebs_snapshot_public({'CreateVolumePermissions':[{'Group': 'all'}]}) True >>> is_ebs_snapshot_public({'CreateVolumePermissions':[{'Group': ''}]}) False >>> is_ebs_snapshot_public({'CreateVolumePermissions':[{'foo': 'bar'}]}) False >>> is_ebs_snapshot_public({'CreateVolumePermissions':[]}) False >>> is_ebs_snapshot_public({}) False """ for p in ebs_snapshot.get("CreateVolumePermissions", []): if p.get("Group", "") == "all": return True return False
[docs]def ec2_instance_missing_tag_names(ec2_instance, required_tag_names): """ Returns any tag names that are missing from an EC2 Instance. >>> ec2_instance_missing_tag_names({'Tags': [{'Key': 'Name'}]}, frozenset(['Name'])) frozenset() >>> ec2_instance_missing_tag_names({ ... 'InstanceId': 'iid', 'Tags': [{'Key': 'Bar'}]}, frozenset(['Name'])) frozenset({'Name'}) """ tags = ec2_instance.get("Tags", []) instance_tag_names = set(tag["Key"] for tag in tags if "Key" in tag) return required_tag_names - instance_tag_names
[docs]def ebs_volume_attached_to_instance(ebs, volume_created_days_ago=90): """ Check an ebs volume is attached to an instance. The "volume_created_days_ago" parameter allows checking for volumes that were created that many days ago. >>> from datetime import datetime >>> from datetime import timezone >>> ebs_volume_attached_to_instance({"CreateTime": datetime.fromisoformat("2020-09-11T19:45:22.116+00:00"), "State": "in-use"}) True >>> ebs_volume_attached_to_instance({"CreateTime": datetime.fromisoformat("2000-09-11T19:45:22.116+00:00"), "State": "in-use"}) True >>> ebs_volume_attached_to_instance({"CreateTime": datetime.now(timezone.utc), "State": "available"}) True >>> ebs_volume_attached_to_instance({"CreateTime": datetime.fromisoformat("2000-09-11T19:45:22.116+00:00"), "State": "available"}) False """ creation_time = ebs["CreateTime"] now = datetime.now(tz=creation_time.tzinfo) if (now - creation_time).days > volume_created_days_ago: if ebs["State"] == "available": return False return True
[docs]def ebs_snapshot_not_too_old(snapshot, snapshot_started_days_ago=365): """ Check an ebs snapshot is created less than "snapshot_started_days_ago". >>> from datetime import datetime >>> from datetime import timezone >>> from aws.ec2.helpers import ebs_snapshot_not_too_old >>> ebs_snapshot_not_too_old({"StartTime": datetime.now(timezone.utc)}) True >>> ebs_snapshot_not_too_old({"StartTime": datetime.fromisoformat("2019-09-11T19:45:22.116+00:00")}) False """ start_time = snapshot["StartTime"] now = datetime.now(tz=start_time.tzinfo) if (now - start_time).days < snapshot_started_days_ago: return True else: return False