mozilla_schema_generator.glean_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import copy 8import logging 9from collections import defaultdict 10from datetime import datetime 11from pathlib import Path 12from typing import Any, Dict, List, Set 13 14import yaml 15from requests import HTTPError 16 17from .config import Config 18from .generic_ping import GenericPing 19from .probes import GleanProbe 20from .schema import Schema 21 22ROOT_DIR = Path(__file__).parent 23BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt" 24METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml" 25 26logger = logging.getLogger(__name__) 27 28SCHEMA_URL_TEMPLATE = ( 29 "https://raw.githubusercontent.com" 30 "/mozilla-services/mozilla-pipeline-schemas" 31 "/{branch}/schemas/glean/glean/" 32) 33 34SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json" 35 36DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format( 37 schema_type="glean", version=1 38) 39 40 41class GleanPing(GenericPing): 42 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 43 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 44 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 45 dependencies_url_template = ( 46 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 47 ) 48 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 49 50 default_dependencies = ["glean-core"] 51 52 with open(BUG_1737656_TXT, "r") as f: 53 bug_1737656_affected_tables = [ 54 line.strip() for line in f.readlines() if line.strip() 55 ] 56 57 def __init__( 58 self, repo, version=1, use_metrics_blocklist=False, **kwargs 59 ): # TODO: Make env-url optional 60 self.repo = repo 61 self.repo_name = repo["name"] 62 self.app_id = repo["app_id"] 63 self.version = version 64 65 if use_metrics_blocklist: 66 self.metric_blocklist = self.get_metric_blocklist() 67 else: 68 self.metric_blocklist = {} 69 70 super().__init__( 71 DEFAULT_SCHEMA_URL, 72 DEFAULT_SCHEMA_URL, 73 self.probes_url_template.format(self.repo_name), 74 **kwargs, 75 ) 76 77 def get_schema(self, generic_schema=False) -> Schema: 78 """ 79 Fetch schema via URL. 80 81 Unless *generic_schema* is set to true, this function makes some modifications 82 to allow some workarounds for proper injection of metrics. 83 """ 84 schema = super().get_schema() 85 if generic_schema: 86 return schema 87 88 # We need to inject placeholders for the url2, text2, etc. types as part 89 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 90 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 91 metric1 = schema.get( 92 ("properties", "metrics", "properties", metric_name) 93 ).copy() 94 metric1 = schema.set_schema_elem( 95 ("properties", "metrics", "properties", metric_name + "2"), 96 metric1, 97 ) 98 99 return schema 100 101 def get_dependencies(self): 102 # Get all of the library dependencies for the application that 103 # are also known about in the repositories file. 104 105 # The dependencies are specified using library names, but we need to 106 # map those back to the name of the repository in the repository file. 107 try: 108 dependencies = self._get_json( 109 self.dependencies_url_template.format(self.repo_name) 110 ) 111 except HTTPError: 112 logging.info(f"For {self.repo_name}, using default Glean dependencies") 113 return self.default_dependencies 114 115 dependency_library_names = list(dependencies.keys()) 116 117 repos = GleanPing._get_json(GleanPing.repos_url) 118 repos_by_dependency_name = {} 119 for repo in repos: 120 for library_name in repo.get("library_names", []): 121 repos_by_dependency_name[library_name] = repo["name"] 122 123 dependencies = [] 124 for name in dependency_library_names: 125 if name in repos_by_dependency_name: 126 dependencies.append(repos_by_dependency_name[name]) 127 128 if len(dependencies) == 0: 129 logging.info(f"For {self.repo_name}, using default Glean dependencies") 130 return self.default_dependencies 131 132 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 133 return dependencies 134 135 @staticmethod 136 def remove_pings_from_metric( 137 metric: Dict[str, Any], blocked_pings: List[str] 138 ) -> Dict[str, Any]: 139 """Remove the given pings from the metric's `send_in_pings` history. 140 141 Only removes if the given metric has been removed from the source since a fixed date 142 (2025-01-01). This allows metrics to be added back to the schema. 143 """ 144 if ( 145 metric["in-source"] 146 or len(blocked_pings) == 0 147 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 148 >= datetime(year=2025, month=1, day=1) 149 ): 150 return metric 151 152 for history_entry in metric["history"]: 153 history_entry["send_in_pings"] = [ 154 p for p in history_entry["send_in_pings"] if p not in blocked_pings 155 ] 156 157 return metric 158 159 def get_probes(self) -> List[GleanProbe]: 160 data = self._get_json(self.probes_url) 161 162 # blocklist needs to be applied here instead of generate_schema because it needs to be 163 # dependency-aware; metrics can move between app and library and still be in the schema 164 # turn blocklist into metric_name -> ping_types map 165 blocklist = defaultdict(list) 166 for ping_type, metric_names in self.metric_blocklist.get( 167 self.get_app_name(), {} 168 ).items(): 169 for metric_name in metric_names: 170 blocklist[metric_name].append(ping_type) 171 172 probes = [ 173 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 174 for name, defn in data.items() 175 ] 176 177 for dependency in self.get_dependencies(): 178 dependency_probes = self._get_json( 179 self.probes_url_template.format(dependency) 180 ) 181 182 dependency_blocklist = defaultdict(list) 183 for ping_type, metric_names in self.metric_blocklist.get( 184 dependency, {} 185 ).items(): 186 for metric_name in metric_names: 187 dependency_blocklist[metric_name].append(ping_type) 188 189 probes += [ 190 ( 191 name, 192 self.remove_pings_from_metric( 193 defn, dependency_blocklist.get(name, []) 194 ), 195 ) 196 for name, defn in dependency_probes.items() 197 ] 198 199 pings = self.get_pings() 200 201 processed = [] 202 for _id, defn in probes: 203 probe = GleanProbe(_id, defn, pings=pings) 204 processed.append(probe) 205 206 # Handling probe type changes (Bug 1870317) 207 probe_types = {hist["type"] for hist in defn[probe.history_key]} 208 if len(probe_types) > 1: 209 # The probe type changed at some point in history. 210 # Create schema entry for each type. 211 hist_defn = defn.copy() 212 213 # No new entry needs to be created for the current probe type 214 probe_types.remove(defn["type"]) 215 216 for hist in hist_defn[probe.history_key]: 217 # Create a new entry for a historic type 218 if hist["type"] in probe_types: 219 hist_defn["type"] = hist["type"] 220 probe = GleanProbe(_id, hist_defn, pings=pings) 221 processed.append(probe) 222 223 # Keep track of the types entries were already created for 224 probe_types.remove(hist["type"]) 225 226 return processed 227 228 def _get_ping_data(self) -> Dict[str, Dict]: 229 url = self.ping_url_template.format(self.repo_name) 230 ping_data = GleanPing._get_json(url) 231 for dependency in self.get_dependencies(): 232 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 233 ping_data.update(dependency_pings) 234 return ping_data 235 236 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 237 url = self.ping_url_template.format(self.repo_name) 238 ping_data = GleanPing._get_json(url) 239 return ping_data 240 241 def _get_dependency_pings(self, dependency): 242 return self._get_json(self.ping_url_template.format(dependency)) 243 244 def get_pings(self) -> Set[str]: 245 return self._get_ping_data().keys() 246 247 @staticmethod 248 def apply_default_metadata(ping_metadata, default_metadata): 249 """apply_default_metadata recurses down into dicts nested 250 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 251 ``ping_metadata``. 252 :param ping_metadata: dict onto which the merge is executed 253 :param default_metadata: dct merged into ping_metadata 254 :return: None 255 """ 256 for k, v in default_metadata.items(): 257 if ( 258 k in ping_metadata 259 and isinstance(ping_metadata[k], dict) 260 and isinstance(default_metadata[k], dict) 261 ): 262 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 263 else: 264 ping_metadata[k] = default_metadata[k] 265 266 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 267 # Get the ping data with the pipeline metadata 268 ping_data = self._get_ping_data_without_dependencies() 269 270 # The ping endpoint for the dependency pings does not include any repo defined 271 # moz_pipeline_metadata_defaults so they need to be applied here. 272 273 # 1. Get repo and pipeline default metadata. 274 repos = self.get_repos() 275 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 276 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 277 278 # 2. Apply the default metadata to each dependency defined ping. 279 280 # Apply app-level metadata to pings defined in dependencies 281 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 282 283 for dependency in self.get_dependencies(): 284 dependency_pings = self._get_dependency_pings(dependency) 285 for dependency_ping in dependency_pings.values(): 286 # Although it is counter intuitive to apply the default metadata on top of the 287 # existing dependency ping metadata it does set the repo specific value for 288 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 289 # value. 290 GleanPing.apply_default_metadata( 291 dependency_ping.get("moz_pipeline_metadata"), 292 copy.deepcopy(default_metadata), 293 ) 294 # app-level ping properties take priority over the app defaults 295 metadata_override = app_metadata.get(dependency_ping["name"]) 296 if metadata_override is not None: 297 GleanPing.apply_default_metadata( 298 dependency_ping.get("moz_pipeline_metadata"), metadata_override 299 ) 300 ping_data.update(dependency_pings) 301 302 return ping_data 303 304 @staticmethod 305 def reorder_metadata(metadata): 306 desired_order_list = [ 307 "bq_dataset_family", 308 "bq_table", 309 "bq_metadata_format", 310 "include_info_sections", 311 "submission_timestamp_granularity", 312 "expiration_policy", 313 "override_attributes", 314 "jwe_mappings", 315 ] 316 reordered_metadata = { 317 k: metadata[k] for k in desired_order_list if k in metadata 318 } 319 320 # re-order jwe-mappings 321 desired_order_list = ["source_field_path", "decrypted_field_path"] 322 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 323 if jwe_mapping_metadata: 324 reordered_jwe_mapping_metadata = [] 325 for mapping in jwe_mapping_metadata: 326 reordered_jwe_mapping_metadata.append( 327 {k: mapping[k] for k in desired_order_list if k in mapping} 328 ) 329 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 330 331 # future proofing, in case there are other fields added at the ping top level 332 # add them to the end. 333 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 334 reordered_metadata = {**reordered_metadata, **leftovers} 335 return reordered_metadata 336 337 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 338 pings = self._get_ping_data_and_dependencies_with_default_metadata() 339 for ping_name, ping_data in pings.items(): 340 metadata = ping_data.get("moz_pipeline_metadata") 341 if not metadata: 342 continue 343 metadata["include_info_sections"] = self._is_field_included( 344 ping_data, "include_info_sections", consider_all_history=False 345 ) 346 metadata["include_client_id"] = self._is_field_included( 347 ping_data, "include_client_id" 348 ) 349 350 # While technically unnecessary, the dictionary elements are re-ordered to match the 351 # currently deployed order and used to verify no difference in output. 352 pings[ping_name] = GleanPing.reorder_metadata(metadata) 353 return pings 354 355 def get_ping_descriptions(self) -> Dict[str, str]: 356 return { 357 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 358 } 359 360 @staticmethod 361 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 362 """Return false if the field exists and is false. 363 364 If `consider_all_history` is False, then only check the latest value in the ping history. 365 366 Otherwise, if the field is not found or true in one or more history entries, 367 true is returned. 368 """ 369 370 # Default to true if not specified. 371 if "history" not in ping_data or len(ping_data["history"]) == 0: 372 return True 373 374 # Check if at some point in the past the field has already been deployed. 375 # And if the caller of this method wants to consider this history of the field. 376 # Keep them in the schema, even if the field has changed as 377 # removing fields is currently not supported. 378 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 379 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 380 ping_history: list 381 if consider_all_history: 382 ping_history = ping_data["history"] 383 else: 384 ping_history = [ping_data["history"][-1]] 385 for history in ping_history: 386 if field_name not in history or history[field_name]: 387 return True 388 389 # The ping was created with include_info_sections = False. The fields can be excluded. 390 return False 391 392 def set_schema_url(self, metadata): 393 """ 394 Switch between the glean-min and glean schemas if the ping does not require 395 info sections as specified in the parsed ping info in probe scraper. 396 """ 397 if not metadata["include_info_sections"]: 398 self.schema_url = SCHEMA_URL_TEMPLATE.format( 399 branch=self.branch_name 400 ) + SCHEMA_VERSION_TEMPLATE.format( 401 schema_type="glean-min", version=self.version 402 ) 403 else: 404 self.schema_url = SCHEMA_URL_TEMPLATE.format( 405 branch=self.branch_name 406 ) + SCHEMA_VERSION_TEMPLATE.format( 407 schema_type="glean", version=self.version 408 ) 409 410 def generate_schema(self, config, generic_schema=False) -> Dict[str, Schema]: 411 pings = self.get_pings_and_pipeline_metadata() 412 schemas = {} 413 414 for ping, pipeline_meta in pings.items(): 415 matchers = { 416 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 417 } 418 419 # Four newly introduced metric types were incorrectly deployed 420 # as repeated key/value structs in all Glean ping tables existing prior 421 # to November 2021. We maintain the incorrect fields for existing tables 422 # by disabling the associated matchers. 423 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 424 # defined that will allow metrics of these types to be injected into proper 425 # structs. The gcp-ingestion repository includes logic to rewrite these 426 # metrics under the "2" names. 427 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 428 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 429 if bq_identifier in self.bug_1737656_affected_tables: 430 matchers = { 431 loc: m 432 for loc, m in matchers.items() 433 if not m.matcher.get("bug_1737656_affected") 434 } 435 436 for matcher in matchers.values(): 437 matcher.matcher["send_in_pings"]["contains"] = ping 438 new_config = Config(ping, matchers=matchers) 439 440 defaults = {"mozPipelineMetadata": pipeline_meta} 441 442 # Adjust the schema path if the ping does not require info sections 443 self.set_schema_url(pipeline_meta) 444 if generic_schema: # Use the generic glean ping schema 445 schema = self.get_schema(generic_schema=True) 446 schema.schema.update(defaults) 447 schemas[new_config.name] = schema 448 else: 449 generated = super().generate_schema(new_config) 450 for schema in generated.values(): 451 # We want to override each individual key with assembled defaults, 452 # but keep values _inside_ them if they have been set in the schemas. 453 for key, value in defaults.items(): 454 if key not in schema.schema: 455 schema.schema[key] = {} 456 schema.schema[key].update(value) 457 schemas.update(generated) 458 459 return schemas 460 461 @staticmethod 462 def get_repos(): 463 """ 464 Retrieve metadata for all non-library Glean repositories 465 """ 466 repos = GleanPing._get_json(GleanPing.repos_url) 467 return [repo for repo in repos if "library_names" not in repo] 468 469 def get_app_name(self) -> str: 470 """Get app name associated with the app id. 471 472 e.g. org-mozilla-firefox -> fenix 473 """ 474 apps = GleanPing._get_json(GleanPing.app_listings_url) 475 # app id in app-listings has "." instead of "-" so using document_namespace 476 app_name = [ 477 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 478 ] 479 return app_name[0] if len(app_name) > 0 else self.app_id 480 481 @staticmethod 482 def get_metric_blocklist(): 483 with open(METRIC_BLOCKLIST, "r") as f: 484 return yaml.safe_load(f)
42class GleanPing(GenericPing): 43 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 44 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 45 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 46 dependencies_url_template = ( 47 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 48 ) 49 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 50 51 default_dependencies = ["glean-core"] 52 53 with open(BUG_1737656_TXT, "r") as f: 54 bug_1737656_affected_tables = [ 55 line.strip() for line in f.readlines() if line.strip() 56 ] 57 58 def __init__( 59 self, repo, version=1, use_metrics_blocklist=False, **kwargs 60 ): # TODO: Make env-url optional 61 self.repo = repo 62 self.repo_name = repo["name"] 63 self.app_id = repo["app_id"] 64 self.version = version 65 66 if use_metrics_blocklist: 67 self.metric_blocklist = self.get_metric_blocklist() 68 else: 69 self.metric_blocklist = {} 70 71 super().__init__( 72 DEFAULT_SCHEMA_URL, 73 DEFAULT_SCHEMA_URL, 74 self.probes_url_template.format(self.repo_name), 75 **kwargs, 76 ) 77 78 def get_schema(self, generic_schema=False) -> Schema: 79 """ 80 Fetch schema via URL. 81 82 Unless *generic_schema* is set to true, this function makes some modifications 83 to allow some workarounds for proper injection of metrics. 84 """ 85 schema = super().get_schema() 86 if generic_schema: 87 return schema 88 89 # We need to inject placeholders for the url2, text2, etc. types as part 90 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 91 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 92 metric1 = schema.get( 93 ("properties", "metrics", "properties", metric_name) 94 ).copy() 95 metric1 = schema.set_schema_elem( 96 ("properties", "metrics", "properties", metric_name + "2"), 97 metric1, 98 ) 99 100 return schema 101 102 def get_dependencies(self): 103 # Get all of the library dependencies for the application that 104 # are also known about in the repositories file. 105 106 # The dependencies are specified using library names, but we need to 107 # map those back to the name of the repository in the repository file. 108 try: 109 dependencies = self._get_json( 110 self.dependencies_url_template.format(self.repo_name) 111 ) 112 except HTTPError: 113 logging.info(f"For {self.repo_name}, using default Glean dependencies") 114 return self.default_dependencies 115 116 dependency_library_names = list(dependencies.keys()) 117 118 repos = GleanPing._get_json(GleanPing.repos_url) 119 repos_by_dependency_name = {} 120 for repo in repos: 121 for library_name in repo.get("library_names", []): 122 repos_by_dependency_name[library_name] = repo["name"] 123 124 dependencies = [] 125 for name in dependency_library_names: 126 if name in repos_by_dependency_name: 127 dependencies.append(repos_by_dependency_name[name]) 128 129 if len(dependencies) == 0: 130 logging.info(f"For {self.repo_name}, using default Glean dependencies") 131 return self.default_dependencies 132 133 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 134 return dependencies 135 136 @staticmethod 137 def remove_pings_from_metric( 138 metric: Dict[str, Any], blocked_pings: List[str] 139 ) -> Dict[str, Any]: 140 """Remove the given pings from the metric's `send_in_pings` history. 141 142 Only removes if the given metric has been removed from the source since a fixed date 143 (2025-01-01). This allows metrics to be added back to the schema. 144 """ 145 if ( 146 metric["in-source"] 147 or len(blocked_pings) == 0 148 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 149 >= datetime(year=2025, month=1, day=1) 150 ): 151 return metric 152 153 for history_entry in metric["history"]: 154 history_entry["send_in_pings"] = [ 155 p for p in history_entry["send_in_pings"] if p not in blocked_pings 156 ] 157 158 return metric 159 160 def get_probes(self) -> List[GleanProbe]: 161 data = self._get_json(self.probes_url) 162 163 # blocklist needs to be applied here instead of generate_schema because it needs to be 164 # dependency-aware; metrics can move between app and library and still be in the schema 165 # turn blocklist into metric_name -> ping_types map 166 blocklist = defaultdict(list) 167 for ping_type, metric_names in self.metric_blocklist.get( 168 self.get_app_name(), {} 169 ).items(): 170 for metric_name in metric_names: 171 blocklist[metric_name].append(ping_type) 172 173 probes = [ 174 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 175 for name, defn in data.items() 176 ] 177 178 for dependency in self.get_dependencies(): 179 dependency_probes = self._get_json( 180 self.probes_url_template.format(dependency) 181 ) 182 183 dependency_blocklist = defaultdict(list) 184 for ping_type, metric_names in self.metric_blocklist.get( 185 dependency, {} 186 ).items(): 187 for metric_name in metric_names: 188 dependency_blocklist[metric_name].append(ping_type) 189 190 probes += [ 191 ( 192 name, 193 self.remove_pings_from_metric( 194 defn, dependency_blocklist.get(name, []) 195 ), 196 ) 197 for name, defn in dependency_probes.items() 198 ] 199 200 pings = self.get_pings() 201 202 processed = [] 203 for _id, defn in probes: 204 probe = GleanProbe(_id, defn, pings=pings) 205 processed.append(probe) 206 207 # Handling probe type changes (Bug 1870317) 208 probe_types = {hist["type"] for hist in defn[probe.history_key]} 209 if len(probe_types) > 1: 210 # The probe type changed at some point in history. 211 # Create schema entry for each type. 212 hist_defn = defn.copy() 213 214 # No new entry needs to be created for the current probe type 215 probe_types.remove(defn["type"]) 216 217 for hist in hist_defn[probe.history_key]: 218 # Create a new entry for a historic type 219 if hist["type"] in probe_types: 220 hist_defn["type"] = hist["type"] 221 probe = GleanProbe(_id, hist_defn, pings=pings) 222 processed.append(probe) 223 224 # Keep track of the types entries were already created for 225 probe_types.remove(hist["type"]) 226 227 return processed 228 229 def _get_ping_data(self) -> Dict[str, Dict]: 230 url = self.ping_url_template.format(self.repo_name) 231 ping_data = GleanPing._get_json(url) 232 for dependency in self.get_dependencies(): 233 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 234 ping_data.update(dependency_pings) 235 return ping_data 236 237 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 238 url = self.ping_url_template.format(self.repo_name) 239 ping_data = GleanPing._get_json(url) 240 return ping_data 241 242 def _get_dependency_pings(self, dependency): 243 return self._get_json(self.ping_url_template.format(dependency)) 244 245 def get_pings(self) -> Set[str]: 246 return self._get_ping_data().keys() 247 248 @staticmethod 249 def apply_default_metadata(ping_metadata, default_metadata): 250 """apply_default_metadata recurses down into dicts nested 251 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 252 ``ping_metadata``. 253 :param ping_metadata: dict onto which the merge is executed 254 :param default_metadata: dct merged into ping_metadata 255 :return: None 256 """ 257 for k, v in default_metadata.items(): 258 if ( 259 k in ping_metadata 260 and isinstance(ping_metadata[k], dict) 261 and isinstance(default_metadata[k], dict) 262 ): 263 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 264 else: 265 ping_metadata[k] = default_metadata[k] 266 267 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 268 # Get the ping data with the pipeline metadata 269 ping_data = self._get_ping_data_without_dependencies() 270 271 # The ping endpoint for the dependency pings does not include any repo defined 272 # moz_pipeline_metadata_defaults so they need to be applied here. 273 274 # 1. Get repo and pipeline default metadata. 275 repos = self.get_repos() 276 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 277 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 278 279 # 2. Apply the default metadata to each dependency defined ping. 280 281 # Apply app-level metadata to pings defined in dependencies 282 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 283 284 for dependency in self.get_dependencies(): 285 dependency_pings = self._get_dependency_pings(dependency) 286 for dependency_ping in dependency_pings.values(): 287 # Although it is counter intuitive to apply the default metadata on top of the 288 # existing dependency ping metadata it does set the repo specific value for 289 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 290 # value. 291 GleanPing.apply_default_metadata( 292 dependency_ping.get("moz_pipeline_metadata"), 293 copy.deepcopy(default_metadata), 294 ) 295 # app-level ping properties take priority over the app defaults 296 metadata_override = app_metadata.get(dependency_ping["name"]) 297 if metadata_override is not None: 298 GleanPing.apply_default_metadata( 299 dependency_ping.get("moz_pipeline_metadata"), metadata_override 300 ) 301 ping_data.update(dependency_pings) 302 303 return ping_data 304 305 @staticmethod 306 def reorder_metadata(metadata): 307 desired_order_list = [ 308 "bq_dataset_family", 309 "bq_table", 310 "bq_metadata_format", 311 "include_info_sections", 312 "submission_timestamp_granularity", 313 "expiration_policy", 314 "override_attributes", 315 "jwe_mappings", 316 ] 317 reordered_metadata = { 318 k: metadata[k] for k in desired_order_list if k in metadata 319 } 320 321 # re-order jwe-mappings 322 desired_order_list = ["source_field_path", "decrypted_field_path"] 323 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 324 if jwe_mapping_metadata: 325 reordered_jwe_mapping_metadata = [] 326 for mapping in jwe_mapping_metadata: 327 reordered_jwe_mapping_metadata.append( 328 {k: mapping[k] for k in desired_order_list if k in mapping} 329 ) 330 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 331 332 # future proofing, in case there are other fields added at the ping top level 333 # add them to the end. 334 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 335 reordered_metadata = {**reordered_metadata, **leftovers} 336 return reordered_metadata 337 338 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 339 pings = self._get_ping_data_and_dependencies_with_default_metadata() 340 for ping_name, ping_data in pings.items(): 341 metadata = ping_data.get("moz_pipeline_metadata") 342 if not metadata: 343 continue 344 metadata["include_info_sections"] = self._is_field_included( 345 ping_data, "include_info_sections", consider_all_history=False 346 ) 347 metadata["include_client_id"] = self._is_field_included( 348 ping_data, "include_client_id" 349 ) 350 351 # While technically unnecessary, the dictionary elements are re-ordered to match the 352 # currently deployed order and used to verify no difference in output. 353 pings[ping_name] = GleanPing.reorder_metadata(metadata) 354 return pings 355 356 def get_ping_descriptions(self) -> Dict[str, str]: 357 return { 358 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 359 } 360 361 @staticmethod 362 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 363 """Return false if the field exists and is false. 364 365 If `consider_all_history` is False, then only check the latest value in the ping history. 366 367 Otherwise, if the field is not found or true in one or more history entries, 368 true is returned. 369 """ 370 371 # Default to true if not specified. 372 if "history" not in ping_data or len(ping_data["history"]) == 0: 373 return True 374 375 # Check if at some point in the past the field has already been deployed. 376 # And if the caller of this method wants to consider this history of the field. 377 # Keep them in the schema, even if the field has changed as 378 # removing fields is currently not supported. 379 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 380 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 381 ping_history: list 382 if consider_all_history: 383 ping_history = ping_data["history"] 384 else: 385 ping_history = [ping_data["history"][-1]] 386 for history in ping_history: 387 if field_name not in history or history[field_name]: 388 return True 389 390 # The ping was created with include_info_sections = False. The fields can be excluded. 391 return False 392 393 def set_schema_url(self, metadata): 394 """ 395 Switch between the glean-min and glean schemas if the ping does not require 396 info sections as specified in the parsed ping info in probe scraper. 397 """ 398 if not metadata["include_info_sections"]: 399 self.schema_url = SCHEMA_URL_TEMPLATE.format( 400 branch=self.branch_name 401 ) + SCHEMA_VERSION_TEMPLATE.format( 402 schema_type="glean-min", version=self.version 403 ) 404 else: 405 self.schema_url = SCHEMA_URL_TEMPLATE.format( 406 branch=self.branch_name 407 ) + SCHEMA_VERSION_TEMPLATE.format( 408 schema_type="glean", version=self.version 409 ) 410 411 def generate_schema(self, config, generic_schema=False) -> Dict[str, Schema]: 412 pings = self.get_pings_and_pipeline_metadata() 413 schemas = {} 414 415 for ping, pipeline_meta in pings.items(): 416 matchers = { 417 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 418 } 419 420 # Four newly introduced metric types were incorrectly deployed 421 # as repeated key/value structs in all Glean ping tables existing prior 422 # to November 2021. We maintain the incorrect fields for existing tables 423 # by disabling the associated matchers. 424 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 425 # defined that will allow metrics of these types to be injected into proper 426 # structs. The gcp-ingestion repository includes logic to rewrite these 427 # metrics under the "2" names. 428 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 429 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 430 if bq_identifier in self.bug_1737656_affected_tables: 431 matchers = { 432 loc: m 433 for loc, m in matchers.items() 434 if not m.matcher.get("bug_1737656_affected") 435 } 436 437 for matcher in matchers.values(): 438 matcher.matcher["send_in_pings"]["contains"] = ping 439 new_config = Config(ping, matchers=matchers) 440 441 defaults = {"mozPipelineMetadata": pipeline_meta} 442 443 # Adjust the schema path if the ping does not require info sections 444 self.set_schema_url(pipeline_meta) 445 if generic_schema: # Use the generic glean ping schema 446 schema = self.get_schema(generic_schema=True) 447 schema.schema.update(defaults) 448 schemas[new_config.name] = schema 449 else: 450 generated = super().generate_schema(new_config) 451 for schema in generated.values(): 452 # We want to override each individual key with assembled defaults, 453 # but keep values _inside_ them if they have been set in the schemas. 454 for key, value in defaults.items(): 455 if key not in schema.schema: 456 schema.schema[key] = {} 457 schema.schema[key].update(value) 458 schemas.update(generated) 459 460 return schemas 461 462 @staticmethod 463 def get_repos(): 464 """ 465 Retrieve metadata for all non-library Glean repositories 466 """ 467 repos = GleanPing._get_json(GleanPing.repos_url) 468 return [repo for repo in repos if "library_names" not in repo] 469 470 def get_app_name(self) -> str: 471 """Get app name associated with the app id. 472 473 e.g. org-mozilla-firefox -> fenix 474 """ 475 apps = GleanPing._get_json(GleanPing.app_listings_url) 476 # app id in app-listings has "." instead of "-" so using document_namespace 477 app_name = [ 478 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 479 ] 480 return app_name[0] if len(app_name) > 0 else self.app_id 481 482 @staticmethod 483 def get_metric_blocklist(): 484 with open(METRIC_BLOCKLIST, "r") as f: 485 return yaml.safe_load(f)
58 def __init__( 59 self, repo, version=1, use_metrics_blocklist=False, **kwargs 60 ): # TODO: Make env-url optional 61 self.repo = repo 62 self.repo_name = repo["name"] 63 self.app_id = repo["app_id"] 64 self.version = version 65 66 if use_metrics_blocklist: 67 self.metric_blocklist = self.get_metric_blocklist() 68 else: 69 self.metric_blocklist = {} 70 71 super().__init__( 72 DEFAULT_SCHEMA_URL, 73 DEFAULT_SCHEMA_URL, 74 self.probes_url_template.format(self.repo_name), 75 **kwargs, 76 )
78 def get_schema(self, generic_schema=False) -> Schema: 79 """ 80 Fetch schema via URL. 81 82 Unless *generic_schema* is set to true, this function makes some modifications 83 to allow some workarounds for proper injection of metrics. 84 """ 85 schema = super().get_schema() 86 if generic_schema: 87 return schema 88 89 # We need to inject placeholders for the url2, text2, etc. types as part 90 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 91 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 92 metric1 = schema.get( 93 ("properties", "metrics", "properties", metric_name) 94 ).copy() 95 metric1 = schema.set_schema_elem( 96 ("properties", "metrics", "properties", metric_name + "2"), 97 metric1, 98 ) 99 100 return schema
Fetch schema via URL.
Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.
102 def get_dependencies(self): 103 # Get all of the library dependencies for the application that 104 # are also known about in the repositories file. 105 106 # The dependencies are specified using library names, but we need to 107 # map those back to the name of the repository in the repository file. 108 try: 109 dependencies = self._get_json( 110 self.dependencies_url_template.format(self.repo_name) 111 ) 112 except HTTPError: 113 logging.info(f"For {self.repo_name}, using default Glean dependencies") 114 return self.default_dependencies 115 116 dependency_library_names = list(dependencies.keys()) 117 118 repos = GleanPing._get_json(GleanPing.repos_url) 119 repos_by_dependency_name = {} 120 for repo in repos: 121 for library_name in repo.get("library_names", []): 122 repos_by_dependency_name[library_name] = repo["name"] 123 124 dependencies = [] 125 for name in dependency_library_names: 126 if name in repos_by_dependency_name: 127 dependencies.append(repos_by_dependency_name[name]) 128 129 if len(dependencies) == 0: 130 logging.info(f"For {self.repo_name}, using default Glean dependencies") 131 return self.default_dependencies 132 133 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 134 return dependencies
136 @staticmethod 137 def remove_pings_from_metric( 138 metric: Dict[str, Any], blocked_pings: List[str] 139 ) -> Dict[str, Any]: 140 """Remove the given pings from the metric's `send_in_pings` history. 141 142 Only removes if the given metric has been removed from the source since a fixed date 143 (2025-01-01). This allows metrics to be added back to the schema. 144 """ 145 if ( 146 metric["in-source"] 147 or len(blocked_pings) == 0 148 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 149 >= datetime(year=2025, month=1, day=1) 150 ): 151 return metric 152 153 for history_entry in metric["history"]: 154 history_entry["send_in_pings"] = [ 155 p for p in history_entry["send_in_pings"] if p not in blocked_pings 156 ] 157 158 return metric
Remove the given pings from the metric's send_in_pings history.
Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.
160 def get_probes(self) -> List[GleanProbe]: 161 data = self._get_json(self.probes_url) 162 163 # blocklist needs to be applied here instead of generate_schema because it needs to be 164 # dependency-aware; metrics can move between app and library and still be in the schema 165 # turn blocklist into metric_name -> ping_types map 166 blocklist = defaultdict(list) 167 for ping_type, metric_names in self.metric_blocklist.get( 168 self.get_app_name(), {} 169 ).items(): 170 for metric_name in metric_names: 171 blocklist[metric_name].append(ping_type) 172 173 probes = [ 174 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 175 for name, defn in data.items() 176 ] 177 178 for dependency in self.get_dependencies(): 179 dependency_probes = self._get_json( 180 self.probes_url_template.format(dependency) 181 ) 182 183 dependency_blocklist = defaultdict(list) 184 for ping_type, metric_names in self.metric_blocklist.get( 185 dependency, {} 186 ).items(): 187 for metric_name in metric_names: 188 dependency_blocklist[metric_name].append(ping_type) 189 190 probes += [ 191 ( 192 name, 193 self.remove_pings_from_metric( 194 defn, dependency_blocklist.get(name, []) 195 ), 196 ) 197 for name, defn in dependency_probes.items() 198 ] 199 200 pings = self.get_pings() 201 202 processed = [] 203 for _id, defn in probes: 204 probe = GleanProbe(_id, defn, pings=pings) 205 processed.append(probe) 206 207 # Handling probe type changes (Bug 1870317) 208 probe_types = {hist["type"] for hist in defn[probe.history_key]} 209 if len(probe_types) > 1: 210 # The probe type changed at some point in history. 211 # Create schema entry for each type. 212 hist_defn = defn.copy() 213 214 # No new entry needs to be created for the current probe type 215 probe_types.remove(defn["type"]) 216 217 for hist in hist_defn[probe.history_key]: 218 # Create a new entry for a historic type 219 if hist["type"] in probe_types: 220 hist_defn["type"] = hist["type"] 221 probe = GleanProbe(_id, hist_defn, pings=pings) 222 processed.append(probe) 223 224 # Keep track of the types entries were already created for 225 probe_types.remove(hist["type"]) 226 227 return processed
248 @staticmethod 249 def apply_default_metadata(ping_metadata, default_metadata): 250 """apply_default_metadata recurses down into dicts nested 251 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 252 ``ping_metadata``. 253 :param ping_metadata: dict onto which the merge is executed 254 :param default_metadata: dct merged into ping_metadata 255 :return: None 256 """ 257 for k, v in default_metadata.items(): 258 if ( 259 k in ping_metadata 260 and isinstance(ping_metadata[k], dict) 261 and isinstance(default_metadata[k], dict) 262 ): 263 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 264 else: 265 ping_metadata[k] = default_metadata[k]
apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The default_metadata is merged into
ping_metadata.
Parameters
- ping_metadata: dict onto which the merge is executed
- default_metadata: dct merged into ping_metadata
Returns
None
305 @staticmethod 306 def reorder_metadata(metadata): 307 desired_order_list = [ 308 "bq_dataset_family", 309 "bq_table", 310 "bq_metadata_format", 311 "include_info_sections", 312 "submission_timestamp_granularity", 313 "expiration_policy", 314 "override_attributes", 315 "jwe_mappings", 316 ] 317 reordered_metadata = { 318 k: metadata[k] for k in desired_order_list if k in metadata 319 } 320 321 # re-order jwe-mappings 322 desired_order_list = ["source_field_path", "decrypted_field_path"] 323 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 324 if jwe_mapping_metadata: 325 reordered_jwe_mapping_metadata = [] 326 for mapping in jwe_mapping_metadata: 327 reordered_jwe_mapping_metadata.append( 328 {k: mapping[k] for k in desired_order_list if k in mapping} 329 ) 330 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 331 332 # future proofing, in case there are other fields added at the ping top level 333 # add them to the end. 334 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 335 reordered_metadata = {**reordered_metadata, **leftovers} 336 return reordered_metadata
338 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 339 pings = self._get_ping_data_and_dependencies_with_default_metadata() 340 for ping_name, ping_data in pings.items(): 341 metadata = ping_data.get("moz_pipeline_metadata") 342 if not metadata: 343 continue 344 metadata["include_info_sections"] = self._is_field_included( 345 ping_data, "include_info_sections", consider_all_history=False 346 ) 347 metadata["include_client_id"] = self._is_field_included( 348 ping_data, "include_client_id" 349 ) 350 351 # While technically unnecessary, the dictionary elements are re-ordered to match the 352 # currently deployed order and used to verify no difference in output. 353 pings[ping_name] = GleanPing.reorder_metadata(metadata) 354 return pings
393 def set_schema_url(self, metadata): 394 """ 395 Switch between the glean-min and glean schemas if the ping does not require 396 info sections as specified in the parsed ping info in probe scraper. 397 """ 398 if not metadata["include_info_sections"]: 399 self.schema_url = SCHEMA_URL_TEMPLATE.format( 400 branch=self.branch_name 401 ) + SCHEMA_VERSION_TEMPLATE.format( 402 schema_type="glean-min", version=self.version 403 ) 404 else: 405 self.schema_url = SCHEMA_URL_TEMPLATE.format( 406 branch=self.branch_name 407 ) + SCHEMA_VERSION_TEMPLATE.format( 408 schema_type="glean", version=self.version 409 )
Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.
411 def generate_schema(self, config, generic_schema=False) -> Dict[str, Schema]: 412 pings = self.get_pings_and_pipeline_metadata() 413 schemas = {} 414 415 for ping, pipeline_meta in pings.items(): 416 matchers = { 417 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 418 } 419 420 # Four newly introduced metric types were incorrectly deployed 421 # as repeated key/value structs in all Glean ping tables existing prior 422 # to November 2021. We maintain the incorrect fields for existing tables 423 # by disabling the associated matchers. 424 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 425 # defined that will allow metrics of these types to be injected into proper 426 # structs. The gcp-ingestion repository includes logic to rewrite these 427 # metrics under the "2" names. 428 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 429 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 430 if bq_identifier in self.bug_1737656_affected_tables: 431 matchers = { 432 loc: m 433 for loc, m in matchers.items() 434 if not m.matcher.get("bug_1737656_affected") 435 } 436 437 for matcher in matchers.values(): 438 matcher.matcher["send_in_pings"]["contains"] = ping 439 new_config = Config(ping, matchers=matchers) 440 441 defaults = {"mozPipelineMetadata": pipeline_meta} 442 443 # Adjust the schema path if the ping does not require info sections 444 self.set_schema_url(pipeline_meta) 445 if generic_schema: # Use the generic glean ping schema 446 schema = self.get_schema(generic_schema=True) 447 schema.schema.update(defaults) 448 schemas[new_config.name] = schema 449 else: 450 generated = super().generate_schema(new_config) 451 for schema in generated.values(): 452 # We want to override each individual key with assembled defaults, 453 # but keep values _inside_ them if they have been set in the schemas. 454 for key, value in defaults.items(): 455 if key not in schema.schema: 456 schema.schema[key] = {} 457 schema.schema[key].update(value) 458 schemas.update(generated) 459 460 return schemas
462 @staticmethod 463 def get_repos(): 464 """ 465 Retrieve metadata for all non-library Glean repositories 466 """ 467 repos = GleanPing._get_json(GleanPing.repos_url) 468 return [repo for repo in repos if "library_names" not in repo]
Retrieve metadata for all non-library Glean repositories
470 def get_app_name(self) -> str: 471 """Get app name associated with the app id. 472 473 e.g. org-mozilla-firefox -> fenix 474 """ 475 apps = GleanPing._get_json(GleanPing.app_listings_url) 476 # app id in app-listings has "." instead of "-" so using document_namespace 477 app_name = [ 478 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 479 ] 480 return app_name[0] if len(app_name) > 0 else self.app_id
Get app name associated with the app id.
e.g. org-mozilla-firefox -> fenix