mozilla_schema_generator.glean_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import copy 8import logging 9from collections import defaultdict 10from datetime import datetime 11from functools import cache 12from pathlib import Path 13from typing import Any, Dict, List, Set 14 15import yaml 16from requests import HTTPError 17 18from .config import Config 19from .generic_ping import GenericPing 20from .probes import GleanProbe 21from .schema import Schema 22 23ROOT_DIR = Path(__file__).parent 24BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt" 25METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml" 26 27logger = logging.getLogger(__name__) 28 29SCHEMA_URL_TEMPLATE = ( 30 "https://raw.githubusercontent.com" 31 "/mozilla-services/mozilla-pipeline-schemas" 32 "/{branch}/schemas/glean/glean/" 33) 34 35SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json" 36 37DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format( 38 schema_type="glean", version=1 39) 40 41 42class GleanPing(GenericPing): 43 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 44 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 45 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 46 dependencies_url_template = ( 47 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 48 ) 49 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 50 51 default_dependencies = ["glean-core"] 52 53 with open(BUG_1737656_TXT, "r") as f: 54 bug_1737656_affected_tables = [ 55 line.strip() for line in f.readlines() if line.strip() 56 ] 57 58 def __init__( 59 self, repo, version=1, use_metrics_blocklist=False, **kwargs 60 ): # TODO: Make env-url optional 61 self.repo = repo 62 self.repo_name = repo["name"] 63 self.app_id = repo["app_id"] 64 self.version = version 65 66 if use_metrics_blocklist: 67 self.metric_blocklist = self.get_metric_blocklist() 68 else: 69 self.metric_blocklist = {} 70 71 super().__init__( 72 DEFAULT_SCHEMA_URL, 73 DEFAULT_SCHEMA_URL, 74 self.probes_url_template.format(self.repo_name), 75 **kwargs, 76 ) 77 78 def get_schema(self, generic_schema=False) -> Schema: 79 """ 80 Fetch schema via URL. 81 82 Unless *generic_schema* is set to true, this function makes some modifications 83 to allow some workarounds for proper injection of metrics. 84 """ 85 schema = super().get_schema() 86 if generic_schema: 87 return schema 88 89 # We need to inject placeholders for the url2, text2, etc. types as part 90 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 91 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 92 metric1 = schema.get( 93 ("properties", "metrics", "properties", metric_name) 94 ).copy() 95 metric1 = schema.set_schema_elem( 96 ("properties", "metrics", "properties", metric_name + "2"), 97 metric1, 98 ) 99 100 return schema 101 102 @cache 103 def get_dependencies(self): 104 # Get all of the library dependencies for the application that 105 # are also known about in the repositories file. 106 107 # The dependencies are specified using library names, but we need to 108 # map those back to the name of the repository in the repository file. 109 try: 110 dependencies = self._get_json( 111 self.dependencies_url_template.format(self.repo_name) 112 ) 113 except HTTPError: 114 logging.info(f"For {self.repo_name}, using default Glean dependencies") 115 return self.default_dependencies 116 117 dependency_library_names = list(dependencies.keys()) 118 119 repos = GleanPing._get_json(GleanPing.repos_url) 120 repos_by_dependency_name = {} 121 for repo in repos: 122 for library_name in repo.get("library_names", []): 123 repos_by_dependency_name[library_name] = repo["name"] 124 125 dependencies = [] 126 for name in dependency_library_names: 127 if name in repos_by_dependency_name: 128 dependencies.append(repos_by_dependency_name[name]) 129 130 if len(dependencies) == 0: 131 logging.info(f"For {self.repo_name}, using default Glean dependencies") 132 return self.default_dependencies 133 134 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 135 return dependencies 136 137 @staticmethod 138 def remove_pings_from_metric( 139 metric: Dict[str, Any], blocked_pings: List[str] 140 ) -> Dict[str, Any]: 141 """Remove the given pings from the metric's `send_in_pings` history. 142 143 Only removes if the given metric has been removed from the source since a fixed date 144 (2025-01-01). This allows metrics to be added back to the schema. 145 """ 146 if ( 147 metric["in-source"] 148 or len(blocked_pings) == 0 149 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 150 >= datetime(year=2025, month=1, day=1) 151 ): 152 return metric 153 154 for history_entry in metric["history"]: 155 history_entry["send_in_pings"] = [ 156 p for p in history_entry["send_in_pings"] if p not in blocked_pings 157 ] 158 159 return metric 160 161 def get_probes(self) -> List[GleanProbe]: 162 data = self._get_json(self.probes_url) 163 164 # blocklist needs to be applied here instead of generate_schema because it needs to be 165 # dependency-aware; metrics can move between app and library and still be in the schema 166 # turn blocklist into metric_name -> ping_types map 167 blocklist = defaultdict(list) 168 for ping_type, metric_names in self.metric_blocklist.get( 169 self.get_app_name(), {} 170 ).items(): 171 for metric_name in metric_names: 172 blocklist[metric_name].append(ping_type) 173 174 probes = [ 175 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 176 for name, defn in data.items() 177 ] 178 179 for dependency in self.get_dependencies(): 180 dependency_probes = self._get_json( 181 self.probes_url_template.format(dependency) 182 ) 183 184 dependency_blocklist = defaultdict(list) 185 for ping_type, metric_names in self.metric_blocklist.get( 186 dependency, {} 187 ).items(): 188 for metric_name in metric_names: 189 dependency_blocklist[metric_name].append(ping_type) 190 191 probes += [ 192 ( 193 name, 194 self.remove_pings_from_metric( 195 defn, dependency_blocklist.get(name, []) 196 ), 197 ) 198 for name, defn in dependency_probes.items() 199 ] 200 201 # A metric can be moved between an app and its dependencies or between dependencies while 202 # probe scraper keeps the history in each location, so both definitions are returned 203 # Merge the history per probe to take the latest definition while still being able to 204 # find metric type changes below 205 # Metrics are not merged if they are not sent in the same pings as they are disjoint 206 207 # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler) 208 # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure" 209 # normalize to "media_audio_init_failure". The transpiler picks one of the colliding 210 # descriptions non-deterministically. 211 def _normalize_name(name): 212 return name.replace(".", "_").replace("-", "_") 213 214 def _pings_in_history(defn): 215 return { 216 p 217 for h in defn[GleanProbe.history_key] 218 for p in h.get("send_in_pings", ["metrics"]) 219 } 220 221 def _latest_history_date(defn): 222 return max( 223 datetime.fromisoformat(h["dates"]["last"]) 224 for h in defn[GleanProbe.history_key] 225 ) 226 227 def _dedupe_sort_key(defn): 228 """Prefer the most recent definition, breaking ties by choosing the in-source metric.""" 229 return ( 230 _latest_history_date(defn), 231 defn.get(GleanProbe.in_source_key, False), 232 defn["name"], 233 ) 234 235 # Group probes that share a normalized name and whose pings intersect to combine 236 # moved metrics and metrics that only differ by "." vs "_" 237 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 238 for name, defn in probes: 239 defn_pings = _pings_in_history(defn) 240 existing_groups = grouped_by_name[_normalize_name(name)] 241 matches = [ 242 group 243 for group in existing_groups 244 if any( 245 _pings_in_history(other_defn) & defn_pings for other_defn in group 246 ) 247 ] 248 if not matches: 249 existing_groups.append([defn]) 250 else: 251 merged_group = [defn] 252 for g in matches: 253 merged_group.extend(g) 254 existing_groups.remove(g) 255 existing_groups.append(merged_group) 256 257 # Take latest definition per group 258 deduped_probes: List[Any] = [] 259 for groups in grouped_by_name.values(): 260 for group in groups: 261 latest_defn = max(group, key=_dedupe_sort_key) 262 if len(group) > 1: 263 latest_defn = latest_defn.copy() 264 latest_defn[GleanProbe.history_key] = sorted( 265 (h for d in group for h in d[GleanProbe.history_key]), 266 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 267 ) 268 deduped_probes.append((latest_defn["name"], latest_defn)) 269 probes = deduped_probes 270 271 pings = self.get_pings() 272 273 processed = [] 274 for _id, defn in probes: 275 probe = GleanProbe(_id, defn, pings=pings) 276 processed.append(probe) 277 278 # Handling probe type changes (Bug 1870317) 279 probe_types = {hist["type"] for hist in defn[probe.history_key]} 280 if len(probe_types) > 1: 281 # The probe type changed at some point in history. 282 # Create schema entry for each type. 283 hist_defn = defn.copy() 284 285 # No new entry needs to be created for the current probe type 286 probe_types.remove(defn["type"]) 287 288 for hist in hist_defn[probe.history_key]: 289 # Create a new entry for a historic type 290 if hist["type"] in probe_types: 291 hist_defn["type"] = hist["type"] 292 probe = GleanProbe(_id, hist_defn, pings=pings) 293 processed.append(probe) 294 295 # Keep track of the types entries were already created for 296 probe_types.remove(hist["type"]) 297 298 return processed 299 300 def _get_ping_data(self) -> Dict[str, Dict]: 301 url = self.ping_url_template.format(self.repo_name) 302 ping_data = GleanPing._get_json(url) 303 for dependency in self.get_dependencies(): 304 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 305 ping_data.update(dependency_pings) 306 return ping_data 307 308 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 309 url = self.ping_url_template.format(self.repo_name) 310 ping_data = GleanPing._get_json(url) 311 return ping_data 312 313 def _get_dependency_pings(self, dependency): 314 return self._get_json(self.ping_url_template.format(dependency)) 315 316 def get_pings(self) -> Set[str]: 317 return self._get_ping_data().keys() 318 319 @staticmethod 320 def apply_default_metadata(ping_metadata, default_metadata): 321 """apply_default_metadata recurses down into dicts nested 322 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 323 ``ping_metadata``. 324 :param ping_metadata: dict onto which the merge is executed 325 :param default_metadata: dct merged into ping_metadata 326 :return: None 327 """ 328 for k, v in default_metadata.items(): 329 if ( 330 k in ping_metadata 331 and isinstance(ping_metadata[k], dict) 332 and isinstance(default_metadata[k], dict) 333 ): 334 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 335 else: 336 ping_metadata[k] = default_metadata[k] 337 338 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 339 # Get the ping data with the pipeline metadata 340 ping_data = self._get_ping_data_without_dependencies() 341 342 # The ping endpoint for the dependency pings does not include any repo defined 343 # moz_pipeline_metadata_defaults so they need to be applied here. 344 345 # 1. Get repo and pipeline default metadata. 346 repos = self.get_repos() 347 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 348 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 349 350 # 2. Apply the default metadata to each dependency defined ping. 351 352 # Apply app-level metadata to pings defined in dependencies 353 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 354 355 for dependency in self.get_dependencies(): 356 dependency_pings = self._get_dependency_pings(dependency) 357 for dependency_ping in dependency_pings.values(): 358 # Although it is counter intuitive to apply the default metadata on top of the 359 # existing dependency ping metadata it does set the repo specific value for 360 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 361 # value. 362 GleanPing.apply_default_metadata( 363 dependency_ping.get("moz_pipeline_metadata"), 364 copy.deepcopy(default_metadata), 365 ) 366 # app-level ping properties take priority over the app defaults 367 metadata_override = app_metadata.get(dependency_ping["name"]) 368 if metadata_override is not None: 369 GleanPing.apply_default_metadata( 370 dependency_ping.get("moz_pipeline_metadata"), metadata_override 371 ) 372 ping_data.update(dependency_pings) 373 374 return ping_data 375 376 @staticmethod 377 def reorder_metadata(metadata): 378 desired_order_list = [ 379 "bq_dataset_family", 380 "bq_table", 381 "bq_metadata_format", 382 "include_info_sections", 383 "submission_timestamp_granularity", 384 "expiration_policy", 385 "override_attributes", 386 "jwe_mappings", 387 ] 388 reordered_metadata = { 389 k: metadata[k] for k in desired_order_list if k in metadata 390 } 391 392 # re-order jwe-mappings 393 desired_order_list = ["source_field_path", "decrypted_field_path"] 394 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 395 if jwe_mapping_metadata: 396 reordered_jwe_mapping_metadata = [] 397 for mapping in jwe_mapping_metadata: 398 reordered_jwe_mapping_metadata.append( 399 {k: mapping[k] for k in desired_order_list if k in mapping} 400 ) 401 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 402 403 # future proofing, in case there are other fields added at the ping top level 404 # add them to the end. 405 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 406 reordered_metadata = {**reordered_metadata, **leftovers} 407 return reordered_metadata 408 409 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 410 pings = self._get_ping_data_and_dependencies_with_default_metadata() 411 for ping_name, ping_data in pings.items(): 412 metadata = ping_data.get("moz_pipeline_metadata") 413 if not metadata: 414 continue 415 metadata["include_info_sections"] = self._is_field_included( 416 ping_data, "include_info_sections", consider_all_history=False 417 ) 418 metadata["include_client_id"] = self._is_field_included( 419 ping_data, "include_client_id" 420 ) 421 422 # While technically unnecessary, the dictionary elements are re-ordered to match the 423 # currently deployed order and used to verify no difference in output. 424 pings[ping_name] = GleanPing.reorder_metadata(metadata) 425 return pings 426 427 def get_ping_descriptions(self) -> Dict[str, str]: 428 return { 429 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 430 } 431 432 @staticmethod 433 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 434 """Return false if the field exists and is false. 435 436 If `consider_all_history` is False, then only check the latest value in the ping history. 437 438 Otherwise, if the field is not found or true in one or more history entries, 439 true is returned. 440 """ 441 442 # Default to true if not specified. 443 if "history" not in ping_data or len(ping_data["history"]) == 0: 444 return True 445 446 # Check if at some point in the past the field has already been deployed. 447 # And if the caller of this method wants to consider this history of the field. 448 # Keep them in the schema, even if the field has changed as 449 # removing fields is currently not supported. 450 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 451 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 452 ping_history: list 453 if consider_all_history: 454 ping_history = ping_data["history"] 455 else: 456 ping_history = [ping_data["history"][-1]] 457 for history in ping_history: 458 if field_name not in history or history[field_name]: 459 return True 460 461 # The ping was created with include_info_sections = False. The fields can be excluded. 462 return False 463 464 def set_schema_url(self, metadata): 465 """ 466 Switch between the glean-min and glean schemas if the ping does not require 467 info sections as specified in the parsed ping info in probe scraper. 468 """ 469 if not metadata["include_info_sections"]: 470 self.schema_url = SCHEMA_URL_TEMPLATE.format( 471 branch=self.branch_name 472 ) + SCHEMA_VERSION_TEMPLATE.format( 473 schema_type="glean-min", version=self.version 474 ) 475 else: 476 self.schema_url = SCHEMA_URL_TEMPLATE.format( 477 branch=self.branch_name 478 ) + SCHEMA_VERSION_TEMPLATE.format( 479 schema_type="glean", version=self.version 480 ) 481 482 def generate_schema( 483 self, 484 config, 485 generic_schema=False, 486 blocked_distribution_pings=("events", "baseline"), 487 ) -> Dict[str, Schema]: 488 pings = self.get_pings_and_pipeline_metadata() 489 schemas = {} 490 491 for ping, pipeline_meta in pings.items(): 492 matchers = { 493 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 494 } 495 496 # Four newly introduced metric types were incorrectly deployed 497 # as repeated key/value structs in all Glean ping tables existing prior 498 # to November 2021. We maintain the incorrect fields for existing tables 499 # by disabling the associated matchers. 500 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 501 # defined that will allow metrics of these types to be injected into proper 502 # structs. The gcp-ingestion repository includes logic to rewrite these 503 # metrics under the "2" names. 504 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 505 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 506 if bq_identifier in self.bug_1737656_affected_tables: 507 matchers = { 508 loc: m 509 for loc, m in matchers.items() 510 if not m.matcher.get("bug_1737656_affected") 511 } 512 513 for matcher in matchers.values(): 514 matcher.matcher["send_in_pings"]["contains"] = ping 515 516 # temporarily block distributions from being added to events and baseline pings 517 # https://mozilla-hub.atlassian.net/browse/DENG-10606 518 if ( 519 blocked_distribution_pings 520 and ping in blocked_distribution_pings 521 and matcher.type.endswith("_distribution") 522 ): 523 matcher.matcher["send_in_pings"]["not_contains"] = ping 524 525 new_config = Config(ping, matchers=matchers) 526 527 defaults = {"mozPipelineMetadata": pipeline_meta} 528 529 # Adjust the schema path if the ping does not require info sections 530 self.set_schema_url(pipeline_meta) 531 if generic_schema: # Use the generic glean ping schema 532 schema = self.get_schema(generic_schema=True) 533 schema.schema.update(defaults) 534 schemas[new_config.name] = schema 535 else: 536 generated = super().generate_schema(new_config) 537 for schema in generated.values(): 538 # We want to override each individual key with assembled defaults, 539 # but keep values _inside_ them if they have been set in the schemas. 540 for key, value in defaults.items(): 541 if key not in schema.schema: 542 schema.schema[key] = {} 543 schema.schema[key].update(value) 544 schemas.update(generated) 545 546 return schemas 547 548 @staticmethod 549 def get_repos(): 550 """ 551 Retrieve metadata for all non-library Glean repositories 552 """ 553 repos = GleanPing._get_json(GleanPing.repos_url) 554 return [repo for repo in repos if "library_names" not in repo] 555 556 def get_app_name(self) -> str: 557 """Get app name associated with the app id. 558 559 e.g. org-mozilla-firefox -> fenix 560 """ 561 apps = GleanPing._get_json(GleanPing.app_listings_url) 562 # app id in app-listings has "." instead of "-" so using document_namespace 563 app_name = [ 564 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 565 ] 566 return app_name[0] if len(app_name) > 0 else self.app_id 567 568 @staticmethod 569 def get_metric_blocklist(): 570 with open(METRIC_BLOCKLIST, "r") as f: 571 return yaml.safe_load(f)
43class GleanPing(GenericPing): 44 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 45 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 46 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 47 dependencies_url_template = ( 48 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 49 ) 50 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 51 52 default_dependencies = ["glean-core"] 53 54 with open(BUG_1737656_TXT, "r") as f: 55 bug_1737656_affected_tables = [ 56 line.strip() for line in f.readlines() if line.strip() 57 ] 58 59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 ) 78 79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema 102 103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies 137 138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric 161 162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 # A metric can be moved between an app and its dependencies or between dependencies while 203 # probe scraper keeps the history in each location, so both definitions are returned 204 # Merge the history per probe to take the latest definition while still being able to 205 # find metric type changes below 206 # Metrics are not merged if they are not sent in the same pings as they are disjoint 207 208 # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler) 209 # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure" 210 # normalize to "media_audio_init_failure". The transpiler picks one of the colliding 211 # descriptions non-deterministically. 212 def _normalize_name(name): 213 return name.replace(".", "_").replace("-", "_") 214 215 def _pings_in_history(defn): 216 return { 217 p 218 for h in defn[GleanProbe.history_key] 219 for p in h.get("send_in_pings", ["metrics"]) 220 } 221 222 def _latest_history_date(defn): 223 return max( 224 datetime.fromisoformat(h["dates"]["last"]) 225 for h in defn[GleanProbe.history_key] 226 ) 227 228 def _dedupe_sort_key(defn): 229 """Prefer the most recent definition, breaking ties by choosing the in-source metric.""" 230 return ( 231 _latest_history_date(defn), 232 defn.get(GleanProbe.in_source_key, False), 233 defn["name"], 234 ) 235 236 # Group probes that share a normalized name and whose pings intersect to combine 237 # moved metrics and metrics that only differ by "." vs "_" 238 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 239 for name, defn in probes: 240 defn_pings = _pings_in_history(defn) 241 existing_groups = grouped_by_name[_normalize_name(name)] 242 matches = [ 243 group 244 for group in existing_groups 245 if any( 246 _pings_in_history(other_defn) & defn_pings for other_defn in group 247 ) 248 ] 249 if not matches: 250 existing_groups.append([defn]) 251 else: 252 merged_group = [defn] 253 for g in matches: 254 merged_group.extend(g) 255 existing_groups.remove(g) 256 existing_groups.append(merged_group) 257 258 # Take latest definition per group 259 deduped_probes: List[Any] = [] 260 for groups in grouped_by_name.values(): 261 for group in groups: 262 latest_defn = max(group, key=_dedupe_sort_key) 263 if len(group) > 1: 264 latest_defn = latest_defn.copy() 265 latest_defn[GleanProbe.history_key] = sorted( 266 (h for d in group for h in d[GleanProbe.history_key]), 267 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 268 ) 269 deduped_probes.append((latest_defn["name"], latest_defn)) 270 probes = deduped_probes 271 272 pings = self.get_pings() 273 274 processed = [] 275 for _id, defn in probes: 276 probe = GleanProbe(_id, defn, pings=pings) 277 processed.append(probe) 278 279 # Handling probe type changes (Bug 1870317) 280 probe_types = {hist["type"] for hist in defn[probe.history_key]} 281 if len(probe_types) > 1: 282 # The probe type changed at some point in history. 283 # Create schema entry for each type. 284 hist_defn = defn.copy() 285 286 # No new entry needs to be created for the current probe type 287 probe_types.remove(defn["type"]) 288 289 for hist in hist_defn[probe.history_key]: 290 # Create a new entry for a historic type 291 if hist["type"] in probe_types: 292 hist_defn["type"] = hist["type"] 293 probe = GleanProbe(_id, hist_defn, pings=pings) 294 processed.append(probe) 295 296 # Keep track of the types entries were already created for 297 probe_types.remove(hist["type"]) 298 299 return processed 300 301 def _get_ping_data(self) -> Dict[str, Dict]: 302 url = self.ping_url_template.format(self.repo_name) 303 ping_data = GleanPing._get_json(url) 304 for dependency in self.get_dependencies(): 305 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 306 ping_data.update(dependency_pings) 307 return ping_data 308 309 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 310 url = self.ping_url_template.format(self.repo_name) 311 ping_data = GleanPing._get_json(url) 312 return ping_data 313 314 def _get_dependency_pings(self, dependency): 315 return self._get_json(self.ping_url_template.format(dependency)) 316 317 def get_pings(self) -> Set[str]: 318 return self._get_ping_data().keys() 319 320 @staticmethod 321 def apply_default_metadata(ping_metadata, default_metadata): 322 """apply_default_metadata recurses down into dicts nested 323 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 324 ``ping_metadata``. 325 :param ping_metadata: dict onto which the merge is executed 326 :param default_metadata: dct merged into ping_metadata 327 :return: None 328 """ 329 for k, v in default_metadata.items(): 330 if ( 331 k in ping_metadata 332 and isinstance(ping_metadata[k], dict) 333 and isinstance(default_metadata[k], dict) 334 ): 335 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 336 else: 337 ping_metadata[k] = default_metadata[k] 338 339 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 340 # Get the ping data with the pipeline metadata 341 ping_data = self._get_ping_data_without_dependencies() 342 343 # The ping endpoint for the dependency pings does not include any repo defined 344 # moz_pipeline_metadata_defaults so they need to be applied here. 345 346 # 1. Get repo and pipeline default metadata. 347 repos = self.get_repos() 348 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 349 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 350 351 # 2. Apply the default metadata to each dependency defined ping. 352 353 # Apply app-level metadata to pings defined in dependencies 354 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 355 356 for dependency in self.get_dependencies(): 357 dependency_pings = self._get_dependency_pings(dependency) 358 for dependency_ping in dependency_pings.values(): 359 # Although it is counter intuitive to apply the default metadata on top of the 360 # existing dependency ping metadata it does set the repo specific value for 361 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 362 # value. 363 GleanPing.apply_default_metadata( 364 dependency_ping.get("moz_pipeline_metadata"), 365 copy.deepcopy(default_metadata), 366 ) 367 # app-level ping properties take priority over the app defaults 368 metadata_override = app_metadata.get(dependency_ping["name"]) 369 if metadata_override is not None: 370 GleanPing.apply_default_metadata( 371 dependency_ping.get("moz_pipeline_metadata"), metadata_override 372 ) 373 ping_data.update(dependency_pings) 374 375 return ping_data 376 377 @staticmethod 378 def reorder_metadata(metadata): 379 desired_order_list = [ 380 "bq_dataset_family", 381 "bq_table", 382 "bq_metadata_format", 383 "include_info_sections", 384 "submission_timestamp_granularity", 385 "expiration_policy", 386 "override_attributes", 387 "jwe_mappings", 388 ] 389 reordered_metadata = { 390 k: metadata[k] for k in desired_order_list if k in metadata 391 } 392 393 # re-order jwe-mappings 394 desired_order_list = ["source_field_path", "decrypted_field_path"] 395 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 396 if jwe_mapping_metadata: 397 reordered_jwe_mapping_metadata = [] 398 for mapping in jwe_mapping_metadata: 399 reordered_jwe_mapping_metadata.append( 400 {k: mapping[k] for k in desired_order_list if k in mapping} 401 ) 402 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 403 404 # future proofing, in case there are other fields added at the ping top level 405 # add them to the end. 406 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 407 reordered_metadata = {**reordered_metadata, **leftovers} 408 return reordered_metadata 409 410 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 411 pings = self._get_ping_data_and_dependencies_with_default_metadata() 412 for ping_name, ping_data in pings.items(): 413 metadata = ping_data.get("moz_pipeline_metadata") 414 if not metadata: 415 continue 416 metadata["include_info_sections"] = self._is_field_included( 417 ping_data, "include_info_sections", consider_all_history=False 418 ) 419 metadata["include_client_id"] = self._is_field_included( 420 ping_data, "include_client_id" 421 ) 422 423 # While technically unnecessary, the dictionary elements are re-ordered to match the 424 # currently deployed order and used to verify no difference in output. 425 pings[ping_name] = GleanPing.reorder_metadata(metadata) 426 return pings 427 428 def get_ping_descriptions(self) -> Dict[str, str]: 429 return { 430 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 431 } 432 433 @staticmethod 434 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 435 """Return false if the field exists and is false. 436 437 If `consider_all_history` is False, then only check the latest value in the ping history. 438 439 Otherwise, if the field is not found or true in one or more history entries, 440 true is returned. 441 """ 442 443 # Default to true if not specified. 444 if "history" not in ping_data or len(ping_data["history"]) == 0: 445 return True 446 447 # Check if at some point in the past the field has already been deployed. 448 # And if the caller of this method wants to consider this history of the field. 449 # Keep them in the schema, even if the field has changed as 450 # removing fields is currently not supported. 451 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 452 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 453 ping_history: list 454 if consider_all_history: 455 ping_history = ping_data["history"] 456 else: 457 ping_history = [ping_data["history"][-1]] 458 for history in ping_history: 459 if field_name not in history or history[field_name]: 460 return True 461 462 # The ping was created with include_info_sections = False. The fields can be excluded. 463 return False 464 465 def set_schema_url(self, metadata): 466 """ 467 Switch between the glean-min and glean schemas if the ping does not require 468 info sections as specified in the parsed ping info in probe scraper. 469 """ 470 if not metadata["include_info_sections"]: 471 self.schema_url = SCHEMA_URL_TEMPLATE.format( 472 branch=self.branch_name 473 ) + SCHEMA_VERSION_TEMPLATE.format( 474 schema_type="glean-min", version=self.version 475 ) 476 else: 477 self.schema_url = SCHEMA_URL_TEMPLATE.format( 478 branch=self.branch_name 479 ) + SCHEMA_VERSION_TEMPLATE.format( 480 schema_type="glean", version=self.version 481 ) 482 483 def generate_schema( 484 self, 485 config, 486 generic_schema=False, 487 blocked_distribution_pings=("events", "baseline"), 488 ) -> Dict[str, Schema]: 489 pings = self.get_pings_and_pipeline_metadata() 490 schemas = {} 491 492 for ping, pipeline_meta in pings.items(): 493 matchers = { 494 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 495 } 496 497 # Four newly introduced metric types were incorrectly deployed 498 # as repeated key/value structs in all Glean ping tables existing prior 499 # to November 2021. We maintain the incorrect fields for existing tables 500 # by disabling the associated matchers. 501 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 502 # defined that will allow metrics of these types to be injected into proper 503 # structs. The gcp-ingestion repository includes logic to rewrite these 504 # metrics under the "2" names. 505 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 506 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 507 if bq_identifier in self.bug_1737656_affected_tables: 508 matchers = { 509 loc: m 510 for loc, m in matchers.items() 511 if not m.matcher.get("bug_1737656_affected") 512 } 513 514 for matcher in matchers.values(): 515 matcher.matcher["send_in_pings"]["contains"] = ping 516 517 # temporarily block distributions from being added to events and baseline pings 518 # https://mozilla-hub.atlassian.net/browse/DENG-10606 519 if ( 520 blocked_distribution_pings 521 and ping in blocked_distribution_pings 522 and matcher.type.endswith("_distribution") 523 ): 524 matcher.matcher["send_in_pings"]["not_contains"] = ping 525 526 new_config = Config(ping, matchers=matchers) 527 528 defaults = {"mozPipelineMetadata": pipeline_meta} 529 530 # Adjust the schema path if the ping does not require info sections 531 self.set_schema_url(pipeline_meta) 532 if generic_schema: # Use the generic glean ping schema 533 schema = self.get_schema(generic_schema=True) 534 schema.schema.update(defaults) 535 schemas[new_config.name] = schema 536 else: 537 generated = super().generate_schema(new_config) 538 for schema in generated.values(): 539 # We want to override each individual key with assembled defaults, 540 # but keep values _inside_ them if they have been set in the schemas. 541 for key, value in defaults.items(): 542 if key not in schema.schema: 543 schema.schema[key] = {} 544 schema.schema[key].update(value) 545 schemas.update(generated) 546 547 return schemas 548 549 @staticmethod 550 def get_repos(): 551 """ 552 Retrieve metadata for all non-library Glean repositories 553 """ 554 repos = GleanPing._get_json(GleanPing.repos_url) 555 return [repo for repo in repos if "library_names" not in repo] 556 557 def get_app_name(self) -> str: 558 """Get app name associated with the app id. 559 560 e.g. org-mozilla-firefox -> fenix 561 """ 562 apps = GleanPing._get_json(GleanPing.app_listings_url) 563 # app id in app-listings has "." instead of "-" so using document_namespace 564 app_name = [ 565 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 566 ] 567 return app_name[0] if len(app_name) > 0 else self.app_id 568 569 @staticmethod 570 def get_metric_blocklist(): 571 with open(METRIC_BLOCKLIST, "r") as f: 572 return yaml.safe_load(f)
59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 )
79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema
Fetch schema via URL.
Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.
103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies
138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric
Remove the given pings from the metric's send_in_pings history.
Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.
162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 # A metric can be moved between an app and its dependencies or between dependencies while 203 # probe scraper keeps the history in each location, so both definitions are returned 204 # Merge the history per probe to take the latest definition while still being able to 205 # find metric type changes below 206 # Metrics are not merged if they are not sent in the same pings as they are disjoint 207 208 # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler) 209 # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure" 210 # normalize to "media_audio_init_failure". The transpiler picks one of the colliding 211 # descriptions non-deterministically. 212 def _normalize_name(name): 213 return name.replace(".", "_").replace("-", "_") 214 215 def _pings_in_history(defn): 216 return { 217 p 218 for h in defn[GleanProbe.history_key] 219 for p in h.get("send_in_pings", ["metrics"]) 220 } 221 222 def _latest_history_date(defn): 223 return max( 224 datetime.fromisoformat(h["dates"]["last"]) 225 for h in defn[GleanProbe.history_key] 226 ) 227 228 def _dedupe_sort_key(defn): 229 """Prefer the most recent definition, breaking ties by choosing the in-source metric.""" 230 return ( 231 _latest_history_date(defn), 232 defn.get(GleanProbe.in_source_key, False), 233 defn["name"], 234 ) 235 236 # Group probes that share a normalized name and whose pings intersect to combine 237 # moved metrics and metrics that only differ by "." vs "_" 238 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 239 for name, defn in probes: 240 defn_pings = _pings_in_history(defn) 241 existing_groups = grouped_by_name[_normalize_name(name)] 242 matches = [ 243 group 244 for group in existing_groups 245 if any( 246 _pings_in_history(other_defn) & defn_pings for other_defn in group 247 ) 248 ] 249 if not matches: 250 existing_groups.append([defn]) 251 else: 252 merged_group = [defn] 253 for g in matches: 254 merged_group.extend(g) 255 existing_groups.remove(g) 256 existing_groups.append(merged_group) 257 258 # Take latest definition per group 259 deduped_probes: List[Any] = [] 260 for groups in grouped_by_name.values(): 261 for group in groups: 262 latest_defn = max(group, key=_dedupe_sort_key) 263 if len(group) > 1: 264 latest_defn = latest_defn.copy() 265 latest_defn[GleanProbe.history_key] = sorted( 266 (h for d in group for h in d[GleanProbe.history_key]), 267 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 268 ) 269 deduped_probes.append((latest_defn["name"], latest_defn)) 270 probes = deduped_probes 271 272 pings = self.get_pings() 273 274 processed = [] 275 for _id, defn in probes: 276 probe = GleanProbe(_id, defn, pings=pings) 277 processed.append(probe) 278 279 # Handling probe type changes (Bug 1870317) 280 probe_types = {hist["type"] for hist in defn[probe.history_key]} 281 if len(probe_types) > 1: 282 # The probe type changed at some point in history. 283 # Create schema entry for each type. 284 hist_defn = defn.copy() 285 286 # No new entry needs to be created for the current probe type 287 probe_types.remove(defn["type"]) 288 289 for hist in hist_defn[probe.history_key]: 290 # Create a new entry for a historic type 291 if hist["type"] in probe_types: 292 hist_defn["type"] = hist["type"] 293 probe = GleanProbe(_id, hist_defn, pings=pings) 294 processed.append(probe) 295 296 # Keep track of the types entries were already created for 297 probe_types.remove(hist["type"]) 298 299 return processed
320 @staticmethod 321 def apply_default_metadata(ping_metadata, default_metadata): 322 """apply_default_metadata recurses down into dicts nested 323 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 324 ``ping_metadata``. 325 :param ping_metadata: dict onto which the merge is executed 326 :param default_metadata: dct merged into ping_metadata 327 :return: None 328 """ 329 for k, v in default_metadata.items(): 330 if ( 331 k in ping_metadata 332 and isinstance(ping_metadata[k], dict) 333 and isinstance(default_metadata[k], dict) 334 ): 335 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 336 else: 337 ping_metadata[k] = default_metadata[k]
apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The default_metadata is merged into
ping_metadata.
Parameters
- ping_metadata: dict onto which the merge is executed
- default_metadata: dct merged into ping_metadata
Returns
None
377 @staticmethod 378 def reorder_metadata(metadata): 379 desired_order_list = [ 380 "bq_dataset_family", 381 "bq_table", 382 "bq_metadata_format", 383 "include_info_sections", 384 "submission_timestamp_granularity", 385 "expiration_policy", 386 "override_attributes", 387 "jwe_mappings", 388 ] 389 reordered_metadata = { 390 k: metadata[k] for k in desired_order_list if k in metadata 391 } 392 393 # re-order jwe-mappings 394 desired_order_list = ["source_field_path", "decrypted_field_path"] 395 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 396 if jwe_mapping_metadata: 397 reordered_jwe_mapping_metadata = [] 398 for mapping in jwe_mapping_metadata: 399 reordered_jwe_mapping_metadata.append( 400 {k: mapping[k] for k in desired_order_list if k in mapping} 401 ) 402 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 403 404 # future proofing, in case there are other fields added at the ping top level 405 # add them to the end. 406 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 407 reordered_metadata = {**reordered_metadata, **leftovers} 408 return reordered_metadata
410 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 411 pings = self._get_ping_data_and_dependencies_with_default_metadata() 412 for ping_name, ping_data in pings.items(): 413 metadata = ping_data.get("moz_pipeline_metadata") 414 if not metadata: 415 continue 416 metadata["include_info_sections"] = self._is_field_included( 417 ping_data, "include_info_sections", consider_all_history=False 418 ) 419 metadata["include_client_id"] = self._is_field_included( 420 ping_data, "include_client_id" 421 ) 422 423 # While technically unnecessary, the dictionary elements are re-ordered to match the 424 # currently deployed order and used to verify no difference in output. 425 pings[ping_name] = GleanPing.reorder_metadata(metadata) 426 return pings
465 def set_schema_url(self, metadata): 466 """ 467 Switch between the glean-min and glean schemas if the ping does not require 468 info sections as specified in the parsed ping info in probe scraper. 469 """ 470 if not metadata["include_info_sections"]: 471 self.schema_url = SCHEMA_URL_TEMPLATE.format( 472 branch=self.branch_name 473 ) + SCHEMA_VERSION_TEMPLATE.format( 474 schema_type="glean-min", version=self.version 475 ) 476 else: 477 self.schema_url = SCHEMA_URL_TEMPLATE.format( 478 branch=self.branch_name 479 ) + SCHEMA_VERSION_TEMPLATE.format( 480 schema_type="glean", version=self.version 481 )
Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.
483 def generate_schema( 484 self, 485 config, 486 generic_schema=False, 487 blocked_distribution_pings=("events", "baseline"), 488 ) -> Dict[str, Schema]: 489 pings = self.get_pings_and_pipeline_metadata() 490 schemas = {} 491 492 for ping, pipeline_meta in pings.items(): 493 matchers = { 494 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 495 } 496 497 # Four newly introduced metric types were incorrectly deployed 498 # as repeated key/value structs in all Glean ping tables existing prior 499 # to November 2021. We maintain the incorrect fields for existing tables 500 # by disabling the associated matchers. 501 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 502 # defined that will allow metrics of these types to be injected into proper 503 # structs. The gcp-ingestion repository includes logic to rewrite these 504 # metrics under the "2" names. 505 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 506 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 507 if bq_identifier in self.bug_1737656_affected_tables: 508 matchers = { 509 loc: m 510 for loc, m in matchers.items() 511 if not m.matcher.get("bug_1737656_affected") 512 } 513 514 for matcher in matchers.values(): 515 matcher.matcher["send_in_pings"]["contains"] = ping 516 517 # temporarily block distributions from being added to events and baseline pings 518 # https://mozilla-hub.atlassian.net/browse/DENG-10606 519 if ( 520 blocked_distribution_pings 521 and ping in blocked_distribution_pings 522 and matcher.type.endswith("_distribution") 523 ): 524 matcher.matcher["send_in_pings"]["not_contains"] = ping 525 526 new_config = Config(ping, matchers=matchers) 527 528 defaults = {"mozPipelineMetadata": pipeline_meta} 529 530 # Adjust the schema path if the ping does not require info sections 531 self.set_schema_url(pipeline_meta) 532 if generic_schema: # Use the generic glean ping schema 533 schema = self.get_schema(generic_schema=True) 534 schema.schema.update(defaults) 535 schemas[new_config.name] = schema 536 else: 537 generated = super().generate_schema(new_config) 538 for schema in generated.values(): 539 # We want to override each individual key with assembled defaults, 540 # but keep values _inside_ them if they have been set in the schemas. 541 for key, value in defaults.items(): 542 if key not in schema.schema: 543 schema.schema[key] = {} 544 schema.schema[key].update(value) 545 schemas.update(generated) 546 547 return schemas
549 @staticmethod 550 def get_repos(): 551 """ 552 Retrieve metadata for all non-library Glean repositories 553 """ 554 repos = GleanPing._get_json(GleanPing.repos_url) 555 return [repo for repo in repos if "library_names" not in repo]
Retrieve metadata for all non-library Glean repositories
557 def get_app_name(self) -> str: 558 """Get app name associated with the app id. 559 560 e.g. org-mozilla-firefox -> fenix 561 """ 562 apps = GleanPing._get_json(GleanPing.app_listings_url) 563 # app id in app-listings has "." instead of "-" so using document_namespace 564 app_name = [ 565 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 566 ] 567 return app_name[0] if len(app_name) > 0 else self.app_id
Get app name associated with the app id.
e.g. org-mozilla-firefox -> fenix