Source code for gcp.client

import warnings

from apiclient.discovery import build as build_service
from apiclient.errors import HttpError

warnings.filterwarnings(
    "ignore", "Your application has authenticated using end user credentials"
)


[docs]def cache_key(project_id, version, product, subproduct, call="list", id_value="na"): """Returns the fullname (directory and filename) for a cached GCP API call. >>> cache_key("123", "v1", "compute", "firewalls") 'pytest_gcp/123/v1/compute/firewalls/list:na.json' >>> cache_key("123", "v1", "compute", "firewalls", "get", "321") 'pytest_gcp/123/v1/compute/firewalls/get:321.json' """ path = "/".join(["pytest_gcp", project_id, version, product, subproduct]) filename = ":".join([call, id_value]) + ".json" return f"{path}/{filename}"
[docs]def get_all_projects_in_folder(folder_id=None): if folder_id is None: return if not folder_id.startswith("folders/"): folder_id = "folders/" + folder_id crm = build_service("cloudresourcemanager", "v2") allFolders = get_all_folders_in_folder(crm, folder_id) crm.close() allFolders.append(folder_id) project_crm = build_service("cloudresourcemanager", "v1") allProjects = [] for folder in allFolders: projects = ( project_crm.projects().list(filter="parent.id:" + folder[8:]).execute() ) if projects: allProjects.append(projects) project_crm.close() # flatten and return return sum([p["projects"] for p in allProjects], [])
[docs]def get_all_folders_in_folder(crm, folder_id=None): allFolders = [] if folder_id is None: return allFolders folders = crm.folders().list(parent=folder_id).execute() if folders: for folder in folders["folders"]: allFolders.extend(get_all_folders_in_folder(crm, folder["name"])) else: allFolders = [folder_id] return allFolders
[docs]class GCPClient: def __init__(self, project_id, folder_id, cache, debug_calls, debug_cache, offline): self.cache = cache self.debug_calls = debug_calls self.debug_cache = debug_cache self.offline = offline self.project_list = [] if project_id is not None: self.project_list = [project_id] if folder_id is not None: self.project_list = [ p["projectId"] for p in get_all_projects_in_folder(folder_id) ]
[docs] def get_project_iam_policies(self): if self.offline: return [] service = self._service("cloudresourcemanager") policies = [] for project_id in self.project_list: try: resp = ( service.projects() .getIamPolicy(resource=project_id, body={}) .execute() ) policies.append(resp) except HttpError as e: if "has not been used in project" in e._get_reason(): continue raise e return policies
[docs] def get_project_container_config(self): if self.offline: return {} service = self._service("container") for project_id in self.project_list: try: # TODO : We may want to correlate the zone here with the corresponding clusters zone. request = ( service.projects() .locations() .getServerConfig( name="projects/{}/locations/us-west1".format(project_id) ) ) resp = request.execute() except HttpError as e: # This will be thrown if an API is disabled, so we will try the next project id if "has not been used in project" in e._get_reason(): continue raise e return resp return {}
[docs] def get( self, project_id, product, subproduct, id_key, id_value, version="v1", call_kwargs=None, ): if self.offline: result = {} else: result = self._get( project_id, product, subproduct, id_key, id_value, version ) return result
def _get( self, project_id, product, subproduct, id_key, id_value, version="v1", call_kwargs=None, ): ckey = cache_key(project_id, version, product, subproduct, "get", id_value) cached_result = self.cache.get(ckey, None) if cached_result is not None: print("found cached value for", ckey) return cached_result if call_kwargs is None: call_kwargs = {} call_kwargs["projectId"] = project_id call_kwargs[id_key] = id_value service = self._service(product, version) api_entity = getattr(service, subproduct.split(".")[0])() for entity in subproduct.split(".")[1:]: api_entity = getattr(api_entity, entity)() try: result = api_entity.get(**call_kwargs).execute() except HttpError as e: # This will be thrown if an API is disabled. if "has not been used in project" in e._get_reason(): return {} raise e result["projectId"] = project_id if self.debug_cache: print("setting cache value for", ckey) self.cache.set(ckey, result) return result
[docs] def list( self, product, subproduct, version="v1", results_key="items", call_kwargs=None ): """Public list func. See _list func docstring for more info""" if self.offline: results = [] else: if call_kwargs is not None: return list( self._list(product, subproduct, version, results_key, call_kwargs) ) results = [] for project_id in self.project_list: call_kwargs = {"project": project_id} results += list( self._list(product, subproduct, version, results_key, call_kwargs) ) return results
def _list( self, product, subproduct, version="v1", results_key="items", call_kwargs=None ): """ Internal function for calling .list() on some service's resource. Supports debug printing and caching of the response. If a service supports "zones", then loop through each zone to collect all resources from that service. An example of this is collecting all compute instances (compute.instances().list()). """ project_id = "-" if "project" in call_kwargs: project_id = call_kwargs["project"] if "projectId" in call_kwargs: project_id = call_kwargs["projectId"] call_id = project_id if call_kwargs is not None: call_id = "-".join(sum([x for x in call_kwargs.items()], ())) ckey = cache_key(call_id, version, product, subproduct) cached_result = self.cache.get(ckey, None) if cached_result is not None: print("found cached value for", ckey) return cached_result service = self._service(product, version) api_entity = getattr(service, subproduct.split(".")[0])() for entity in subproduct.split(".")[1:]: api_entity = getattr(api_entity, entity)() if self.debug_calls: print( "calling {}.{} for project {}".format(product, subproduct, project_id) ) if self._zone_aware(product, subproduct): results = [] for zone in self._list_zones(project_id): results = sum( [results], self._list_all_items( api_entity, {**call_kwargs, "zone": zone}, results_key ), ) else: results = self._list_all_items(api_entity, call_kwargs, results_key) # Append the project id to each resource for use in test metadata results = [{"projectId": project_id, **result} for result in results] if self.debug_cache: print("setting cache value for", ckey) self.cache.set(ckey, results) return results def _list_all_items(self, api_entity, call_kwargs, results_key): """Internal helper for dealing with pagination""" request = api_entity.list(**call_kwargs) items = [] while request is not None: try: resp = request.execute() except HttpError as e: # This will be thrown if an API is disabled. if "has not been used in project" in e._get_reason(): return [] raise e items = sum([items], resp.get(results_key, [])) try: request = api_entity.list_next(request, resp) except AttributeError: request = None return items def _service(self, product, version="v1"): """Internal helper around google client lib's build service func""" return build_service(product, version) def _zone_aware(self, product, subproduct): """ Internal helper for whether or not a product and subproduct take zones into account. Differing heavily from AWS, most GCP services do not take zones into account with API calls. """ if product == "compute" and subproduct == "instances": return True return False def _list_zones(self, project_id): """Internal helper for listing all zones""" try: response = ( self._service("compute") .zones() .list(project=project_id) .execute()["items"] ) except HttpError as e: # This will be thrown if an API is disabled. if "has not been used in project" in e._get_reason(): return [] raise e return [result["name"] for result in response]