mozilla_schema_generator.glean_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import copy
  8import logging
  9from collections import defaultdict
 10from datetime import datetime
 11from functools import cache
 12from pathlib import Path
 13from typing import Any, Dict, List, Set
 14
 15import yaml
 16from requests import HTTPError
 17
 18from .config import Config
 19from .generic_ping import GenericPing
 20from .probes import GleanProbe
 21from .schema import Schema
 22
 23ROOT_DIR = Path(__file__).parent
 24BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt"
 25METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml"
 26
 27logger = logging.getLogger(__name__)
 28
 29SCHEMA_URL_TEMPLATE = (
 30    "https://raw.githubusercontent.com"
 31    "/mozilla-services/mozilla-pipeline-schemas"
 32    "/{branch}/schemas/glean/glean/"
 33)
 34
 35SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json"
 36
 37DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format(
 38    schema_type="glean", version=1
 39)
 40
 41
 42class GleanPing(GenericPing):
 43    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 44    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 45    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 46    dependencies_url_template = (
 47        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 48    )
 49    app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings"
 50
 51    default_dependencies = ["glean-core"]
 52
 53    with open(BUG_1737656_TXT, "r") as f:
 54        bug_1737656_affected_tables = [
 55            line.strip() for line in f.readlines() if line.strip()
 56        ]
 57
 58    def __init__(
 59        self, repo, version=1, use_metrics_blocklist=False, **kwargs
 60    ):  # TODO: Make env-url optional
 61        self.repo = repo
 62        self.repo_name = repo["name"]
 63        self.app_id = repo["app_id"]
 64        self.version = version
 65
 66        if use_metrics_blocklist:
 67            self.metric_blocklist = self.get_metric_blocklist()
 68        else:
 69            self.metric_blocklist = {}
 70
 71        super().__init__(
 72            DEFAULT_SCHEMA_URL,
 73            DEFAULT_SCHEMA_URL,
 74            self.probes_url_template.format(self.repo_name),
 75            **kwargs,
 76        )
 77
 78    def get_schema(self, generic_schema=False) -> Schema:
 79        """
 80        Fetch schema via URL.
 81
 82        Unless *generic_schema* is set to true, this function makes some modifications
 83        to allow some workarounds for proper injection of metrics.
 84        """
 85        schema = super().get_schema()
 86        if generic_schema:
 87            return schema
 88
 89        # We need to inject placeholders for the url2, text2, etc. types as part
 90        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 91        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 92            metric1 = schema.get(
 93                ("properties", "metrics", "properties", metric_name)
 94            ).copy()
 95            metric1 = schema.set_schema_elem(
 96                ("properties", "metrics", "properties", metric_name + "2"),
 97                metric1,
 98            )
 99
