mozilla_schema_generator.glean_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import copy 8import logging 9from collections import defaultdict 10from datetime import datetime 11from functools import cache 12from pathlib import Path 13from typing import Any, Dict, List, Set 14 15import yaml 16from requests import HTTPError 17 18from .config import Config 19from .generic_ping import GenericPing 20from .probes import GleanProbe 21from .schema import Schema 22 23ROOT_DIR = Path(__file__).parent 24BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt" 25METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml" 26 27logger = logging.getLogger(__name__) 28 29SCHEMA_URL_TEMPLATE = ( 30 "https://raw.githubusercontent.com" 31 "/mozilla-services/mozilla-pipeline-schemas" 32 "/{branch}/schemas/glean/glean/" 33) 34 35SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json" 36 37DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format( 38 schema_type="glean", version=1 39) 40 41 42class GleanPing(GenericPing): 43 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 44 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 45 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 46 dependencies_url_template = ( 47 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 48 ) 49 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 50 51 default_dependencies = ["glean-core"] 52 53 with open(BUG_1737656_TXT, "r") as f: 54 bug_1737656_affected_tables = [ 55 line.strip() for line in f.readlines() if line.strip() 56 ] 57 58 def __init__( 59 self, repo, version=1, use_metrics_blocklist=False, **kwargs 60 ): # TODO: Make env-url optional 61 self.repo = repo 62 self.repo_name = repo["name"] 63 self.app_id = repo["app_id"] 64 self.version = version 65 66 if use_metrics_blocklist: 67 self.metric_blocklist = self.get_metric_blocklist() 68 else: 69 self.metric_blocklist = {} 70 71 super().__init__( 72 DEFAULT_SCHEMA_URL, 73 DEFAULT_SCHEMA_URL, 74 self.probes_url_template.format(self.repo_name), 75 **kwargs, 76 ) 77 78 def get_schema(self, generic_schema=False) -> Schema: 79 """ 80 Fetch schema via URL. 81 82 Unless *generic_schema* is set to true, this function makes some modifications 83 to allow some workarounds for proper injection of metrics. 84 """ 85 schema = super().get_schema() 86 if generic_schema: 87 return schema 88 89 # We need to inject placeholders for the url2, text2, etc. types as part 90 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 91 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 92 metric1 = schema.get( 93 ("properties", "metrics", "properties", metric_name) 94 ).copy() 95 metric1 = schema.set_schema_elem( 96 ("properties", "metrics", "properties", metric_name + "2"), 97 metric1, 98 ) 99 100 return schema 101 102 @cache 103 def get_dependencies(self): 104 # Get all of the library dependencies for the application that 105 # are also known about in the repositories file. 106 107 # The dependencies are specified using library names, but we need to 108 # map those back to the name of the repository in the repository file. 109 try: 110 dependencies = self._get_json( 111 self.dependencies_url_template.format(self.repo_name) 112 ) 113 except HTTPError: 114 logging.info(f"For {self.repo_name}, using default Glean dependencies") 115 return self.default_dependencies 116 117 dependency_library_names = list(dependencies.keys()) 118 119 repos = GleanPing._get_json(GleanPing.repos_url) 120 repos_by_dependency_name = {} 121 for repo in repos: 122 for library_name in repo.get("library_names", []): 123 repos_by_dependency_name[library_name] = repo["name"] 124 125 dependencies = [] 126 for name in dependency_library_names: 127 if name in repos_by_dependency_name: 128 dependencies.append(repos_by_dependency_name[name]) 129 130 if len(dependencies) == 0: 131 logging.info(f"For {self.repo_name}, using default Glean dependencies") 132 return self.default_dependencies 133 134 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 135 return dependencies 136 137 @staticmethod 138 def remove_pings_from_metric( 139 metric: Dict[str, Any], blocked_pings: List[str] 140 ) -> Dict[str, Any]: 141 """Remove the given pings from the metric's `send_in_pings` history. 142 143 Only removes if the given metric has been removed from the source since a fixed date 144 (2025-01-01). This allows metrics to be added back to the schema. 145 """ 146 if ( 147 metric["in-source"] 148 or len(blocked_pings) == 0 149 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 150 >= datetime(year=2025, month=1, day=1) 151 ): 152 return metric 153 154 for history_entry in metric["history"]: 155 history_entry["send_in_pings"] = [ 156 p for p in history_entry["send_in_pings"] if p not in blocked_pings 157 ] 158 159 return metric 160 161 def get_probes(self) -> List[GleanProbe]: 162 data = self._get_json(self.probes_url) 163 164 # blocklist needs to be applied here instead of generate_schema because it needs to be 165 # dependency-aware; metrics can move between app and library and still be in the schema 166 # turn blocklist into metric_name -> ping_types map 167 blocklist = defaultdict(list) 168 for ping_type, metric_names in self.metric_blocklist.get( 169 self.get_app_name(), {} 170 ).items(): 171 for metric_name in metric_names: 172 blocklist[metric_name].append(ping_type) 173 174 probes = [ 175 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 176 for name, defn in data.items() 177 ] 178 179 for dependency in self.get_dependencies(): 180 dependency_probes = self._get_json( 181 self.probes_url_template.format(dependency) 182 ) 183 184 dependency_blocklist = defaultdict(list) 185 for ping_type, metric_names in self.metric_blocklist.get( 186 dependency, {} 187 ).items(): 188 for metric_name in metric_names: 189 dependency_blocklist[metric_name].append(ping_type) 190 191 probes += [ 192 ( 193 name, 194 self.remove_pings_from_metric( 195 defn, dependency_blocklist.get(name, []) 196 ), 197 ) 198 for name, defn in dependency_probes.items() 199 ] 200 201 pings = self.get_pings() 202 203 processed = [] 204 for _id, defn in probes: 205 probe = GleanProbe(_id, defn, pings=pings) 206 processed.append(probe) 207 208 # Handling probe type changes (Bug 1870317) 209 probe_types = {hist["type"] for hist in defn[probe.history_key]} 210 if len(probe_types) > 1: 211 # The probe type changed at some point in history. 212 # Create schema entry for each type. 213 hist_defn = defn.copy() 214 215 # No new entry needs to be created for the current probe type 216 probe_types.remove(defn["type"]) 217 218 for hist in hist_defn[probe.history_key]: 219 # Create a new entry for a historic type 220 if hist["type"] in probe_types: 221 hist_defn["type"] = hist["type"] 222 probe = GleanProbe(_id, hist_defn, pings=pings) 223 processed.append(probe) 224 225 # Keep track of the types entries were already created for 226 probe_types.remove(hist["type"]) 227 228 return processed 229 230 def _get_ping_data(self) -> Dict[str, Dict]: 231 url = self.ping_url_template.format(self.repo_name) 232 ping_data = GleanPing._get_json(url) 233 for dependency in self.get_dependencies(): 234 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 235 ping_data.update(dependency_pings) 236 return ping_data 237 238 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 239 url = self.ping_url_template.format(self.repo_name) 240 ping_data = GleanPing._get_json(url) 241 return ping_data 242 243 def _get_dependency_pings(self, dependency): 244 return self._get_json(self.ping_url_template.format(dependency)) 245 246 def get_pings(self) -> Set[str]: 247 return self._get_ping_data().keys() 248 249 @staticmethod 250 def apply_default_metadata(ping_metadata, default_metadata): 251 """apply_default_metadata recurses down into dicts nested 252 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 253 ``ping_metadata``. 254 :param ping_metadata: dict onto which the merge is executed 255 :param default_metadata: dct merged into ping_metadata 256 :return: None 257 """ 258 for k, v in default_metadata.items(): 259 if ( 260 k in ping_metadata 261 and isinstance(ping_metadata[k], dict) 262 and isinstance(default_metadata[k], dict) 263 ): 264 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 265 else: 266 ping_metadata[k] = default_metadata[k] 267 268 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 269 # Get the ping data with the pipeline metadata 270 ping_data = self._get_ping_data_without_dependencies() 271 272 # The ping endpoint for the dependency pings does not include any repo defined 273 # moz_pipeline_metadata_defaults so they need to be applied here. 274 275 # 1. Get repo and pipeline default metadata. 276 repos = self.get_repos() 277 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 278 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 279 280 # 2. Apply the default metadata to each dependency defined ping. 281 282 # Apply app-level metadata to pings defined in dependencies 283 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 284 285 for dependency in self.get_dependencies(): 286 dependency_pings = self._get_dependency_pings(dependency) 287 for dependency_ping in dependency_pings.values(): 288 # Although it is counter intuitive to apply the default metadata on top of the 289 # existing dependency ping metadata it does set the repo specific value for 290 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 291 # value. 292 GleanPing.apply_default_metadata( 293 dependency_ping.get("moz_pipeline_metadata"), 294 copy.deepcopy(default_metadata), 295 ) 296 # app-level ping properties take priority over the app defaults 297 metadata_override = app_metadata.get(dependency_ping["name"]) 298 if metadata_override is not None: 299 GleanPing.apply_default_metadata( 300 dependency_ping.get("moz_pipeline_metadata"), metadata_override 301 ) 302 ping_data.update(dependency_pings) 303 304 return ping_data 305 306 @staticmethod 307 def reorder_metadata(metadata): 308 desired_order_list = [ 309 "bq_dataset_family", 310 "bq_table", 311 "bq_metadata_format", 312 "include_info_sections", 313 "submission_timestamp_granularity", 314 "expiration_policy", 315 "override_attributes", 316 "jwe_mappings", 317 ] 318 reordered_metadata = { 319 k: metadata[k] for k in desired_order_list if k in metadata 320 } 321 322 # re-order jwe-mappings 323 desired_order_list = ["source_field_path", "decrypted_field_path"] 324 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 325 if jwe_mapping_metadata: 326 reordered_jwe_mapping_metadata = [] 327 for mapping in jwe_mapping_metadata: 328 reordered_jwe_mapping_metadata.append( 329 {k: mapping[k] for k in desired_order_list if k in mapping} 330 ) 331 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 332 333 # future proofing, in case there are other fields added at the ping top level 334 # add them to the end. 335 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 336 reordered_metadata = {**reordered_metadata, **leftovers} 337 return reordered_metadata 338 339 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 340 pings = self._get_ping_data_and_dependencies_with_default_metadata() 341 for ping_name, ping_data in pings.items(): 342 metadata = ping_data.get("moz_pipeline_metadata") 343 if not metadata: 344 continue 345 metadata["include_info_sections"] = self._is_field_included( 346 ping_data, "include_info_sections", consider_all_history=False 347 ) 348 metadata["include_client_id"] = self._is_field_included( 349 ping_data, "include_client_id" 350 ) 351 352 # While technically unnecessary, the dictionary elements are re-ordered to match the 353 # currently deployed order and used to verify no difference in output. 354 pings[ping_name] = GleanPing.reorder_metadata(metadata) 355 return pings 356 357 def get_ping_descriptions(self) -> Dict[str, str]: 358 return { 359 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 360 } 361 362 @staticmethod 363 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 364 """Return false if the field exists and is false. 365 366 If `consider_all_history` is False, then only check the latest value in the ping history. 367 368 Otherwise, if the field is not found or true in one or more history entries, 369 true is returned. 370 """ 371 372 # Default to true if not specified. 373 if "history" not in ping_data or len(ping_data["history"]) == 0: 374 return True 375 376 # Check if at some point in the past the field has already been deployed. 377 # And if the caller of this method wants to consider this history of the field. 378 # Keep them in the schema, even if the field has changed as 379 # removing fields is currently not supported. 380 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 381 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 382 ping_history: list 383 if consider_all_history: 384 ping_history = ping_data["history"] 385 else: 386 ping_history = [ping_data["history"][-1]] 387 for history in ping_history: 388 if field_name not in history or history[field_name]: 389 return True 390 391 # The ping was created with include_info_sections = False. The fields can be excluded. 392 return False 393 394 def set_schema_url(self, metadata): 395 """ 396 Switch between the glean-min and glean schemas if the ping does not require 397 info sections as specified in the parsed ping info in probe scraper. 398 """ 399 if not metadata["include_info_sections"]: 400 self.schema_url = SCHEMA_URL_TEMPLATE.format( 401 branch=self.branch_name 402 ) + SCHEMA_VERSION_TEMPLATE.format( 403 schema_type="glean-min", version=self.version 404 ) 405 else: 406 self.schema_url = SCHEMA_URL_TEMPLATE.format( 407 branch=self.branch_name 408 ) + SCHEMA_VERSION_TEMPLATE.format( 409 schema_type="glean", version=self.version 410 ) 411 412 def generate_schema( 413 self, 414 config, 415 generic_schema=False, 416 blocked_distribution_pings=("events", "baseline"), 417 ) -> Dict[str, Schema]: 418 pings = self.get_pings_and_pipeline_metadata() 419 schemas = {} 420 421 for ping, pipeline_meta in pings.items(): 422 matchers = { 423 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 424 } 425 426 # Four newly introduced metric types were incorrectly deployed 427 # as repeated key/value structs in all Glean ping tables existing prior 428 # to November 2021. We maintain the incorrect fields for existing tables 429 # by disabling the associated matchers. 430 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 431 # defined that will allow metrics of these types to be injected into proper 432 # structs. The gcp-ingestion repository includes logic to rewrite these 433 # metrics under the "2" names. 434 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 435 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 436 if bq_identifier in self.bug_1737656_affected_tables: 437 matchers = { 438 loc: m 439 for loc, m in matchers.items() 440 if not m.matcher.get("bug_1737656_affected") 441 } 442 443 for matcher in matchers.values(): 444 matcher.matcher["send_in_pings"]["contains"] = ping 445 446 # temporarily block distributions from being added to events and baseline pings 447 # https://mozilla-hub.atlassian.net/browse/DENG-10606 448 if ( 449 blocked_distribution_pings 450 and ping in blocked_distribution_pings 451 and matcher.type.endswith("_distribution") 452 ): 453 matcher.matcher["send_in_pings"]["not_contains"] = ping 454 455 new_config = Config(ping, matchers=matchers) 456 457 defaults = {"mozPipelineMetadata": pipeline_meta} 458 459 # Adjust the schema path if the ping does not require info sections 460 self.set_schema_url(pipeline_meta) 461 if generic_schema: # Use the generic glean ping schema 462 schema = self.get_schema(generic_schema=True) 463 schema.schema.update(defaults) 464 schemas[new_config.name] = schema 465 else: 466 generated = super().generate_schema(new_config) 467 for schema in generated.values(): 468 # We want to override each individual key with assembled defaults, 469 # but keep values _inside_ them if they have been set in the schemas. 470 for key, value in defaults.items(): 471 if key not in schema.schema: 472 schema.schema[key] = {} 473 schema.schema[key].update(value) 474 schemas.update(generated) 475 476 return schemas 477 478 @staticmethod 479 def get_repos(): 480 """ 481 Retrieve metadata for all non-library Glean repositories 482 """ 483 repos = GleanPing._get_json(GleanPing.repos_url) 484 return [repo for repo in repos if "library_names" not in repo] 485 486 def get_app_name(self) -> str: 487 """Get app name associated with the app id. 488 489 e.g. org-mozilla-firefox -> fenix 490 """ 491 apps = GleanPing._get_json(GleanPing.app_listings_url) 492 # app id in app-listings has "." instead of "-" so using document_namespace 493 app_name = [ 494 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 495 ] 496 return app_name[0] if len(app_name) > 0 else self.app_id 497 498 @staticmethod 499 def get_metric_blocklist(): 500 with open(METRIC_BLOCKLIST, "r") as f: 501 return yaml.safe_load(f)
43class GleanPing(GenericPing): 44 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 45 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 46 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 47 dependencies_url_template = ( 48 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 49 ) 50 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 51 52 default_dependencies = ["glean-core"] 53 54 with open(BUG_1737656_TXT, "r") as f: 55 bug_1737656_affected_tables = [ 56 line.strip() for line in f.readlines() if line.strip() 57 ] 58 59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 ) 78 79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema 102 103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies 137 138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric 161 162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 pings = self.get_pings() 203 204 processed = [] 205 for _id, defn in probes: 206 probe = GleanProbe(_id, defn, pings=pings) 207 processed.append(probe) 208 209 # Handling probe type changes (Bug 1870317) 210 probe_types = {hist["type"] for hist in defn[probe.history_key]} 211 if len(probe_types) > 1: 212 # The probe type changed at some point in history. 213 # Create schema entry for each type. 214 hist_defn = defn.copy() 215 216 # No new entry needs to be created for the current probe type 217 probe_types.remove(defn["type"]) 218 219 for hist in hist_defn[probe.history_key]: 220 # Create a new entry for a historic type 221 if hist["type"] in probe_types: 222 hist_defn["type"] = hist["type"] 223 probe = GleanProbe(_id, hist_defn, pings=pings) 224 processed.append(probe) 225 226 # Keep track of the types entries were already created for 227 probe_types.remove(hist["type"]) 228 229 return processed 230 231 def _get_ping_data(self) -> Dict[str, Dict]: 232 url = self.ping_url_template.format(self.repo_name) 233 ping_data = GleanPing._get_json(url) 234 for dependency in self.get_dependencies(): 235 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 236 ping_data.update(dependency_pings) 237 return ping_data 238 239 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 240 url = self.ping_url_template.format(self.repo_name) 241 ping_data = GleanPing._get_json(url) 242 return ping_data 243 244 def _get_dependency_pings(self, dependency): 245 return self._get_json(self.ping_url_template.format(dependency)) 246 247 def get_pings(self) -> Set[str]: 248 return self._get_ping_data().keys() 249 250 @staticmethod 251 def apply_default_metadata(ping_metadata, default_metadata): 252 """apply_default_metadata recurses down into dicts nested 253 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 254 ``ping_metadata``. 255 :param ping_metadata: dict onto which the merge is executed 256 :param default_metadata: dct merged into ping_metadata 257 :return: None 258 """ 259 for k, v in default_metadata.items(): 260 if ( 261 k in ping_metadata 262 and isinstance(ping_metadata[k], dict) 263 and isinstance(default_metadata[k], dict) 264 ): 265 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 266 else: 267 ping_metadata[k] = default_metadata[k] 268 269 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 270 # Get the ping data with the pipeline metadata 271 ping_data = self._get_ping_data_without_dependencies() 272 273 # The ping endpoint for the dependency pings does not include any repo defined 274 # moz_pipeline_metadata_defaults so they need to be applied here. 275 276 # 1. Get repo and pipeline default metadata. 277 repos = self.get_repos() 278 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 279 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 280 281 # 2. Apply the default metadata to each dependency defined ping. 282 283 # Apply app-level metadata to pings defined in dependencies 284 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 285 286 for dependency in self.get_dependencies(): 287 dependency_pings = self._get_dependency_pings(dependency) 288 for dependency_ping in dependency_pings.values(): 289 # Although it is counter intuitive to apply the default metadata on top of the 290 # existing dependency ping metadata it does set the repo specific value for 291 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 292 # value. 293 GleanPing.apply_default_metadata( 294 dependency_ping.get("moz_pipeline_metadata"), 295 copy.deepcopy(default_metadata), 296 ) 297 # app-level ping properties take priority over the app defaults 298 metadata_override = app_metadata.get(dependency_ping["name"]) 299 if metadata_override is not None: 300 GleanPing.apply_default_metadata( 301 dependency_ping.get("moz_pipeline_metadata"), metadata_override 302 ) 303 ping_data.update(dependency_pings) 304 305 return ping_data 306 307 @staticmethod 308 def reorder_metadata(metadata): 309 desired_order_list = [ 310 "bq_dataset_family", 311 "bq_table", 312 "bq_metadata_format", 313 "include_info_sections", 314 "submission_timestamp_granularity", 315 "expiration_policy", 316 "override_attributes", 317 "jwe_mappings", 318 ] 319 reordered_metadata = { 320 k: metadata[k] for k in desired_order_list if k in metadata 321 } 322 323 # re-order jwe-mappings 324 desired_order_list = ["source_field_path", "decrypted_field_path"] 325 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 326 if jwe_mapping_metadata: 327 reordered_jwe_mapping_metadata = [] 328 for mapping in jwe_mapping_metadata: 329 reordered_jwe_mapping_metadata.append( 330 {k: mapping[k] for k in desired_order_list if k in mapping} 331 ) 332 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 333 334 # future proofing, in case there are other fields added at the ping top level 335 # add them to the end. 336 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 337 reordered_metadata = {**reordered_metadata, **leftovers} 338 return reordered_metadata 339 340 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 341 pings = self._get_ping_data_and_dependencies_with_default_metadata() 342 for ping_name, ping_data in pings.items(): 343 metadata = ping_data.get("moz_pipeline_metadata") 344 if not metadata: 345 continue 346 metadata["include_info_sections"] = self._is_field_included( 347 ping_data, "include_info_sections", consider_all_history=False 348 ) 349 metadata["include_client_id"] = self._is_field_included( 350 ping_data, "include_client_id" 351 ) 352 353 # While technically unnecessary, the dictionary elements are re-ordered to match the 354 # currently deployed order and used to verify no difference in output. 355 pings[ping_name] = GleanPing.reorder_metadata(metadata) 356 return pings 357 358 def get_ping_descriptions(self) -> Dict[str, str]: 359 return { 360 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 361 } 362 363 @staticmethod 364 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 365 """Return false if the field exists and is false. 366 367 If `consider_all_history` is False, then only check the latest value in the ping history. 368 369 Otherwise, if the field is not found or true in one or more history entries, 370 true is returned. 371 """ 372 373 # Default to true if not specified. 374 if "history" not in ping_data or len(ping_data["history"]) == 0: 375 return True 376 377 # Check if at some point in the past the field has already been deployed. 378 # And if the caller of this method wants to consider this history of the field. 379 # Keep them in the schema, even if the field has changed as 380 # removing fields is currently not supported. 381 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 382 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 383 ping_history: list 384 if consider_all_history: 385 ping_history = ping_data["history"] 386 else: 387 ping_history = [ping_data["history"][-1]] 388 for history in ping_history: 389 if field_name not in history or history[field_name]: 390 return True 391 392 # The ping was created with include_info_sections = False. The fields can be excluded. 393 return False 394 395 def set_schema_url(self, metadata): 396 """ 397 Switch between the glean-min and glean schemas if the ping does not require 398 info sections as specified in the parsed ping info in probe scraper. 399 """ 400 if not metadata["include_info_sections"]: 401 self.schema_url = SCHEMA_URL_TEMPLATE.format( 402 branch=self.branch_name 403 ) + SCHEMA_VERSION_TEMPLATE.format( 404 schema_type="glean-min", version=self.version 405 ) 406 else: 407 self.schema_url = SCHEMA_URL_TEMPLATE.format( 408 branch=self.branch_name 409 ) + SCHEMA_VERSION_TEMPLATE.format( 410 schema_type="glean", version=self.version 411 ) 412 413 def generate_schema( 414 self, 415 config, 416 generic_schema=False, 417 blocked_distribution_pings=("events", "baseline"), 418 ) -> Dict[str, Schema]: 419 pings = self.get_pings_and_pipeline_metadata() 420 schemas = {} 421 422 for ping, pipeline_meta in pings.items(): 423 matchers = { 424 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 425 } 426 427 # Four newly introduced metric types were incorrectly deployed 428 # as repeated key/value structs in all Glean ping tables existing prior 429 # to November 2021. We maintain the incorrect fields for existing tables 430 # by disabling the associated matchers. 431 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 432 # defined that will allow metrics of these types to be injected into proper 433 # structs. The gcp-ingestion repository includes logic to rewrite these 434 # metrics under the "2" names. 435 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 436 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 437 if bq_identifier in self.bug_1737656_affected_tables: 438 matchers = { 439 loc: m 440 for loc, m in matchers.items() 441 if not m.matcher.get("bug_1737656_affected") 442 } 443 444 for matcher in matchers.values(): 445 matcher.matcher["send_in_pings"]["contains"] = ping 446 447 # temporarily block distributions from being added to events and baseline pings 448 # https://mozilla-hub.atlassian.net/browse/DENG-10606 449 if ( 450 blocked_distribution_pings 451 and ping in blocked_distribution_pings 452 and matcher.type.endswith("_distribution") 453 ): 454 matcher.matcher["send_in_pings"]["not_contains"] = ping 455 456 new_config = Config(ping, matchers=matchers) 457 458 defaults = {"mozPipelineMetadata": pipeline_meta} 459 460 # Adjust the schema path if the ping does not require info sections 461 self.set_schema_url(pipeline_meta) 462 if generic_schema: # Use the generic glean ping schema 463 schema = self.get_schema(generic_schema=True) 464 schema.schema.update(defaults) 465 schemas[new_config.name] = schema 466 else: 467 generated = super().generate_schema(new_config) 468 for schema in generated.values(): 469 # We want to override each individual key with assembled defaults, 470 # but keep values _inside_ them if they have been set in the schemas. 471 for key, value in defaults.items(): 472 if key not in schema.schema: 473 schema.schema[key] = {} 474 schema.schema[key].update(value) 475 schemas.update(generated) 476 477 return schemas 478 479 @staticmethod 480 def get_repos(): 481 """ 482 Retrieve metadata for all non-library Glean repositories 483 """ 484 repos = GleanPing._get_json(GleanPing.repos_url) 485 return [repo for repo in repos if "library_names" not in repo] 486 487 def get_app_name(self) -> str: 488 """Get app name associated with the app id. 489 490 e.g. org-mozilla-firefox -> fenix 491 """ 492 apps = GleanPing._get_json(GleanPing.app_listings_url) 493 # app id in app-listings has "." instead of "-" so using document_namespace 494 app_name = [ 495 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 496 ] 497 return app_name[0] if len(app_name) > 0 else self.app_id 498 499 @staticmethod 500 def get_metric_blocklist(): 501 with open(METRIC_BLOCKLIST, "r") as f: 502 return yaml.safe_load(f)
59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 )
79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema
Fetch schema via URL.
Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.
103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies
138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric
Remove the given pings from the metric's send_in_pings history.
Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.
162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 pings = self.get_pings() 203 204 processed = [] 205 for _id, defn in probes: 206 probe = GleanProbe(_id, defn, pings=pings) 207 processed.append(probe) 208 209 # Handling probe type changes (Bug 1870317) 210 probe_types = {hist["type"] for hist in defn[probe.history_key]} 211 if len(probe_types) > 1: 212 # The probe type changed at some point in history. 213 # Create schema entry for each type. 214 hist_defn = defn.copy() 215 216 # No new entry needs to be created for the current probe type 217 probe_types.remove(defn["type"]) 218 219 for hist in hist_defn[probe.history_key]: 220 # Create a new entry for a historic type 221 if hist["type"] in probe_types: 222 hist_defn["type"] = hist["type"] 223 probe = GleanProbe(_id, hist_defn, pings=pings) 224 processed.append(probe) 225 226 # Keep track of the types entries were already created for 227 probe_types.remove(hist["type"]) 228 229 return processed
250 @staticmethod 251 def apply_default_metadata(ping_metadata, default_metadata): 252 """apply_default_metadata recurses down into dicts nested 253 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 254 ``ping_metadata``. 255 :param ping_metadata: dict onto which the merge is executed 256 :param default_metadata: dct merged into ping_metadata 257 :return: None 258 """ 259 for k, v in default_metadata.items(): 260 if ( 261 k in ping_metadata 262 and isinstance(ping_metadata[k], dict) 263 and isinstance(default_metadata[k], dict) 264 ): 265 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 266 else: 267 ping_metadata[k] = default_metadata[k]
apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The default_metadata is merged into
ping_metadata.
Parameters
- ping_metadata: dict onto which the merge is executed
- default_metadata: dct merged into ping_metadata
Returns
None
307 @staticmethod 308 def reorder_metadata(metadata): 309 desired_order_list = [ 310 "bq_dataset_family", 311 "bq_table", 312 "bq_metadata_format", 313 "include_info_sections", 314 "submission_timestamp_granularity", 315 "expiration_policy", 316 "override_attributes", 317 "jwe_mappings", 318 ] 319 reordered_metadata = { 320 k: metadata[k] for k in desired_order_list if k in metadata 321 } 322 323 # re-order jwe-mappings 324 desired_order_list = ["source_field_path", "decrypted_field_path"] 325 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 326 if jwe_mapping_metadata: 327 reordered_jwe_mapping_metadata = [] 328 for mapping in jwe_mapping_metadata: 329 reordered_jwe_mapping_metadata.append( 330 {k: mapping[k] for k in desired_order_list if k in mapping} 331 ) 332 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 333 334 # future proofing, in case there are other fields added at the ping top level 335 # add them to the end. 336 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 337 reordered_metadata = {**reordered_metadata, **leftovers} 338 return reordered_metadata
340 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 341 pings = self._get_ping_data_and_dependencies_with_default_metadata() 342 for ping_name, ping_data in pings.items(): 343 metadata = ping_data.get("moz_pipeline_metadata") 344 if not metadata: 345 continue 346 metadata["include_info_sections"] = self._is_field_included( 347 ping_data, "include_info_sections", consider_all_history=False 348 ) 349 metadata["include_client_id"] = self._is_field_included( 350 ping_data, "include_client_id" 351 ) 352 353 # While technically unnecessary, the dictionary elements are re-ordered to match the 354 # currently deployed order and used to verify no difference in output. 355 pings[ping_name] = GleanPing.reorder_metadata(metadata) 356 return pings
395 def set_schema_url(self, metadata): 396 """ 397 Switch between the glean-min and glean schemas if the ping does not require 398 info sections as specified in the parsed ping info in probe scraper. 399 """ 400 if not metadata["include_info_sections"]: 401 self.schema_url = SCHEMA_URL_TEMPLATE.format( 402 branch=self.branch_name 403 ) + SCHEMA_VERSION_TEMPLATE.format( 404 schema_type="glean-min", version=self.version 405 ) 406 else: 407 self.schema_url = SCHEMA_URL_TEMPLATE.format( 408 branch=self.branch_name 409 ) + SCHEMA_VERSION_TEMPLATE.format( 410 schema_type="glean", version=self.version 411 )
Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.
413 def generate_schema( 414 self, 415 config, 416 generic_schema=False, 417 blocked_distribution_pings=("events", "baseline"), 418 ) -> Dict[str, Schema]: 419 pings = self.get_pings_and_pipeline_metadata() 420 schemas = {} 421 422 for ping, pipeline_meta in pings.items(): 423 matchers = { 424 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 425 } 426 427 # Four newly introduced metric types were incorrectly deployed 428 # as repeated key/value structs in all Glean ping tables existing prior 429 # to November 2021. We maintain the incorrect fields for existing tables 430 # by disabling the associated matchers. 431 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 432 # defined that will allow metrics of these types to be injected into proper 433 # structs. The gcp-ingestion repository includes logic to rewrite these 434 # metrics under the "2" names. 435 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 436 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 437 if bq_identifier in self.bug_1737656_affected_tables: 438 matchers = { 439 loc: m 440 for loc, m in matchers.items() 441 if not m.matcher.get("bug_1737656_affected") 442 } 443 444 for matcher in matchers.values(): 445 matcher.matcher["send_in_pings"]["contains"] = ping 446 447 # temporarily block distributions from being added to events and baseline pings 448 # https://mozilla-hub.atlassian.net/browse/DENG-10606 449 if ( 450 blocked_distribution_pings 451 and ping in blocked_distribution_pings 452 and matcher.type.endswith("_distribution") 453 ): 454 matcher.matcher["send_in_pings"]["not_contains"] = ping 455 456 new_config = Config(ping, matchers=matchers) 457 458 defaults = {"mozPipelineMetadata": pipeline_meta} 459 460 # Adjust the schema path if the ping does not require info sections 461 self.set_schema_url(pipeline_meta) 462 if generic_schema: # Use the generic glean ping schema 463 schema = self.get_schema(generic_schema=True) 464 schema.schema.update(defaults) 465 schemas[new_config.name] = schema 466 else: 467 generated = super().generate_schema(new_config) 468 for schema in generated.values(): 469 # We want to override each individual key with assembled defaults, 470 # but keep values _inside_ them if they have been set in the schemas. 471 for key, value in defaults.items(): 472 if key not in schema.schema: 473 schema.schema[key] = {} 474 schema.schema[key].update(value) 475 schemas.update(generated) 476 477 return schemas
479 @staticmethod 480 def get_repos(): 481 """ 482 Retrieve metadata for all non-library Glean repositories 483 """ 484 repos = GleanPing._get_json(GleanPing.repos_url) 485 return [repo for repo in repos if "library_names" not in repo]
Retrieve metadata for all non-library Glean repositories
487 def get_app_name(self) -> str: 488 """Get app name associated with the app id. 489 490 e.g. org-mozilla-firefox -> fenix 491 """ 492 apps = GleanPing._get_json(GleanPing.app_listings_url) 493 # app id in app-listings has "." instead of "-" so using document_namespace 494 app_name = [ 495 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 496 ] 497 return app_name[0] if len(app_name) > 0 else self.app_id
Get app name associated with the app id.
e.g. org-mozilla-firefox -> fenix