mozilla_schema_generator.generic_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import datetime 8import json 9import logging 10import os 11import pathlib 12import re 13from json.decoder import JSONDecodeError 14from typing import Dict, List 15 16import requests 17 18from .config import Config 19from .probes import Probe 20from .schema import Schema, SchemaException 21 22logger = logging.getLogger(__name__) 23 24 25class GenericPing(object): 26 27 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 28 default_encoding = "utf-8" 29 default_max_size = 12000 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 30 extra_schema_key = "extra" 31 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 32 33 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 34 self.schema_url = schema_url.format(branch=mps_branch) 35 self.env_url = env_url.format(branch=mps_branch) 36 self.probes_url = probes_url 37 38 def get_schema(self) -> Schema: 39 return Schema(self._get_json(self.schema_url)) 40 41 def get_env(self) -> Schema: 42 return Schema(self._get_json(self.env_url)) 43 44 def get_probes(self) -> List[Probe]: 45 return [ 46 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 47 ] 48 49 def generate_schema( 50 self, config: Config, *, split: bool = None, max_size: int = None 51 ) -> Dict[str, List[Schema]]: 52 schema = self.get_schema() 53 env = self.get_env() 54 55 probes = self.get_probes() 56 57 if split is None: 58 split = False 59 if max_size is None: 60 max_size = self.default_max_size 61 62 if env.get_size() >= max_size: 63 raise SchemaException( 64 "Environment must be smaller than max_size {}".format(max_size) 65 ) 66 67 # TODO: Allow splits of extra schema, if necessary 68 if schema.get_size() >= max_size: 69 raise SchemaException( 70 "Schema must be smaller than max_size {}".format(max_size) 71 ) 72 73 if split: 74 configs = config.split() 75 else: 76 configs = [config] 77 env = schema 78 79 schemas = { 80 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 81 } 82 83 if split: 84 schemas[self.extra_schema_key] = self.make_extra_schema( 85 schema, probes, configs 86 ) 87 88 if any( 89 schema.get_size() > max_size for _, s in schemas.items() for schema in s 90 ): 91 raise SchemaException( 92 "Schema must be smaller or equal max_size {}".format(max_size) 93 ) 94 95 return schemas 96 97 @staticmethod 98 def make_schemas( 99 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 100 ) -> List[Schema]: 101 """ 102 Fill in probes based on the config, and keep only the env 103 parts of the schema. Throw away everything else. 104 """ 105 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 106 schemas = [] 107 108 # TODO: Should env be checked to be a subset of schema? 109 final_schema = env.clone() 110 for schema_key, probe in schema_elements: 111 try: 112 addtlProps = env.get(schema_key + ("additionalProperties",)) 113 except KeyError: 114 addtlProps = None 115 116 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 117 118 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 119 schemas.append(final_schema) 120 final_schema = env.clone() 121 122 final_schema.set_schema_elem( 123 schema_key + ("properties", probe.name), probe_schema.schema 124 ) 125 126 # Remove all additionalProperties (#22) 127 schemas.append(final_schema) 128 for s in schemas: 129 for key in config.get_match_keys(): 130 try: 131 s.delete_group_from_schema( 132 key + ("propertyNames",), propagate=False 133 ) 134 except KeyError: 135 pass 136 137 try: 138 s.delete_group_from_schema( 139 key + ("additionalProperties",), propagate=True 140 ) 141 except KeyError: 142 pass 143 144 return schemas 145 146 @staticmethod 147 def make_extra_schema( 148 schema: Schema, probes: List[Probe], configs: List[Config] 149 ) -> List[Schema]: 150 """ 151 Given the list of probes and the configuration, 152 return the schema that has everything but those sections that we 153 filled in already. 154 155 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 156 """ 157 schema = schema.clone() 158 159 # Get the schema elements we already filled in for the other tables 160 schema_elements = [ 161 schema_key 162 for _config in configs 163 for schema_key, _ in _config.get_schema_elements(probes) 164 ] 165 166 # Delete those from the schema 167 for schema_key in schema_elements: 168 schema.delete_group_from_schema(schema_key) 169 170 return [schema] 171 172 @staticmethod 173 def _slugify(text: str) -> str: 174 """Get a valid slug from an arbitrary string""" 175 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 176 return re.sub(r"[-\s]+", "-", value) 177 178 @staticmethod 179 def _present_in_cache(url: str) -> bool: 180 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 181 182 @staticmethod 183 def _add_to_cache(url: str, val: str): 184 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 185 186 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 187 # protect against multiple writers to the cache: 188 # https://github.com/mozilla/mozilla-schema-generator/pull/210 189 try: 190 with open(cache_file, "x") as f: 191 f.write(val) 192 except FileExistsError: 193 pass 194 195 @staticmethod 196 def _retrieve_from_cache(url: str) -> str: 197 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 198 199 @staticmethod 200 def _get_json_str(url: str) -> str: 201 no_param_url = re.sub(r"\?.*", "", url) 202 203 if GenericPing._present_in_cache(no_param_url): 204 return GenericPing._retrieve_from_cache(no_param_url) 205 206 r = requests.get(url, stream=True) 207 r.raise_for_status() 208 209 json_bytes = b"" 210 211 try: 212 for chunk in r.iter_content(chunk_size=1024): 213 if chunk: 214 json_bytes += chunk 215 except ValueError as e: 216 raise ValueError("Could not parse " + url) from e 217 218 final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding) 219 GenericPing._add_to_cache(no_param_url, final_json) 220 221 return final_json 222 223 @staticmethod 224 def _get_json(url: str) -> dict: 225 if url.startswith(GenericPing.probe_info_base_url): 226 # For probe-info-service requests, add 227 # random query param to force cloudfront 228 # to bypass the cache 229 url += f"?t={datetime.datetime.utcnow().isoformat()}" 230 try: 231 return json.loads(GenericPing._get_json_str(url)) 232 except JSONDecodeError: 233 logging.error("Unable to process JSON for url: %s", url) 234 raise
class
GenericPing:
26class GenericPing(object): 27 28 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 29 default_encoding = "utf-8" 30 default_max_size = 12000 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 31 extra_schema_key = "extra" 32 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 33 34 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 35 self.schema_url = schema_url.format(branch=mps_branch) 36 self.env_url = env_url.format(branch=mps_branch) 37 self.probes_url = probes_url 38 39 def get_schema(self) -> Schema: 40 return Schema(self._get_json(self.schema_url)) 41 42 def get_env(self) -> Schema: 43 return Schema(self._get_json(self.env_url)) 44 45 def get_probes(self) -> List[Probe]: 46 return [ 47 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 48 ] 49 50 def generate_schema( 51 self, config: Config, *, split: bool = None, max_size: int = None 52 ) -> Dict[str, List[Schema]]: 53 schema = self.get_schema() 54 env = self.get_env() 55 56 probes = self.get_probes() 57 58 if split is None: 59 split = False 60 if max_size is None: 61 max_size = self.default_max_size 62 63 if env.get_size() >= max_size: 64 raise SchemaException( 65 "Environment must be smaller than max_size {}".format(max_size) 66 ) 67 68 # TODO: Allow splits of extra schema, if necessary 69 if schema.get_size() >= max_size: 70 raise SchemaException( 71 "Schema must be smaller than max_size {}".format(max_size) 72 ) 73 74 if split: 75 configs = config.split() 76 else: 77 configs = [config] 78 env = schema 79 80 schemas = { 81 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 82 } 83 84 if split: 85 schemas[self.extra_schema_key] = self.make_extra_schema( 86 schema, probes, configs 87 ) 88 89 if any( 90 schema.get_size() > max_size for _, s in schemas.items() for schema in s 91 ): 92 raise SchemaException( 93 "Schema must be smaller or equal max_size {}".format(max_size) 94 ) 95 96 return schemas 97 98 @staticmethod 99 def make_schemas( 100 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 101 ) -> List[Schema]: 102 """ 103 Fill in probes based on the config, and keep only the env 104 parts of the schema. Throw away everything else. 105 """ 106 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 107 schemas = [] 108 109 # TODO: Should env be checked to be a subset of schema? 110 final_schema = env.clone() 111 for schema_key, probe in schema_elements: 112 try: 113 addtlProps = env.get(schema_key + ("additionalProperties",)) 114 except KeyError: 115 addtlProps = None 116 117 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 118 119 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 120 schemas.append(final_schema) 121 final_schema = env.clone() 122 123 final_schema.set_schema_elem( 124 schema_key + ("properties", probe.name), probe_schema.schema 125 ) 126 127 # Remove all additionalProperties (#22) 128 schemas.append(final_schema) 129 for s in schemas: 130 for key in config.get_match_keys(): 131 try: 132 s.delete_group_from_schema( 133 key + ("propertyNames",), propagate=False 134 ) 135 except KeyError: 136 pass 137 138 try: 139 s.delete_group_from_schema( 140 key + ("additionalProperties",), propagate=True 141 ) 142 except KeyError: 143 pass 144 145 return schemas 146 147 @staticmethod 148 def make_extra_schema( 149 schema: Schema, probes: List[Probe], configs: List[Config] 150 ) -> List[Schema]: 151 """ 152 Given the list of probes and the configuration, 153 return the schema that has everything but those sections that we 154 filled in already. 155 156 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 157 """ 158 schema = schema.clone() 159 160 # Get the schema elements we already filled in for the other tables 161 schema_elements = [ 162 schema_key 163 for _config in configs 164 for schema_key, _ in _config.get_schema_elements(probes) 165 ] 166 167 # Delete those from the schema 168 for schema_key in schema_elements: 169 schema.delete_group_from_schema(schema_key) 170 171 return [schema] 172 173 @staticmethod 174 def _slugify(text: str) -> str: 175 """Get a valid slug from an arbitrary string""" 176 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 177 return re.sub(r"[-\s]+", "-", value) 178 179 @staticmethod 180 def _present_in_cache(url: str) -> bool: 181 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 182 183 @staticmethod 184 def _add_to_cache(url: str, val: str): 185 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 186 187 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 188 # protect against multiple writers to the cache: 189 # https://github.com/mozilla/mozilla-schema-generator/pull/210 190 try: 191 with open(cache_file, "x") as f: 192 f.write(val) 193 except FileExistsError: 194 pass 195 196 @staticmethod 197 def _retrieve_from_cache(url: str) -> str: 198 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 199 200 @staticmethod 201 def _get_json_str(url: str) -> str: 202 no_param_url = re.sub(r"\?.*", "", url) 203 204 if GenericPing._present_in_cache(no_param_url): 205 return GenericPing._retrieve_from_cache(no_param_url) 206 207 r = requests.get(url, stream=True) 208 r.raise_for_status() 209 210 json_bytes = b"" 211 212 try: 213 for chunk in r.iter_content(chunk_size=1024): 214 if chunk: 215 json_bytes += chunk 216 except ValueError as e: 217 raise ValueError("Could not parse " + url) from e 218 219 final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding) 220 GenericPing._add_to_cache(no_param_url, final_json) 221 222 return final_json 223 224 @staticmethod 225 def _get_json(url: str) -> dict: 226 if url.startswith(GenericPing.probe_info_base_url): 227 # For probe-info-service requests, add 228 # random query param to force cloudfront 229 # to bypass the cache 230 url += f"?t={datetime.datetime.utcnow().isoformat()}" 231 try: 232 return json.loads(GenericPing._get_json_str(url)) 233 except JSONDecodeError: 234 logging.error("Unable to process JSON for url: %s", url) 235 raise
def
generate_schema( self, config: mozilla_schema_generator.config.Config, *, split: bool = None, max_size: int = None) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
50 def generate_schema( 51 self, config: Config, *, split: bool = None, max_size: int = None 52 ) -> Dict[str, List[Schema]]: 53 schema = self.get_schema() 54 env = self.get_env() 55 56 probes = self.get_probes() 57 58 if split is None: 59 split = False 60 if max_size is None: 61 max_size = self.default_max_size 62 63 if env.get_size() >= max_size: 64 raise SchemaException( 65 "Environment must be smaller than max_size {}".format(max_size) 66 ) 67 68 # TODO: Allow splits of extra schema, if necessary 69 if schema.get_size() >= max_size: 70 raise SchemaException( 71 "Schema must be smaller than max_size {}".format(max_size) 72 ) 73 74 if split: 75 configs = config.split() 76 else: 77 configs = [config] 78 env = schema 79 80 schemas = { 81 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 82 } 83 84 if split: 85 schemas[self.extra_schema_key] = self.make_extra_schema( 86 schema, probes, configs 87 ) 88 89 if any( 90 schema.get_size() > max_size for _, s in schemas.items() for schema in s 91 ): 92 raise SchemaException( 93 "Schema must be smaller or equal max_size {}".format(max_size) 94 ) 95 96 return schemas
@staticmethod
def
make_schemas( env: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], config: mozilla_schema_generator.config.Config, split: bool, max_size: int) -> List[mozilla_schema_generator.schema.Schema]:
98 @staticmethod 99 def make_schemas( 100 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 101 ) -> List[Schema]: 102 """ 103 Fill in probes based on the config, and keep only the env 104 parts of the schema. Throw away everything else. 105 """ 106 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 107 schemas = [] 108 109 # TODO: Should env be checked to be a subset of schema? 110 final_schema = env.clone() 111 for schema_key, probe in schema_elements: 112 try: 113 addtlProps = env.get(schema_key + ("additionalProperties",)) 114 except KeyError: 115 addtlProps = None 116 117 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 118 119 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 120 schemas.append(final_schema) 121 final_schema = env.clone() 122 123 final_schema.set_schema_elem( 124 schema_key + ("properties", probe.name), probe_schema.schema 125 ) 126 127 # Remove all additionalProperties (#22) 128 schemas.append(final_schema) 129 for s in schemas: 130 for key in config.get_match_keys(): 131 try: 132 s.delete_group_from_schema( 133 key + ("propertyNames",), propagate=False 134 ) 135 except KeyError: 136 pass 137 138 try: 139 s.delete_group_from_schema( 140 key + ("additionalProperties",), propagate=True 141 ) 142 except KeyError: 143 pass 144 145 return schemas
Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.
@staticmethod
def
make_extra_schema( schema: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], configs: List[mozilla_schema_generator.config.Config]) -> List[mozilla_schema_generator.schema.Schema]:
147 @staticmethod 148 def make_extra_schema( 149 schema: Schema, probes: List[Probe], configs: List[Config] 150 ) -> List[Schema]: 151 """ 152 Given the list of probes and the configuration, 153 return the schema that has everything but those sections that we 154 filled in already. 155 156 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 157 """ 158 schema = schema.clone() 159 160 # Get the schema elements we already filled in for the other tables 161 schema_elements = [ 162 schema_key 163 for _config in configs 164 for schema_key, _ in _config.get_schema_elements(probes) 165 ] 166 167 # Delete those from the schema 168 for schema_key in schema_elements: 169 schema.delete_group_from_schema(schema_key) 170 171 return [schema]
Given the list of probes and the configuration, return the schema that has everything but those sections that we filled in already.
TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)