mozilla_schema_generator.glean_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import copy
  8import logging
  9from collections import defaultdict
 10from datetime import datetime
 11from functools import cache
 12from pathlib import Path
 13from typing import Any, Dict, List, Set
 14
 15import yaml
 16from requests import HTTPError
 17
 18from .config import Config
 19from .generic_ping import GenericPing
 20from .probes import GleanProbe
 21from .schema import Schema
 22
 23ROOT_DIR = Path(__file__).parent
 24BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt"
 25METRIC_BLOCKLIST = ROOT_DIR / "configs" / "metric_blocklist.yaml"
 26
 27logger = logging.getLogger(__name__)
 28
 29SCHEMA_URL_TEMPLATE = (
 30    "https://raw.githubusercontent.com"
 31    "/mozilla-services/mozilla-pipeline-schemas"
 32    "/{branch}/schemas/glean/glean/"
 33)
 34
 35SCHEMA_VERSION_TEMPLATE = "{schema_type}.{version}.schema.json"
 36
 37DEFAULT_SCHEMA_URL = SCHEMA_URL_TEMPLATE + SCHEMA_VERSION_TEMPLATE.format(
 38    schema_type="glean", version=1
 39)
 40
 41
 42class GleanPing(GenericPing):
 43    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 44    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 45    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 46    dependencies_url_template = (
 47        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 48    )
 49    app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings"
 50
 51    default_dependencies = ["glean-core"]
 52
 53    with open(BUG_1737656_TXT, "r") as f:
 54        bug_1737656_affected_tables = [
 55            line.strip() for line in f.readlines() if line.strip()
 56        ]
 57
 58    def __init__(
 59        self, repo, version=1, use_metrics_blocklist=False, **kwargs
 60    ):  # TODO: Make env-url optional
 61        self.repo = repo
 62        self.repo_name = repo["name"]
 63        self.app_id = repo["app_id"]
 64        self.version = version
 65
 66        if use_metrics_blocklist:
 67            self.metric_blocklist = self.get_metric_blocklist()
 68        else:
 69            self.metric_blocklist = {}
 70
 71        super().__init__(
 72            DEFAULT_SCHEMA_URL,
 73            DEFAULT_SCHEMA_URL,
 74            self.probes_url_template.format(self.repo_name),
 75            **kwargs,
 76        )
 77
 78    def get_schema(self, generic_schema=False) -> Schema:
 79        """
 80        Fetch schema via URL.
 81
 82        Unless *generic_schema* is set to true, this function makes some modifications
 83        to allow some workarounds for proper injection of metrics.
 84        """
 85        schema = super().get_schema()
 86        if generic_schema:
 87            return schema
 88
 89        # We need to inject placeholders for the url2, text2, etc. types as part
 90        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 91        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 92            metric1 = schema.get(
 93                ("properties", "metrics", "properties", metric_name)
 94            ).copy()
 95            metric1 = schema.set_schema_elem(
 96                ("properties", "metrics", "properties", metric_name + "2"),
 97                metric1,
 98            )
 99