100        return schema
101
102    @cache
103    def get_dependencies(self):
104        # Get all of the library dependencies for the application that
105        # are also known about in the repositories file.
106
107        # The dependencies are specified using library names, but we need to
108        # map those back to the name of the repository in the repository file.
109        try:
110            dependencies = self._get_json(
111                self.dependencies_url_template.format(self.repo_name)
112            )
113        except HTTPError:
114            logging.info(f"For {self.repo_name}, using default Glean dependencies")
115            return self.default_dependencies
116
117        dependency_library_names = list(dependencies.keys())
118
119        repos = GleanPing._get_json(GleanPing.repos_url)
120        repos_by_dependency_name = {}
121        for repo in repos:
122            for library_name in repo.get("library_names", []):
123                repos_by_dependency_name[library_name] = repo["name"]
124
125        dependencies = []
126        for name in dependency_library_names:
127            if name in repos_by_dependency_name:
128                dependencies.append(repos_by_dependency_name[name])
129
130        if len(dependencies) == 0:
131            logging.info(f"For {self.repo_name}, using default Glean dependencies")
132            return self.default_dependencies
133
134        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
135        return dependencies
136
137    @staticmethod
138    def remove_pings_from_metric(
139        metric: Dict[str, Any], blocked_pings: List[str]
140    ) -> Dict[str, Any]:
141        """Remove the given pings from the metric's `send_in_pings` history.
142
143        Only removes if the given metric has been removed from the source since a fixed date
144        (2025-01-01). This allows metrics to be added back to the schema.
145        """
146        if (
147            metric["in-source"]
148            or len(blocked_pings) == 0
149            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
150            >= datetime(year=2025, month=1, day=1)
151        ):
152            return metric
153
154        for history_entry in metric["history"]:
155            history_entry["send_in_pings"] = [
156                p for p in history_entry["send_in_pings"] if p not in blocked_pings
157            ]
158
159        return metric
160
161    def get_probes(self) -> List[GleanProbe]:
162        data = self._get_json(self.probes_url)
163
164        # blocklist needs to be applied here instead of generate_schema because it needs to be
165        # dependency-aware; metrics can move between app and library and still be in the schema
166        # turn blocklist into metric_name -> ping_types map
167        blocklist = defaultdict(list)
168        for ping_type, metric_names in self.metric_blocklist.get(
169            self.get_app_name(), {}
170        ).items():
171            for metric_name in metric_names:
172                blocklist[metric_name].append(ping_type)
173
174        probes = [
175            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
176            for name, defn in data.items()
177        ]
178
179        for dependency in self.get_dependencies():
180            dependency_probes = self._get_json(
181                self.probes_url_template.format(dependency)
182            )
183
184            dependency_blocklist = defaultdict(list)
185            for ping_type, metric_names in self.metric_blocklist.get(
186                dependency, {}
187            ).items():
188                for metric_name in metric_names:
189                    dependency_blocklist[metric_name].append(ping_type)
190
191            probes += [
192                (
193                    name,
194                    self.remove_pings_from_metric(
195                        defn, dependency_blocklist.get(name, [])
196                    ),
197                )
198                for name, defn in dependency_probes.items()
199            ]
200
201        # A metric can be moved between an app and its dependencies or between dependencies while
202        # probe scraper keeps the history in each location, so both definitions are returned
203        # Merge the history per probe to take the latest definition while still being able to
204        # find metric type changes below
205        # Metrics are not merged if they are not sent in the same pings as they are disjoint
206
207        # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler)
208        # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure"
209        # normalize to "media_audio_init_failure". The transpiler picks one of the colliding
210        # descriptions non-deterministically.
211        def _normalize_name(name):
212            return name.replace(".", "_").replace("-", "_")
213
214        def _pings_in_history(defn):
215            return {
216                p
217                for h in defn[GleanProbe.history_key]
218                for p in h.get("send_in_pings", ["metrics"])
219            }
220
221        def _latest_history_date(defn):
222            return max(
223                datetime.fromisoformat(h["dates"]["last"])
224                for h in defn[GleanProbe.history_key]
225            )
226
227        def _dedupe_sort_key(defn):
228            """Prefer the most recent definition, breaking ties by choosing the in-source metric."""
229            return (
230                _latest_history_date(defn),
231                defn.get(GleanProbe.in_source_key, False),
232                defn["name"],
233            )
234
235        # Group probes that share a normalized name and whose pings intersect to combine
236        # moved metrics and metrics that only differ by "." vs "_"
237        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
238        for name, defn in probes:
239            defn_pings = _pings_in_history(defn)
240            existing_groups = grouped_by_name[_normalize_name(name)]
241            matches = [
242                group
243                for group in existing_groups
244                if any(
245                    _pings_in_history(other_defn) & defn_pings for other_defn in group
246                )
247            ]
248            if not matches:
249                existing_groups.append([defn])
250            else:
251                merged_group = [defn]
252                for g in matches:
253                    merged_group.extend(g)
254                    existing_groups.remove(g)
255                existing_groups.append(merged_group)
256
257        # Take latest definition per group
258        deduped_probes: List[Any] = []
259        for groups in grouped_by_name.values():
260            for group in groups:
261                latest_defn = max(group, key=_dedupe_sort_key)
262                if len(group) > 1:
263                    latest_defn = latest_defn.copy()
264                    latest_defn[GleanProbe.history_key] = sorted(
265                        (h for d in group for h in d[GleanProbe.history_key]),
266                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
267                    )
268                deduped_probes.append((latest_defn["name"], latest_defn))
269        probes = deduped_probes
270
271        pings = self.get_pings()
272
273        processed = []
274        for _id, defn in probes:
275            probe = GleanProbe(_id, defn, pings=pings)
276            processed.append(probe)
277
278            # Handling probe type changes (Bug 1870317)
279            probe_types = {hist["type"] for hist in defn[probe.history_key]}
280            if len(probe_types) > 1:
281                # The probe type changed at some point in history.
282                # Create schema entry for each type.
283                hist_defn = defn.copy()
284
285                # No new entry needs to be created for the current probe type
286                probe_types.remove(defn["type"])
287
288                for hist in hist_defn[probe.history_key]:
289                    # Create a new entry for a historic type
290                    if hist["type"] in probe_types:
291                        hist_defn["type"] = hist["type"]
292                        probe = GleanProbe(_id, hist_defn, pings=pings)
293                        processed.append(probe)
294
295                        # Keep track of the types entries were already created for
296                        probe_types.remove(hist["type"])
297
298        return processed
299
300    def _get_ping_data(self) -> Dict[str, Dict]:
301        url = self.ping_url_template.format(self.repo_name)
302        ping_data = GleanPing._get_json(url)
303        for dependency in self.get_dependencies():
304            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
305            ping_data.update(dependency_pings)
306        return ping_data
307
308    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
309        url = self.ping_url_template.format(self.repo_name)
310        ping_data = GleanPing._get_json(url)
311        return ping_data
312
313    def _get_dependency_pings(self, dependency):
314        return self._get_json(self.ping_url_template.format(dependency))
315
316    def get_pings(self) -> Set[str]:
317        return self._get_ping_data().keys()
318
319    @staticmethod
320    def apply_default_metadata(ping_metadata, default_metadata):
321        """apply_default_metadata recurses down into dicts nested
322        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
323        ``ping_metadata``.
324        :param ping_metadata: dict onto which the merge is executed
325        :param default_metadata: dct merged into ping_metadata
326        :return: None
327        """
328        for k, v in default_metadata.items():
329            if (
330                k in ping_metadata
331                and isinstance(ping_metadata[k], dict)
332                and isinstance(default_metadata[k], dict)
333            ):
334                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
335            else:
336                ping_metadata[k] = default_metadata[k]
337
338    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
339        # Get the ping data with the pipeline metadata
340        ping_data = self._get_ping_data_without_dependencies()
341
342        # The ping endpoint for the dependency pings does not include any repo defined
343        # moz_pipeline_metadata_defaults so they need to be applied here.
344
345        # 1.  Get repo and pipeline default metadata.
346        repos = self.get_repos()
347        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
348        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
349
350        # 2.  Apply the default metadata to each dependency defined ping.
351
352        # Apply app-level metadata to pings defined in dependencies
353        app_metadata = current_repo.get("moz_pipeline_metadata", {})
354
355        for dependency in self.get_dependencies():
356            dependency_pings = self._get_dependency_pings(dependency)
357            for dependency_ping in dependency_pings.values():
358                # Although it is counter intuitive to apply the default metadata on top of the
359                # existing dependency ping metadata it does set the repo specific value for
360                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
361                # value.
362                GleanPing.apply_default_metadata(
363                    dependency_ping.get("moz_pipeline_metadata"),
364                    copy.deepcopy(default_metadata),
365                )
366                # app-level ping properties take priority over the app defaults
367                metadata_override = app_metadata.get(dependency_ping["name"])
368                if metadata_override is not None:
369                    GleanPing.apply_default_metadata(
370                        dependency_ping.get("moz_pipeline_metadata"), metadata_override
371                    )
372            ping_data.update(dependency_pings)
373
374        return ping_data
375
376    @staticmethod
377    def reorder_metadata(metadata):
378        desired_order_list = [
379            "bq_dataset_family",
380            "bq_table",
381            "bq_metadata_format",
382            "include_info_sections",
383            "submission_timestamp_granularity",
384            "expiration_policy",
385            "override_attributes",
386            "jwe_mappings",
387        ]
388        reordered_metadata = {
389            k: metadata[k] for k in desired_order_list if k in metadata
390        }
391
392        # re-order jwe-mappings
393        desired_order_list = ["source_field_path", "decrypted_field_path"]
394        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
395        if jwe_mapping_metadata:
396            reordered_jwe_mapping_metadata = []
397            for mapping in jwe_mapping_metadata:
398                reordered_jwe_mapping_metadata.append(
399                    {k: mapping[k] for k in desired_order_list if k in mapping}
400                )
401            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
402
403        # future proofing, in case there are other fields added at the ping top level
404        # add them to the end.
405        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
406        reordered_metadata = {**reordered_metadata, **leftovers}
407        return reordered_metadata
408
409    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
410        pings = self._get_ping_data_and_dependencies_with_default_metadata()
411        for ping_name, ping_data in pings.items():
412            metadata = ping_data.get("moz_pipeline_metadata")
413            if not metadata:
414                continue
415            metadata["include_info_sections"] = self._is_field_included(
416                ping_data, "include_info_sections", consider_all_history=False
417            )
418            metadata["include_client_id"] = self._is_field_included(
419                ping_data, "include_client_id"
420            )
421
422            # While technically unnecessary, the dictionary elements are re-ordered to match the
423            # currently deployed order and used to verify no difference in output.
424            pings[ping_name] = GleanPing.reorder_metadata(metadata)
425        return pings
426
427    def get_ping_descriptions(self) -> Dict[str, str]:
428        return {
429            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
430        }
431
432    @staticmethod
433    def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
434        """Return false if the field exists and is false.
435
436        If `consider_all_history` is False, then only check the latest value in the ping history.
437
438        Otherwise, if the field is not found or true in one or more history entries,
439        true is returned.
440        """
441
442        # Default to true if not specified.
443        if "history" not in ping_data or len(ping_data["history"]) == 0:
444            return True
445
446        # Check if at some point in the past the field has already been deployed.
447        # And if the caller of this method wants to consider this history of the field.
448        # Keep them in the schema, even if the field has changed as
449        # removing fields is currently not supported.
450        # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
451        # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
452        ping_history: list
453        if consider_all_history:
454            ping_history = ping_data["history"]
455        else:
456            ping_history = [ping_data["history"][-1]]
457        for history in ping_history:
458            if field_name not in history or history[field_name]:
459                return True
460
461        # The ping was created with include_info_sections = False. The fields can be excluded.
462        return False
463
464    def set_schema_url(self, metadata):
465        """
466        Switch between the glean-min and glean schemas if the ping does not require
467        info sections as specified in the parsed ping info in probe scraper.
468        """
469        if not metadata["include_info_sections"]:
470            self.schema_url = SCHEMA_URL_TEMPLATE.format(
471                branch=self.branch_name
472            ) + SCHEMA_VERSION_TEMPLATE.format(
473                schema_type="glean-min", version=self.version
474            )
475        else:
476            self.schema_url = SCHEMA_URL_TEMPLATE.format(
477                branch=self.branch_name
478            ) + SCHEMA_VERSION_TEMPLATE.format(
479                schema_type="glean", version=self.version
480            )
481
482    def generate_schema(
483        self,
484        config,
485        generic_schema=False,
486        blocked_distribution_pings=("events", "baseline"),
487    ) -> Dict[str, Schema]:
488        pings = self.get_pings_and_pipeline_metadata()
489        schemas = {}
490
491        for ping, pipeline_meta in pings.items():
492            matchers = {
493                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
494            }
495
496            # Four newly introduced metric types were incorrectly deployed
497            # as repeated key/value structs in all Glean ping tables existing prior
498            # to November 2021. We maintain the incorrect fields for existing tables
499            # by disabling the associated matchers.
500            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
501            # defined that will allow metrics of these types to be injected into proper
502            # structs. The gcp-ingestion repository includes logic to rewrite these
503            # metrics under the "2" names.
504            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
505            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
506            if bq_identifier in self.bug_1737656_affected_tables:
507                matchers = {
508                    loc: m
509                    for loc, m in matchers.items()
510                    if not m.matcher.get("bug_1737656_affected")
511                }
512
513            for matcher in matchers.values():
514                matcher.matcher["send_in_pings"]["contains"] = ping
515
516                # temporarily block distributions from being added to events and baseline pings
517                # https://mozilla-hub.atlassian.net/browse/DENG-10606
518                if (
519                    blocked_distribution_pings
520                    and ping in blocked_distribution_pings
521                    and matcher.type.endswith("_distribution")
522                ):
523                    matcher.matcher["send_in_pings"]["not_contains"] = ping
524
525            new_config = Config(ping, matchers=matchers)
526
527            defaults = {"mozPipelineMetadata": pipeline_meta}
528
529            # Adjust the schema path if the ping does not require info sections
530            self.set_schema_url(pipeline_meta)
531            if generic_schema:  # Use the generic glean ping schema
532                schema = self.get_schema(generic_schema=True)
533                schema.schema.update(defaults)
534                schemas[new_config.name] = schema
535            else:
536                generated = super().generate_schema(new_config)
537                for schema in generated.values():
538                    # We want to override each individual key with assembled defaults,
539                    # but keep values _inside_ them if they have been set in the schemas.
540                    for key, value in defaults.items():
541                        if key not in schema.schema:
542                            schema.schema[key] = {}
543                        schema.schema[key].update(value)
544                schemas.update(generated)
545
546        return schemas
547
548    @staticmethod
549    def get_repos():
550        """
551        Retrieve metadata for all non-library Glean repositories
552        """
553        repos = GleanPing._get_json(GleanPing.repos_url)
554        return [repo for repo in repos if "library_names" not in repo]
555
556    def get_app_name(self) -> str:
557        """Get app name associated with the app id.
558
559        e.g. org-mozilla-firefox -> fenix
560        """
561        apps = GleanPing._get_json(GleanPing.app_listings_url)
562        # app id in app-listings has "." instead of "-" so using document_namespace
563        app_name = [
564            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
565        ]
566        return app_name[0] if len(app_name) > 0 else self.app_id
567
568    @staticmethod
569    def get_metric_blocklist():
570        with open(METRIC_BLOCKLIST, "r") as f:
571            return yaml.safe_load(f)
ROOT_DIR = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator')
BUG_1737656_TXT = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/bug_1737656_affected.txt')
METRIC_BLOCKLIST = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/metric_blocklist.yaml')
logger = <Logger mozilla_schema_generator.glean_ping (WARNING)>
SCHEMA_URL_TEMPLATE = 'https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/{branch}/schemas/glean/glean/'
SCHEMA_VERSION_TEMPLATE = '{schema_type}.{version}.schema.json'
DEFAULT_SCHEMA_URL = 'https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/{branch}/schemas/glean/glean/glean.1.schema.json'
 43class GleanPing(GenericPing):
 44    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 45    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 46    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 47    dependencies_url_template = (
 48        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 49    )
 50    app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings"
 51
 52    default_dependencies = ["glean-core"]
 53
 54    with open(BUG_1737656_TXT, "r") as f:
 55        bug_1737656_affected_tables = [
 56            line.strip() for line in f.readlines() if line.strip()
 57        ]
 58
 59    def __init__(
 60        self, repo, version=1, use_metrics_blocklist=False, **kwargs
 61    ):  # TODO: Make env-url optional
 62        self.repo = repo
 63        self.repo_name = repo["name"]
 64        self.app_id = repo["app_id"]
 65        self.version = version
 66
 67        if use_metrics_blocklist:
 68            self.metric_blocklist = self.get_metric_blocklist()
 69        else:
 70            self.metric_blocklist = {}
 71
 72        super().__init__(
 73            DEFAULT_SCHEMA_URL,
 74            DEFAULT_SCHEMA_URL,
 75            self.probes_url_template.format(self.repo_name),
 76            **kwargs,
 77        )
 78
 79    def get_schema(self, generic_schema=False) -> Schema:
 80        """
 81        Fetch schema via URL.
 82
 83        Unless *generic_schema* is set to true, this function makes some modifications
 84        to allow some workarounds for proper injection of metrics.
 85        """
 86        schema = super().get_schema()
 87        if generic_schema:
 88            return schema
 89
 90        # We need to inject placeholders for the url2, text2, etc. types as part
 91        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 92        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 93            metric1 = schema.get(
 94                ("properties", "metrics", "properties", metric_name)
 95            ).copy()
 96            metric1 = schema.set_schema_elem(
 97                ("properties", "metrics", "properties", metric_name + "2"),
 98                metric1,
 99            )
