mozilla_schema_generator.generic_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import json 8import logging 9import os 10import pathlib 11import re 12from json.decoder import JSONDecodeError 13from typing import Dict, List 14 15import requests 16 17from .config import Config 18from .probes import Probe 19from .schema import Schema, SchemaException 20 21logger = logging.getLogger(__name__) 22 23 24class GenericPing(object): 25 26 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 27 default_encoding = "utf-8" 28 default_max_size = 12000 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 29 extra_schema_key = "extra" 30 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 31 32 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 33 self.schema_url = schema_url.format(branch=mps_branch) 34 self.env_url = env_url.format(branch=mps_branch) 35 self.probes_url = probes_url 36 37 def get_schema(self) -> Schema: 38 return Schema(self._get_json(self.schema_url)) 39 40 def get_env(self) -> Schema: 41 return Schema(self._get_json(self.env_url)) 42 43 def get_probes(self) -> List[Probe]: 44 return [ 45 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 46 ] 47 48 def generate_schema( 49 self, config: Config, *, split: bool = None, max_size: int = None 50 ) -> Dict[str, List[Schema]]: 51 schema = self.get_schema() 52 env = self.get_env() 53 54 probes = self.get_probes() 55 56 if split is None: 57 split = False 58 if max_size is None: 59 max_size = self.default_max_size 60 61 if env.get_size() >= max_size: 62 raise SchemaException( 63 "Environment must be smaller than max_size {}".format(max_size) 64 ) 65 66 # TODO: Allow splits of extra schema, if necessary 67 if schema.get_size() >= max_size: 68 raise SchemaException( 69 "Schema must be smaller than max_size {}".format(max_size) 70 ) 71 72 if split: 73 configs = config.split() 74 else: 75 configs = [config] 76 env = schema 77 78 schemas = { 79 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 80 } 81 82 if split: 83 schemas[self.extra_schema_key] = self.make_extra_schema( 84 schema, probes, configs 85 ) 86 87 if any( 88 schema.get_size() > max_size for _, s in schemas.items() for schema in s 89 ): 90 raise SchemaException( 91 "Schema must be smaller or equal max_size {}".format(max_size) 92 ) 93 94 return schemas 95 96 @staticmethod 97 def make_schemas( 98 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 99 ) -> List[Schema]: 100 """ 101 Fill in probes based on the config, and keep only the env 102 parts of the schema. Throw away everything else. 103 """ 104 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 105 schemas = [] 106 107 # TODO: Should env be checked to be a subset of schema? 108 final_schema = env.clone() 109 for schema_key, probe in schema_elements: 110 try: 111 addtlProps = env.get(schema_key + ("additionalProperties",)) 112 except KeyError: 113 addtlProps = None 114 115 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 116 117 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 118 schemas.append(final_schema) 119 final_schema = env.clone() 120 121 final_schema.set_schema_elem( 122 schema_key + ("properties", probe.name), probe_schema.schema 123 ) 124 125 # Remove all additionalProperties (#22) 126 schemas.append(final_schema) 127 for s in schemas: 128 for key in config.get_match_keys(): 129 try: 130 s.delete_group_from_schema( 131 key + ("propertyNames",), propagate=False 132 ) 133 except KeyError: 134 pass 135 136 try: 137 s.delete_group_from_schema( 138 key + ("additionalProperties",), propagate=True 139 ) 140 except KeyError: 141 pass 142 143 return schemas 144 145 @staticmethod 146 def make_extra_schema( 147 schema: Schema, probes: List[Probe], configs: List[Config] 148 ) -> List[Schema]: 149 """ 150 Given the list of probes and the configuration, 151 return the schema that has everything but those sections that we 152 filled in already. 153 154 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 155 """ 156 schema = schema.clone() 157 158 # Get the schema elements we already filled in for the other tables 159 schema_elements = [ 160 schema_key 161 for _config in configs 162 for schema_key, _ in _config.get_schema_elements(probes) 163 ] 164 165 # Delete those from the schema 166 for schema_key in schema_elements: 167 schema.delete_group_from_schema(schema_key) 168 169 return [schema] 170 171 @staticmethod 172 def _slugify(text: str) -> str: 173 """Get a valid slug from an arbitrary string""" 174 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 175 return re.sub(r"[-\s]+", "-", value) 176 177 @staticmethod 178 def _present_in_cache(url: str) -> bool: 179 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 180 181 @staticmethod 182 def _add_to_cache(url: str, val: str): 183 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 184 185 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 186 # protect against multiple writers to the cache: 187 # https://github.com/mozilla/mozilla-schema-generator/pull/210 188 try: 189 with open(cache_file, "x") as f: 190 f.write(val) 191 except FileExistsError: 192 pass 193 194 @staticmethod 195 def _retrieve_from_cache(url: str) -> str: 196 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 197 198 @staticmethod 199 def _get_json_str(url: str) -> str: 200 if GenericPing._present_in_cache(url): 201 return GenericPing._retrieve_from_cache(url) 202 203 headers = {} 204 if url.startswith(GenericPing.probe_info_base_url): 205 # For probe-info-service requests, set the cache-control header to force 206 # google cloud cdn to bypass the cache 207 headers["Cache-Control"] = "no-cache" 208 209 r = requests.get(url, headers=headers, stream=True) 210 r.raise_for_status() 211 212 json_bytes = b"" 213 214 try: 215 for chunk in r.iter_content(chunk_size=1024): 216 if chunk: 217 json_bytes += chunk 218 except ValueError as e: 219 raise ValueError("Could not parse " + url) from e 220 221 final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding) 222 GenericPing._add_to_cache(url, final_json) 223 224 return final_json 225 226 @staticmethod 227 def _get_json(url: str) -> dict: 228 try: 229 return json.loads(GenericPing._get_json_str(url)) 230 except JSONDecodeError: 231 logging.error("Unable to process JSON for url: %s", url) 232 raise
class
GenericPing:
25class GenericPing(object): 26 27 probe_info_base_url = "https://probeinfo.telemetry.mozilla.org" 28 default_encoding = "utf-8" 29 default_max_size = 12000 # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633 30 extra_schema_key = "extra" 31 cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache")) 32 33 def __init__(self, schema_url, env_url, probes_url, mps_branch="main"): 34 self.schema_url = schema_url.format(branch=mps_branch) 35 self.env_url = env_url.format(branch=mps_branch) 36 self.probes_url = probes_url 37 38 def get_schema(self) -> Schema: 39 return Schema(self._get_json(self.schema_url)) 40 41 def get_env(self) -> Schema: 42 return Schema(self._get_json(self.env_url)) 43 44 def get_probes(self) -> List[Probe]: 45 return [ 46 Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items() 47 ] 48 49 def generate_schema( 50 self, config: Config, *, split: bool = None, max_size: int = None 51 ) -> Dict[str, List[Schema]]: 52 schema = self.get_schema() 53 env = self.get_env() 54 55 probes = self.get_probes() 56 57 if split is None: 58 split = False 59 if max_size is None: 60 max_size = self.default_max_size 61 62 if env.get_size() >= max_size: 63 raise SchemaException( 64 "Environment must be smaller than max_size {}".format(max_size) 65 ) 66 67 # TODO: Allow splits of extra schema, if necessary 68 if schema.get_size() >= max_size: 69 raise SchemaException( 70 "Schema must be smaller than max_size {}".format(max_size) 71 ) 72 73 if split: 74 configs = config.split() 75 else: 76 configs = [config] 77 env = schema 78 79 schemas = { 80 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 81 } 82 83 if split: 84 schemas[self.extra_schema_key] = self.make_extra_schema( 85 schema, probes, configs 86 ) 87 88 if any( 89 schema.get_size() > max_size for _, s in schemas.items() for schema in s 90 ): 91 raise SchemaException( 92 "Schema must be smaller or equal max_size {}".format(max_size) 93 ) 94 95 return schemas 96 97 @staticmethod 98 def make_schemas( 99 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 100 ) -> List[Schema]: 101 """ 102 Fill in probes based on the config, and keep only the env 103 parts of the schema. Throw away everything else. 104 """ 105 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 106 schemas = [] 107 108 # TODO: Should env be checked to be a subset of schema? 109 final_schema = env.clone() 110 for schema_key, probe in schema_elements: 111 try: 112 addtlProps = env.get(schema_key + ("additionalProperties",)) 113 except KeyError: 114 addtlProps = None 115 116 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 117 118 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 119 schemas.append(final_schema) 120 final_schema = env.clone() 121 122 final_schema.set_schema_elem( 123 schema_key + ("properties", probe.name), probe_schema.schema 124 ) 125 126 # Remove all additionalProperties (#22) 127 schemas.append(final_schema) 128 for s in schemas: 129 for key in config.get_match_keys(): 130 try: 131 s.delete_group_from_schema( 132 key + ("propertyNames",), propagate=False 133 ) 134 except KeyError: 135 pass 136 137 try: 138 s.delete_group_from_schema( 139 key + ("additionalProperties",), propagate=True 140 ) 141 except KeyError: 142 pass 143 144 return schemas 145 146 @staticmethod 147 def make_extra_schema( 148 schema: Schema, probes: List[Probe], configs: List[Config] 149 ) -> List[Schema]: 150 """ 151 Given the list of probes and the configuration, 152 return the schema that has everything but those sections that we 153 filled in already. 154 155 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 156 """ 157 schema = schema.clone() 158 159 # Get the schema elements we already filled in for the other tables 160 schema_elements = [ 161 schema_key 162 for _config in configs 163 for schema_key, _ in _config.get_schema_elements(probes) 164 ] 165 166 # Delete those from the schema 167 for schema_key in schema_elements: 168 schema.delete_group_from_schema(schema_key) 169 170 return [schema] 171 172 @staticmethod 173 def _slugify(text: str) -> str: 174 """Get a valid slug from an arbitrary string""" 175 value = re.sub(r"[^\w\s-]", "", text.lower()).strip() 176 return re.sub(r"[-\s]+", "-", value) 177 178 @staticmethod 179 def _present_in_cache(url: str) -> bool: 180 return (GenericPing.cache_dir / GenericPing._slugify(url)).exists() 181 182 @staticmethod 183 def _add_to_cache(url: str, val: str): 184 GenericPing.cache_dir.mkdir(parents=True, exist_ok=True) 185 186 cache_file = GenericPing.cache_dir / GenericPing._slugify(url) 187 # protect against multiple writers to the cache: 188 # https://github.com/mozilla/mozilla-schema-generator/pull/210 189 try: 190 with open(cache_file, "x") as f: 191 f.write(val) 192 except FileExistsError: 193 pass 194 195 @staticmethod 196 def _retrieve_from_cache(url: str) -> str: 197 return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text() 198 199 @staticmethod 200 def _get_json_str(url: str) -> str: 201 if GenericPing._present_in_cache(url): 202 return GenericPing._retrieve_from_cache(url) 203 204 headers = {} 205 if url.startswith(GenericPing.probe_info_base_url): 206 # For probe-info-service requests, set the cache-control header to force 207 # google cloud cdn to bypass the cache 208 headers["Cache-Control"] = "no-cache" 209 210 r = requests.get(url, headers=headers, stream=True) 211 r.raise_for_status() 212 213 json_bytes = b"" 214 215 try: 216 for chunk in r.iter_content(chunk_size=1024): 217 if chunk: 218 json_bytes += chunk 219 except ValueError as e: 220 raise ValueError("Could not parse " + url) from e 221 222 final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding) 223 GenericPing._add_to_cache(url, final_json) 224 225 return final_json 226 227 @staticmethod 228 def _get_json(url: str) -> dict: 229 try: 230 return json.loads(GenericPing._get_json_str(url)) 231 except JSONDecodeError: 232 logging.error("Unable to process JSON for url: %s", url) 233 raise
def
generate_schema( self, config: mozilla_schema_generator.config.Config, *, split: bool = None, max_size: int = None) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
49 def generate_schema( 50 self, config: Config, *, split: bool = None, max_size: int = None 51 ) -> Dict[str, List[Schema]]: 52 schema = self.get_schema() 53 env = self.get_env() 54 55 probes = self.get_probes() 56 57 if split is None: 58 split = False 59 if max_size is None: 60 max_size = self.default_max_size 61 62 if env.get_size() >= max_size: 63 raise SchemaException( 64 "Environment must be smaller than max_size {}".format(max_size) 65 ) 66 67 # TODO: Allow splits of extra schema, if necessary 68 if schema.get_size() >= max_size: 69 raise SchemaException( 70 "Schema must be smaller than max_size {}".format(max_size) 71 ) 72 73 if split: 74 configs = config.split() 75 else: 76 configs = [config] 77 env = schema 78 79 schemas = { 80 c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs 81 } 82 83 if split: 84 schemas[self.extra_schema_key] = self.make_extra_schema( 85 schema, probes, configs 86 ) 87 88 if any( 89 schema.get_size() > max_size for _, s in schemas.items() for schema in s 90 ): 91 raise SchemaException( 92 "Schema must be smaller or equal max_size {}".format(max_size) 93 ) 94 95 return schemas
@staticmethod
def
make_schemas( env: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], config: mozilla_schema_generator.config.Config, split: bool, max_size: int) -> List[mozilla_schema_generator.schema.Schema]:
97 @staticmethod 98 def make_schemas( 99 env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int 100 ) -> List[Schema]: 101 """ 102 Fill in probes based on the config, and keep only the env 103 parts of the schema. Throw away everything else. 104 """ 105 schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1]) 106 schemas = [] 107 108 # TODO: Should env be checked to be a subset of schema? 109 final_schema = env.clone() 110 for schema_key, probe in schema_elements: 111 try: 112 addtlProps = env.get(schema_key + ("additionalProperties",)) 113 except KeyError: 114 addtlProps = None 115 116 probe_schema = Schema(probe.get_schema(addtlProps)).clone() 117 118 if split and final_schema.get_size() + probe_schema.get_size() > max_size: 119 schemas.append(final_schema) 120 final_schema = env.clone() 121 122 final_schema.set_schema_elem( 123 schema_key + ("properties", probe.name), probe_schema.schema 124 ) 125 126 # Remove all additionalProperties (#22) 127 schemas.append(final_schema) 128 for s in schemas: 129 for key in config.get_match_keys(): 130 try: 131 s.delete_group_from_schema( 132 key + ("propertyNames",), propagate=False 133 ) 134 except KeyError: 135 pass 136 137 try: 138 s.delete_group_from_schema( 139 key + ("additionalProperties",), propagate=True 140 ) 141 except KeyError: 142 pass 143 144 return schemas
Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.
@staticmethod
def
make_extra_schema( schema: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], configs: List[mozilla_schema_generator.config.Config]) -> List[mozilla_schema_generator.schema.Schema]:
146 @staticmethod 147 def make_extra_schema( 148 schema: Schema, probes: List[Probe], configs: List[Config] 149 ) -> List[Schema]: 150 """ 151 Given the list of probes and the configuration, 152 return the schema that has everything but those sections that we 153 filled in already. 154 155 TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json) 156 """ 157 schema = schema.clone() 158 159 # Get the schema elements we already filled in for the other tables 160 schema_elements = [ 161 schema_key 162 for _config in configs 163 for schema_key, _ in _config.get_schema_elements(probes) 164 ] 165 166 # Delete those from the schema 167 for schema_key in schema_elements: 168 schema.delete_group_from_schema(schema_key) 169 170 return [schema]
Given the list of probes and the configuration, return the schema that has everything but those sections that we filled in already.
TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)