100        return schema
101
102    @cache
103    def get_dependencies(self):
104        # Get all of the library dependencies for the application that
105        # are also known about in the repositories file.
106
107        # The dependencies are specified using library names, but we need to
108        # map those back to the name of the repository in the repository file.
109        try:
110            dependencies = self._get_json(
111                self.dependencies_url_template.format(self.repo_name)
112            )
113        except HTTPError:
114            logging.info(f"For {self.repo_name}, using default Glean dependencies")
115            return self.default_dependencies
116
117        dependency_library_names = list(dependencies.keys())
118
119        repos = GleanPing._get_json(GleanPing.repos_url)
120        repos_by_dependency_name = {}
121        for repo in repos:
122            for library_name in repo.get("library_names", []):
123                repos_by_dependency_name[library_name] = repo["name"]
124
125        dependencies = []
126        for name in dependency_library_names:
127            if name in repos_by_dependency_name:
128                dependencies.append(repos_by_dependency_name[name])
129
130        if len(dependencies) == 0:
131            logging.info(f"For {self.repo_name}, using default Glean dependencies")
132            return self.default_dependencies
133
134        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
135        return dependencies
136
137    @staticmethod
138    def remove_pings_from_metric(
139        metric: Dict[str, Any], blocked_pings: List[str]
140    ) -> Dict[str, Any]:
141        """Remove the given pings from the metric's `send_in_pings` history.
142
143        Only removes if the given metric has been removed from the source since a fixed date
144        (2025-01-01). This allows metrics to be added back to the schema.
145        """
146        if (
147            metric["in-source"]
148            or len(blocked_pings) == 0
149            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
150            >= datetime(year=2025, month=1, day=1)
151        ):
152            return metric
153
154        for history_entry in metric["history"]:
155            history_entry["send_in_pings"] = [
156                p for p in history_entry["send_in_pings"] if p not in blocked_pings
157            ]
158
159        return metric
160
161    def get_probes(self) -> List[GleanProbe]:
162        data = self._get_json(self.probes_url)
163
164        # blocklist needs to be applied here instead of generate_schema because it needs to be
165        # dependency-aware; metrics can move between app and library and still be in the schema
166        # turn blocklist into metric_name -> ping_types map
167        blocklist = defaultdict(list)
168        for ping_type, metric_names in self.metric_blocklist.get(
169            self.get_app_name(), {}
170        ).items():
171            for metric_name in metric_names:
172                blocklist[metric_name].append(ping_type)
173
174        probes = [
175            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
176            for name, defn in data.items()
177        ]
178
179        for dependency in self.get_dependencies():
180            dependency_probes = self._get_json(
181                self.probes_url_template.format(dependency)
182            )
183
184            dependency_blocklist = defaultdict(list)
185            for ping_type, metric_names in self.metric_blocklist.get(
186                dependency, {}
187            ).items():
188                for metric_name in metric_names:
189                    dependency_blocklist[metric_name].append(ping_type)
190
191            probes += [
192                (
193                    name,
194                    self.remove_pings_from_metric(
195                        defn, dependency_blocklist.get(name, [])
196                    ),
197                )
198                for name, defn in dependency_probes.items()
199            ]
200
201        # A metric can be moved between an app and its dependencies or between dependencies while
202        # probe scraper keeps the history in each location, so both definitions are returned
203        # Merge the history per probe to take the latest definition while still being able to
204        # find metric type changes below
205        # Metrics are not merged if they are not sent in the same pings as they are disjoint
206        def _pings_in_history(defn):
207            return {
208                p
209                for h in defn[GleanProbe.history_key]
210                for p in h.get("send_in_pings", ["metrics"])
211            }
212
213        def _latest_history_date(defn):
214            return max(
215                datetime.fromisoformat(h["dates"]["last"])
216                for h in defn[GleanProbe.history_key]
217            )
218
219        # Group same name probes whose pings intersect to combine moved metrics
220        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
221        for name, defn in probes:
222            defn_pings = _pings_in_history(defn)
223            existing_groups = grouped_by_name[name]
224            matches = [
225                group
226                for group in existing_groups
227                if any(_pings_in_history(defn) & defn_pings for defn in group)
228            ]
229            if not matches:
230                existing_groups.append([defn])
231            else:
232                merged_group = [defn]
233                for g in matches:
234                    merged_group.extend(g)
235                    existing_groups.remove(g)
236                existing_groups.append(merged_group)
237
238        # Take latest definition per group
239        deduped_probes: List[Any] = []
240        for name, groups in grouped_by_name.items():
241            for group in groups:
242                latest_defn = max(group, key=_latest_history_date)
243                if len(group) > 1:
244                    latest_defn = latest_defn.copy()
245                    latest_defn[GleanProbe.history_key] = sorted(
246                        (h for d in group for h in d[GleanProbe.history_key]),
247                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
248                    )
249                deduped_probes.append((name, latest_defn))
250        probes = deduped_probes
251
252        pings = self.get_pings()
253
254        processed = []
255        for _id, defn in probes:
256            probe = GleanProbe(_id, defn, pings=pings)
257            processed.append(probe)
258
259            # Handling probe type changes (Bug 1870317)
260            probe_types = {hist["type"] for hist in defn[probe.history_key]}
261            if len(probe_types) > 1:
262                # The probe type changed at some point in history.
263                # Create schema entry for each type.
264                hist_defn = defn.copy()
265
266                # No new entry needs to be created for the current probe type
267                probe_types.remove(defn["type"])
268
269                for hist in hist_defn[probe.history_key]:
270                    # Create a new entry for a historic type
271                    if hist["type"] in probe_types:
272                        hist_defn["type"] = hist["type"]
273                        probe = GleanProbe(_id, hist_defn, pings=pings)
274                        processed.append(probe)
275
276                        # Keep track of the types entries were already created for
277                        probe_types.remove(hist["type"])
278
279        return processed
280
281    def _get_ping_data(self) -> Dict[str, Dict]:
282        url = self.ping_url_template.format(self.repo_name)
283        ping_data = GleanPing._get_json(url)
284        for dependency in self.get_dependencies():
285            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
286            ping_data.update(dependency_pings)
287        return ping_data
288
289    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
290        url = self.ping_url_template.format(self.repo_name)
291        ping_data = GleanPing._get_json(url)
292        return ping_data
293
294    def _get_dependency_pings(self, dependency):
295        return self._get_json(self.ping_url_template.format(dependency))
296
297    def get_pings(self) -> Set[str]:
298        return self._get_ping_data().keys()
299
300    @staticmethod
301    def apply_default_metadata(ping_metadata, default_metadata):
302        """apply_default_metadata recurses down into dicts nested
303        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
304        ``ping_metadata``.
305        :param ping_metadata: dict onto which the merge is executed
306        :param default_metadata: dct merged into ping_metadata
307        :return: None
308        """
309        for k, v in default_metadata.items():
310            if (
311                k in ping_metadata
312                and isinstance(ping_metadata[k], dict)
313                and isinstance(default_metadata[k], dict)
314            ):
315                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
316            else:
317                ping_metadata[k] = default_metadata[k]
318
319    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
320        # Get the ping data with the pipeline metadata
321        ping_data = self._get_ping_data_without_dependencies()
322
323        # The ping endpoint for the dependency pings does not include any repo defined
324        # moz_pipeline_metadata_defaults so they need to be applied here.
325
326        # 1.  Get repo and pipeline default metadata.
327        repos = self.get_repos()
328        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
329        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
330
331        # 2.  Apply the default metadata to each dependency defined ping.
332
333        # Apply app-level metadata to pings defined in dependencies
334        app_metadata = current_repo.get("moz_pipeline_metadata", {})
335
336        for dependency in self.get_dependencies():
337            dependency_pings = self._get_dependency_pings(dependency)
338            for dependency_ping in dependency_pings.values():
339                # Although it is counter intuitive to apply the default metadata on top of the
340                # existing dependency ping metadata it does set the repo specific value for
341                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
342                # value.
343                GleanPing.apply_default_metadata(
344                    dependency_ping.get("moz_pipeline_metadata"),
345                    copy.deepcopy(default_metadata),
346                )
347                # app-level ping properties take priority over the app defaults
348                metadata_override = app_metadata.get(dependency_ping["name"])
349                if metadata_override is not None:
350                    GleanPing.apply_default_metadata(
351                        dependency_ping.get("moz_pipeline_metadata"), metadata_override
352                    )
353            ping_data.update(dependency_pings)
354
355        return ping_data
356
357    @staticmethod
358    def reorder_metadata(metadata):
359        desired_order_list = [
360            "bq_dataset_family",
361            "bq_table",
362            "bq_metadata_format",
363            "include_info_sections",
364            "submission_timestamp_granularity",
365            "expiration_policy",
366            "override_attributes",
367            "jwe_mappings",
368        ]
369        reordered_metadata = {
370            k: metadata[k] for k in desired_order_list if k in metadata
371        }
372
373        # re-order jwe-mappings
374        desired_order_list = ["source_field_path", "decrypted_field_path"]
375        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
376        if jwe_mapping_metadata:
377            reordered_jwe_mapping_metadata = []
378            for mapping in jwe_mapping_metadata:
379                reordered_jwe_mapping_metadata.append(
380                    {k: mapping[k] for k in desired_order_list if k in mapping}
381                )
382            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
383
384        # future proofing, in case there are other fields added at the ping top level
385        # add them to the end.
386        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
387        reordered_metadata = {**reordered_metadata, **leftovers}
388        return reordered_metadata
389
390    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
391        pings = self._get_ping_data_and_dependencies_with_default_metadata()
392        for ping_name, ping_data in pings.items():
393            metadata = ping_data.get("moz_pipeline_metadata")
394            if not metadata:
395                continue
396            metadata["include_info_sections"] = self._is_field_included(
397                ping_data, "include_info_sections", consider_all_history=False
398            )
399            metadata["include_client_id"] = self._is_field_included(
400                ping_data, "include_client_id"
401            )
402
403            # While technically unnecessary, the dictionary elements are re-ordered to match the
404            # currently deployed order and used to verify no difference in output.
405            pings[ping_name] = GleanPing.reorder_metadata(metadata)
406        return pings
407
408    def get_ping_descriptions(self) -> Dict[str, str]:
409        return {
410            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
411        }
412
413    @staticmethod
414    def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
415        """Return false if the field exists and is false.
416
417        If `consider_all_history` is False, then only check the latest value in the ping history.
418
419        Otherwise, if the field is not found or true in one or more history entries,
420        true is returned.
421        """
422
423        # Default to true if not specified.
424        if "history" not in ping_data or len(ping_data["history"]) == 0:
425            return True
426
427        # Check if at some point in the past the field has already been deployed.
428        # And if the caller of this method wants to consider this history of the field.
429        # Keep them in the schema, even if the field has changed as
430        # removing fields is currently not supported.
431        # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
432        # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
433        ping_history: list
434        if consider_all_history:
435            ping_history = ping_data["history"]
436        else:
437            ping_history = [ping_data["history"][-1]]
438        for history in ping_history:
439            if field_name not in history or history[field_name]:
440                return True
441
442        # The ping was created with include_info_sections = False. The fields can be excluded.
443        return False
444
445    def set_schema_url(self, metadata):
446        """
447        Switch between the glean-min and glean schemas if the ping does not require
448        info sections as specified in the parsed ping info in probe scraper.
449        """
450        if not metadata["include_info_sections"]:
451            self.schema_url = SCHEMA_URL_TEMPLATE.format(
452                branch=self.branch_name
453            ) + SCHEMA_VERSION_TEMPLATE.format(
454                schema_type="glean-min", version=self.version
455            )
456        else:
457            self.schema_url = SCHEMA_URL_TEMPLATE.format(
458                branch=self.branch_name
459            ) + SCHEMA_VERSION_TEMPLATE.format(
460                schema_type="glean", version=self.version
461            )
462
463    def generate_schema(
464        self,
465        config,
466        generic_schema=False,
467        blocked_distribution_pings=("events", "baseline"),
468    ) -> Dict[str, Schema]:
469        pings = self.get_pings_and_pipeline_metadata()
470        schemas = {}
471
472        for ping, pipeline_meta in pings.items():
473            matchers = {
474                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
475            }
476
477            # Four newly introduced metric types were incorrectly deployed
478            # as repeated key/value structs in all Glean ping tables existing prior
479            # to November 2021. We maintain the incorrect fields for existing tables
480            # by disabling the associated matchers.
481            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
482            # defined that will allow metrics of these types to be injected into proper
483            # structs. The gcp-ingestion repository includes logic to rewrite these
484            # metrics under the "2" names.
485            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
486            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
487            if bq_identifier in self.bug_1737656_affected_tables:
488                matchers = {
489                    loc: m
490                    for loc, m in matchers.items()
491                    if not m.matcher.get("bug_1737656_affected")
492                }
493
494            for matcher in matchers.values():
495                matcher.matcher["send_in_pings"]["contains"] = ping
496
497                # temporarily block distributions from being added to events and baseline pings
498                # https://mozilla-hub.atlassian.net/browse/DENG-10606
499                if (
500                    blocked_distribution_pings
501                    and ping in blocked_distribution_pings
502                    and matcher.type.endswith("_distribution")
503                ):
504                    matcher.matcher["send_in_pings"]["not_contains"] = ping
505
506            new_config = Config(ping, matchers=matchers)
507
508            defaults = {"mozPipelineMetadata": pipeline_meta}
509
510            # Adjust the schema path if the ping does not require info sections
511            self.set_schema_url(pipeline_meta)
512            if generic_schema:  # Use the generic glean ping schema
513                schema = self.get_schema(generic_schema=True)
514                schema.schema.update(defaults)
515                schemas[new_config.name] = schema
516            else:
517                generated = super().generate_schema(new_config)
518                for schema in generated.values():
519                    # We want to override each individual key with assembled defaults,
520                    # but keep values _inside_ them if they have been set in the schemas.
521                    for key, value in defaults.items():
522                        if key not in schema.schema:
523                            schema.schema[key] = {}
524                        schema.schema[key].update(value)
525                schemas.update(generated)
526
527        return schemas
528
529    @staticmethod
530    def get_repos():
531        """
532        Retrieve metadata for all non-library Glean repositories
533        """
534        repos = GleanPing._get_json(GleanPing.repos_url)
535        return [repo for repo in repos if "library_names" not in repo]
536
537    def get_app_name(self) -> str:
538        """Get app name associated with the app id.
539
540        e.g. org-mozilla-firefox -> fenix
541        """
542        apps = GleanPing._get_json(GleanPing.app_listings_url)
543        # app id in app-listings has "." instead of "-" so using document_namespace
544        app_name = [
545            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
546        ]
547        return app_name[0] if len(app_name) > 0 else self.app_id
548
549    @staticmethod
550    def get_metric_blocklist():
551        with open(METRIC_BLOCKLIST, "r") as f:
552            return yaml.safe_load(f)
ROOT_DIR = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator')
BUG_1737656_TXT = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/bug_1737656_affected.txt')
METRIC_BLOCKLIST = PosixPath('/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/metric_blocklist.yaml')
logger = <Logger mozilla_schema_generator.glean_ping (WARNING)>
SCHEMA_URL_TEMPLATE = 'https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/{branch}/schemas/glean/glean/'
SCHEMA_VERSION_TEMPLATE = '{schema_type}.{version}.schema.json'
DEFAULT_SCHEMA_URL = 'https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas/{branch}/schemas/glean/glean/glean.1.schema.json'
 43class GleanPing(GenericPing):
 44    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 45    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 46    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 47    dependencies_url_template = (
 48        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 49    )
 50    app_listings_url = GenericPing.probe_info_base_url + "/v2/glean/app-listings"
 51
 52    default_dependencies = ["glean-core"]
 53
 54    with open(BUG_1737656_TXT, "r") as f:
 55        bug_1737656_affected_tables = [
 56            line.strip() for line in f.readlines() if line.strip()
 57        ]
 58
 59    def __init__(
 60        self, repo, version=1, use_metrics_blocklist=False, **kwargs
 61    ):  # TODO: Make env-url optional
 62        self.repo = repo
 63        self.repo_name = repo["name"]
 64        self.app_id = repo["app_id"]
 65        self.version = version
 66
 67        if use_metrics_blocklist:
 68            self.metric_blocklist = self.get_metric_blocklist()
 69        else:
 70            self.metric_blocklist = {}
 71
 72        super().__init__(
 73            DEFAULT_SCHEMA_URL,
 74            DEFAULT_SCHEMA_URL,
 75            self.probes_url_template.format(self.repo_name),
 76            **kwargs,
 77        )
 78
 79    def get_schema(self, generic_schema=False) -> Schema:
 80        """
 81        Fetch schema via URL.
 82
 83        Unless *generic_schema* is set to true, this function makes some modifications
 84        to allow some workarounds for proper injection of metrics.
 85        """
 86        schema = super().get_schema()
 87        if generic_schema:
 88            return schema
 89
 90        # We need to inject placeholders for the url2, text2, etc. types as part
 91        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 92        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 93            metric1 = schema.get(
 94                ("properties", "metrics", "properties", metric_name)
 95            ).copy()
 96            metric1 = schema.set_schema_elem(
 97                ("properties", "metrics", "properties", metric_name + "2"),
 98                metric1,
 99            )