100
101        return schema
102
103    @cache
104    def get_dependencies(self):
105        # Get all of the library dependencies for the application that
106        # are also known about in the repositories file.
107
108        # The dependencies are specified using library names, but we need to
109        # map those back to the name of the repository in the repository file.
110        try:
111            dependencies = self._get_json(
112                self.dependencies_url_template.format(self.repo_name)
113            )
114        except HTTPError:
115            logging.info(f"For {self.repo_name}, using default Glean dependencies")
116            return self.default_dependencies
117
118        dependency_library_names = list(dependencies.keys())
119
120        repos = GleanPing._get_json(GleanPing.repos_url)
121        repos_by_dependency_name = {}
122        for repo in repos:
123            for library_name in repo.get("library_names", []):
124                repos_by_dependency_name[library_name] = repo["name"]
125
126        dependencies = []
127        for name in dependency_library_names:
128            if name in repos_by_dependency_name:
129                dependencies.append(repos_by_dependency_name[name])
130
131        if len(dependencies) == 0:
132            logging.info(f"For {self.repo_name}, using default Glean dependencies")
133            return self.default_dependencies
134
135        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
136        return dependencies
137
138    @staticmethod
139    def remove_pings_from_metric(
140        metric: Dict[str, Any], blocked_pings: List[str]
141    ) -> Dict[str, Any]:
142        """Remove the given pings from the metric's `send_in_pings` history.
143
144        Only removes if the given metric has been removed from the source since a fixed date
145        (2025-01-01). This allows metrics to be added back to the schema.
146        """
147        if (
148            metric["in-source"]
149            or len(blocked_pings) == 0
150            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
151            >= datetime(year=2025, month=1, day=1)
152        ):
153            return metric
154
155        for history_entry in metric["history"]:
156            history_entry["send_in_pings"] = [
157                p for p in history_entry["send_in_pings"] if p not in blocked_pings
158            ]
159
160        return metric
161
162    def get_probes(self) -> List[GleanProbe]:
163        data = self._get_json(self.probes_url)
164
165        # blocklist needs to be applied here instead of generate_schema because it needs to be
166        # dependency-aware; metrics can move between app and library and still be in the schema
167        # turn blocklist into metric_name -> ping_types map
168        blocklist = defaultdict(list)
169        for ping_type, metric_names in self.metric_blocklist.get(
170            self.get_app_name(), {}
171        ).items():
172            for metric_name in metric_names:
173                blocklist[metric_name].append(ping_type)
174
175        probes = [
176            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
177            for name, defn in data.items()
178        ]
179
180        for dependency in self.get_dependencies():
181            dependency_probes = self._get_json(
182                self.probes_url_template.format(dependency)
183            )
184
185            dependency_blocklist = defaultdict(list)
186            for ping_type, metric_names in self.metric_blocklist.get(
187                dependency, {}
188            ).items():
189                for metric_name in metric_names:
190                    dependency_blocklist[metric_name].append(ping_type)
191
192            probes += [
193                (
194                    name,
195                    self.remove_pings_from_metric(
196                        defn, dependency_blocklist.get(name, [])
197                    ),
198                )
199                for name, defn in dependency_probes.items()
200            ]
201
202        # A metric can be moved between an app and its dependencies or between dependencies while
203        # probe scraper keeps the history in each location, so both definitions are returned
204        # Merge the history per probe to take the latest definition while still being able to
205        # find metric type changes below
206        # Metrics are not merged if they are not sent in the same pings as they are disjoint
207
208        # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler)
209        # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure"
210        # normalize to "media_audio_init_failure". The transpiler picks one of the colliding
211        # descriptions non-deterministically.
212        def _normalize_name(name):
213            return name.replace(".", "_").replace("-", "_")
214
215        def _pings_in_history(defn):
216            return {
217                p
218                for h in defn[GleanProbe.history_key]
219                for p in h.get("send_in_pings", ["metrics"])
220            }
221
222        def _latest_history_date(defn):
223            return max(
224                datetime.fromisoformat(h["dates"]["last"])
225                for h in defn[GleanProbe.history_key]
226            )
227
228        def _dedupe_sort_key(defn):
229            """Prefer the most recent definition, breaking ties by choosing the in-source metric."""
230            return (
231                _latest_history_date(defn),
232                defn.get(GleanProbe.in_source_key, False),
233                defn["name"],
234            )
235
236        # Group probes that share a normalized name and whose pings intersect to combine
237        # moved metrics and metrics that only differ by "." vs "_"
238        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
239        for name, defn in probes:
240            defn_pings = _pings_in_history(defn)
241            existing_groups = grouped_by_name[_normalize_name(name)]
242            matches = [
243                group
244                for group in existing_groups
245                if any(
246                    _pings_in_history(other_defn) & defn_pings for other_defn in group
247                )
248            ]
249            if not matches:
250                existing_groups.append([defn])
251            else:
252                merged_group = [defn]
253                for g in matches:
254                    merged_group.extend(g)
255                    existing_groups.remove(g)
256                existing_groups.append(merged_group)
257
258        # Take latest definition per group
259        deduped_probes: List[Any] = []
260        for groups in grouped_by_name.values():
261            for group in groups:
262                latest_defn = max(group, key=_dedupe_sort_key)
263                if len(group) > 1:
264                    latest_defn = latest_defn.copy()
265                    latest_defn[GleanProbe.history_key] = sorted(
266                        (h for d in group for h in d[GleanProbe.history_key]),
267                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
268                    )
269                deduped_probes.append((latest_defn["name"], latest_defn))
270        probes = deduped_probes
271
272        pings = self.get_pings()
273
274        processed = []
275        for _id, defn in probes:
276            probe = GleanProbe(_id, defn, pings=pings)
277            processed.append(probe)
278
279            # Handling probe type changes (Bug 1870317)
280            probe_types = {hist["type"] for hist in defn[probe.history_key]}
281            if len(probe_types) > 1:
282                # The probe type changed at some point in history.
283                # Create schema entry for each type.
284                hist_defn = defn.copy()
285
286                # No new entry needs to be created for the current probe type
287                probe_types.remove(defn["type"])
288
289                for hist in hist_defn[probe.history_key]:
290                    # Create a new entry for a historic type
291                    if hist["type"] in probe_types:
292                        hist_defn["type"] = hist["type"]
293                        probe = GleanProbe(_id, hist_defn, pings=pings)
294                        processed.append(probe)
295
296                        # Keep track of the types entries were already created for
297                        probe_types.remove(hist["type"])
298
299        return processed
300
301    def _get_ping_data(self) -> Dict[str, Dict]:
302        url = self.ping_url_template.format(self.repo_name)
303        ping_data = GleanPing._get_json(url)
304        for dependency in self.get_dependencies():
305            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
306            ping_data.update(dependency_pings)
307        return ping_data
308
309    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
310        url = self.ping_url_template.format(self.repo_name)
311        ping_data = GleanPing._get_json(url)
312        return ping_data
313
314    def _get_dependency_pings(self, dependency):
315        return self._get_json(self.ping_url_template.format(dependency))
316
317    def get_pings(self) -> Set[str]:
318        return self._get_ping_data().keys()
319
320    @staticmethod
321    def apply_default_metadata(ping_metadata, default_metadata):
322        """apply_default_metadata recurses down into dicts nested
323        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
324        ``ping_metadata``.
325        :param ping_metadata: dict onto which the merge is executed
326        :param default_metadata: dct merged into ping_metadata
327        :return: None
328        """
329        for k, v in default_metadata.items():
330            if (
331                k in ping_metadata
332                and isinstance(ping_metadata[k], dict)
333                and isinstance(default_metadata[k], dict)
334            ):
335                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
336            else:
337                ping_metadata[k] = default_metadata[k]
338
339    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
340        # Get the ping data with the pipeline metadata
341        ping_data = self._get_ping_data_without_dependencies()
342
343        # The ping endpoint for the dependency pings does not include any repo defined
344        # moz_pipeline_metadata_defaults so they need to be applied here.
345
346        # 1.  Get repo and pipeline default metadata.
347        repos = self.get_repos()
348        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
349        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
350
351        # 2.  Apply the default metadata to each dependency defined ping.
352
353        # Apply app-level metadata to pings defined in dependencies
354        app_metadata = current_repo.get("moz_pipeline_metadata", {})
355
356        for dependency in self.get_dependencies():
357            dependency_pings = self._get_dependency_pings(dependency)
358            for dependency_ping in dependency_pings.values():
359                # Although it is counter intuitive to apply the default metadata on top of the
360                # existing dependency ping metadata it does set the repo specific value for
361                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
362                # value.
363                GleanPing.apply_default_metadata(
364                    dependency_ping.get("moz_pipeline_metadata"),
365                    copy.deepcopy(default_metadata),
366                )
367                # app-level ping properties take priority over the app defaults
368                metadata_override = app_metadata.get(dependency_ping["name"])
369                if metadata_override is not None:
370                    GleanPing.apply_default_metadata(
371                        dependency_ping.get("moz_pipeline_metadata"), metadata_override
372                    )
373            ping_data.update(dependency_pings)
374
375        return ping_data
376
377    @staticmethod
378    def reorder_metadata(metadata):
379        desired_order_list = [
380            "bq_dataset_family",
381            "bq_table",
382            "bq_metadata_format",
383            "include_info_sections",
384            "submission_timestamp_granularity",
385            "expiration_policy",
386            "override_attributes",
387            "jwe_mappings",
388        ]
389        reordered_metadata = {
390            k: metadata[k] for k in desired_order_list if k in metadata
391        }
392
393        # re-order jwe-mappings
394        desired_order_list = ["source_field_path", "decrypted_field_path"]
395        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
396        if jwe_mapping_metadata:
397            reordered_jwe_mapping_metadata = []
398            for mapping in jwe_mapping_metadata:
399                reordered_jwe_mapping_metadata.append(
400                    {k: mapping[k] for k in desired_order_list if k in mapping}
401                )
402            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
403
404        # future proofing, in case there are other fields added at the ping top level
405        # add them to the end.
406        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
407        reordered_metadata = {**reordered_metadata, **leftovers}
408        return reordered_metadata
409
410    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
411        pings = self._get_ping_data_and_dependencies_with_default_metadata()
412        for ping_name, ping_data in pings.items():
413            metadata = ping_data.get("moz_pipeline_metadata")
414            if not metadata:
415                continue
416            metadata["include_info_sections"] = self._is_field_included(
417                ping_data, "include_info_sections", consider_all_history=False
418            )
419            metadata["include_client_id"] = self._is_field_included(
420                ping_data, "include_client_id"
421            )
422
423            # While technically unnecessary, the dictionary elements are re-ordered to match the
424            # currently deployed order and used to verify no difference in output.
425            pings[ping_name] = GleanPing.reorder_metadata(metadata)
426        return pings
427
428    def get_ping_descriptions(self) -> Dict[str, str]:
429        return {
430            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
431        }
432
433    @staticmethod
434    def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
435        """Return false if the field exists and is false.
436
437        If `consider_all_history` is False, then only check the latest value in the ping history.
438
439        Otherwise, if the field is not found or true in one or more history entries,
440        true is returned.
441        """
442
443        # Default to true if not specified.
444        if "history" not in ping_data or len(ping_data["history"]) == 0:
445            return True
446
447        # Check if at some point in the past the field has already been deployed.
448        # And if the caller of this method wants to consider this history of the field.
449        # Keep them in the schema, even if the field has changed as
450        # removing fields is currently not supported.
451        # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
452        # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
453        ping_history: list
454        if consider_all_history:
455            ping_history = ping_data["history"]
456        else:
457            ping_history = [ping_data["history"][-1]]
458        for history in ping_history:
459            if field_name not in history or history[field_name]:
460                return True
461
462        # The ping was created with include_info_sections = False. The fields can be excluded.
463        return False
464
465    def set_schema_url(self, metadata):
466        """
467        Switch between the glean-min and glean schemas if the ping does not require
468        info sections as specified in the parsed ping info in probe scraper.
469        """
470        if not metadata["include_info_sections"]:
471            self.schema_url = SCHEMA_URL_TEMPLATE.format(
472                branch=self.branch_name
473            ) + SCHEMA_VERSION_TEMPLATE.format(
474                schema_type="glean-min", version=self.version
475            )
476        else:
477            self.schema_url = SCHEMA_URL_TEMPLATE.format(
478                branch=self.branch_name
479            ) + SCHEMA_VERSION_TEMPLATE.format(
480                schema_type="glean", version=self.version
481            )
482
483    def generate_schema(
484        self,
485        config,
486        generic_schema=False,
487        blocked_distribution_pings=("events", "baseline"),
488    ) -> Dict[str, Schema]:
489        pings = self.get_pings_and_pipeline_metadata()
490        schemas = {}
491
492        for ping, pipeline_meta in pings.items():
493            matchers = {
494                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
495            }
496
497            # Four newly introduced metric types were incorrectly deployed
498            # as repeated key/value structs in all Glean ping tables existing prior
499            # to November 2021. We maintain the incorrect fields for existing tables
500            # by disabling the associated matchers.
501            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
502            # defined that will allow metrics of these types to be injected into proper
503            # structs. The gcp-ingestion repository includes logic to rewrite these
504            # metrics under the "2" names.
505            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
506            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
507            if bq_identifier in self.bug_1737656_affected_tables:
508                matchers = {
509                    loc: m
510                    for loc, m in matchers.items()
511                    if not m.matcher.get("bug_1737656_affected")
512                }
513
514            for matcher in matchers.values():
515                matcher.matcher["send_in_pings"]["contains"] = ping
516
517                # temporarily block distributions from being added to events and baseline pings
518                # https://mozilla-hub.atlassian.net/browse/DENG-10606
519                if (
520                    blocked_distribution_pings
521                    and ping in blocked_distribution_pings
522                    and matcher.type.endswith("_distribution")
523                ):
524                    matcher.matcher["send_in_pings"]["not_contains"] = ping
525
526            new_config = Config(ping, matchers=matchers)
527
528            defaults = {"mozPipelineMetadata": pipeline_meta}
529
530            # Adjust the schema path if the ping does not require info sections
531            self.set_schema_url(pipeline_meta)
532            if generic_schema:  # Use the generic glean ping schema
533                schema = self.get_schema(generic_schema=True)
534                schema.schema.update(defaults)
535                schemas[new_config.name] = schema
536            else:
537                generated = super().generate_schema(new_config)
538                for schema in generated.values():
539                    # We want to override each individual key with assembled defaults,
540                    # but keep values _inside_ them if they have been set in the schemas.
541                    for key, value in defaults.items():
542                        if key not in schema.schema:
543                            schema.schema[key] = {}
544                        schema.schema[key].update(value)
545                schemas.update(generated)
546
547        return schemas
548
549    @staticmethod
550    def get_repos():
551        """
552        Retrieve metadata for all non-library Glean repositories
553        """
554        repos = GleanPing._get_json(GleanPing.repos_url)
555        return [repo for repo in repos if "library_names" not in repo]
556
557    def get_app_name(self) -> str:
558        """Get app name associated with the app id.
559
560        e.g. org-mozilla-firefox -> fenix
561        """
562        apps = GleanPing._get_json(GleanPing.app_listings_url)
563        # app id in app-listings has "." instead of "-" so using document_namespace
564        app_name = [
565            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
566        ]
567        return app_name[0] if len(app_name) > 0 else self.app_id
568
569    @staticmethod
570    def get_metric_blocklist():
571        with open(METRIC_BLOCKLIST, "r") as f:
572            return yaml.safe_load(f)
GleanPing(repo, version=1, use_metrics_blocklist=False, **kwargs)
59    def __init__(
60        self, repo, version=1, use_metrics_blocklist=False, **kwargs
61    ):  # TODO: Make env-url optional
62        self.repo = repo
63        self.repo_name = repo["name"]
64        self.app_id = repo["app_id"]
65        self.version = version
66
67        if use_metrics_blocklist:
68            self.metric_blocklist = self.get_metric_blocklist()
69        else:
70            self.metric_blocklist = {}
71
72        super().__init__(
73            DEFAULT_SCHEMA_URL,
74            DEFAULT_SCHEMA_URL,
75            self.probes_url_template.format(self.repo_name),
76            **kwargs,
77        )
probes_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/metrics'
ping_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/pings'
repos_url = 'https://probeinfo.telemetry.mozilla.org/glean/repositories'
dependencies_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/dependencies'
app_listings_url = 'https://probeinfo.telemetry.mozilla.org/v2/glean/app-listings'
default_dependencies = ['glean-core']
repo
repo_name
app_id
version
def get_schema(self, generic_schema=False) -> mozilla_schema_generator.schema.Schema:
 79    def get_schema(self, generic_schema=False) -> Schema:
 80        """
 81        Fetch schema via URL.
 82
 83        Unless *generic_schema* is set to true, this function makes some modifications
 84        to allow some workarounds for proper injection of metrics.
 85        """
 86        schema = super().get_schema()
 87        if generic_schema:
 88            return schema
 89
 90        # We need to inject placeholders for the url2, text2, etc. types as part
 91        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 92        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 93            metric1 = schema.get(
 94                ("properties", "metrics", "properties", metric_name)
 95            ).copy()
 96            metric1 = schema.set_schema_elem(
 97                ("properties", "metrics", "properties", metric_name + "2"),
 98                metric1,
 99            )
