mozilla_schema_generator.generic_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import json 8import logging 9import os 10import pathlib 11import re 12from json.decoder import JSONDecodeError 13from typing import Dict, List 14 15import requests 16from requests.adapters import HTTPAdapter 17from urllib3.util.retry import Retry 18 19from .config import Config 20from .probes import Probe 21from .schema import Schema, SchemaException 22 23logger = logging.getLogger(__name__) 24 25_http_session = requests.Session() 26_http_session.mount( 27 "https://", 28 HTTPAdapter( 29 max_retries=Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504]) 30 ), 31) 32 33 34class GenericPing(object): 35 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 36 default_encoding = "utf-8" 37 default_max_size = 12900 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 38 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 39 40 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 41 self.branch_name = mps_branch 42 self.schema_url = schema_url.format(branch=self.branch_name) 43 self.env_url = env_url.format(branch=self.branch_name) 44 self.probes_url = probes_url 45 46 def get_schema(self) -> Schema: 47 return Schema(self._get_json(self.schema_url)) 48 49 def get_env(self) -> Schema: 50 return Schema(self._get_json(self.env_url)) 51 52 def get_probes(self) -> List[Probe]: 53 return [ 54 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 55 ] 56 57 def generate_schema( 58 self, config: Config, *, max_size: int = None 59 ) -> Dict[str, Schema]: 60 schema = self.get_schema() 61 env = self.get_env() 62 63 probes = self.get_probes() 64 65 if max_size is None: 66 max_size = self.default_max_size 67 68 if env.get_size() >= max_size: 69 raise SchemaException( 70 "Environment must be smaller than max_size {}".format(max_size) 71 ) 72 73 if schema.get_size() >= max_size: 74 raise SchemaException( 75 "Schema must be smaller than max_size {}".format(max_size) 76 ) 77 78 schemas = {config.name: self.make_schema(schema, probes, config, max_size)} 79 80 if any(schema.get_size() > max_size for schema in schemas.values()): 81 raise SchemaException( 82 "Schema must be smaller or equal max_size {}".format(max_size) 83 ) 84 85 return schemas 86 87 @staticmethod 88 def make_schema( 89 env: Schema, probes: List[Probe], config: Config, max_size: int 90 ) -> Schema: 91 """ 92 Fill in probes based on the config, and keep only the env 93 parts of the schema. Throw away everything else. 94 """ 95 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 96 97 schema = env.clone() 98 for schema_key, probe in schema_elements: 99 try: 100 addtlProps = env.get(schema_key + ("additionalProperties",)) 101 except KeyError: 102 addtlProps = None 103 104 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 105 106 schema.set_schema_elem( 107 schema_key + ("properties", probe.name), probe_schema.schema 108 ) 109 110 # Remove all additionalProperties (#22) 111 for key in config.get_match_keys(): 112 try: 113 schema.delete_group_from_schema( 114 key + ("propertyNames",), propagate=False 115 ) 116 except KeyError: 117 pass 118 119 try: 120 schema.delete_group_from_schema( 121 key + ("additionalProperties",), propagate=True 122 ) 123 except KeyError: 124 pass 125 126 return schema 127 128 @staticmethod 129 def _slugify(text: str) -> str: 130 """Get a valid slug from an arbitrary string""" 131 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 132 return re.sub(r"[-\s]+", "-", value) 133 134 @staticmethod 135 def _present_in_cache(url: str) -> bool: 136 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 137 138 @staticmethod 139 def _add_to_cache(url: str, val: str): 140 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 141 142 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 143 # protect against multiple writers to the cache: 144 # https://github.com/mozilla/mozilla-schema-generator/pull/210 145 try: 146 with open(cache_file, "x") as f: 147 f.write(val) 148 except FileExistsError: 149 pass 150 151 @staticmethod 152 def _retrieve_from_cache(url: str) -> str: 153 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 154 155 @staticmethod 156 def _get_json_str(url: str) -> str: 157 if GenericPing._present_in_cache(url): 158 return GenericPing._retrieve_from_cache(url) 159 160 headers = {} 161 if url.startswith(GenericPing.probe_info_base_url): 162 # For probe-info-service requests, set the cache-control header to force 163 # google cloud cdn to bypass the cache 164 headers["Cache-Control"] = "no-cache" 165 166 r = _http_session.get(url, headers=headers) 167 r.raise_for_status() 168 169 final_json = r.content.decode(r.encoding or GenericPing.default_encoding) 170 GenericPing._add_to_cache(url, final_json) 171 172 return final_json 173 174 @staticmethod 175 def _get_json(url: str) -> dict: 176 try: 177 return json.loads(GenericPing._get_json_str(url)) 178 except JSONDecodeError: 179 logging.error("Unable to process JSON for url: %s", url) 180 raise
logger =
<Logger mozilla_schema_generator.generic_ping (WARNING)>
class
GenericPing:
35class GenericPing(object): 36 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 37 default_encoding = "utf-8" 38 default_max_size = 12900 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 39 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 40 41 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 42 self.branch_name = mps_branch 43 self.schema_url = schema_url.format(branch=self.branch_name) 44 self.env_url = env_url.format(branch=self.branch_name) 45 self.probes_url = probes_url 46 47 def get_schema(self) -> Schema: 48 return Schema(self._get_json(self.schema_url)) 49 50 def get_env(self) -> Schema: 51 return Schema(self._get_json(self.env_url)) 52 53 def get_probes(self) -> List[Probe]: 54 return [ 55 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 56 ] 57 58 def generate_schema( 59 self, config: Config, *, max_size: int = None 60 ) -> Dict[str, Schema]: 61 schema = self.get_schema() 62 env = self.get_env() 63 64 probes = self.get_probes() 65 66 if max_size is None: 67 max_size = self.default_max_size 68 69 if env.get_size() >= max_size: 70 raise SchemaException( 71 "Environment must be smaller than max_size {}".format(max_size) 72 ) 73 74 if schema.get_size() >= max_size: 75 raise SchemaException( 76 "Schema must be smaller than max_size {}".format(max_size) 77 ) 78 79 schemas = {config.name: self.make_schema(schema, probes, config, max_size)} 80 81 if any(schema.get_size() > max_size for schema in schemas.values()): 82 raise SchemaException( 83 "Schema must be smaller or equal max_size {}".format(max_size) 84 ) 85 86 return schemas 87 88 @staticmethod 89 def make_schema( 90 env: Schema, probes: List[Probe], config: Config, max_size: int 91 ) -> Schema: 92 """ 93 Fill in probes based on the config, and keep only the env 94 parts of the schema. Throw away everything else. 95 """ 96 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 97 98 schema = env.clone() 99 for schema_key, probe in schema_elements: 100 try: 101 addtlProps = env.get(schema_key + ("additionalProperties",)) 102 except KeyError: 103 addtlProps = None 104 105 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 106 107 schema.set_schema_elem( 108 schema_key + ("properties", probe.name), probe_schema.schema 109 ) 110 111 # Remove all additionalProperties (#22) 112 for key in config.get_match_keys(): 113 try: 114 schema.delete_group_from_schema( 115 key + ("propertyNames",), propagate=False 116 ) 117 except KeyError: 118 pass 119 120 try: 121 schema.delete_group_from_schema( 122 key + ("additionalProperties",), propagate=True 123 ) 124 except KeyError: 125 pass 126 127 return schema 128 129 @staticmethod 130 def _slugify(text: str) -> str: 131 """Get a valid slug from an arbitrary string""" 132 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 133 return re.sub(r"[-\s]+", "-", value) 134 135 @staticmethod 136 def _present_in_cache(url: str) -> bool: 137 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 138 139 @staticmethod 140 def _add_to_cache(url: str, val: str): 141 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 142 143 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 144 # protect against multiple writers to the cache: 145 # https://github.com/mozilla/mozilla-schema-generator/pull/210 146 try: 147 with open(cache_file, "x") as f: 148 f.write(val) 149 except FileExistsError: 150 pass 151 152 @staticmethod 153 def _retrieve_from_cache(url: str) -> str: 154 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 155 156 @staticmethod 157 def _get_json_str(url: str) -> str: 158 if GenericPing._present_in_cache(url): 159 return GenericPing._retrieve_from_cache(url) 160 161 headers = {} 162 if url.startswith(GenericPing.probe_info_base_url): 163 # For probe-info-service requests, set the cache-control header to force 164 # google cloud cdn to bypass the cache 165 headers["Cache-Control"] = "no-cache" 166 167 r = _http_session.get(url, headers=headers) 168 r.raise_for_status() 169 170 final_json = r.content.decode(r.encoding or GenericPing.default_encoding) 171 GenericPing._add_to_cache(url, final_json) 172 173 return final_json 174 175 @staticmethod 176 def _get_json(url: str) -> dict: 177 try: 178 return json.loads(GenericPing._get_json_str(url)) 179 except JSONDecodeError: 180 logging.error("Unable to process JSON for url: %s", url) 181 raise
def
generate_schema( self, config: mozilla_schema_generator.config.Config, *, max_size: int = None) -> Dict[str, mozilla_schema_generator.schema.Schema]:
58 def generate_schema( 59 self, config: Config, *, max_size: int = None 60 ) -> Dict[str, Schema]: 61 schema = self.get_schema() 62 env = self.get_env() 63 64 probes = self.get_probes() 65 66 if max_size is None: 67 max_size = self.default_max_size 68 69 if env.get_size() >= max_size: 70 raise SchemaException( 71 "Environment must be smaller than max_size {}".format(max_size) 72 ) 73 74 if schema.get_size() >= max_size: 75 raise SchemaException( 76 "Schema must be smaller than max_size {}".format(max_size) 77 ) 78 79 schemas = {config.name: self.make_schema(schema, probes, config, max_size)} 80 81 if any(schema.get_size() > max_size for schema in schemas.values()): 82 raise SchemaException( 83 "Schema must be smaller or equal max_size {}".format(max_size) 84 ) 85 86 return schemas
@staticmethod
def
make_schema( env: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], config: mozilla_schema_generator.config.Config, max_size: int) -> mozilla_schema_generator.schema.Schema:
88 @staticmethod 89 def make_schema( 90 env: Schema, probes: List[Probe], config: Config, max_size: int 91 ) -> Schema: 92 """ 93 Fill in probes based on the config, and keep only the env 94 parts of the schema. Throw away everything else. 95 """ 96 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 97 98 schema = env.clone() 99 for schema_key, probe in schema_elements: 100 try: 101 addtlProps = env.get(schema_key + ("additionalProperties",)) 102 except KeyError: 103 addtlProps = None 104 105 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 106 107 schema.set_schema_elem( 108 schema_key + ("properties", probe.name), probe_schema.schema 109 ) 110 111 # Remove all additionalProperties (#22) 112 for key in config.get_match_keys(): 113 try: 114 schema.delete_group_from_schema( 115 key + ("propertyNames",), propagate=False 116 ) 117 except KeyError: 118 pass 119 120 try: 121 schema.delete_group_from_schema( 122 key + ("additionalProperties",), propagate=True 123 ) 124 except KeyError: 125 pass 126 127 return schema
Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.