100
101        return schema
102
103    @cache
104    def get_dependencies(self):
105        # Get all of the library dependencies for the application that
106        # are also known about in the repositories file.
107
108        # The dependencies are specified using library names, but we need to
109        # map those back to the name of the repository in the repository file.
110        try:
111            dependencies = self._get_json(
112                self.dependencies_url_template.format(self.repo_name)
113            )
114        except HTTPError:
115            logging.info(f"For {self.repo_name}, using default Glean dependencies")
116            return self.default_dependencies
117
118        dependency_library_names = list(dependencies.keys())
119
120        repos = GleanPing._get_json(GleanPing.repos_url)
121        repos_by_dependency_name = {}
122        for repo in repos:
123            for library_name in repo.get("library_names", []):
124                repos_by_dependency_name[library_name] = repo["name"]
125
126        dependencies = []
127        for name in dependency_library_names:
128            if name in repos_by_dependency_name:
129                dependencies.append(repos_by_dependency_name[name])
130
131        if len(dependencies) == 0:
132            logging.info(f"For {self.repo_name}, using default Glean dependencies")
133            return self.default_dependencies
134
135        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
136        return dependencies
137
138    @staticmethod
139    def remove_pings_from_metric(
140        metric: Dict[str, Any], blocked_pings: List[str]
141    ) -> Dict[str, Any]:
142        """Remove the given pings from the metric's `send_in_pings` history.
143
144        Only removes if the given metric has been removed from the source since a fixed date
145        (2025-01-01). This allows metrics to be added back to the schema.
146        """
147        if (
148            metric["in-source"]
149            or len(blocked_pings) == 0
150            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
151            >= datetime(year=2025, month=1, day=1)
152        ):
153            return metric
154
155        for history_entry in metric["history"]:
156            history_entry["send_in_pings"] = [
157                p for p in history_entry["send_in_pings"] if p not in blocked_pings
158            ]
159
160        return metric
161
162    def get_probes(self) -> List[GleanProbe]:
163        data = self._get_json(self.probes_url)
164
165        # blocklist needs to be applied here instead of generate_schema because it needs to be
166        # dependency-aware; metrics can move between app and library and still be in the schema
167        # turn blocklist into metric_name -> ping_types map
168        blocklist = defaultdict(list)
169        for ping_type, metric_names in self.metric_blocklist.get(
170            self.get_app_name(), {}
171        ).items():
172            for metric_name in metric_names:
173                blocklist[metric_name].append(ping_type)
174
175        probes = [
176            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
177            for name, defn in data.items()
178        ]
179
180        for dependency in self.get_dependencies():
181            dependency_probes = self._get_json(
182                self.probes_url_template.format(dependency)
183            )
184
185            dependency_blocklist = defaultdict(list)
186            for ping_type, metric_names in self.metric_blocklist.get(
187                dependency, {}
188            ).items():
189                for metric_name in metric_names:
190                    dependency_blocklist[metric_name].append(ping_type)
191
192            probes += [
193                (
194                    name,
195                    self.remove_pings_from_metric(
196                        defn, dependency_blocklist.get(name, [])
197                    ),
198                )
199                for name, defn in dependency_probes.items()
200            ]
201
202        # A metric can be moved between an app and its dependencies or between dependencies while
203        # probe scraper keeps the history in each location, so both definitions are returned
204        # Merge the history per probe to take the latest definition while still being able to
205        # find metric type changes below
206        # Metrics are not merged if they are not sent in the same pings as they are disjoint
207        def _pings_in_history(defn):
208            return {
209                p
210                for h in defn[GleanProbe.history_key]
211                for p in h.get("send_in_pings", ["metrics"])
212            }
213
214        def _latest_history_date(defn):
215            return max(
216                datetime.fromisoformat(h["dates"]["last"])
217                for h in defn[GleanProbe.history_key]
218            )
219
220        # Group same name probes whose pings intersect to combine moved metrics
221        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
222        for name, defn in probes:
223            defn_pings = _pings_in_history(defn)
224            existing_groups = grouped_by_name[name]
225            matches = [
226                group
227                for group in existing_groups
228                if any(_pings_in_history(defn) & defn_pings for defn in group)
229            ]
230            if not matches:
231                existing_groups.append([defn])
232            else:
233                merged_group = [defn]
234                for g in matches:
235                    merged_group.extend(g)
236                    existing_groups.remove(g)
237                existing_groups.append(merged_group)
238
239        # Take latest definition per group
240        deduped_probes: List[Any] = []
241        for name, groups in grouped_by_name.items():
242            for group in groups:
243                latest_defn = max(group, key=_latest_history_date)
244                if len(group) > 1:
245                    latest_defn = latest_defn.copy()
246                    latest_defn[GleanProbe.history_key] = sorted(
247                        (h for d in group for h in d[GleanProbe.history_key]),
248                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
249                    )
250                deduped_probes.append((name, latest_defn))
251        probes = deduped_probes
252
253        pings = self.get_pings()
254
255        processed = []
256        for _id, defn in probes:
257            probe = GleanProbe(_id, defn, pings=pings)
258            processed.append(probe)
259
260            # Handling probe type changes (Bug 1870317)
261            probe_types = {hist["type"] for hist in defn[probe.history_key]}
262            if len(probe_types) > 1:
263                # The probe type changed at some point in history.
264                # Create schema entry for each type.
265                hist_defn = defn.copy()
266
267                # No new entry needs to be created for the current probe type
268                probe_types.remove(defn["type"])
269
270                for hist in hist_defn[probe.history_key]:
271                    # Create a new entry for a historic type
272                    if hist["type"] in probe_types:
273                        hist_defn["type"] = hist["type"]
274                        probe = GleanProbe(_id, hist_defn, pings=pings)
275                        processed.append(probe)
276
277                        # Keep track of the types entries were already created for
278                        probe_types.remove(hist["type"])
279
280        return processed
281
282    def _get_ping_data(self) -> Dict[str, Dict]:
283        url = self.ping_url_template.format(self.repo_name)
284        ping_data = GleanPing._get_json(url)
285        for dependency in self.get_dependencies():
286            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
287            ping_data.update(dependency_pings)
288        return ping_data
289
290    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
291        url = self.ping_url_template.format(self.repo_name)
292        ping_data = GleanPing._get_json(url)
293        return ping_data
294
295    def _get_dependency_pings(self, dependency):
296        return self._get_json(self.ping_url_template.format(dependency))
297
298    def get_pings(self) -> Set[str]:
299        return self._get_ping_data().keys()
300
301    @staticmethod
302    def apply_default_metadata(ping_metadata, default_metadata):
303        """apply_default_metadata recurses down into dicts nested
304        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
305        ``ping_metadata``.
306        :param ping_metadata: dict onto which the merge is executed
307        :param default_metadata: dct merged into ping_metadata
308        :return: None
309        """
310        for k, v in default_metadata.items():
311            if (
312                k in ping_metadata
313                and isinstance(ping_metadata[k], dict)
314                and isinstance(default_metadata[k], dict)
315            ):
316                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
317            else:
318                ping_metadata[k] = default_metadata[k]
319
320    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
321        # Get the ping data with the pipeline metadata
322        ping_data = self._get_ping_data_without_dependencies()
323
324        # The ping endpoint for the dependency pings does not include any repo defined
325        # moz_pipeline_metadata_defaults so they need to be applied here.
326
327        # 1.  Get repo and pipeline default metadata.
328        repos = self.get_repos()
329        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
330        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
331
332        # 2.  Apply the default metadata to each dependency defined ping.
333
334        # Apply app-level metadata to pings defined in dependencies
335        app_metadata = current_repo.get("moz_pipeline_metadata", {})
336
337        for dependency in self.get_dependencies():
338            dependency_pings = self._get_dependency_pings(dependency)
339            for dependency_ping in dependency_pings.values():
340                # Although it is counter intuitive to apply the default metadata on top of the
341                # existing dependency ping metadata it does set the repo specific value for
342                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
343                # value.
344                GleanPing.apply_default_metadata(
345                    dependency_ping.get("moz_pipeline_metadata"),
346                    copy.deepcopy(default_metadata),
347                )
348                # app-level ping properties take priority over the app defaults
349                metadata_override = app_metadata.get(dependency_ping["name"])
350                if metadata_override is not None:
351                    GleanPing.apply_default_metadata(
352                        dependency_ping.get("moz_pipeline_metadata"), metadata_override
353                    )
354            ping_data.update(dependency_pings)
355
356        return ping_data
357
358    @staticmethod
359    def reorder_metadata(metadata):
360        desired_order_list = [
361            "bq_dataset_family",
362            "bq_table",
363            "bq_metadata_format",
364            "include_info_sections",
365            "submission_timestamp_granularity",
366            "expiration_policy",
367            "override_attributes",
368            "jwe_mappings",
369        ]
370        reordered_metadata = {
371            k: metadata[k] for k in desired_order_list if k in metadata
372        }
373
374        # re-order jwe-mappings
375        desired_order_list = ["source_field_path", "decrypted_field_path"]
376        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
377        if jwe_mapping_metadata:
378            reordered_jwe_mapping_metadata = []
379            for mapping in jwe_mapping_metadata:
380                reordered_jwe_mapping_metadata.append(
381                    {k: mapping[k] for k in desired_order_list if k in mapping}
382                )
383            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
384
385        # future proofing, in case there are other fields added at the ping top level
386        # add them to the end.
387        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
388        reordered_metadata = {**reordered_metadata, **leftovers}
389        return reordered_metadata
390
391    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
392        pings = self._get_ping_data_and_dependencies_with_default_metadata()
393        for ping_name, ping_data in pings.items():
394            metadata = ping_data.get("moz_pipeline_metadata")
395            if not metadata:
396                continue
397            metadata["include_info_sections"] = self._is_field_included(
398                ping_data, "include_info_sections", consider_all_history=False
399            )
400            metadata["include_client_id"] = self._is_field_included(
401                ping_data, "include_client_id"
402            )
403
404            # While technically unnecessary, the dictionary elements are re-ordered to match the
405            # currently deployed order and used to verify no difference in output.
406            pings[ping_name] = GleanPing.reorder_metadata(metadata)
407        return pings
408
409    def get_ping_descriptions(self) -> Dict[str, str]:
410        return {
411            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
412        }
413
414    @staticmethod
415    def _is_field_included(ping_data, field_name, consider_all_history=True) -> bool:
416        """Return false if the field exists and is false.
417
418        If `consider_all_history` is False, then only check the latest value in the ping history.
419
420        Otherwise, if the field is not found or true in one or more history entries,
421        true is returned.
422        """
423
424        # Default to true if not specified.
425        if "history" not in ping_data or len(ping_data["history"]) == 0:
426            return True
427
428        # Check if at some point in the past the field has already been deployed.
429        # And if the caller of this method wants to consider this history of the field.
430        # Keep them in the schema, even if the field has changed as
431        # removing fields is currently not supported.
432        # See https://bugzilla.mozilla.org/show_bug.cgi?id=1898105
433        # and https://bugzilla.mozilla.org/show_bug.cgi?id=1898105#c10
434        ping_history: list
435        if consider_all_history:
436            ping_history = ping_data["history"]
437        else:
438            ping_history = [ping_data["history"][-1]]
439        for history in ping_history:
440            if field_name not in history or history[field_name]:
441                return True
442
443        # The ping was created with include_info_sections = False. The fields can be excluded.
444        return False
445
446    def set_schema_url(self, metadata):
447        """
448        Switch between the glean-min and glean schemas if the ping does not require
449        info sections as specified in the parsed ping info in probe scraper.
450        """
451        if not metadata["include_info_sections"]:
452            self.schema_url = SCHEMA_URL_TEMPLATE.format(
453                branch=self.branch_name
454            ) + SCHEMA_VERSION_TEMPLATE.format(
455                schema_type="glean-min", version=self.version
456            )
457        else:
458            self.schema_url = SCHEMA_URL_TEMPLATE.format(
459                branch=self.branch_name
460            ) + SCHEMA_VERSION_TEMPLATE.format(
461                schema_type="glean", version=self.version
462            )
463
464    def generate_schema(
465        self,
466        config,
467        generic_schema=False,
468        blocked_distribution_pings=("events", "baseline"),
469    ) -> Dict[str, Schema]:
470        pings = self.get_pings_and_pipeline_metadata()
471        schemas = {}
472
473        for ping, pipeline_meta in pings.items():
474            matchers = {
475                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
476            }
477
478            # Four newly introduced metric types were incorrectly deployed
479            # as repeated key/value structs in all Glean ping tables existing prior
480            # to November 2021. We maintain the incorrect fields for existing tables
481            # by disabling the associated matchers.
482            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
483            # defined that will allow metrics of these types to be injected into proper
484            # structs. The gcp-ingestion repository includes logic to rewrite these
485            # metrics under the "2" names.
486            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
487            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
488            if bq_identifier in self.bug_1737656_affected_tables:
489                matchers = {
490                    loc: m
491                    for loc, m in matchers.items()
492                    if not m.matcher.get("bug_1737656_affected")
493                }
494
495            for matcher in matchers.values():
496                matcher.matcher["send_in_pings"]["contains"] = ping
497
498                # temporarily block distributions from being added to events and baseline pings
499                # https://mozilla-hub.atlassian.net/browse/DENG-10606
500                if (
501                    blocked_distribution_pings
502                    and ping in blocked_distribution_pings
503                    and matcher.type.endswith("_distribution")
504                ):
505                    matcher.matcher["send_in_pings"]["not_contains"] = ping
506
507            new_config = Config(ping, matchers=matchers)
508
509            defaults = {"mozPipelineMetadata": pipeline_meta}
510
511            # Adjust the schema path if the ping does not require info sections
512            self.set_schema_url(pipeline_meta)
513            if generic_schema:  # Use the generic glean ping schema
514                schema = self.get_schema(generic_schema=True)
515                schema.schema.update(defaults)
516                schemas[new_config.name] = schema
517            else:
518                generated = super().generate_schema(new_config)
519                for schema in generated.values():
520                    # We want to override each individual key with assembled defaults,
521                    # but keep values _inside_ them if they have been set in the schemas.
522                    for key, value in defaults.items():
523                        if key not in schema.schema:
524                            schema.schema[key] = {}
525                        schema.schema[key].update(value)
526                schemas.update(generated)
527
528        return schemas
529
530    @staticmethod
531    def get_repos():
532        """
533        Retrieve metadata for all non-library Glean repositories
534        """
535        repos = GleanPing._get_json(GleanPing.repos_url)
536        return [repo for repo in repos if "library_names" not in repo]
537
538    def get_app_name(self) -> str:
539        """Get app name associated with the app id.
540
541        e.g. org-mozilla-firefox -> fenix
542        """
543        apps = GleanPing._get_json(GleanPing.app_listings_url)
544        # app id in app-listings has "." instead of "-" so using document_namespace
545        app_name = [
546            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
547        ]
548        return app_name[0] if len(app_name) > 0 else self.app_id
549
550    @staticmethod
551    def get_metric_blocklist():
552        with open(METRIC_BLOCKLIST, "r") as f:
553            return yaml.safe_load(f)
GleanPing(repo, version=1, use_metrics_blocklist=False, **kwargs)
59    def __init__(
60        self, repo, version=1, use_metrics_blocklist=False, **kwargs
61    ):  # TODO: Make env-url optional
62        self.repo = repo
63        self.repo_name = repo["name"]
64        self.app_id = repo["app_id"]
65        self.version = version
66
67        if use_metrics_blocklist:
68            self.metric_blocklist = self.get_metric_blocklist()
69        else:
70            self.metric_blocklist = {}
71
72        super().__init__(
73            DEFAULT_SCHEMA_URL,
74            DEFAULT_SCHEMA_URL,
75            self.probes_url_template.format(self.repo_name),
76            **kwargs,
77        )
probes_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/metrics'
ping_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/pings'
repos_url = 'https://probeinfo.telemetry.mozilla.org/glean/repositories'
dependencies_url_template = 'https://probeinfo.telemetry.mozilla.org/glean/{}/dependencies'
app_listings_url = 'https://probeinfo.telemetry.mozilla.org/v2/glean/app-listings'
default_dependencies = ['glean-core']
repo
repo_name
app_id
version
def get_schema(self, generic_schema=False) -> mozilla_schema_generator.schema.Schema:
 79    def get_schema(self, generic_schema=False) -> Schema:
 80        """
 81        Fetch schema via URL.
 82
 83        Unless *generic_schema* is set to true, this function makes some modifications
 84        to allow some workarounds for proper injection of metrics.
 85        """
 86        schema = super().get_schema()
 87        if generic_schema:
 88            return schema
 89
 90        # We need to inject placeholders for the url2, text2, etc. types as part
 91        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 92        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 93            metric1 = schema.get(
 94                ("properties", "metrics", "properties", metric_name)
 95            ).copy()
 96            metric1 = schema.set_schema_elem(
 97                ("properties", "metrics", "properties", metric_name + "2"),
 98                metric1,
 99            )