100
101        return schema

Fetch schema via URL.

Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.

@cache
def get_dependencies(self):
103    @cache
104    def get_dependencies(self):
105        # Get all of the library dependencies for the application that
106        # are also known about in the repositories file.
107
108        # The dependencies are specified using library names, but we need to
109        # map those back to the name of the repository in the repository file.
110        try:
111            dependencies = self._get_json(
112                self.dependencies_url_template.format(self.repo_name)
113            )
114        except HTTPError:
115            logging.info(f"For {self.repo_name}, using default Glean dependencies")
116            return self.default_dependencies
117
118        dependency_library_names = list(dependencies.keys())
119
120        repos = GleanPing._get_json(GleanPing.repos_url)
121        repos_by_dependency_name = {}
122        for repo in repos:
123            for library_name in repo.get("library_names", []):
124                repos_by_dependency_name[library_name] = repo["name"]
125
126        dependencies = []
127        for name in dependency_library_names:
128            if name in repos_by_dependency_name:
129                dependencies.append(repos_by_dependency_name[name])
130
131        if len(dependencies) == 0:
132            logging.info(f"For {self.repo_name}, using default Glean dependencies")
133            return self.default_dependencies
134
135        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
136        return dependencies
@staticmethod
def remove_pings_from_metric(metric: Dict[str, Any], blocked_pings: List[str]) -> Dict[str, Any]:
138    @staticmethod
139    def remove_pings_from_metric(
140        metric: Dict[str, Any], blocked_pings: List[str]
141    ) -> Dict[str, Any]:
142        """Remove the given pings from the metric's `send_in_pings` history.
143
144        Only removes if the given metric has been removed from the source since a fixed date
145        (2025-01-01). This allows metrics to be added back to the schema.
146        """
147        if (
148            metric["in-source"]
149            or len(blocked_pings) == 0
150            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
151            >= datetime(year=2025, month=1, day=1)
152        ):
153            return metric
154
155        for history_entry in metric["history"]:
156            history_entry["send_in_pings"] = [
157                p for p in history_entry["send_in_pings"] if p not in blocked_pings
158            ]
159
160        return metric

