mozilla_schema_generator.glean_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import copy 8import logging 9from collections import defaultdict 10from datetime import datetime 11from functools import cache 12from pathlib import Path 13from typing import Any, Dict, List, Set 14 15import yaml 16from requests import HTTPError 17 18from .config import Config 19from .generic_ping import GenericPing 20from .probes import GleanProbe 21from .schema import Schema 22 23ROOT_DIR = Path(__file__).parent 24BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt" 25METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml" 26 27logger = logging.getLogger(__name__) 28 29SCHEMA_URL_TEMPLATE = ( 30 "https://raw.githubusercontent.com" 31 "/mozilla-services/mozilla-pipeline-schemas" 32 "/{branch}/schemas/glean/glean/" 33) 34 35SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json" 36 37DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format( 38 schema_type="glean", version=1 39) 40 41 42class GleanPing(GenericPing): 43 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 44 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 45 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 46 dependencies_url_template = ( 47 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 48 ) 49 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 50 51 default_dependencies = ["glean-core"] 52 53 with open(BUG_1737656_TXT, "r") as f: 54 bug_1737656_affected_tables = [ 55 line.strip() for line in f.readlines() if line.strip() 56 ] 57 58 def __init__( 59 self, repo, version=1, use_metrics_blocklist=False, **kwargs 60 ): # TODO: Make env-url optional 61 self.repo = repo 62 self.repo_name = repo["name"] 63 self.app_id = repo["app_id"] 64 self.version = version 65 66 if use_metrics_blocklist: 67 self.metric_blocklist = self.get_metric_blocklist() 68 else: 69 self.metric_blocklist = {} 70 71 super().__init__( 72 DEFAULT_SCHEMA_URL, 73 DEFAULT_SCHEMA_URL, 74 self.probes_url_template.format(self.repo_name), 75 **kwargs, 76 ) 77 78 def get_schema(self, generic_schema=False) -> Schema: 79 """ 80 Fetch schema via URL. 81 82 Unless *generic_schema* is set to true, this function makes some modifications 83 to allow some workarounds for proper injection of metrics. 84 """ 85 schema = super().get_schema() 86 if generic_schema: 87 return schema 88 89 # We need to inject placeholders for the url2, text2, etc. types as part 90 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 91 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 92 metric1 = schema.get( 93 ("properties", "metrics", "properties", metric_name) 94 ).copy() 95 metric1 = schema.set_schema_elem( 96 ("properties", "metrics", "properties", metric_name + "2"), 97 metric1, 98 ) 99 100 return schema 101 102 @cache 103 def get_dependencies(self): 104 # Get all of the library dependencies for the application that 105 # are also known about in the repositories file. 106 107 # The dependencies are specified using library names, but we need to 108 # map those back to the name of the repository in the repository file. 109 try: 110 dependencies = self._get_json( 111 self.dependencies_url_template.format(self.repo_name) 112 ) 113 except HTTPError: 114 logging.info(f"For {self.repo_name}, using default Glean dependencies") 115 return self.default_dependencies 116 117 dependency_library_names = list(dependencies.keys()) 118 119 repos = GleanPing._get_json(GleanPing.repos_url) 120 repos_by_dependency_name = {} 121 for repo in repos: 122 for library_name in repo.get("library_names", []): 123 repos_by_dependency_name[library_name] = repo["name"] 124 125 dependencies = [] 126 for name in dependency_library_names: 127 if name in repos_by_dependency_name: 128 dependencies.append(repos_by_dependency_name[name]) 129 130 if len(dependencies) == 0: 131 logging.info(f"For {self.repo_name}, using default Glean dependencies") 132 return self.default_dependencies 133 134 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 135 return dependencies 136 137 @staticmethod 138 def remove_pings_from_metric( 139 metric: Dict[str, Any], blocked_pings: List[str] 140 ) -> Dict[str, Any]: 141 """Remove the given pings from the metric's `send_in_pings` history. 142 143 Only removes if the given metric has been removed from the source since a fixed date 144 (2025-01-01). This allows metrics to be added back to the schema. 145 """ 146 if ( 147 metric["in-source"] 148 or len(blocked_pings) == 0 149 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 150 >= datetime(year=2025, month=1, day=1) 151 ): 152 return metric 153 154 for history_entry in metric["history"]: 155 history_entry["send_in_pings"] = [ 156 p for p in history_entry["send_in_pings"] if p not in blocked_pings 157 ] 158 159 return metric 160 161 def get_probes(self) -> List[GleanProbe]: 162 data = self._get_json(self.probes_url) 163 164 # blocklist needs to be applied here instead of generate_schema because it needs to be 165 # dependency-aware; metrics can move between app and library and still be in the schema 166 # turn blocklist into metric_name -> ping_types map 167 blocklist = defaultdict(list) 168 for ping_type, metric_names in self.metric_blocklist.get( 169 self.get_app_name(), {} 170 ).items(): 171 for metric_name in metric_names: 172 blocklist[metric_name].append(ping_type) 173 174 probes = [ 175 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 176 for name, defn in data.items() 177 ] 178 179 for dependency in self.get_dependencies(): 180 dependency_probes = self._get_json( 181 self.probes_url_template.format(dependency) 182 ) 183 184 dependency_blocklist = defaultdict(list) 185 for ping_type, metric_names in self.metric_blocklist.get( 186 dependency, {} 187 ).items(): 188 for metric_name in metric_names: 189 dependency_blocklist[metric_name].append(ping_type) 190 191 probes += [ 192 ( 193 name, 194 self.remove_pings_from_metric( 195 defn, dependency_blocklist.get(name, []) 196 ), 197 ) 198 for name, defn in dependency_probes.items() 199 ] 200 201 # A metric can be moved between an app and its dependencies or between dependencies while 202 # probe scraper keeps the history in each location, so both definitions are returned 203 # Merge the history per probe to take the latest definition while still being able to 204 # find metric type changes below 205 # Metrics are not merged if they are not sent in the same pings as they are disjoint 206 def _pings_in_history(defn): 207 return { 208 p 209 for h in defn[GleanProbe.history_key] 210 for p in h.get("send_in_pings", ["metrics"]) 211 } 212 213 def _latest_history_date(defn): 214 return max( 215 datetime.fromisoformat(h["dates"]["last"]) 216 for h in defn[GleanProbe.history_key] 217 ) 218 219 # Group same name probes whose pings intersect to combine moved metrics 220 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 221 for name, defn in probes: 222 defn_pings = _pings_in_history(defn) 223 existing_groups = grouped_by_name[name] 224 matches = [ 225 group 226 for group in existing_groups 227 if any(_pings_in_history(defn) & defn_pings for defn in group) 228 ] 229 if not matches: 230 existing_groups.append([defn]) 231 else: 232 merged_group = [defn] 233 for g in matches: 234 merged_group.extend(g) 235 existing_groups.remove(g) 236 existing_groups.append(merged_group) 237 238 # Take latest definition per group 239 deduped_probes: List[Any] = [] 240 for name, groups in grouped_by_name.items(): 241 for group in groups: 242 latest_defn = max(group, key=_latest_history_date) 243 if len(group) > 1: 244 latest_defn = latest_defn.copy() 245 latest_defn[GleanProbe.history_key] = sorted( 246 (h for d in group for h in d[GleanProbe.history_key]), 247 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 248 ) 249 deduped_probes.append((name, latest_defn)) 250 probes = deduped_probes 251 252 pings = self.get_pings() 253 254 processed = [] 255 for _id, defn in probes: 256 probe = GleanProbe(_id, defn, pings=pings) 257 processed.append(probe) 258 259 # Handling probe type changes (Bug 1870317) 260 probe_types = {hist["type"] for hist in defn[probe.history_key]} 261 if len(probe_types) > 1: 262 # The probe type changed at some point in history. 263 # Create schema entry for each type. 264 hist_defn = defn.copy() 265 266 # No new entry needs to be created for the current probe type 267 probe_types.remove(defn["type"]) 268 269 for hist in hist_defn[probe.history_key]: 270 # Create a new entry for a historic type 271 if hist["type"] in probe_types: 272 hist_defn["type"] = hist["type"] 273 probe = GleanProbe(_id, hist_defn, pings=pings) 274 processed.append(probe) 275 276 # Keep track of the types entries were already created for 277 probe_types.remove(hist["type"]) 278 279 return processed 280 281 def _get_ping_data(self) -> Dict[str, Dict]: 282 url = self.ping_url_template.format(self.repo_name) 283 ping_data = GleanPing._get_json(url) 284 for dependency in self.get_dependencies(): 285 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 286 ping_data.update(dependency_pings) 287 return ping_data 288 289 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 290 url = self.ping_url_template.format(self.repo_name) 291 ping_data = GleanPing._get_json(url) 292 return ping_data 293 294 def _get_dependency_pings(self, dependency): 295 return self._get_json(self.ping_url_template.format(dependency)) 296 297 def get_pings(self) -> Set[str]: 298 return self._get_ping_data().keys() 299 300 @staticmethod 301 def apply_default_metadata(ping_metadata, default_metadata): 302 """apply_default_metadata recurses down into dicts nested 303 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 304 ``ping_metadata``. 305 :param ping_metadata: dict onto which the merge is executed 306 :param default_metadata: dct merged into ping_metadata 307 :return: None 308 """ 309 for k, v in default_metadata.items(): 310 if ( 311 k in ping_metadata 312 and isinstance(ping_metadata[k], dict) 313 and isinstance(default_metadata[k], dict) 314 ): 315 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 316 else: 317 ping_metadata[k] = default_metadata[k] 318 319 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 320 # Get the ping data with the pipeline metadata 321 ping_data = self._get_ping_data_without_dependencies() 322 323 # The ping endpoint for the dependency pings does not include any repo defined 324 # moz_pipeline_metadata_defaults so they need to be applied here. 325 326 # 1. Get repo and pipeline default metadata. 327 repos = self.get_repos() 328 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 329 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 330 331 # 2. Apply the default metadata to each dependency defined ping. 332 333 # Apply app-level metadata to pings defined in dependencies 334 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 335 336 for dependency in self.get_dependencies(): 337 dependency_pings = self._get_dependency_pings(dependency) 338 for dependency_ping in dependency_pings.values(): 339 # Although it is counter intuitive to apply the default metadata on top of the 340 # existing dependency ping metadata it does set the repo specific value for 341 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 342 # value. 343 GleanPing.apply_default_metadata( 344 dependency_ping.get("moz_pipeline_metadata"), 345 copy.deepcopy(default_metadata), 346 ) 347 # app-level ping properties take priority over the app defaults 348 metadata_override = app_metadata.get(dependency_ping["name"]) 349 if metadata_override is not None: 350 GleanPing.apply_default_metadata( 351 dependency_ping.get("moz_pipeline_metadata"), metadata_override 352 ) 353 ping_data.update(dependency_pings) 354 355 return ping_data 356 357 @staticmethod 358 def reorder_metadata(metadata): 359 desired_order_list = [ 360 "bq_dataset_family", 361 "bq_table", 362 "bq_metadata_format", 363 "include_info_sections", 364 "submission_timestamp_granularity", 365 "expiration_policy", 366 "override_attributes", 367 "jwe_mappings", 368 ] 369 reordered_metadata = { 370 k: metadata[k] for k in desired_order_list if k in metadata 371 } 372 373 # re-order jwe-mappings 374 desired_order_list = ["source_field_path", "decrypted_field_path"] 375 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 376 if jwe_mapping_metadata: 377 reordered_jwe_mapping_metadata = [] 378 for mapping in jwe_mapping_metadata: 379 reordered_jwe_mapping_metadata.append( 380 {k: mapping[k] for k in desired_order_list if k in mapping} 381 ) 382 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 383 384 # future proofing, in case there are other fields added at the ping top level 385 # add them to the end. 386 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 387 reordered_metadata = {**reordered_metadata, **leftovers} 388 return reordered_metadata 389 390 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 391 pings = self._get_ping_data_and_dependencies_with_default_metadata() 392 for ping_name, ping_data in pings.items(): 393 metadata = ping_data.get("moz_pipeline_metadata") 394 if not metadata: 395 continue 396 metadata["include_info_sections"] = self._is_field_included( 397 ping_data, "include_info_sections", consider_all_history=False 398 ) 399 metadata["include_client_id"] = self._is_field_included( 400 ping_data, "include_client_id" 401 ) 402 403 # While technically unnecessary, the dictionary elements are re-ordered to match the 404 # currently deployed order and used to verify no difference in output. 405 pings[ping_name] = GleanPing.reorder_metadata(metadata) 406 return pings 407 408 def get_ping_descriptions(self) -> Dict[str, str]: 409 return { 410 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 411 } 412 413 @staticmethod 414 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 415 """Return false if the field exists and is false. 416 417 If `consider_all_history` is False, then only check the latest value in the ping history. 418 419 Otherwise, if the field is not found or true in one or more history entries, 420 true is returned. 421 """ 422 423 # Default to true if not specified. 424 if "history" not in ping_data or len(ping_data["history"]) == 0: 425 return True 426 427 # Check if at some point in the past the field has already been deployed. 428 # And if the caller of this method wants to consider this history of the field. 429 # Keep them in the schema, even if the field has changed as 430 # removing fields is currently not supported. 431 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 432 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 433 ping_history: list 434 if consider_all_history: 435 ping_history = ping_data["history"] 436 else: 437 ping_history = [ping_data["history"][-1]] 438 for history in ping_history: 439 if field_name not in history or history[field_name]: 440 return True 441 442 # The ping was created with include_info_sections = False. The fields can be excluded. 443 return False 444 445 def set_schema_url(self, metadata): 446 """ 447 Switch between the glean-min and glean schemas if the ping does not require 448 info sections as specified in the parsed ping info in probe scraper. 449 """ 450 if not metadata["include_info_sections"]: 451 self.schema_url = SCHEMA_URL_TEMPLATE.format( 452 branch=self.branch_name 453 ) + SCHEMA_VERSION_TEMPLATE.format( 454 schema_type="glean-min", version=self.version 455 ) 456 else: 457 self.schema_url = SCHEMA_URL_TEMPLATE.format( 458 branch=self.branch_name 459 ) + SCHEMA_VERSION_TEMPLATE.format( 460 schema_type="glean", version=self.version 461 ) 462 463 def generate_schema( 464 self, 465 config, 466 generic_schema=False, 467 blocked_distribution_pings=("events", "baseline"), 468 ) -> Dict[str, Schema]: 469 pings = self.get_pings_and_pipeline_metadata() 470 schemas = {} 471 472 for ping, pipeline_meta in pings.items(): 473 matchers = { 474 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 475 } 476 477 # Four newly introduced metric types were incorrectly deployed 478 # as repeated key/value structs in all Glean ping tables existing prior 479 # to November 2021. We maintain the incorrect fields for existing tables 480 # by disabling the associated matchers. 481 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 482 # defined that will allow metrics of these types to be injected into proper 483 # structs. The gcp-ingestion repository includes logic to rewrite these 484 # metrics under the "2" names. 485 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 486 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 487 if bq_identifier in self.bug_1737656_affected_tables: 488 matchers = { 489 loc: m 490 for loc, m in matchers.items() 491 if not m.matcher.get("bug_1737656_affected") 492 } 493 494 for matcher in matchers.values(): 495 matcher.matcher["send_in_pings"]["contains"] = ping 496 497 # temporarily block distributions from being added to events and baseline pings 498 # https://mozilla-hub.atlassian.net/browse/DENG-10606 499 if ( 500 blocked_distribution_pings 501 and ping in blocked_distribution_pings 502 and matcher.type.endswith("_distribution") 503 ): 504 matcher.matcher["send_in_pings"]["not_contains"] = ping 505 506 new_config = Config(ping, matchers=matchers) 507 508 defaults = {"mozPipelineMetadata": pipeline_meta} 509 510 # Adjust the schema path if the ping does not require info sections 511 self.set_schema_url(pipeline_meta) 512 if generic_schema: # Use the generic glean ping schema 513 schema = self.get_schema(generic_schema=True) 514 schema.schema.update(defaults) 515 schemas[new_config.name] = schema 516 else: 517 generated = super().generate_schema(new_config) 518 for schema in generated.values(): 519 # We want to override each individual key with assembled defaults, 520 # but keep values _inside_ them if they have been set in the schemas. 521 for key, value in defaults.items(): 522 if key not in schema.schema: 523 schema.schema[key] = {} 524 schema.schema[key].update(value) 525 schemas.update(generated) 526 527 return schemas 528 529 @staticmethod 530 def get_repos(): 531 """ 532 Retrieve metadata for all non-library Glean repositories 533 """ 534 repos = GleanPing._get_json(GleanPing.repos_url) 535 return [repo for repo in repos if "library_names" not in repo] 536 537 def get_app_name(self) -> str: 538 """Get app name associated with the app id. 539 540 e.g. org-mozilla-firefox -> fenix 541 """ 542 apps = GleanPing._get_json(GleanPing.app_listings_url) 543 # app id in app-listings has "." instead of "-" so using document_namespace 544 app_name = [ 545 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 546 ] 547 return app_name[0] if len(app_name) > 0 else self.app_id 548 549 @staticmethod 550 def get_metric_blocklist(): 551 with open(METRIC_BLOCKLIST, "r") as f: 552 return yaml.safe_load(f)
43class GleanPing(GenericPing): 44 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 45 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 46 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 47 dependencies_url_template = ( 48 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 49 ) 50 app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings" 51 52 default_dependencies = ["glean-core"] 53 54 with open(BUG_1737656_TXT, "r") as f: 55 bug_1737656_affected_tables = [ 56 line.strip() for line in f.readlines() if line.strip() 57 ] 58 59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 ) 78 79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema 102 103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies 137 138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric 161 162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 # A metric can be moved between an app and its dependencies or between dependencies while 203 # probe scraper keeps the history in each location, so both definitions are returned 204 # Merge the history per probe to take the latest definition while still being able to 205 # find metric type changes below 206 # Metrics are not merged if they are not sent in the same pings as they are disjoint 207 def _pings_in_history(defn): 208 return { 209 p 210 for h in defn[GleanProbe.history_key] 211 for p in h.get("send_in_pings", ["metrics"]) 212 } 213 214 def _latest_history_date(defn): 215 return max( 216 datetime.fromisoformat(h["dates"]["last"]) 217 for h in defn[GleanProbe.history_key] 218 ) 219 220 # Group same name probes whose pings intersect to combine moved metrics 221 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 222 for name, defn in probes: 223 defn_pings = _pings_in_history(defn) 224 existing_groups = grouped_by_name[name] 225 matches = [ 226 group 227 for group in existing_groups 228 if any(_pings_in_history(defn) & defn_pings for defn in group) 229 ] 230 if not matches: 231 existing_groups.append([defn]) 232 else: 233 merged_group = [defn] 234 for g in matches: 235 merged_group.extend(g) 236 existing_groups.remove(g) 237 existing_groups.append(merged_group) 238 239 # Take latest definition per group 240 deduped_probes: List[Any] = [] 241 for name, groups in grouped_by_name.items(): 242 for group in groups: 243 latest_defn = max(group, key=_latest_history_date) 244 if len(group) > 1: 245 latest_defn = latest_defn.copy() 246 latest_defn[GleanProbe.history_key] = sorted( 247 (h for d in group for h in d[GleanProbe.history_key]), 248 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 249 ) 250 deduped_probes.append((name, latest_defn)) 251 probes = deduped_probes 252 253 pings = self.get_pings() 254 255 processed = [] 256 for _id, defn in probes: 257 probe = GleanProbe(_id, defn, pings=pings) 258 processed.append(probe) 259 260 # Handling probe type changes (Bug 1870317) 261 probe_types = {hist["type"] for hist in defn[probe.history_key]} 262 if len(probe_types) > 1: 263 # The probe type changed at some point in history. 264 # Create schema entry for each type. 265 hist_defn = defn.copy() 266 267 # No new entry needs to be created for the current probe type 268 probe_types.remove(defn["type"]) 269 270 for hist in hist_defn[probe.history_key]: 271 # Create a new entry for a historic type 272 if hist["type"] in probe_types: 273 hist_defn["type"] = hist["type"] 274 probe = GleanProbe(_id, hist_defn, pings=pings) 275 processed.append(probe) 276 277 # Keep track of the types entries were already created for 278 probe_types.remove(hist["type"]) 279 280 return processed 281 282 def _get_ping_data(self) -> Dict[str, Dict]: 283 url = self.ping_url_template.format(self.repo_name) 284 ping_data = GleanPing._get_json(url) 285 for dependency in self.get_dependencies(): 286 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 287 ping_data.update(dependency_pings) 288 return ping_data 289 290 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 291 url = self.ping_url_template.format(self.repo_name) 292 ping_data = GleanPing._get_json(url) 293 return ping_data 294 295 def _get_dependency_pings(self, dependency): 296 return self._get_json(self.ping_url_template.format(dependency)) 297 298 def get_pings(self) -> Set[str]: 299 return self._get_ping_data().keys() 300 301 @staticmethod 302 def apply_default_metadata(ping_metadata, default_metadata): 303 """apply_default_metadata recurses down into dicts nested 304 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 305 ``ping_metadata``. 306 :param ping_metadata: dict onto which the merge is executed 307 :param default_metadata: dct merged into ping_metadata 308 :return: None 309 """ 310 for k, v in default_metadata.items(): 311 if ( 312 k in ping_metadata 313 and isinstance(ping_metadata[k], dict) 314 and isinstance(default_metadata[k], dict) 315 ): 316 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 317 else: 318 ping_metadata[k] = default_metadata[k] 319 320 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 321 # Get the ping data with the pipeline metadata 322 ping_data = self._get_ping_data_without_dependencies() 323 324 # The ping endpoint for the dependency pings does not include any repo defined 325 # moz_pipeline_metadata_defaults so they need to be applied here. 326 327 # 1. Get repo and pipeline default metadata. 328 repos = self.get_repos() 329 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 330 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 331 332 # 2. Apply the default metadata to each dependency defined ping. 333 334 # Apply app-level metadata to pings defined in dependencies 335 app_metadata = current_repo.get("moz_pipeline_metadata", {}) 336 337 for dependency in self.get_dependencies(): 338 dependency_pings = self._get_dependency_pings(dependency) 339 for dependency_ping in dependency_pings.values(): 340 # Although it is counter intuitive to apply the default metadata on top of the 341 # existing dependency ping metadata it does set the repo specific value for 342 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 343 # value. 344 GleanPing.apply_default_metadata( 345 dependency_ping.get("moz_pipeline_metadata"), 346 copy.deepcopy(default_metadata), 347 ) 348 # app-level ping properties take priority over the app defaults 349 metadata_override = app_metadata.get(dependency_ping["name"]) 350 if metadata_override is not None: 351 GleanPing.apply_default_metadata( 352 dependency_ping.get("moz_pipeline_metadata"), metadata_override 353 ) 354 ping_data.update(dependency_pings) 355 356 return ping_data 357 358 @staticmethod 359 def reorder_metadata(metadata): 360 desired_order_list = [ 361 "bq_dataset_family", 362 "bq_table", 363 "bq_metadata_format", 364 "include_info_sections", 365 "submission_timestamp_granularity", 366 "expiration_policy", 367 "override_attributes", 368 "jwe_mappings", 369 ] 370 reordered_metadata = { 371 k: metadata[k] for k in desired_order_list if k in metadata 372 } 373 374 # re-order jwe-mappings 375 desired_order_list = ["source_field_path", "decrypted_field_path"] 376 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 377 if jwe_mapping_metadata: 378 reordered_jwe_mapping_metadata = [] 379 for mapping in jwe_mapping_metadata: 380 reordered_jwe_mapping_metadata.append( 381 {k: mapping[k] for k in desired_order_list if k in mapping} 382 ) 383 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 384 385 # future proofing, in case there are other fields added at the ping top level 386 # add them to the end. 387 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 388 reordered_metadata = {**reordered_metadata, **leftovers} 389 return reordered_metadata 390 391 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 392 pings = self._get_ping_data_and_dependencies_with_default_metadata() 393 for ping_name, ping_data in pings.items(): 394 metadata = ping_data.get("moz_pipeline_metadata") 395 if not metadata: 396 continue 397 metadata["include_info_sections"] = self._is_field_included( 398 ping_data, "include_info_sections", consider_all_history=False 399 ) 400 metadata["include_client_id"] = self._is_field_included( 401 ping_data, "include_client_id" 402 ) 403 404 # While technically unnecessary, the dictionary elements are re-ordered to match the 405 # currently deployed order and used to verify no difference in output. 406 pings[ping_name] = GleanPing.reorder_metadata(metadata) 407 return pings 408 409 def get_ping_descriptions(self) -> Dict[str, str]: 410 return { 411 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 412 } 413 414 @staticmethod 415 def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool: 416 """Return false if the field exists and is false. 417 418 If `consider_all_history` is False, then only check the latest value in the ping history. 419 420 Otherwise, if the field is not found or true in one or more history entries, 421 true is returned. 422 """ 423 424 # Default to true if not specified. 425 if "history" not in ping_data or len(ping_data["history"]) == 0: 426 return True 427 428 # Check if at some point in the past the field has already been deployed. 429 # And if the caller of this method wants to consider this history of the field. 430 # Keep them in the schema, even if the field has changed as 431 # removing fields is currently not supported. 432 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105 433 # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10 434 ping_history: list 435 if consider_all_history: 436 ping_history = ping_data["history"] 437 else: 438 ping_history = [ping_data["history"][-1]] 439 for history in ping_history: 440 if field_name not in history or history[field_name]: 441 return True 442 443 # The ping was created with include_info_sections = False. The fields can be excluded. 444 return False 445 446 def set_schema_url(self, metadata): 447 """ 448 Switch between the glean-min and glean schemas if the ping does not require 449 info sections as specified in the parsed ping info in probe scraper. 450 """ 451 if not metadata["include_info_sections"]: 452 self.schema_url = SCHEMA_URL_TEMPLATE.format( 453 branch=self.branch_name 454 ) + SCHEMA_VERSION_TEMPLATE.format( 455 schema_type="glean-min", version=self.version 456 ) 457 else: 458 self.schema_url = SCHEMA_URL_TEMPLATE.format( 459 branch=self.branch_name 460 ) + SCHEMA_VERSION_TEMPLATE.format( 461 schema_type="glean", version=self.version 462 ) 463 464 def generate_schema( 465 self, 466 config, 467 generic_schema=False, 468 blocked_distribution_pings=("events", "baseline"), 469 ) -> Dict[str, Schema]: 470 pings = self.get_pings_and_pipeline_metadata() 471 schemas = {} 472 473 for ping, pipeline_meta in pings.items(): 474 matchers = { 475 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 476 } 477 478 # Four newly introduced metric types were incorrectly deployed 479 # as repeated key/value structs in all Glean ping tables existing prior 480 # to November 2021. We maintain the incorrect fields for existing tables 481 # by disabling the associated matchers. 482 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 483 # defined that will allow metrics of these types to be injected into proper 484 # structs. The gcp-ingestion repository includes logic to rewrite these 485 # metrics under the "2" names. 486 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 487 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 488 if bq_identifier in self.bug_1737656_affected_tables: 489 matchers = { 490 loc: m 491 for loc, m in matchers.items() 492 if not m.matcher.get("bug_1737656_affected") 493 } 494 495 for matcher in matchers.values(): 496 matcher.matcher["send_in_pings"]["contains"] = ping 497 498 # temporarily block distributions from being added to events and baseline pings 499 # https://mozilla-hub.atlassian.net/browse/DENG-10606 500 if ( 501 blocked_distribution_pings 502 and ping in blocked_distribution_pings 503 and matcher.type.endswith("_distribution") 504 ): 505 matcher.matcher["send_in_pings"]["not_contains"] = ping 506 507 new_config = Config(ping, matchers=matchers) 508 509 defaults = {"mozPipelineMetadata": pipeline_meta} 510 511 # Adjust the schema path if the ping does not require info sections 512 self.set_schema_url(pipeline_meta) 513 if generic_schema: # Use the generic glean ping schema 514 schema = self.get_schema(generic_schema=True) 515 schema.schema.update(defaults) 516 schemas[new_config.name] = schema 517 else: 518 generated = super().generate_schema(new_config) 519 for schema in generated.values(): 520 # We want to override each individual key with assembled defaults, 521 # but keep values _inside_ them if they have been set in the schemas. 522 for key, value in defaults.items(): 523 if key not in schema.schema: 524 schema.schema[key] = {} 525 schema.schema[key].update(value) 526 schemas.update(generated) 527 528 return schemas 529 530 @staticmethod 531 def get_repos(): 532 """ 533 Retrieve metadata for all non-library Glean repositories 534 """ 535 repos = GleanPing._get_json(GleanPing.repos_url) 536 return [repo for repo in repos if "library_names" not in repo] 537 538 def get_app_name(self) -> str: 539 """Get app name associated with the app id. 540 541 e.g. org-mozilla-firefox -> fenix 542 """ 543 apps = GleanPing._get_json(GleanPing.app_listings_url) 544 # app id in app-listings has "." instead of "-" so using document_namespace 545 app_name = [ 546 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 547 ] 548 return app_name[0] if len(app_name) > 0 else self.app_id 549 550 @staticmethod 551 def get_metric_blocklist(): 552 with open(METRIC_BLOCKLIST, "r") as f: 553 return yaml.safe_load(f)
59 def __init__( 60 self, repo, version=1, use_metrics_blocklist=False, **kwargs 61 ): # TODO: Make env-url optional 62 self.repo = repo 63 self.repo_name = repo["name"] 64 self.app_id = repo["app_id"] 65 self.version = version 66 67 if use_metrics_blocklist: 68 self.metric_blocklist = self.get_metric_blocklist() 69 else: 70 self.metric_blocklist = {} 71 72 super().__init__( 73 DEFAULT_SCHEMA_URL, 74 DEFAULT_SCHEMA_URL, 75 self.probes_url_template.format(self.repo_name), 76 **kwargs, 77 )
79 def get_schema(self, generic_schema=False) -> Schema: 80 """ 81 Fetch schema via URL. 82 83 Unless *generic_schema* is set to true, this function makes some modifications 84 to allow some workarounds for proper injection of metrics. 85 """ 86 schema = super().get_schema() 87 if generic_schema: 88 return schema 89 90 # We need to inject placeholders for the url2, text2, etc. types as part 91 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 92 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 93 metric1 = schema.get( 94 ("properties", "metrics", "properties", metric_name) 95 ).copy() 96 metric1 = schema.set_schema_elem( 97 ("properties", "metrics", "properties", metric_name + "2"), 98 metric1, 99 ) 100 101 return schema
Fetch schema via URL.
Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.
103 @cache 104 def get_dependencies(self): 105 # Get all of the library dependencies for the application that 106 # are also known about in the repositories file. 107 108 # The dependencies are specified using library names, but we need to 109 # map those back to the name of the repository in the repository file. 110 try: 111 dependencies = self._get_json( 112 self.dependencies_url_template.format(self.repo_name) 113 ) 114 except HTTPError: 115 logging.info(f"For {self.repo_name}, using default Glean dependencies") 116 return self.default_dependencies 117 118 dependency_library_names = list(dependencies.keys()) 119 120 repos = GleanPing._get_json(GleanPing.repos_url) 121 repos_by_dependency_name = {} 122 for repo in repos: 123 for library_name in repo.get("library_names", []): 124 repos_by_dependency_name[library_name] = repo["name"] 125 126 dependencies = [] 127 for name in dependency_library_names: 128 if name in repos_by_dependency_name: 129 dependencies.append(repos_by_dependency_name[name]) 130 131 if len(dependencies) == 0: 132 logging.info(f"For {self.repo_name}, using default Glean dependencies") 133 return self.default_dependencies 134 135 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 136 return dependencies
138 @staticmethod 139 def remove_pings_from_metric( 140 metric: Dict[str, Any], blocked_pings: List[str] 141 ) -> Dict[str, Any]: 142 """Remove the given pings from the metric's `send_in_pings` history. 143 144 Only removes if the given metric has been removed from the source since a fixed date 145 (2025-01-01). This allows metrics to be added back to the schema. 146 """ 147 if ( 148 metric["in-source"] 149 or len(blocked_pings) == 0 150 or datetime.fromisoformat(metric["history"][-1]["dates"]["last"]) 151 >= datetime(year=2025, month=1, day=1) 152 ): 153 return metric 154 155 for history_entry in metric["history"]: 156 history_entry["send_in_pings"] = [ 157 p for p in history_entry["send_in_pings"] if p not in blocked_pings 158 ] 159 160 return metric
Remove the given pings from the metric's send_in_pings history.
Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.
162 def get_probes(self) -> List[GleanProbe]: 163 data = self._get_json(self.probes_url) 164 165 # blocklist needs to be applied here instead of generate_schema because it needs to be 166 # dependency-aware; metrics can move between app and library and still be in the schema 167 # turn blocklist into metric_name -> ping_types map 168 blocklist = defaultdict(list) 169 for ping_type, metric_names in self.metric_blocklist.get( 170 self.get_app_name(), {} 171 ).items(): 172 for metric_name in metric_names: 173 blocklist[metric_name].append(ping_type) 174 175 probes = [ 176 (name, self.remove_pings_from_metric(defn, blocklist.get(name, []))) 177 for name, defn in data.items() 178 ] 179 180 for dependency in self.get_dependencies(): 181 dependency_probes = self._get_json( 182 self.probes_url_template.format(dependency) 183 ) 184 185 dependency_blocklist = defaultdict(list) 186 for ping_type, metric_names in self.metric_blocklist.get( 187 dependency, {} 188 ).items(): 189 for metric_name in metric_names: 190 dependency_blocklist[metric_name].append(ping_type) 191 192 probes += [ 193 ( 194 name, 195 self.remove_pings_from_metric( 196 defn, dependency_blocklist.get(name, []) 197 ), 198 ) 199 for name, defn in dependency_probes.items() 200 ] 201 202 # A metric can be moved between an app and its dependencies or between dependencies while 203 # probe scraper keeps the history in each location, so both definitions are returned 204 # Merge the history per probe to take the latest definition while still being able to 205 # find metric type changes below 206 # Metrics are not merged if they are not sent in the same pings as they are disjoint 207 def _pings_in_history(defn): 208 return { 209 p 210 for h in defn[GleanProbe.history_key] 211 for p in h.get("send_in_pings", ["metrics"]) 212 } 213 214 def _latest_history_date(defn): 215 return max( 216 datetime.fromisoformat(h["dates"]["last"]) 217 for h in defn[GleanProbe.history_key] 218 ) 219 220 # Group same name probes whose pings intersect to combine moved metrics 221 grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list) 222 for name, defn in probes: 223 defn_pings = _pings_in_history(defn) 224 existing_groups = grouped_by_name[name] 225 matches = [ 226 group 227 for group in existing_groups 228 if any(_pings_in_history(defn) & defn_pings for defn in group) 229 ] 230 if not matches: 231 existing_groups.append([defn]) 232 else: 233 merged_group = [defn] 234 for g in matches: 235 merged_group.extend(g) 236 existing_groups.remove(g) 237 existing_groups.append(merged_group) 238 239 # Take latest definition per group 240 deduped_probes: List[Any] = [] 241 for name, groups in grouped_by_name.items(): 242 for group in groups: 243 latest_defn = max(group, key=_latest_history_date) 244 if len(group) > 1: 245 latest_defn = latest_defn.copy() 246 latest_defn[GleanProbe.history_key] = sorted( 247 (h for d in group for h in d[GleanProbe.history_key]), 248 key=lambda h: datetime.fromisoformat(h["dates"]["first"]), 249 ) 250 deduped_probes.append((name, latest_defn)) 251 probes = deduped_probes 252 253 pings = self.get_pings() 254 255 processed = [] 256 for _id, defn in probes: 257 probe = GleanProbe(_id, defn, pings=pings) 258 processed.append(probe) 259 260 # Handling probe type changes (Bug 1870317) 261 probe_types = {hist["type"] for hist in defn[probe.history_key]} 262 if len(probe_types) > 1: 263 # The probe type changed at some point in history. 264 # Create schema entry for each type. 265 hist_defn = defn.copy() 266 267 # No new entry needs to be created for the current probe type 268 probe_types.remove(defn["type"]) 269 270 for hist in hist_defn[probe.history_key]: 271 # Create a new entry for a historic type 272 if hist["type"] in probe_types: 273 hist_defn["type"] = hist["type"] 274 probe = GleanProbe(_id, hist_defn, pings=pings) 275 processed.append(probe) 276 277 # Keep track of the types entries were already created for 278 probe_types.remove(hist["type"]) 279 280 return processed
301 @staticmethod 302 def apply_default_metadata(ping_metadata, default_metadata): 303 """apply_default_metadata recurses down into dicts nested 304 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 305 ``ping_metadata``. 306 :param ping_metadata: dict onto which the merge is executed 307 :param default_metadata: dct merged into ping_metadata 308 :return: None 309 """ 310 for k, v in default_metadata.items(): 311 if ( 312 k in ping_metadata 313 and isinstance(ping_metadata[k], dict) 314 and isinstance(default_metadata[k], dict) 315 ): 316 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 317 else: 318 ping_metadata[k] = default_metadata[k]
apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The default_metadata is merged into
ping_metadata.
Parameters
- ping_metadata: dict onto which the merge is executed
- default_metadata: dct merged into ping_metadata
Returns
None
358 @staticmethod 359 def reorder_metadata(metadata): 360 desired_order_list = [ 361 "bq_dataset_family", 362 "bq_table", 363 "bq_metadata_format", 364 "include_info_sections", 365 "submission_timestamp_granularity", 366 "expiration_policy", 367 "override_attributes", 368 "jwe_mappings", 369 ] 370 reordered_metadata = { 371 k: metadata[k] for k in desired_order_list if k in metadata 372 } 373 374 # re-order jwe-mappings 375 desired_order_list = ["source_field_path", "decrypted_field_path"] 376 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 377 if jwe_mapping_metadata: 378 reordered_jwe_mapping_metadata = [] 379 for mapping in jwe_mapping_metadata: 380 reordered_jwe_mapping_metadata.append( 381 {k: mapping[k] for k in desired_order_list if k in mapping} 382 ) 383 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 384 385 # future proofing, in case there are other fields added at the ping top level 386 # add them to the end. 387 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 388 reordered_metadata = {**reordered_metadata, **leftovers} 389 return reordered_metadata
391 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 392 pings = self._get_ping_data_and_dependencies_with_default_metadata() 393 for ping_name, ping_data in pings.items(): 394 metadata = ping_data.get("moz_pipeline_metadata") 395 if not metadata: 396 continue 397 metadata["include_info_sections"] = self._is_field_included( 398 ping_data, "include_info_sections", consider_all_history=False 399 ) 400 metadata["include_client_id"] = self._is_field_included( 401 ping_data, "include_client_id" 402 ) 403 404 # While technically unnecessary, the dictionary elements are re-ordered to match the 405 # currently deployed order and used to verify no difference in output. 406 pings[ping_name] = GleanPing.reorder_metadata(metadata) 407 return pings
446 def set_schema_url(self, metadata): 447 """ 448 Switch between the glean-min and glean schemas if the ping does not require 449 info sections as specified in the parsed ping info in probe scraper. 450 """ 451 if not metadata["include_info_sections"]: 452 self.schema_url = SCHEMA_URL_TEMPLATE.format( 453 branch=self.branch_name 454 ) + SCHEMA_VERSION_TEMPLATE.format( 455 schema_type="glean-min", version=self.version 456 ) 457 else: 458 self.schema_url = SCHEMA_URL_TEMPLATE.format( 459 branch=self.branch_name 460 ) + SCHEMA_VERSION_TEMPLATE.format( 461 schema_type="glean", version=self.version 462 )
Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.
464 def generate_schema( 465 self, 466 config, 467 generic_schema=False, 468 blocked_distribution_pings=("events", "baseline"), 469 ) -> Dict[str, Schema]: 470 pings = self.get_pings_and_pipeline_metadata() 471 schemas = {} 472 473 for ping, pipeline_meta in pings.items(): 474 matchers = { 475 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 476 } 477 478 # Four newly introduced metric types were incorrectly deployed 479 # as repeated key/value structs in all Glean ping tables existing prior 480 # to November 2021. We maintain the incorrect fields for existing tables 481 # by disabling the associated matchers. 482 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 483 # defined that will allow metrics of these types to be injected into proper 484 # structs. The gcp-ingestion repository includes logic to rewrite these 485 # metrics under the "2" names. 486 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 487 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 488 if bq_identifier in self.bug_1737656_affected_tables: 489 matchers = { 490 loc: m 491 for loc, m in matchers.items() 492 if not m.matcher.get("bug_1737656_affected") 493 } 494 495 for matcher in matchers.values(): 496 matcher.matcher["send_in_pings"]["contains"] = ping 497 498 # temporarily block distributions from being added to events and baseline pings 499 # https://mozilla-hub.atlassian.net/browse/DENG-10606 500 if ( 501 blocked_distribution_pings 502 and ping in blocked_distribution_pings 503 and matcher.type.endswith("_distribution") 504 ): 505 matcher.matcher["send_in_pings"]["not_contains"] = ping 506 507 new_config = Config(ping, matchers=matchers) 508 509 defaults = {"mozPipelineMetadata": pipeline_meta} 510 511 # Adjust the schema path if the ping does not require info sections 512 self.set_schema_url(pipeline_meta) 513 if generic_schema: # Use the generic glean ping schema 514 schema = self.get_schema(generic_schema=True) 515 schema.schema.update(defaults) 516 schemas[new_config.name] = schema 517 else: 518 generated = super().generate_schema(new_config) 519 for schema in generated.values(): 520 # We want to override each individual key with assembled defaults, 521 # but keep values _inside_ them if they have been set in the schemas. 522 for key, value in defaults.items(): 523 if key not in schema.schema: 524 schema.schema[key] = {} 525 schema.schema[key].update(value) 526 schemas.update(generated) 527 528 return schemas
530 @staticmethod 531 def get_repos(): 532 """ 533 Retrieve metadata for all non-library Glean repositories 534 """ 535 repos = GleanPing._get_json(GleanPing.repos_url) 536 return [repo for repo in repos if "library_names" not in repo]
Retrieve metadata for all non-library Glean repositories
538 def get_app_name(self) -> str: 539 """Get app name associated with the app id. 540 541 e.g. org-mozilla-firefox -> fenix 542 """ 543 apps = GleanPing._get_json(GleanPing.app_listings_url) 544 # app id in app-listings has "." instead of "-" so using document_namespace 545 app_name = [ 546 app["app_name"] for app in apps if app["document_namespace"] == self.app_id 547 ] 548 return app_name[0] if len(app_name) > 0 else self.app_id
Get app name associated with the app id.
e.g. org-mozilla-firefox -> fenix