100
101        return schema

Fetch schema via URL.

Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.

@cache
def get_dependencies(self):
103    @cache
104    def get_dependencies(self):
105        # Get all of the library dependencies for the application that
106        # are also known about in the repositories file.
107
108        # The dependencies are specified using library names, but we need to
109        # map those back to the name of the repository in the repository file.
110        try:
111            dependencies = self._get_json(
112                self.dependencies_url_template.format(self.repo_name)
113            )
114        except HTTPError:
115            logging.info(f"For {self.repo_name}, using default Glean dependencies")
116            return self.default_dependencies
117
118        dependency_library_names = list(dependencies.keys())
119
120        repos = GleanPing._get_json(GleanPing.repos_url)
121        repos_by_dependency_name = {}
122        for repo in repos:
123            for library_name in repo.get("library_names", []):
124                repos_by_dependency_name[library_name] = repo["name"]
125
126        dependencies = []
127        for name in dependency_library_names:
128            if name in repos_by_dependency_name:
129                dependencies.append(repos_by_dependency_name[name])
130
131        if len(dependencies) == 0:
132            logging.info(f"For {self.repo_name}, using default Glean dependencies")
133            return self.default_dependencies
134
135        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
136        return dependencies
@staticmethod
def remove_pings_from_metric(metric: Dict[str, Any], blocked_pings: List[str]) -> Dict[str, Any]:
138    @staticmethod
139    def remove_pings_from_metric(
140        metric: Dict[str, Any], blocked_pings: List[str]
141    ) -> Dict[str, Any]:
142        """Remove the given pings from the metric's `send_in_pings` history.
143
144        Only removes if the given metric has been removed from the source since a fixed date
145        (2025-01-01). This allows metrics to be added back to the schema.
146        """
147        if (
148            metric["in-source"]
149            or len(blocked_pings) == 0
150            or datetime.fromisoformat(metric["history"][-1]["dates"]["last"])
151            >= datetime(year=2025, month=1, day=1)
152        ):
153            return metric
154
155        for history_entry in metric["history"]:
156            history_entry["send_in_pings"] = [
157                p for p in history_entry["send_in_pings"] if p not in blocked_pings
158            ]
159
160        return metric