Remove the given pings from the metric's send_in_pings history.

Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.

def get_probes(self) -> List[mozilla_schema_generator.probes.GleanProbe]:
162    def get_probes(self) -> List[GleanProbe]:
163        data = self._get_json(self.probes_url)
164
165        # blocklist needs to be applied here instead of generate_schema because it needs to be
166        # dependency-aware; metrics can move between app and library and still be in the schema
167        # turn blocklist into metric_name -> ping_types map
168        blocklist = defaultdict(list)
169        for ping_type, metric_names in self.metric_blocklist.get(
170            self.get_app_name(), {}
171        ).items():
172            for metric_name in metric_names:
173                blocklist[metric_name].append(ping_type)
174
175        probes = [
176            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
177            for name, defn in data.items()
178        ]
179
180        for dependency in self.get_dependencies():
181            dependency_probes = self._get_json(
182                self.probes_url_template.format(dependency)
183            )
184
185            dependency_blocklist = defaultdict(list)
186            for ping_type, metric_names in self.metric_blocklist.get(
187                dependency, {}
188            ).items():
189                for metric_name in metric_names:
190                    dependency_blocklist[metric_name].append(ping_type)
191
192            probes += [
193                (
194                    name,
195                    self.remove_pings_from_metric(
196                        defn, dependency_blocklist.get(name, [])
197                    ),
198                )
199                for name, defn in dependency_probes.items()
200            ]
201
202        # A metric can be moved between an app and its dependencies or between dependencies while
203        # probe scraper keeps the history in each location, so both definitions are returned
204        # Merge the history per probe to take the latest definition while still being able to
205        # find metric type changes below
206        # Metrics are not merged if they are not sent in the same pings as they are disjoint
207
208        # Metrics are grouped by their normalized BigQuery column name (from jsonschema-transpiler)
209        # rather than their raw name. e.g. "media.audio.init_failure" and "media.audio_init_failure"
210        # normalize to "media_audio_init_failure". The transpiler picks one of the colliding
211        # descriptions non-deterministically.
212        def _normalize_name(name):
213            return name.replace(".", "_").replace("-", "_")
214
215        def _pings_in_history(defn):
216            return {
217                p
218                for h in defn[GleanProbe.history_key]
219                for p in h.get("send_in_pings", ["metrics"])
220            }
221
222        def _latest_history_date(defn):
223            return max(
224                datetime.fromisoformat(h["dates"]["last"])
225                for h in defn[GleanProbe.history_key]
226            )
227
228        def _dedupe_sort_key(defn):
229            """Prefer the most recent definition, breaking ties by choosing the in-source metric."""
230            return (
231                _latest_history_date(defn),
232                defn.get(GleanProbe.in_source_key, False),
233                defn["name"],
234            )
235
236        # Group probes that share a normalized name and whose pings intersect to combine
237        # moved metrics and metrics that only differ by "." vs "_"
238        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
239        for name, defn in probes:
240            defn_pings = _pings_in_history(defn)
241            existing_groups = grouped_by_name[_normalize_name(name)]
242            matches = [
243                group
244                for group in existing_groups
245                if any(
246                    _pings_in_history(other_defn) & defn_pings for other_defn in group
247                )
248            ]
249            if not matches:
250                existing_groups.append([defn])
251            else:
252                merged_group = [defn]
253                for g in matches:
254                    merged_group.extend(g)
255                    existing_groups.remove(g)
256                existing_groups.append(merged_group)
257
258        # Take latest definition per group
259        deduped_probes: List[Any] = []
260        for groups in grouped_by_name.values():
261            for group in groups:
262                latest_defn = max(group, key=_dedupe_sort_key)
263                if len(group) > 1:
264                    latest_defn = latest_defn.copy()
265                    latest_defn[GleanProbe.history_key] = sorted(
266                        (h for d in group for h in d[GleanProbe.history_key]),
267                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
268                    )
269                deduped_probes.append((latest_defn["name"], latest_defn))
270        probes = deduped_probes
271
272        pings = self.get_pings()
273
274        processed = []
275        for _id, defn in probes:
276            probe = GleanProbe(_id, defn, pings=pings)
277            processed.append(probe)
278
279            # Handling probe type changes (Bug 1870317)
280            probe_types = {hist["type"] for hist in defn[probe.history_key]}
281            if len(probe_types) > 1:
282                # The probe type changed at some point in history.
283                # Create schema entry for each type.
284                hist_defn = defn.copy()
285
286                # No new entry needs to be created for the current probe type
287                probe_types.remove(defn["type"])
288
289                for hist in hist_defn[probe.history_key]:
290                    # Create a new entry for a historic type
291                    if hist["type"] in probe_types:
292                        hist_defn["type"] = hist["type"]
293                        probe = GleanProbe(_id, hist_defn, pings=pings)
294                        processed.append(probe)
295
296                        # Keep track of the types entries were already created for
297                        probe_types.remove(hist["type"])
298
299        return processed
def get_pings(self) -> Set[str]:
317    def get_pings(self) -> Set[str]:
318        return self._get_ping_data().keys()
@staticmethod
def apply_default_metadata(ping_metadata, default_metadata):
320    @staticmethod
321    def apply_default_metadata(ping_metadata, default_metadata):
322        """apply_default_metadata recurses down into dicts nested
323        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
324        ``ping_metadata``.
325        :param ping_metadata: dict onto which the merge is executed
326        :param default_metadata: dct merged into ping_metadata
327        :return: None
328        """
329        for k, v in default_metadata.items():
330            if (
331                k in ping_metadata
332                and isinstance(ping_metadata[k], dict)
333                and isinstance(default_metadata[k], dict)
334            ):
335                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
336            else:
337                ping_metadata[k] = default_metadata[k]

