mozilla_schema_generator.glean_ping
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7import logging 8from pathlib import Path 9from typing import Dict, List, Set 10 11from requests import HTTPError 12 13from .config import Config 14from .generic_ping import GenericPing 15from .probes import GleanProbe 16from .schema import Schema 17 18ROOT_DIR = Path(__file__).parent 19BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt" 20 21logger = logging.getLogger(__name__) 22 23 24class GleanPing(GenericPing): 25 26 schema_url = ( 27 "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas" 28 "/{branch}/schemas/glean/glean/glean.1.schema.json" 29 ) 30 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 31 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 32 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 33 dependencies_url_template = ( 34 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 35 ) 36 37 default_dependencies = ["glean-core"] 38 ignore_pings = { 39 "all-pings", 40 "all_pings", 41 "default", 42 "glean_ping_info", 43 "glean_client_info", 44 } 45 46 with open(BUG_1737656_TXT, "r") as f: 47 bug_1737656_affected_tables = [l.strip() for l in f.readlines() if l.strip()] 48 49 def __init__(self, repo, **kwargs): # TODO: Make env-url optional 50 self.repo = repo 51 self.repo_name = repo["name"] 52 self.app_id = repo["app_id"] 53 super().__init__( 54 self.schema_url, 55 self.schema_url, 56 self.probes_url_template.format(self.repo_name), 57 **kwargs, 58 ) 59 60 def get_schema(self, generic_schema=False) -> Schema: 61 """ 62 Fetch schema via URL. 63 64 Unless *generic_schema* is set to true, this function makes some modifications 65 to allow some workarounds for proper injection of metrics. 66 """ 67 schema = super().get_schema() 68 if generic_schema: 69 return schema 70 71 # We need to inject placeholders for the url2, text2, etc. types as part 72 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 73 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 74 metric1 = schema.get( 75 ("properties", "metrics", "properties", metric_name) 76 ).copy() 77 metric1 = schema.set_schema_elem( 78 ("properties", "metrics", "properties", metric_name + "2"), 79 metric1, 80 ) 81 82 return schema 83 84 def get_dependencies(self): 85 # Get all of the library dependencies for the application that 86 # are also known about in the repositories file. 87 88 # The dependencies are specified using library names, but we need to 89 # map those back to the name of the repository in the repository file. 90 try: 91 dependencies = self._get_json( 92 self.dependencies_url_template.format(self.repo_name) 93 ) 94 except HTTPError: 95 logging.info(f"For {self.repo_name}, using default Glean dependencies") 96 return self.default_dependencies 97 98 dependency_library_names = list(dependencies.keys()) 99 100 repos = GleanPing._get_json(GleanPing.repos_url) 101 repos_by_dependency_name = {} 102 for repo in repos: 103 for library_name in repo.get("library_names", []): 104 repos_by_dependency_name[library_name] = repo["name"] 105 106 dependencies = [] 107 for name in dependency_library_names: 108 if name in repos_by_dependency_name: 109 dependencies.append(repos_by_dependency_name[name]) 110 111 if len(dependencies) == 0: 112 logging.info(f"For {self.repo_name}, using default Glean dependencies") 113 return self.default_dependencies 114 115 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 116 return dependencies 117 118 def get_probes(self) -> List[GleanProbe]: 119 data = self._get_json(self.probes_url) 120 probes = list(data.items()) 121 122 for dependency in self.get_dependencies(): 123 dependency_probes = self._get_json( 124 self.probes_url_template.format(dependency) 125 ) 126 probes += list(dependency_probes.items()) 127 128 pings = self.get_pings() 129 130 processed = [] 131 for _id, defn in probes: 132 probe = GleanProbe(_id, defn, pings=pings) 133 processed.append(probe) 134 135 # Manual handling of incompatible schema changes 136 issue_118_affected = { 137 "fenix", 138 "fenix-nightly", 139 "firefox-android-nightly", 140 "firefox-android-beta", 141 "firefox-android-release", 142 } 143 if ( 144 self.repo_name in issue_118_affected 145 and probe.get_name() == "installation.timestamp" 146 ): 147 logging.info(f"Writing column {probe.get_name()} for compatibility.") 148 # See: https://github.com/mozilla/mozilla-schema-generator/issues/118 149 # Search through history for the "string" type and add a copy of 150 # the probe at that time in history. The changepoint signifies 151 # this event. 152 changepoint_index = 0 153 for definition in probe.definition_history: 154 if definition["type"] != probe.get_type(): 155 break 156 changepoint_index += 1 157 # Modify the definition with the truncated history. 158 hist_defn = defn.copy() 159 hist_defn[probe.history_key] = probe.definition_history[ 160 changepoint_index: 161 ] 162 hist_defn["type"] = hist_defn[probe.history_key][0]["type"] 163 incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings) 164 processed.append(incompatible_probe_type) 165 166 return processed 167 168 def _get_ping_data(self) -> Dict[str, Dict]: 169 url = self.ping_url_template.format(self.repo_name) 170 ping_data = GleanPing._get_json(url) 171 for dependency in self.get_dependencies(): 172 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 173 ping_data.update(dependency_pings) 174 return ping_data 175 176 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 177 url = self.ping_url_template.format(self.repo_name) 178 ping_data = GleanPing._get_json(url) 179 return ping_data 180 181 def _get_dependency_pings(self, dependency): 182 return self._get_json(self.ping_url_template.format(dependency)) 183 184 def get_pings(self) -> Set[str]: 185 return self._get_ping_data().keys() 186 187 @staticmethod 188 def apply_default_metadata(ping_metadata, default_metadata): 189 """apply_default_metadata recurses down into dicts nested 190 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 191 ``ping_metadata``. 192 :param ping_metadata: dict onto which the merge is executed 193 :param default_metadata: dct merged into ping_metadata 194 :return: None 195 """ 196 for k, v in default_metadata.items(): 197 if ( 198 k in ping_metadata 199 and isinstance(ping_metadata[k], dict) 200 and isinstance(default_metadata[k], dict) 201 ): 202 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 203 else: 204 ping_metadata[k] = default_metadata[k] 205 206 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 207 # Get the ping data with the pipeline metadata 208 ping_data = self._get_ping_data_without_dependencies() 209 210 # The ping endpoint for the dependency pings does not include any repo defined 211 # moz_pipeline_metadata_defaults so they need to be applied here. 212 213 # 1. Get repo and pipeline default metadata. 214 repos = GleanPing.get_repos() 215 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 216 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 217 218 # 2. Apply the default metadata to each dependency defined ping. 219 for dependency in self.get_dependencies(): 220 dependency_pings = self._get_dependency_pings(dependency) 221 for dependency_ping in dependency_pings.values(): 222 # Although it is counter intuitive to apply the default metadata on top of the 223 # existing dependency ping metadata it does set the repo specific value for 224 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 225 # value. 226 GleanPing.apply_default_metadata( 227 dependency_ping.get("moz_pipeline_metadata"), default_metadata 228 ) 229 ping_data.update(dependency_pings) 230 return ping_data 231 232 @staticmethod 233 def reorder_metadata(metadata): 234 desired_order_list = [ 235 "bq_dataset_family", 236 "bq_table", 237 "bq_metadata_format", 238 "submission_timestamp_granularity", 239 "expiration_policy", 240 "override_attributes", 241 "jwe_mappings", 242 ] 243 reordered_metadata = { 244 k: metadata[k] for k in desired_order_list if k in metadata 245 } 246 247 # re-order jwe-mappings 248 desired_order_list = ["source_field_path", "decrypted_field_path"] 249 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 250 if jwe_mapping_metadata: 251 reordered_jwe_mapping_metadata = [] 252 for mapping in jwe_mapping_metadata: 253 reordered_jwe_mapping_metadata.append( 254 {k: mapping[k] for k in desired_order_list if k in mapping} 255 ) 256 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 257 258 # future proofing, in case there are other fields added at the ping top level 259 # add them to the end. 260 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 261 reordered_metadata = {**reordered_metadata, **leftovers} 262 return reordered_metadata 263 264 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 265 pings = self._get_ping_data_and_dependencies_with_default_metadata() 266 for ping_name, ping_data in pings.items(): 267 metadata = ping_data.get("moz_pipeline_metadata") 268 269 # While technically unnecessary, the dictionary elements are re-ordered to match the 270 # currently deployed order and used to verify no difference in output. 271 pings[ping_name] = GleanPing.reorder_metadata(metadata) 272 return pings 273 274 def get_ping_descriptions(self) -> Dict[str, str]: 275 return { 276 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 277 } 278 279 def generate_schema( 280 self, config, split, generic_schema=False 281 ) -> Dict[str, List[Schema]]: 282 pings = self.get_pings_and_pipeline_metadata() 283 schemas = {} 284 285 for ping, pipeline_meta in pings.items(): 286 matchers = { 287 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 288 } 289 290 # Four newly introduced metric types were incorrectly deployed 291 # as repeated key/value structs in all Glean ping tables existing prior 292 # to November 2021. We maintain the incorrect fields for existing tables 293 # by disabling the associated matchers. 294 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 295 # defined that will allow metrics of these types to be injected into proper 296 # structs. The gcp-ingestion repository includes logic to rewrite these 297 # metrics under the "2" names. 298 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 299 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 300 if bq_identifier in self.bug_1737656_affected_tables: 301 matchers = { 302 loc: m 303 for loc, m in matchers.items() 304 if not m.matcher.get("bug_1737656_affected") 305 } 306 307 for matcher in matchers.values(): 308 matcher.matcher["send_in_pings"]["contains"] = ping 309 new_config = Config(ping, matchers=matchers) 310 311 defaults = {"mozPipelineMetadata": pipeline_meta} 312 313 if generic_schema: # Use the generic glean ping schema 314 schema = self.get_schema(generic_schema=True) 315 schema.schema.update(defaults) 316 schemas[new_config.name] = [schema] 317 else: 318 generated = super().generate_schema(new_config) 319 for value in generated.values(): 320 for schema in value: 321 schema.schema.update(defaults) 322 schemas.update(generated) 323 324 return schemas 325 326 @staticmethod 327 def get_repos(): 328 """ 329 Retrieve metadata for all non-library Glean repositories 330 """ 331 repos = GleanPing._get_json(GleanPing.repos_url) 332 return [repo for repo in repos if "library_names" not in repo]
25class GleanPing(GenericPing): 26 27 schema_url = ( 28 "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas" 29 "/{branch}/schemas/glean/glean/glean.1.schema.json" 30 ) 31 probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics" 32 ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings" 33 repos_url = GenericPing.probe_info_base_url + "/glean/repositories" 34 dependencies_url_template = ( 35 GenericPing.probe_info_base_url + "/glean/{}/dependencies" 36 ) 37 38 default_dependencies = ["glean-core"] 39 ignore_pings = { 40 "all-pings", 41 "all_pings", 42 "default", 43 "glean_ping_info", 44 "glean_client_info", 45 } 46 47 with open(BUG_1737656_TXT, "r") as f: 48 bug_1737656_affected_tables = [l.strip() for l in f.readlines() if l.strip()] 49 50 def __init__(self, repo, **kwargs): # TODO: Make env-url optional 51 self.repo = repo 52 self.repo_name = repo["name"] 53 self.app_id = repo["app_id"] 54 super().__init__( 55 self.schema_url, 56 self.schema_url, 57 self.probes_url_template.format(self.repo_name), 58 **kwargs, 59 ) 60 61 def get_schema(self, generic_schema=False) -> Schema: 62 """ 63 Fetch schema via URL. 64 65 Unless *generic_schema* is set to true, this function makes some modifications 66 to allow some workarounds for proper injection of metrics. 67 """ 68 schema = super().get_schema() 69 if generic_schema: 70 return schema 71 72 # We need to inject placeholders for the url2, text2, etc. types as part 73 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 74 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 75 metric1 = schema.get( 76 ("properties", "metrics", "properties", metric_name) 77 ).copy() 78 metric1 = schema.set_schema_elem( 79 ("properties", "metrics", "properties", metric_name + "2"), 80 metric1, 81 ) 82 83 return schema 84 85 def get_dependencies(self): 86 # Get all of the library dependencies for the application that 87 # are also known about in the repositories file. 88 89 # The dependencies are specified using library names, but we need to 90 # map those back to the name of the repository in the repository file. 91 try: 92 dependencies = self._get_json( 93 self.dependencies_url_template.format(self.repo_name) 94 ) 95 except HTTPError: 96 logging.info(f"For {self.repo_name}, using default Glean dependencies") 97 return self.default_dependencies 98 99 dependency_library_names = list(dependencies.keys()) 100 101 repos = GleanPing._get_json(GleanPing.repos_url) 102 repos_by_dependency_name = {} 103 for repo in repos: 104 for library_name in repo.get("library_names", []): 105 repos_by_dependency_name[library_name] = repo["name"] 106 107 dependencies = [] 108 for name in dependency_library_names: 109 if name in repos_by_dependency_name: 110 dependencies.append(repos_by_dependency_name[name]) 111 112 if len(dependencies) == 0: 113 logging.info(f"For {self.repo_name}, using default Glean dependencies") 114 return self.default_dependencies 115 116 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 117 return dependencies 118 119 def get_probes(self) -> List[GleanProbe]: 120 data = self._get_json(self.probes_url) 121 probes = list(data.items()) 122 123 for dependency in self.get_dependencies(): 124 dependency_probes = self._get_json( 125 self.probes_url_template.format(dependency) 126 ) 127 probes += list(dependency_probes.items()) 128 129 pings = self.get_pings() 130 131 processed = [] 132 for _id, defn in probes: 133 probe = GleanProbe(_id, defn, pings=pings) 134 processed.append(probe) 135 136 # Manual handling of incompatible schema changes 137 issue_118_affected = { 138 "fenix", 139 "fenix-nightly", 140 "firefox-android-nightly", 141 "firefox-android-beta", 142 "firefox-android-release", 143 } 144 if ( 145 self.repo_name in issue_118_affected 146 and probe.get_name() == "installation.timestamp" 147 ): 148 logging.info(f"Writing column {probe.get_name()} for compatibility.") 149 # See: https://github.com/mozilla/mozilla-schema-generator/issues/118 150 # Search through history for the "string" type and add a copy of 151 # the probe at that time in history. The changepoint signifies 152 # this event. 153 changepoint_index = 0 154 for definition in probe.definition_history: 155 if definition["type"] != probe.get_type(): 156 break 157 changepoint_index += 1 158 # Modify the definition with the truncated history. 159 hist_defn = defn.copy() 160 hist_defn[probe.history_key] = probe.definition_history[ 161 changepoint_index: 162 ] 163 hist_defn["type"] = hist_defn[probe.history_key][0]["type"] 164 incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings) 165 processed.append(incompatible_probe_type) 166 167 return processed 168 169 def _get_ping_data(self) -> Dict[str, Dict]: 170 url = self.ping_url_template.format(self.repo_name) 171 ping_data = GleanPing._get_json(url) 172 for dependency in self.get_dependencies(): 173 dependency_pings = self._get_json(self.ping_url_template.format(dependency)) 174 ping_data.update(dependency_pings) 175 return ping_data 176 177 def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]: 178 url = self.ping_url_template.format(self.repo_name) 179 ping_data = GleanPing._get_json(url) 180 return ping_data 181 182 def _get_dependency_pings(self, dependency): 183 return self._get_json(self.ping_url_template.format(dependency)) 184 185 def get_pings(self) -> Set[str]: 186 return self._get_ping_data().keys() 187 188 @staticmethod 189 def apply_default_metadata(ping_metadata, default_metadata): 190 """apply_default_metadata recurses down into dicts nested 191 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 192 ``ping_metadata``. 193 :param ping_metadata: dict onto which the merge is executed 194 :param default_metadata: dct merged into ping_metadata 195 :return: None 196 """ 197 for k, v in default_metadata.items(): 198 if ( 199 k in ping_metadata 200 and isinstance(ping_metadata[k], dict) 201 and isinstance(default_metadata[k], dict) 202 ): 203 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 204 else: 205 ping_metadata[k] = default_metadata[k] 206 207 def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]: 208 # Get the ping data with the pipeline metadata 209 ping_data = self._get_ping_data_without_dependencies() 210 211 # The ping endpoint for the dependency pings does not include any repo defined 212 # moz_pipeline_metadata_defaults so they need to be applied here. 213 214 # 1. Get repo and pipeline default metadata. 215 repos = GleanPing.get_repos() 216 current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {}) 217 default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {}) 218 219 # 2. Apply the default metadata to each dependency defined ping. 220 for dependency in self.get_dependencies(): 221 dependency_pings = self._get_dependency_pings(dependency) 222 for dependency_ping in dependency_pings.values(): 223 # Although it is counter intuitive to apply the default metadata on top of the 224 # existing dependency ping metadata it does set the repo specific value for 225 # bq_dataset_family instead of using the dependency id for the bq_dataset_family 226 # value. 227 GleanPing.apply_default_metadata( 228 dependency_ping.get("moz_pipeline_metadata"), default_metadata 229 ) 230 ping_data.update(dependency_pings) 231 return ping_data 232 233 @staticmethod 234 def reorder_metadata(metadata): 235 desired_order_list = [ 236 "bq_dataset_family", 237 "bq_table", 238 "bq_metadata_format", 239 "submission_timestamp_granularity", 240 "expiration_policy", 241 "override_attributes", 242 "jwe_mappings", 243 ] 244 reordered_metadata = { 245 k: metadata[k] for k in desired_order_list if k in metadata 246 } 247 248 # re-order jwe-mappings 249 desired_order_list = ["source_field_path", "decrypted_field_path"] 250 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 251 if jwe_mapping_metadata: 252 reordered_jwe_mapping_metadata = [] 253 for mapping in jwe_mapping_metadata: 254 reordered_jwe_mapping_metadata.append( 255 {k: mapping[k] for k in desired_order_list if k in mapping} 256 ) 257 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 258 259 # future proofing, in case there are other fields added at the ping top level 260 # add them to the end. 261 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 262 reordered_metadata = {**reordered_metadata, **leftovers} 263 return reordered_metadata 264 265 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 266 pings = self._get_ping_data_and_dependencies_with_default_metadata() 267 for ping_name, ping_data in pings.items(): 268 metadata = ping_data.get("moz_pipeline_metadata") 269 270 # While technically unnecessary, the dictionary elements are re-ordered to match the 271 # currently deployed order and used to verify no difference in output. 272 pings[ping_name] = GleanPing.reorder_metadata(metadata) 273 return pings 274 275 def get_ping_descriptions(self) -> Dict[str, str]: 276 return { 277 k: v["history"][-1]["description"] for k, v in self._get_ping_data().items() 278 } 279 280 def generate_schema( 281 self, config, split, generic_schema=False 282 ) -> Dict[str, List[Schema]]: 283 pings = self.get_pings_and_pipeline_metadata() 284 schemas = {} 285 286 for ping, pipeline_meta in pings.items(): 287 matchers = { 288 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 289 } 290 291 # Four newly introduced metric types were incorrectly deployed 292 # as repeated key/value structs in all Glean ping tables existing prior 293 # to November 2021. We maintain the incorrect fields for existing tables 294 # by disabling the associated matchers. 295 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 296 # defined that will allow metrics of these types to be injected into proper 297 # structs. The gcp-ingestion repository includes logic to rewrite these 298 # metrics under the "2" names. 299 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 300 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 301 if bq_identifier in self.bug_1737656_affected_tables: 302 matchers = { 303 loc: m 304 for loc, m in matchers.items() 305 if not m.matcher.get("bug_1737656_affected") 306 } 307 308 for matcher in matchers.values(): 309 matcher.matcher["send_in_pings"]["contains"] = ping 310 new_config = Config(ping, matchers=matchers) 311 312 defaults = {"mozPipelineMetadata": pipeline_meta} 313 314 if generic_schema: # Use the generic glean ping schema 315 schema = self.get_schema(generic_schema=True) 316 schema.schema.update(defaults) 317 schemas[new_config.name] = [schema] 318 else: 319 generated = super().generate_schema(new_config) 320 for value in generated.values(): 321 for schema in value: 322 schema.schema.update(defaults) 323 schemas.update(generated) 324 325 return schemas 326 327 @staticmethod 328 def get_repos(): 329 """ 330 Retrieve metadata for all non-library Glean repositories 331 """ 332 repos = GleanPing._get_json(GleanPing.repos_url) 333 return [repo for repo in repos if "library_names" not in repo]
61 def get_schema(self, generic_schema=False) -> Schema: 62 """ 63 Fetch schema via URL. 64 65 Unless *generic_schema* is set to true, this function makes some modifications 66 to allow some workarounds for proper injection of metrics. 67 """ 68 schema = super().get_schema() 69 if generic_schema: 70 return schema 71 72 # We need to inject placeholders for the url2, text2, etc. types as part 73 # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 74 for metric_name in ["labeled_rate", "jwe", "url", "text"]: 75 metric1 = schema.get( 76 ("properties", "metrics", "properties", metric_name) 77 ).copy() 78 metric1 = schema.set_schema_elem( 79 ("properties", "metrics", "properties", metric_name + "2"), 80 metric1, 81 ) 82 83 return schema
Fetch schema via URL.
Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.
def
get_dependencies(self):
85 def get_dependencies(self): 86 # Get all of the library dependencies for the application that 87 # are also known about in the repositories file. 88 89 # The dependencies are specified using library names, but we need to 90 # map those back to the name of the repository in the repository file. 91 try: 92 dependencies = self._get_json( 93 self.dependencies_url_template.format(self.repo_name) 94 ) 95 except HTTPError: 96 logging.info(f"For {self.repo_name}, using default Glean dependencies") 97 return self.default_dependencies 98 99 dependency_library_names = list(dependencies.keys()) 100 101 repos = GleanPing._get_json(GleanPing.repos_url) 102 repos_by_dependency_name = {} 103 for repo in repos: 104 for library_name in repo.get("library_names", []): 105 repos_by_dependency_name[library_name] = repo["name"] 106 107 dependencies = [] 108 for name in dependency_library_names: 109 if name in repos_by_dependency_name: 110 dependencies.append(repos_by_dependency_name[name]) 111 112 if len(dependencies) == 0: 113 logging.info(f"For {self.repo_name}, using default Glean dependencies") 114 return self.default_dependencies 115 116 logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}") 117 return dependencies
119 def get_probes(self) -> List[GleanProbe]: 120 data = self._get_json(self.probes_url) 121 probes = list(data.items()) 122 123 for dependency in self.get_dependencies(): 124 dependency_probes = self._get_json( 125 self.probes_url_template.format(dependency) 126 ) 127 probes += list(dependency_probes.items()) 128 129 pings = self.get_pings() 130 131 processed = [] 132 for _id, defn in probes: 133 probe = GleanProbe(_id, defn, pings=pings) 134 processed.append(probe) 135 136 # Manual handling of incompatible schema changes 137 issue_118_affected = { 138 "fenix", 139 "fenix-nightly", 140 "firefox-android-nightly", 141 "firefox-android-beta", 142 "firefox-android-release", 143 } 144 if ( 145 self.repo_name in issue_118_affected 146 and probe.get_name() == "installation.timestamp" 147 ): 148 logging.info(f"Writing column {probe.get_name()} for compatibility.") 149 # See: https://github.com/mozilla/mozilla-schema-generator/issues/118 150 # Search through history for the "string" type and add a copy of 151 # the probe at that time in history. The changepoint signifies 152 # this event. 153 changepoint_index = 0 154 for definition in probe.definition_history: 155 if definition["type"] != probe.get_type(): 156 break 157 changepoint_index += 1 158 # Modify the definition with the truncated history. 159 hist_defn = defn.copy() 160 hist_defn[probe.history_key] = probe.definition_history[ 161 changepoint_index: 162 ] 163 hist_defn["type"] = hist_defn[probe.history_key][0]["type"] 164 incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings) 165 processed.append(incompatible_probe_type) 166 167 return processed
@staticmethod
def
apply_default_metadata(ping_metadata, default_metadata):
188 @staticmethod 189 def apply_default_metadata(ping_metadata, default_metadata): 190 """apply_default_metadata recurses down into dicts nested 191 to an arbitrary depth, updating keys. The ``default_metadata`` is merged into 192 ``ping_metadata``. 193 :param ping_metadata: dict onto which the merge is executed 194 :param default_metadata: dct merged into ping_metadata 195 :return: None 196 """ 197 for k, v in default_metadata.items(): 198 if ( 199 k in ping_metadata 200 and isinstance(ping_metadata[k], dict) 201 and isinstance(default_metadata[k], dict) 202 ): 203 GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k]) 204 else: 205 ping_metadata[k] = default_metadata[k]
apply_default_metadata recurses down into dicts nested
to an arbitrary depth, updating keys. The default_metadata
is merged into
ping_metadata
.
Parameters
- ping_metadata: dict onto which the merge is executed
- default_metadata: dct merged into ping_metadata
Returns
None
@staticmethod
def
reorder_metadata(metadata):
233 @staticmethod 234 def reorder_metadata(metadata): 235 desired_order_list = [ 236 "bq_dataset_family", 237 "bq_table", 238 "bq_metadata_format", 239 "submission_timestamp_granularity", 240 "expiration_policy", 241 "override_attributes", 242 "jwe_mappings", 243 ] 244 reordered_metadata = { 245 k: metadata[k] for k in desired_order_list if k in metadata 246 } 247 248 # re-order jwe-mappings 249 desired_order_list = ["source_field_path", "decrypted_field_path"] 250 jwe_mapping_metadata = reordered_metadata.get("jwe_mappings") 251 if jwe_mapping_metadata: 252 reordered_jwe_mapping_metadata = [] 253 for mapping in jwe_mapping_metadata: 254 reordered_jwe_mapping_metadata.append( 255 {k: mapping[k] for k in desired_order_list if k in mapping} 256 ) 257 reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata 258 259 # future proofing, in case there are other fields added at the ping top level 260 # add them to the end. 261 leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)} 262 reordered_metadata = {**reordered_metadata, **leftovers} 263 return reordered_metadata
def
get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
265 def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]: 266 pings = self._get_ping_data_and_dependencies_with_default_metadata() 267 for ping_name, ping_data in pings.items(): 268 metadata = ping_data.get("moz_pipeline_metadata") 269 270 # While technically unnecessary, the dictionary elements are re-ordered to match the 271 # currently deployed order and used to verify no difference in output. 272 pings[ping_name] = GleanPing.reorder_metadata(metadata) 273 return pings
def
generate_schema( self, config, split, generic_schema=False) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
280 def generate_schema( 281 self, config, split, generic_schema=False 282 ) -> Dict[str, List[Schema]]: 283 pings = self.get_pings_and_pipeline_metadata() 284 schemas = {} 285 286 for ping, pipeline_meta in pings.items(): 287 matchers = { 288 loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items() 289 } 290 291 # Four newly introduced metric types were incorrectly deployed 292 # as repeated key/value structs in all Glean ping tables existing prior 293 # to November 2021. We maintain the incorrect fields for existing tables 294 # by disabling the associated matchers. 295 # Note that each of these types now has a "2" matcher ("text2", "url2", etc.) 296 # defined that will allow metrics of these types to be injected into proper 297 # structs. The gcp-ingestion repository includes logic to rewrite these 298 # metrics under the "2" names. 299 # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656 300 bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta) 301 if bq_identifier in self.bug_1737656_affected_tables: 302 matchers = { 303 loc: m 304 for loc, m in matchers.items() 305 if not m.matcher.get("bug_1737656_affected") 306 } 307 308 for matcher in matchers.values(): 309 matcher.matcher["send_in_pings"]["contains"] = ping 310 new_config = Config(ping, matchers=matchers) 311 312 defaults = {"mozPipelineMetadata": pipeline_meta} 313 314 if generic_schema: # Use the generic glean ping schema 315 schema = self.get_schema(generic_schema=True) 316 schema.schema.update(defaults) 317 schemas[new_config.name] = [schema] 318 else: 319 generated = super().generate_schema(new_config) 320 for value in generated.values(): 321 for schema in value: 322 schema.schema.update(defaults) 323 schemas.update(generated) 324 325 return schemas
@staticmethod
def
get_repos():
327 @staticmethod 328 def get_repos(): 329 """ 330 Retrieve metadata for all non-library Glean repositories 331 """ 332 repos = GleanPing._get_json(GleanPing.repos_url) 333 return [repo for repo in repos if "library_names" not in repo]
Retrieve metadata for all non-library Glean repositories