Remove the given pings from the metric's send_in_pings history.

Only removes if the given metric has been removed from the source since a fixed date (2025-01-01). This allows metrics to be added back to the schema.

def get_probes(self) -> List[mozilla_schema_generator.probes.GleanProbe]:
162    def get_probes(self) -> List[GleanProbe]:
163        data = self._get_json(self.probes_url)
164
165        # blocklist needs to be applied here instead of generate_schema because it needs to be
166        # dependency-aware; metrics can move between app and library and still be in the schema
167        # turn blocklist into metric_name -> ping_types map
168        blocklist = defaultdict(list)
169        for ping_type, metric_names in self.metric_blocklist.get(
170            self.get_app_name(), {}
171        ).items():
172            for metric_name in metric_names:
173                blocklist[metric_name].append(ping_type)
174
175        probes = [
176            (name, self.remove_pings_from_metric(defn, blocklist.get(name, [])))
177            for name, defn in data.items()
178        ]
179
180        for dependency in self.get_dependencies():
181            dependency_probes = self._get_json(
182                self.probes_url_template.format(dependency)
183            )
184
185            dependency_blocklist = defaultdict(list)
186            for ping_type, metric_names in self.metric_blocklist.get(
187                dependency, {}
188            ).items():
189                for metric_name in metric_names:
190                    dependency_blocklist[metric_name].append(ping_type)
191
192            probes += [
193                (
194                    name,
195                    self.remove_pings_from_metric(
196                        defn, dependency_blocklist.get(name, [])
197                    ),
198                )
199                for name, defn in dependency_probes.items()
200            ]
201
202        # A metric can be moved between an app and its dependencies or between dependencies while
203        # probe scraper keeps the history in each location, so both definitions are returned
204        # Merge the history per probe to take the latest definition while still being able to
205        # find metric type changes below
206        # Metrics are not merged if they are not sent in the same pings as they are disjoint
207        def _pings_in_history(defn):
208            return {
209                p
210                for h in defn[GleanProbe.history_key]
211                for p in h.get("send_in_pings", ["metrics"])
212            }
213
214        def _latest_history_date(defn):
215            return max(
216                datetime.fromisoformat(h["dates"]["last"])
217                for h in defn[GleanProbe.history_key]
218            )
219
220        # Group same name probes whose pings intersect to combine moved metrics
221        grouped_by_name: Dict[str, List[List[dict]]] = defaultdict(list)
222        for name, defn in probes:
223            defn_pings = _pings_in_history(defn)
224            existing_groups = grouped_by_name[name]
225            matches = [
226                group
227                for group in existing_groups
228                if any(_pings_in_history(defn) & defn_pings for defn in group)
229            ]
230            if not matches:
231                existing_groups.append([defn])
232            else:
233                merged_group = [defn]
234                for g in matches:
235                    merged_group.extend(g)
236                    existing_groups.remove(g)
237                existing_groups.append(merged_group)
238
239        # Take latest definition per group
240        deduped_probes: List[Any] = []
241        for name, groups in grouped_by_name.items():
242            for group in groups:
243                latest_defn = max(group, key=_latest_history_date)
244                if len(group) > 1:
245                    latest_defn = latest_defn.copy()
246                    latest_defn[GleanProbe.history_key] = sorted(
247                        (h for d in group for h in d[GleanProbe.history_key]),
248                        key=lambda h: datetime.fromisoformat(h["dates"]["first"]),
249                    )
250                deduped_probes.append((name, latest_defn))
251        probes = deduped_probes
252
253        pings = self.get_pings()
254
255        processed = []
256        for _id, defn in probes:
257            probe = GleanProbe(_id, defn, pings=pings)
258            processed.append(probe)
259
260            # Handling probe type changes (Bug 1870317)
261            probe_types = {hist["type"] for hist in defn[probe.history_key]}
262            if len(probe_types) > 1:
263                # The probe type changed at some point in history.
264                # Create schema entry for each type.
265                hist_defn = defn.copy()
266
267                # No new entry needs to be created for the current probe type
268                probe_types.remove(defn["type"])
269
270                for hist in hist_defn[probe.history_key]:
271                    # Create a new entry for a historic type
272                    if hist["type"] in probe_types:
273                        hist_defn["type"] = hist["type"]
274                        probe = GleanProbe(_id, hist_defn, pings=pings)
275                        processed.append(probe)
276
277                        # Keep track of the types entries were already created for
278                        probe_types.remove(hist["type"])
279
280        return processed
def get_pings(self) -> Set[str]:
298    def get_pings(self) -> Set[str]:
299        return self._get_ping_data().keys()
@staticmethod
def apply_default_metadata(ping_metadata, default_metadata):
301    @staticmethod
302    def apply_default_metadata(ping_metadata, default_metadata):
303        """apply_default_metadata recurses down into dicts nested
304        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
305        ``ping_metadata``.
306        :param ping_metadata: dict onto which the merge is executed
307        :param default_metadata: dct merged into ping_metadata
308        :return: None
309        """
310        for k, v in default_metadata.items():
311            if (
312                k in ping_metadata
313                and isinstance(ping_metadata[k], dict)
314                and isinstance(default_metadata[k], dict)
315            ):
316                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
317            else:
318                ping_metadata[k] = default_metadata[k]