apply_default_metadata recurses down into dicts nested to an arbitrary depth, updating keys. The default_metadata is merged into ping_metadata.

Parameters
  • ping_metadata: dict onto which the merge is executed
  • default_metadata: dct merged into ping_metadata
Returns

None

@staticmethod
def reorder_metadata(metadata):
377    @staticmethod
378    def reorder_metadata(metadata):
379        desired_order_list = [
380            "bq_dataset_family",
381            "bq_table",
382            "bq_metadata_format",
383            "include_info_sections",
384            "submission_timestamp_granularity",
385            "expiration_policy",
386            "override_attributes",
387            "jwe_mappings",
388        ]
389        reordered_metadata = {
390            k: metadata[k] for k in desired_order_list if k in metadata
391        }
392
393        # re-order jwe-mappings
394        desired_order_list = ["source_field_path", "decrypted_field_path"]
395        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
396        if jwe_mapping_metadata:
397            reordered_jwe_mapping_metadata = []
398            for mapping in jwe_mapping_metadata:
399                reordered_jwe_mapping_metadata.append(
400                    {k: mapping[k] for k in desired_order_list if k in mapping}
401                )
402            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
403
404        # future proofing, in case there are other fields added at the ping top level
405        # add them to the end.
406        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
407        reordered_metadata = {**reordered_metadata, **leftovers}
408        return reordered_metadata
def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
410    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
411        pings = self._get_ping_data_and_dependencies_with_default_metadata()
412        for ping_name, ping_data in pings.items():
413            metadata = ping_data.get("moz_pipeline_metadata")
414            if not metadata:
415                continue
416            metadata["include_info_sections"] = self._is_field_included(
417                ping_data, "include_info_sections", consider_all_history=False
418            )
419            metadata["include_client_id"] = self._is_field_included(
420                ping_data, "include_client_id"
421            )
422
423            # While technically unnecessary, the dictionary elements are re-ordered to match the
424            # currently deployed order and used to verify no difference in output.
425            pings[ping_name] = GleanPing.reorder_metadata(metadata)
426        return pings
def get_ping_descriptions(self) -> Dict[str, str]:
428    def get_ping_descriptions(self) -> Dict[str, str]:
429        return {
430            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
431        }
def set_schema_url(self, metadata):
465    def set_schema_url(self, metadata):
466        """
467        Switch between the glean-min and glean schemas if the ping does not require
468        info sections as specified in the parsed ping info in probe scraper.
469        """
470        if not metadata["include_info_sections"]:
471            self.schema_url = SCHEMA_URL_TEMPLATE.format(
472                branch=self.branch_name
473            ) + SCHEMA_VERSION_TEMPLATE.format(
474                schema_type="glean-min", version=self.version
475            )
476        else:
477            self.schema_url = SCHEMA_URL_TEMPLATE.format(
478                branch=self.branch_name
479            ) + SCHEMA_VERSION_TEMPLATE.format(
480                schema_type="glean", version=self.version
481            )

Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.

def generate_schema( self, config, generic_schema=False, blocked_distribution_pings=('events', 'baseline')) -> Dict[str, mozilla_schema_generator.schema.Schema]:
483    def generate_schema(
484        self,
485        config,
486        generic_schema=False,
487        blocked_distribution_pings=("events", "baseline"),
488    ) -> Dict[str, Schema]:
489        pings = self.get_pings_and_pipeline_metadata()
490        schemas = {}
491
492        for ping, pipeline_meta in pings.items():
493            matchers = {
494                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
495            }
496
497            # Four newly introduced metric types were incorrectly deployed
498            # as repeated key/value structs in all Glean ping tables existing prior
499            # to November 2021. We maintain the incorrect fields for existing tables
500            # by disabling the associated matchers.
501            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
502            # defined that will allow metrics of these types to be injected into proper
503            # structs. The gcp-ingestion repository includes logic to rewrite these
504            # metrics under the "2" names.
505            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
506            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
507            if bq_identifier in self.bug_1737656_affected_tables:
508                matchers = {
509                    loc: m
510                    for loc, m in matchers.items()
511                    if not m.matcher.get("bug_1737656_affected")
512                }
513
514            for matcher in matchers.values():
515                matcher.matcher["send_in_pings"]["contains"] = ping
516
517                # temporarily block distributions from being added to events and baseline pings
518                # https://mozilla-hub.atlassian.net/browse/DENG-10606
519                if (
520                    blocked_distribution_pings
521                    and ping in blocked_distribution_pings
522                    and matcher.type.endswith("_distribution")
523                ):
524                    matcher.matcher["send_in_pings"]["not_contains"] = ping
525
526            new_config = Config(ping, matchers=matchers)
527
528            defaults = {"mozPipelineMetadata": pipeline_meta}
529
530            # Adjust the schema path if the ping does not require info sections
531            self.set_schema_url(pipeline_meta)
532            if generic_schema:  # Use the generic glean ping schema
533                schema = self.get_schema(generic_schema=True)
534                schema.schema.update(defaults)
535                schemas[new_config.name] = schema
536            else:
537                generated = super().generate_schema(new_config)
538                for schema in generated.values():
539                    # We want to override each individual key with assembled defaults,
540                    # but keep values _inside_ them if they have been set in the schemas.
541                    for key, value in defaults.items():
542                        if key not in schema.schema:
543                            schema.schema[key] = {}
544                        schema.schema[key].update(value)
545                schemas.update(generated)
546
547        return schemas
@staticmethod
def get_repos():
549    @staticmethod
550    def get_repos():
551        """
552        Retrieve metadata for all non-library Glean repositories
553        """
554        repos = GleanPing._get_json(GleanPing.repos_url)
555        return [repo for repo in repos if "library_names" not in repo]

Retrieve metadata for all non-library Glean repositories

def get_app_name(self) -> str:
557    def get_app_name(self) -> str:
558        """Get app name associated with the app id.
559
560        e.g. org-mozilla-firefox -> fenix
561        """
562        apps = GleanPing._get_json(GleanPing.app_listings_url)
563        # app id in app-listings has "." instead of "-" so using document_namespace
564        app_name = [
565            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
566        ]
567        return app_name[0] if len(app_name) > 0 else self.app_id

Get app name associated with the app id.

e.g. org-mozilla-firefox -> fenix