apply_default_metadata recurses down into dicts nested to an arbitrary depth, updating keys. The default_metadata is merged into ping_metadata.

Parameters
  • ping_metadata: dict onto which the merge is executed
  • default_metadata: dct merged into ping_metadata
Returns

None

@staticmethod
def reorder_metadata(metadata):
358    @staticmethod
359    def reorder_metadata(metadata):
360        desired_order_list = [
361            "bq_dataset_family",
362            "bq_table",
363            "bq_metadata_format",
364            "include_info_sections",
365            "submission_timestamp_granularity",
366            "expiration_policy",
367            "override_attributes",
368            "jwe_mappings",
369        ]
370        reordered_metadata = {
371            k: metadata[k] for k in desired_order_list if k in metadata
372        }
373
374        # re-order jwe-mappings
375        desired_order_list = ["source_field_path", "decrypted_field_path"]
376        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
377        if jwe_mapping_metadata:
378            reordered_jwe_mapping_metadata = []
379            for mapping in jwe_mapping_metadata:
380                reordered_jwe_mapping_metadata.append(
381                    {k: mapping[k] for k in desired_order_list if k in mapping}
382                )
383            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
384
385        # future proofing, in case there are other fields added at the ping top level
386        # add them to the end.
387        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
388        reordered_metadata = {**reordered_metadata, **leftovers}
389        return reordered_metadata
def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
391    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
392        pings = self._get_ping_data_and_dependencies_with_default_metadata()
393        for ping_name, ping_data in pings.items():
394            metadata = ping_data.get("moz_pipeline_metadata")
395            if not metadata:
396                continue
397            metadata["include_info_sections"] = self._is_field_included(
398                ping_data, "include_info_sections", consider_all_history=False
399            )
400            metadata["include_client_id"] = self._is_field_included(
401                ping_data, "include_client_id"
402            )
403
404            # While technically unnecessary, the dictionary elements are re-ordered to match the
405            # currently deployed order and used to verify no difference in output.
406            pings[ping_name] = GleanPing.reorder_metadata(metadata)
407        return pings
def get_ping_descriptions(self) -> Dict[str, str]:
409    def get_ping_descriptions(self) -> Dict[str, str]:
410        return {
411            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
412        }
def set_schema_url(self, metadata):
446    def set_schema_url(self, metadata):
447        """
448        Switch between the glean-min and glean schemas if the ping does not require
449        info sections as specified in the parsed ping info in probe scraper.
450        """
451        if not metadata["include_info_sections"]:
452            self.schema_url = SCHEMA_URL_TEMPLATE.format(
453                branch=self.branch_name
454            ) + SCHEMA_VERSION_TEMPLATE.format(
455                schema_type="glean-min", version=self.version
456            )
457        else:
458            self.schema_url = SCHEMA_URL_TEMPLATE.format(
459                branch=self.branch_name
460            ) + SCHEMA_VERSION_TEMPLATE.format(
461                schema_type="glean", version=self.version
462            )

Switch between the glean-min and glean schemas if the ping does not require info sections as specified in the parsed ping info in probe scraper.