@staticmethod
def get_metric_blocklist():
569    @staticmethod
570    def get_metric_blocklist():
571        with open(METRIC_BLOCKLIST, "r") as f:
572            return yaml.safe_load(f)
f = <_io.TextIOWrapper name='/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/bug_1737656_affected.txt' mode='r' encoding='UTF-8'>
bug_1737656_affected_tables = ['burnham.baseline_v1', 'burnham.deletion_request_v1', 'burnham.discovery_v1', 'burnham.events_v1', 'burnham.metrics_v1', 'burnham.space_ship_ready_v1', 'burnham.starbase46_v1', 'firefox_desktop_background_update.background_update_v1', 'firefox_desktop_background_update.baseline_v1', 'firefox_desktop_background_update.deletion_request_v1', 'firefox_desktop_background_update.events_v1', 'firefox_desktop_background_update.metrics_v1', 'firefox_desktop.baseline_v1', 'firefox_desktop.deletion_request_v1', 'firefox_desktop.events_v1', 'firefox_desktop.fog_validation_v1', 'firefox_desktop.metrics_v1', 'firefox_installer.install_v1', 'firefox_launcher_process.launcher_process_failure_v1', 'messaging_system.cfr_v1', 'messaging_system.infobar_v1', 'messaging_system.moments_v1', 'messaging_system.onboarding_v1', 'messaging_system.personalization_experiment_v1', 'messaging_system.snippets_v1', 'messaging_system.spotlight_v1', 'messaging_system.undesired_events_v1', 'messaging_system.whats_new_panel_v1', 'mlhackweek_search.action_v1', 'mlhackweek_search.baseline_v1', 'mlhackweek_search.custom_v1', 'mlhackweek_search.deletion_request_v1', 'mlhackweek_search.events_v1', 'mlhackweek_search.metrics_v1', 'mozilla_lockbox.addresses_sync_v1', 'mozilla_lockbox.baseline_v1', 'mozilla_lockbox.bookmarks_sync_v1', 'mozilla_lockbox.creditcards_sync_v1', 'mozilla_lockbox.deletion_request_v1', 'mozilla_lockbox.events_v1', 'mozilla_lockbox.history_sync_v1', 'mozilla_lockbox.logins_sync_v1', 'mozilla_lockbox.metrics_v1', 'mozilla_lockbox.sync_v1', 'mozilla_lockbox.tabs_sync_v1', 'mozilla_mach.baseline_v1', 'mozilla_mach.deletion_request_v1', 'mozilla_mach.events_v1', 'mozilla_mach.metrics_v1', 'mozilla_mach.usage_v1', 'mozillavpn.deletion_request_v1', 'mozillavpn.main_v1', 'mozphab.baseline_v1', 'mozphab.deletion_request_v1', 'mozphab.events_v1', 'mozphab.metrics_v1', 'mozphab.usage_v1', 'org_mozilla_bergamot.custom_v1', 'org_mozilla_bergamot.deletion_request_v1', 'org_mozilla_connect_firefox.baseline_v1', 'org_mozilla_connect_firefox.deletion_request_v1', 'org_mozilla_connect_firefox.events_v1', 'org_mozilla_connect_firefox.metrics_v1', 'org_mozilla_fenix.activation_v1', 'org_mozilla_fenix.addresses_sync_v1', 'org_mozilla_fenix.baseline_v1', 'org_mozilla_fenix.bookmarks_sync_v1', 'org_mozilla_fenix.creditcards_sync_v1', 'org_mozilla_fenix.deletion_request_v1', 'org_mozilla_fenix.events_v1', 'org_mozilla_fenix.first_session_v1', 'org_mozilla_fenix.fog_validation_v1', 'org_mozilla_fenix.history_sync_v1', 'org_mozilla_fenix.installation_v1', 'org_mozilla_fenix.logins_sync_v1', 'org_mozilla_fenix.metrics_v1', 'org_mozilla_fenix.migration_v1', 'org_mozilla_fenix.startup_timeline_v1', 'org_mozilla_fenix.sync_v1', 'org_mozilla_fenix.tabs_sync_v1', 'org_mozilla_fenix_nightly.activation_v1', 'org_mozilla_fenix_nightly.addresses_sync_v1', 'org_mozilla_fenix_nightly.baseline_v1', 'org_mozilla_fenix_nightly.bookmarks_sync_v1', 'org_mozilla_fenix_nightly.creditcards_sync_v1', 'org_mozilla_fenix_nightly.deletion_request_v1', 'org_mozilla_fenix_nightly.events_v1', 'org_mozilla_fenix_nightly.first_session_v1', 'org_mozilla_fenix_nightly.fog_validation_v1', 'org_mozilla_fenix_nightly.history_sync_v1', 'org_mozilla_fenix_nightly.installation_v1', 'org_mozilla_fenix_nightly.logins_sync_v1', 'org_mozilla_fenix_nightly.metrics_v1', 'org_mozilla_fenix_nightly.migration_v1', 'org_mozilla_fenix_nightly.startup_timeline_v1', 'org_mozilla_fenix_nightly.sync_v1', 'org_mozilla_fenix_nightly.tabs_sync_v1', 'org_mozilla_fennec_aurora.activation_v1', 'org_mozilla_fennec_aurora.addresses_sync_v1', 'org_mozilla_fennec_aurora.baseline_v1', 'org_mozilla_fennec_aurora.bookmarks_sync_v1', 'org_mozilla_fennec_aurora.creditcards_sync_v1', 'org_mozilla_fennec_aurora.deletion_request_v1', 'org_mozilla_fennec_aurora.events_v1', 'org_mozilla_fennec_aurora.first_session_v1', 'org_mozilla_fennec_aurora.fog_validation_v1', 'org_mozilla_fennec_aurora.history_sync_v1', 'org_mozilla_fennec_aurora.installation_v1', 'org_mozilla_fennec_aurora.logins_sync_v1', 'org_mozilla_fennec_aurora.metrics_v1', 'org_mozilla_fennec_aurora.migration_v1', 'org_mozilla_fennec_aurora.startup_timeline_v1', 'org_mozilla_fennec_aurora.sync_v1', 'org_mozilla_fennec_aurora.tabs_sync_v1', 'org_mozilla_firefox_beta.activation_v1', 'org_mozilla_firefox_beta.addresses_sync_v1', 'org_mozilla_firefox_beta.baseline_v1', 'org_mozilla_firefox_beta.bookmarks_sync_v1', 'org_mozilla_firefox_beta.creditcards_sync_v1', 'org_mozilla_firefox_beta.deletion_request_v1', 'org_mozilla_firefox_beta.events_v1', 'org_mozilla_firefox_beta.first_session_v1', 'org_mozilla_firefox_beta.fog_validation_v1', 'org_mozilla_firefox_beta.history_sync_v1', 'org_mozilla_firefox_beta.installation_v1', 'org_mozilla_firefox_beta.logins_sync_v1', 'org_mozilla_firefox_beta.metrics_v1', 'org_mozilla_firefox_beta.migration_v1', 'org_mozilla_firefox_beta.startup_timeline_v1', 'org_mozilla_firefox_beta.sync_v1', 'org_mozilla_firefox_beta.tabs_sync_v1', 'org_mozilla_firefox.activation_v1', 'org_mozilla_firefox.addresses_sync_v1', 'org_mozilla_firefox.baseline_v1', 'org_mozilla_firefox.bookmarks_sync_v1', 'org_mozilla_firefox.creditcards_sync_v1', 'org_mozilla_firefox.deletion_request_v1', 'org_mozilla_firefox.events_v1', 'org_mozilla_firefox.first_session_v1', 'org_mozilla_firefox.fog_validation_v1', 'org_mozilla_firefox.history_sync_v1', 'org_mozilla_firefox.installation_v1', 'org_mozilla_firefox.logins_sync_v1', 'org_mozilla_firefox.metrics_v1', 'org_mozilla_firefox.migration_v1', 'org_mozilla_firefox.startup_timeline_v1', 'org_mozilla_firefox.sync_v1', 'org_mozilla_firefox.tabs_sync_v1', 'org_mozilla_firefoxreality.baseline_v1', 'org_mozilla_firefoxreality.deletion_request_v1', 'org_mozilla_firefoxreality.events_v1', 'org_mozilla_firefoxreality.launch_v1', 'org_mozilla_firefoxreality.metrics_v1', 'org_mozilla_focus_beta.activation_v1', 'org_mozilla_focus_beta.baseline_v1', 'org_mozilla_focus_beta.deletion_request_v1', 'org_mozilla_focus_beta.events_v1', 'org_mozilla_focus_beta.metrics_v1', 'org_mozilla_focus.activation_v1', 'org_mozilla_focus.baseline_v1', 'org_mozilla_focus.deletion_request_v1', 'org_mozilla_focus.events_v1', 'org_mozilla_focus.metrics_v1', 'org_mozilla_focus_nightly.activation_v1', 'org_mozilla_focus_nightly.baseline_v1', 'org_mozilla_focus_nightly.deletion_request_v1', 'org_mozilla_focus_nightly.events_v1', 'org_mozilla_focus_nightly.metrics_v1', 'org_mozilla_ios_fennec.baseline_v1', 'org_mozilla_ios_fennec.deletion_request_v1', 'org_mozilla_ios_fennec.events_v1', 'org_mozilla_ios_fennec.metrics_v1', 'org_mozilla_ios_firefox.baseline_v1', 'org_mozilla_ios_firefox.deletion_request_v1', 'org_mozilla_ios_firefox.events_v1', 'org_mozilla_ios_firefox.metrics_v1', 'org_mozilla_ios_firefoxbeta.baseline_v1', 'org_mozilla_ios_firefoxbeta.deletion_request_v1', 'org_mozilla_ios_firefoxbeta.events_v1', 'org_mozilla_ios_firefoxbeta.metrics_v1', 'org_mozilla_ios_focus.baseline_v1', 'org_mozilla_ios_focus.deletion_request_v1', 'org_mozilla_ios_focus.events_v1', 'org_mozilla_ios_focus.metrics_v1', 'org_mozilla_ios_klar.baseline_v1', 'org_mozilla_ios_klar.deletion_request_v1', 'org_mozilla_ios_klar.events_v1', 'org_mozilla_ios_klar.metrics_v1', 'org_mozilla_ios_lockbox.baseline_v1', 'org_mozilla_ios_lockbox.deletion_request_v1', 'org_mozilla_ios_lockbox.events_v1', 'org_mozilla_ios_lockbox.metrics_v1', 'org_mozilla_klar.activation_v1', 'org_mozilla_klar.baseline_v1', 'org_mozilla_klar.deletion_request_v1', 'org_mozilla_klar.events_v1', 'org_mozilla_klar.metrics_v1', 'org_mozilla_mozregression.baseline_v1', 'org_mozilla_mozregression.deletion_request_v1', 'org_mozilla_mozregression.events_v1', 'org_mozilla_mozregression.metrics_v1', 'org_mozilla_mozregression.usage_v1', 'org_mozilla_reference_browser.baseline_v1', 'org_mozilla_reference_browser.deletion_request_v1', 'org_mozilla_reference_browser.events_v1', 'org_mozilla_reference_browser.metrics_v1', 'org_mozilla_tv_firefox.baseline_v1', 'org_mozilla_tv_firefox.deletion_request_v1', 'org_mozilla_tv_firefox.events_v1', 'org_mozilla_tv_firefox.metrics_v1', 'org_mozilla_vrbrowser.addresses_sync_v1', 'org_mozilla_vrbrowser.baseline_v1', 'org_mozilla_vrbrowser.bookmarks_sync_v1', 'org_mozilla_vrbrowser.creditcards_sync_v1', 'org_mozilla_vrbrowser.deletion_request_v1', 'org_mozilla_vrbrowser.events_v1', 'org_mozilla_vrbrowser.history_sync_v1', 'org_mozilla_vrbrowser.logins_sync_v1', 'org_mozilla_vrbrowser.metrics_v1', 'org_mozilla_vrbrowser.session_end_v1', 'org_mozilla_vrbrowser.sync_v1', 'org_mozilla_vrbrowser.tabs_sync_v1', 'rally_core.deletion_request_v1', 'rally_core.demographics_v1', 'rally_core.enrollment_v1', 'rally_core.study_enrollment_v1', 'rally_core.study_unenrollment_v1', 'rally_core.uninstall_deletion_v1', 'rally_debug.deletion_request_v1', 'rally_debug.demographics_v1', 'rally_debug.enrollment_v1', 'rally_debug.study_enrollment_v1', 'rally_debug.study_unenrollment_v1', 'rally_debug.uninstall_deletion_v1', 'rally_study_zero_one.deletion_request_v1', 'rally_study_zero_one.rs01_event_v1', 'rally_study_zero_one.study_enrollment_v1', 'rally_zero_one.deletion_request_v1', 'rally_zero_one.measurements_v1', 'rally_zero_one.pioneer_enrollment_v1']