def generate_schema( self, config, generic_schema=False, blocked_distribution_pings=('events', 'baseline')) -> Dict[str, mozilla_schema_generator.schema.Schema]:
464    def generate_schema(
465        self,
466        config,
467        generic_schema=False,
468        blocked_distribution_pings=("events", "baseline"),
469    ) -> Dict[str, Schema]:
470        pings = self.get_pings_and_pipeline_metadata()
471        schemas = {}
472
473        for ping, pipeline_meta in pings.items():
474            matchers = {
475                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
476            }
477
478            # Four newly introduced metric types were incorrectly deployed
479            # as repeated key/value structs in all Glean ping tables existing prior
480            # to November 2021. We maintain the incorrect fields for existing tables
481            # by disabling the associated matchers.
482            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
483            # defined that will allow metrics of these types to be injected into proper
484            # structs. The gcp-ingestion repository includes logic to rewrite these
485            # metrics under the "2" names.
486            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
487            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
488            if bq_identifier in self.bug_1737656_affected_tables:
489                matchers = {
490                    loc: m
491                    for loc, m in matchers.items()
492                    if not m.matcher.get("bug_1737656_affected")
493                }
494
495            for matcher in matchers.values():
496                matcher.matcher["send_in_pings"]["contains"] = ping
497
498                # temporarily block distributions from being added to events and baseline pings
499                # https://mozilla-hub.atlassian.net/browse/DENG-10606
500                if (
501                    blocked_distribution_pings
502                    and ping in blocked_distribution_pings
503                    and matcher.type.endswith("_distribution")
504                ):
505                    matcher.matcher["send_in_pings"]["not_contains"] = ping
506
507            new_config = Config(ping, matchers=matchers)
508
509            defaults = {"mozPipelineMetadata": pipeline_meta}
510
511            # Adjust the schema path if the ping does not require info sections
512            self.set_schema_url(pipeline_meta)
513            if generic_schema:  # Use the generic glean ping schema
514                schema = self.get_schema(generic_schema=True)
515                schema.schema.update(defaults)
516                schemas[new_config.name] = schema
517            else:
518                generated = super().generate_schema(new_config)
519                for schema in generated.values():
520                    # We want to override each individual key with assembled defaults,
521                    # but keep values _inside_ them if they have been set in the schemas.
522                    for key, value in defaults.items():
523                        if key not in schema.schema:
524                            schema.schema[key] = {}
525                        schema.schema[key].update(value)
526                schemas.update(generated)
527
528        return schemas
@staticmethod
def get_repos():
530    @staticmethod
531    def get_repos():
532        """
533        Retrieve metadata for all non-library Glean repositories
534        """
535        repos = GleanPing._get_json(GleanPing.repos_url)
536        return [repo for repo in repos if "library_names" not in repo]

Retrieve metadata for all non-library Glean repositories

def get_app_name(self) -> str:
538    def get_app_name(self) -> str:
539        """Get app name associated with the app id.
540
541        e.g. org-mozilla-firefox -> fenix
542        """
543        apps = GleanPing._get_json(GleanPing.app_listings_url)
544        # app id in app-listings has "." instead of "-" so using document_namespace
545        app_name = [
546            app["app_name"] for app in apps if app["document_namespace"] == self.app_id
547        ]
548        return app_name[0] if len(app_name) > 0 else self.app_id

Get app name associated with the app id.

e.g. org-mozilla-firefox -> fenix

@staticmethod
def get_metric_blocklist():
550    @staticmethod
551    def get_metric_blocklist():
552        with open(METRIC_BLOCKLIST, "r") as f:
553            return yaml.safe_load(f)
f = <_io.TextIOWrapper name='/home/runner/work/mozilla-schema-generator/mozilla-schema-generator/mozilla_schema_generator/configs/bug_1737656_affected.txt' mode='r' encoding='UTF-8'>
bug_1737656_affected_tables = ['burnham.baseline_v1', 'burnham.deletion_request_v1', 'burnham.discovery_v1', 'burnham.events_v1', 'burnham.metrics_v1', 'burnham.space_ship_ready_v1', 'burnham.starbase46_v1', 'firefox_desktop_background_update.background_update_v1', 'firefox_desktop_background_update.baseline_v1', 'firefox_desktop_background_update.deletion_request_v1', 'firefox_desktop_background_update.events_v1', 'firefox_desktop_background_update.metrics_v1', 'firefox_desktop.baseline_v1', 'firefox_desktop.deletion_request_v1', 'firefox_desktop.events_v1', 'firefox_desktop.fog_validation_v1', 'firefox_desktop.metrics_v1', 'firefox_installer.install_v1', 'firefox_launcher_process.launcher_process_failure_v1', 'messaging_system.cfr_v1', 'messaging_system.infobar_v1', 'messaging_system.moments_v1', 'messaging_system.onboarding_v1', 'messaging_system.personalization_experiment_v1', 'messaging_system.snippets_v1', 'messaging_system.spotlight_v1', 'messaging_system.undesired_events_v1', 'messaging_system.whats_new_panel_v1', 'mlhackweek_search.action_v1', 'mlhackweek_search.baseline_v1', 'mlhackweek_search.custom_v1', 'mlhackweek_search.deletion_request_v1', 'mlhackweek_search.events_v1', 'mlhackweek_search.metrics_v1', 'mozilla_lockbox.addresses_sync_v1', 'mozilla_lockbox.baseline_v1', 'mozilla_lockbox.bookmarks_sync_v1', 'mozilla_lockbox.creditcards_sync_v1', 'mozilla_lockbox.deletion_request_v1', 'mozilla_lockbox.events_v1', 'mozilla_lockbox.history_sync_v1', 'mozilla_lockbox.logins_sync_v1', 'mozilla_lockbox.metrics_v1', 'mozilla_lockbox.sync_v1', 'mozilla_lockbox.tabs_sync_v1', 'mozilla_mach.baseline_v1', 'mozilla_mach.deletion_request_v1', 'mozilla_mach.events_v1', 'mozilla_mach.metrics_v1', 'mozilla_mach.usage_v1', 'mozillavpn.deletion_request_v1', 'mozillavpn.main_v1', 'mozphab.baseline_v1', 'mozphab.deletion_request_v1', 'mozphab.events_v1', 'mozphab.metrics_v1', 'mozphab.usage_v1', 'org_mozilla_bergamot.custom_v1', 'org_mozilla_bergamot.deletion_request_v1', 'org_mozilla_connect_firefox.baseline_v1', 'org_mozilla_connect_firefox.deletion_request_v1', 'org_mozilla_connect_firefox.events_v1', 'org_mozilla_connect_firefox.metrics_v1', 'org_mozilla_fenix.activation_v1', 'org_mozilla_fenix.addresses_sync_v1', 'org_mozilla_fenix.baseline_v1', 'org_mozilla_fenix.bookmarks_sync_v1', 'org_mozilla_fenix.creditcards_sync_v1', 'org_mozilla_fenix.deletion_request_v1', 'org_mozilla_fenix.events_v1', 'org_mozilla_fenix.first_session_v1', 'org_mozilla_fenix.fog_validation_v1', 'org_mozilla_fenix.history_sync_v1', 'org_mozilla_fenix.installation_v1', 'org_mozilla_fenix.logins_sync_v1', 'org_mozilla_fenix.metrics_v1', 'org_mozilla_fenix.migration_v1', 'org_mozilla_fenix.startup_timeline_v1', 'org_mozilla_fenix.sync_v1', 'org_mozilla_fenix.tabs_sync_v1', 'org_mozilla_fenix_nightly.activation_v1', 'org_mozilla_fenix_nightly.addresses_sync_v1', 'org_mozilla_fenix_nightly.baseline_v1', 'org_mozilla_fenix_nightly.bookmarks_sync_v1', 'org_mozilla_fenix_nightly.creditcards_sync_v1', 'org_mozilla_fenix_nightly.deletion_request_v1', 'org_mozilla_fenix_nightly.events_v1', 'org_mozilla_fenix_nightly.first_session_v1', 'org_mozilla_fenix_nightly.fog_validation_v1', 'org_mozilla_fenix_nightly.history_sync_v1', 'org_mozilla_fenix_nightly.installation_v1', 'org_mozilla_fenix_nightly.logins_sync_v1', 'org_mozilla_fenix_nightly.metrics_v1', 'org_mozilla_fenix_nightly.migration_v1', 'org_mozilla_fenix_nightly.startup_timeline_v1', 'org_mozilla_fenix_nightly.sync_v1', 'org_mozilla_fenix_nightly.tabs_sync_v1', 'org_mozilla_fennec_aurora.activation_v1', 'org_mozilla_fennec_aurora.addresses_sync_v1', 'org_mozilla_fennec_aurora.baseline_v1', 'org_mozilla_fennec_aurora.bookmarks_sync_v1', 'org_mozilla_fennec_aurora.creditcards_sync_v1', 'org_mozilla_fennec_aurora.deletion_request_v1', 'org_mozilla_fennec_aurora.events_v1', 'org_mozilla_fennec_aurora.first_session_v1', 'org_mozilla_fennec_aurora.fog_validation_v1', 'org_mozilla_fennec_aurora.history_sync_v1', 'org_mozilla_fennec_aurora.installation_v1', 'org_mozilla_fennec_aurora.logins_sync_v1', 'org_mozilla_fennec_aurora.metrics_v1', 'org_mozilla_fennec_aurora.migration_v1', 'org_mozilla_fennec_aurora.startup_timeline_v1', 'org_mozilla_fennec_aurora.sync_v1', 'org_mozilla_fennec_aurora.tabs_sync_v1', 'org_mozilla_firefox_beta.activation_v1', 'org_mozilla_firefox_beta.addresses_sync_v1', 'org_mozilla_firefox_beta.baseline_v1', 'org_mozilla_firefox_beta.bookmarks_sync_v1', 'org_mozilla_firefox_beta.creditcards_sync_v1', 'org_mozilla_firefox_beta.deletion_request_v1', 'org_mozilla_firefox_beta.events_v1', 'org_mozilla_firefox_beta.first_session_v1', 'org_mozilla_firefox_beta.fog_validation_v1', 'org_mozilla_firefox_beta.history_sync_v1', 'org_mozilla_firefox_beta.installation_v1', 'org_mozilla_firefox_beta.logins_sync_v1', 'org_mozilla_firefox_beta.metrics_v1', 'org_mozilla_firefox_beta.migration_v1', 'org_mozilla_firefox_beta.startup_timeline_v1', 'org_mozilla_firefox_beta.sync_v1', 'org_mozilla_firefox_beta.tabs_sync_v1', 'org_mozilla_firefox.activation_v1', 'org_mozilla_firefox.addresses_sync_v1', 'org_mozilla_firefox.baseline_v1', 'org_mozilla_firefox.bookmarks_sync_v1', 'org_mozilla_firefox.creditcards_sync_v1', 'org_mozilla_firefox.deletion_request_v1', 'org_mozilla_firefox.events_v1', 'org_mozilla_firefox.first_session_v1', 'org_mozilla_firefox.fog_validation_v1', 'org_mozilla_firefox.history_sync_v1', 'org_mozilla_firefox.installation_v1', 'org_mozilla_firefox.logins_sync_v1', 'org_mozilla_firefox.metrics_v1', 'org_mozilla_firefox.migration_v1', 'org_mozilla_firefox.startup_timeline_v1', 'org_mozilla_firefox.sync_v1', 'org_mozilla_firefox.tabs_sync_v1', 'org_mozilla_firefoxreality.baseline_v1', 'org_mozilla_firefoxreality.deletion_request_v1', 'org_mozilla_firefoxreality.events_v1', 'org_mozilla_firefoxreality.launch_v1', 'org_mozilla_firefoxreality.metrics_v1', 'org_mozilla_focus_beta.activation_v1', 'org_mozilla_focus_beta.baseline_v1', 'org_mozilla_focus_beta.deletion_request_v1', 'org_mozilla_focus_beta.events_v1', 'org_mozilla_focus_beta.metrics_v1', 'org_mozilla_focus.activation_v1', 'org_mozilla_focus.baseline_v1', 'org_mozilla_focus.deletion_request_v1', 'org_mozilla_focus.events_v1', 'org_mozilla_focus.metrics_v1', 'org_mozilla_focus_nightly.activation_v1', 'org_mozilla_focus_nightly.baseline_v1', 'org_mozilla_focus_nightly.deletion_request_v1', 'org_mozilla_focus_nightly.events_v1', 'org_mozilla_focus_nightly.metrics_v1', 'org_mozilla_ios_fennec.baseline_v1', 'org_mozilla_ios_fennec.deletion_request_v1', 'org_mozilla_ios_fennec.events_v1', 'org_mozilla_ios_fennec.metrics_v1', 'org_mozilla_ios_firefox.baseline_v1', 'org_mozilla_ios_firefox.deletion_request_v1', 'org_mozilla_ios_firefox.events_v1', 'org_mozilla_ios_firefox.metrics_v1', 'org_mozilla_ios_firefoxbeta.baseline_v1', 'org_mozilla_ios_firefoxbeta.deletion_request_v1', 'org_mozilla_ios_firefoxbeta.events_v1', 'org_mozilla_ios_firefoxbeta.metrics_v1', 'org_mozilla_ios_focus.baseline_v1', 'org_mozilla_ios_focus.deletion_request_v1', 'org_mozilla_ios_focus.events_v1', 'org_mozilla_ios_focus.metrics_v1', 'org_mozilla_ios_klar.baseline_v1', 'org_mozilla_ios_klar.deletion_request_v1', 'org_mozilla_ios_klar.events_v1', 'org_mozilla_ios_klar.metrics_v1', 'org_mozilla_ios_lockbox.baseline_v1', 'org_mozilla_ios_lockbox.deletion_request_v1', 'org_mozilla_ios_lockbox.events_v1', 'org_mozilla_ios_lockbox.metrics_v1', 'org_mozilla_klar.activation_v1', 'org_mozilla_klar.baseline_v1', 'org_mozilla_klar.deletion_request_v1', 'org_mozilla_klar.events_v1', 'org_mozilla_klar.metrics_v1', 'org_mozilla_mozregression.baseline_v1', 'org_mozilla_mozregression.deletion_request_v1', 'org_mozilla_mozregression.events_v1', 'org_mozilla_mozregression.metrics_v1', 'org_mozilla_mozregression.usage_v1', 'org_mozilla_reference_browser.baseline_v1', 'org_mozilla_reference_browser.deletion_request_v1', 'org_mozilla_reference_browser.events_v1', 'org_mozilla_reference_browser.metrics_v1', 'org_mozilla_tv_firefox.baseline_v1', 'org_mozilla_tv_firefox.deletion_request_v1', 'org_mozilla_tv_firefox.events_v1', 'org_mozilla_tv_firefox.metrics_v1', 'org_mozilla_vrbrowser.addresses_sync_v1', 'org_mozilla_vrbrowser.baseline_v1', 'org_mozilla_vrbrowser.bookmarks_sync_v1', 'org_mozilla_vrbrowser.creditcards_sync_v1', 'org_mozilla_vrbrowser.deletion_request_v1', 'org_mozilla_vrbrowser.events_v1', 'org_mozilla_vrbrowser.history_sync_v1', 'org_mozilla_vrbrowser.logins_sync_v1', 'org_mozilla_vrbrowser.metrics_v1', 'org_mozilla_vrbrowser.session_end_v1', 'org_mozilla_vrbrowser.sync_v1', 'org_mozilla_vrbrowser.tabs_sync_v1', 'rally_core.deletion_request_v1', 'rally_core.demographics_v1', 'rally_core.enrollment_v1', 'rally_core.study_enrollment_v1', 'rally_core.study_unenrollment_v1', 'rally_core.uninstall_deletion_v1', 'rally_debug.deletion_request_v1', 'rally_debug.demographics_v1', 'rally_debug.enrollment_v1', 'rally_debug.study_enrollment_v1', 'rally_debug.study_unenrollment_v1', 'rally_debug.uninstall_deletion_v1', 'rally_study_zero_one.deletion_request_v1', 'rally_study_zero_one.rs01_event_v1', 'rally_study_zero_one.study_enrollment_v1', 'rally_zero_one.deletion_request_v1', 'rally_zero_one.measurements_v1', 'rally_zero_one.pioneer_enrollment_v1']