mozilla_schema_generator.glean_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import logging
  8from pathlib import Path
  9from typing import Dict, List, Set
 10
 11from requests import HTTPError
 12
 13from .config import Config
 14from .generic_ping import GenericPing
 15from .probes import GleanProbe
 16from .schema import Schema
 17
 18ROOT_DIR = Path(__file__).parent
 19BUG_1737656_TXT = ROOT_DIR / "configs" / "bug_1737656_affected.txt"
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24class GleanPing(GenericPing):
 25
 26    schema_url = (
 27        "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas"
 28        "/{branch}/schemas/glean/glean/glean.1.schema.json"
 29    )
 30    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 31    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 32    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 33    dependencies_url_template = (
 34        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 35    )
 36
 37    default_dependencies = ["glean-core"]
 38    ignore_pings = {
 39        "all-pings",
 40        "all_pings",
 41        "default",
 42        "glean_ping_info",
 43        "glean_client_info",
 44    }
 45
 46    with open(BUG_1737656_TXT, "r") as f:
 47        bug_1737656_affected_tables = [l.strip() for l in f.readlines() if l.strip()]
 48
 49    def __init__(self, repo, **kwargs):  # TODO: Make env-url optional
 50        self.repo = repo
 51        self.repo_name = repo["name"]
 52        self.app_id = repo["app_id"]
 53        super().__init__(
 54            self.schema_url,
 55            self.schema_url,
 56            self.probes_url_template.format(self.repo_name),
 57            **kwargs,
 58        )
 59
 60    def get_schema(self, generic_schema=False) -> Schema:
 61        """
 62        Fetch schema via URL.
 63
 64        Unless *generic_schema* is set to true, this function makes some modifications
 65        to allow some workarounds for proper injection of metrics.
 66        """
 67        schema = super().get_schema()
 68        if generic_schema:
 69            return schema
 70
 71        # We need to inject placeholders for the url2, text2, etc. types as part
 72        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 73        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 74            metric1 = schema.get(
 75                ("properties", "metrics", "properties", metric_name)
 76            ).copy()
 77            metric1 = schema.set_schema_elem(
 78                ("properties", "metrics", "properties", metric_name + "2"),
 79                metric1,
 80            )
 81
 82        return schema
 83
 84    def get_dependencies(self):
 85        # Get all of the library dependencies for the application that
 86        # are also known about in the repositories file.
 87
 88        # The dependencies are specified using library names, but we need to
 89        # map those back to the name of the repository in the repository file.
 90        try:
 91            dependencies = self._get_json(
 92                self.dependencies_url_template.format(self.repo_name)
 93            )
 94        except HTTPError:
 95            logging.info(f"For {self.repo_name}, using default Glean dependencies")
 96            return self.default_dependencies
 97
 98        dependency_library_names = list(dependencies.keys())
 99
100        repos = GleanPing._get_json(GleanPing.repos_url)
101        repos_by_dependency_name = {}
102        for repo in repos:
103            for library_name in repo.get("library_names", []):
104                repos_by_dependency_name[library_name] = repo["name"]
105
106        dependencies = []
107        for name in dependency_library_names:
108            if name in repos_by_dependency_name:
109                dependencies.append(repos_by_dependency_name[name])
110
111        if len(dependencies) == 0:
112            logging.info(f"For {self.repo_name}, using default Glean dependencies")
113            return self.default_dependencies
114
115        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
116        return dependencies
117
118    def get_probes(self) -> List[GleanProbe]:
119        data = self._get_json(self.probes_url)
120        probes = list(data.items())
121
122        for dependency in self.get_dependencies():
123            dependency_probes = self._get_json(
124                self.probes_url_template.format(dependency)
125            )
126            probes += list(dependency_probes.items())
127
128        pings = self.get_pings()
129
130        processed = []
131        for _id, defn in probes:
132            probe = GleanProbe(_id, defn, pings=pings)
133            processed.append(probe)
134
135            # Manual handling of incompatible schema changes
136            issue_118_affected = {
137                "fenix",
138                "fenix-nightly",
139                "firefox-android-nightly",
140                "firefox-android-beta",
141                "firefox-android-release",
142            }
143            if (
144                self.repo_name in issue_118_affected
145                and probe.get_name() == "installation.timestamp"
146            ):
147                logging.info(f"Writing column {probe.get_name()} for compatibility.")
148                # See: https://github.com/mozilla/mozilla-schema-generator/issues/118
149                # Search through history for the "string" type and add a copy of
150                # the probe at that time in history. The changepoint signifies
151                # this event.
152                changepoint_index = 0
153                for definition in probe.definition_history:
154                    if definition["type"] != probe.get_type():
155                        break
156                    changepoint_index += 1
157                # Modify the definition with the truncated history.
158                hist_defn = defn.copy()
159                hist_defn[probe.history_key] = probe.definition_history[
160                    changepoint_index:
161                ]
162                hist_defn["type"] = hist_defn[probe.history_key][0]["type"]
163                incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings)
164                processed.append(incompatible_probe_type)
165
166        return processed
167
168    def _get_ping_data(self) -> Dict[str, Dict]:
169        url = self.ping_url_template.format(self.repo_name)
170        ping_data = GleanPing._get_json(url)
171        for dependency in self.get_dependencies():
172            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
173            ping_data.update(dependency_pings)
174        return ping_data
175
176    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
177        url = self.ping_url_template.format(self.repo_name)
178        ping_data = GleanPing._get_json(url)
179        return ping_data
180
181    def _get_dependency_pings(self, dependency):
182        return self._get_json(self.ping_url_template.format(dependency))
183
184    def get_pings(self) -> Set[str]:
185        return self._get_ping_data().keys()
186
187    @staticmethod
188    def apply_default_metadata(ping_metadata, default_metadata):
189        """apply_default_metadata recurses down into dicts nested
190        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
191        ``ping_metadata``.
192        :param ping_metadata: dict onto which the merge is executed
193        :param default_metadata: dct merged into ping_metadata
194        :return: None
195        """
196        for k, v in default_metadata.items():
197            if (
198                k in ping_metadata
199                and isinstance(ping_metadata[k], dict)
200                and isinstance(default_metadata[k], dict)
201            ):
202                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
203            else:
204                ping_metadata[k] = default_metadata[k]
205
206    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
207        # Get the ping data with the pipeline metadata
208        ping_data = self._get_ping_data_without_dependencies()
209
210        # The ping endpoint for the dependency pings does not include any repo defined
211        # moz_pipeline_metadata_defaults so they need to be applied here.
212
213        # 1.  Get repo and pipeline default metadata.
214        repos = GleanPing.get_repos()
215        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
216        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
217
218        # 2.  Apply the default metadata to each dependency defined ping.
219        for dependency in self.get_dependencies():
220            dependency_pings = self._get_dependency_pings(dependency)
221            for dependency_ping in dependency_pings.values():
222                # Although it is counter intuitive to apply the default metadata on top of the
223                # existing dependency ping metadata it does set the repo specific value for
224                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
225                # value.
226                GleanPing.apply_default_metadata(
227                    dependency_ping.get("moz_pipeline_metadata"), default_metadata
228                )
229            ping_data.update(dependency_pings)
230        return ping_data
231
232    @staticmethod
233    def reorder_metadata(metadata):
234        desired_order_list = [
235            "bq_dataset_family",
236            "bq_table",
237            "bq_metadata_format",
238            "submission_timestamp_granularity",
239            "expiration_policy",
240            "override_attributes",
241            "jwe_mappings",
242        ]
243        reordered_metadata = {
244            k: metadata[k] for k in desired_order_list if k in metadata
245        }
246
247        # re-order jwe-mappings
248        desired_order_list = ["source_field_path", "decrypted_field_path"]
249        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
250        if jwe_mapping_metadata:
251            reordered_jwe_mapping_metadata = []
252            for mapping in jwe_mapping_metadata:
253                reordered_jwe_mapping_metadata.append(
254                    {k: mapping[k] for k in desired_order_list if k in mapping}
255                )
256            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
257
258        # future proofing, in case there are other fields added at the ping top level
259        # add them to the end.
260        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
261        reordered_metadata = {**reordered_metadata, **leftovers}
262        return reordered_metadata
263
264    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
265        pings = self._get_ping_data_and_dependencies_with_default_metadata()
266        for ping_name, ping_data in pings.items():
267            metadata = ping_data.get("moz_pipeline_metadata")
268
269            # While technically unnecessary, the dictionary elements are re-ordered to match the
270            # currently deployed order and used to verify no difference in output.
271            pings[ping_name] = GleanPing.reorder_metadata(metadata)
272        return pings
273
274    def get_ping_descriptions(self) -> Dict[str, str]:
275        return {
276            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
277        }
278
279    def generate_schema(
280        self, config, split, generic_schema=False
281    ) -> Dict[str, List[Schema]]:
282        pings = self.get_pings_and_pipeline_metadata()
283        schemas = {}
284
285        for ping, pipeline_meta in pings.items():
286            matchers = {
287                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
288            }
289
290            # Four newly introduced metric types were incorrectly deployed
291            # as repeated key/value structs in all Glean ping tables existing prior
292            # to November 2021. We maintain the incorrect fields for existing tables
293            # by disabling the associated matchers.
294            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
295            # defined that will allow metrics of these types to be injected into proper
296            # structs. The gcp-ingestion repository includes logic to rewrite these
297            # metrics under the "2" names.
298            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
299            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
300            if bq_identifier in self.bug_1737656_affected_tables:
301                matchers = {
302                    loc: m
303                    for loc, m in matchers.items()
304                    if not m.matcher.get("bug_1737656_affected")
305                }
306
307            for matcher in matchers.values():
308                matcher.matcher["send_in_pings"]["contains"] = ping
309            new_config = Config(ping, matchers=matchers)
310
311            defaults = {"mozPipelineMetadata": pipeline_meta}
312
313            if generic_schema:  # Use the generic glean ping schema
314                schema = self.get_schema(generic_schema=True)
315                schema.schema.update(defaults)
316                schemas[new_config.name] = [schema]
317            else:
318                generated = super().generate_schema(new_config)
319                for value in generated.values():
320                    for schema in value:
321                        schema.schema.update(defaults)
322                schemas.update(generated)
323
324        return schemas
325
326    @staticmethod
327    def get_repos():
328        """
329        Retrieve metadata for all non-library Glean repositories
330        """
331        repos = GleanPing._get_json(GleanPing.repos_url)
332        return [repo for repo in repos if "library_names" not in repo]
 25class GleanPing(GenericPing):
 26
 27    schema_url = (
 28        "https://raw.githubusercontent.com/mozilla-services/mozilla-pipeline-schemas"
 29        "/{branch}/schemas/glean/glean/glean.1.schema.json"
 30    )
 31    probes_url_template = GenericPing.probe_info_base_url + "/glean/{}/metrics"
 32    ping_url_template = GenericPing.probe_info_base_url + "/glean/{}/pings"
 33    repos_url = GenericPing.probe_info_base_url + "/glean/repositories"
 34    dependencies_url_template = (
 35        GenericPing.probe_info_base_url + "/glean/{}/dependencies"
 36    )
 37
 38    default_dependencies = ["glean-core"]
 39    ignore_pings = {
 40        "all-pings",
 41        "all_pings",
 42        "default",
 43        "glean_ping_info",
 44        "glean_client_info",
 45    }
 46
 47    with open(BUG_1737656_TXT, "r") as f:
 48        bug_1737656_affected_tables = [l.strip() for l in f.readlines() if l.strip()]
 49
 50    def __init__(self, repo, **kwargs):  # TODO: Make env-url optional
 51        self.repo = repo
 52        self.repo_name = repo["name"]
 53        self.app_id = repo["app_id"]
 54        super().__init__(
 55            self.schema_url,
 56            self.schema_url,
 57            self.probes_url_template.format(self.repo_name),
 58            **kwargs,
 59        )
 60
 61    def get_schema(self, generic_schema=False) -> Schema:
 62        """
 63        Fetch schema via URL.
 64
 65        Unless *generic_schema* is set to true, this function makes some modifications
 66        to allow some workarounds for proper injection of metrics.
 67        """
 68        schema = super().get_schema()
 69        if generic_schema:
 70            return schema
 71
 72        # We need to inject placeholders for the url2, text2, etc. types as part
 73        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
 74        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
 75            metric1 = schema.get(
 76                ("properties", "metrics", "properties", metric_name)
 77            ).copy()
 78            metric1 = schema.set_schema_elem(
 79                ("properties", "metrics", "properties", metric_name + "2"),
 80                metric1,
 81            )
 82
 83        return schema
 84
 85    def get_dependencies(self):
 86        # Get all of the library dependencies for the application that
 87        # are also known about in the repositories file.
 88
 89        # The dependencies are specified using library names, but we need to
 90        # map those back to the name of the repository in the repository file.
 91        try:
 92            dependencies = self._get_json(
 93                self.dependencies_url_template.format(self.repo_name)
 94            )
 95        except HTTPError:
 96            logging.info(f"For {self.repo_name}, using default Glean dependencies")
 97            return self.default_dependencies
 98
 99        dependency_library_names = list(dependencies.keys())
100
101        repos = GleanPing._get_json(GleanPing.repos_url)
102        repos_by_dependency_name = {}
103        for repo in repos:
104            for library_name in repo.get("library_names", []):
105                repos_by_dependency_name[library_name] = repo["name"]
106
107        dependencies = []
108        for name in dependency_library_names:
109            if name in repos_by_dependency_name:
110                dependencies.append(repos_by_dependency_name[name])
111
112        if len(dependencies) == 0:
113            logging.info(f"For {self.repo_name}, using default Glean dependencies")
114            return self.default_dependencies
115
116        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
117        return dependencies
118
119    def get_probes(self) -> List[GleanProbe]:
120        data = self._get_json(self.probes_url)
121        probes = list(data.items())
122
123        for dependency in self.get_dependencies():
124            dependency_probes = self._get_json(
125                self.probes_url_template.format(dependency)
126            )
127            probes += list(dependency_probes.items())
128
129        pings = self.get_pings()
130
131        processed = []
132        for _id, defn in probes:
133            probe = GleanProbe(_id, defn, pings=pings)
134            processed.append(probe)
135
136            # Manual handling of incompatible schema changes
137            issue_118_affected = {
138                "fenix",
139                "fenix-nightly",
140                "firefox-android-nightly",
141                "firefox-android-beta",
142                "firefox-android-release",
143            }
144            if (
145                self.repo_name in issue_118_affected
146                and probe.get_name() == "installation.timestamp"
147            ):
148                logging.info(f"Writing column {probe.get_name()} for compatibility.")
149                # See: https://github.com/mozilla/mozilla-schema-generator/issues/118
150                # Search through history for the "string" type and add a copy of
151                # the probe at that time in history. The changepoint signifies
152                # this event.
153                changepoint_index = 0
154                for definition in probe.definition_history:
155                    if definition["type"] != probe.get_type():
156                        break
157                    changepoint_index += 1
158                # Modify the definition with the truncated history.
159                hist_defn = defn.copy()
160                hist_defn[probe.history_key] = probe.definition_history[
161                    changepoint_index:
162                ]
163                hist_defn["type"] = hist_defn[probe.history_key][0]["type"]
164                incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings)
165                processed.append(incompatible_probe_type)
166
167        return processed
168
169    def _get_ping_data(self) -> Dict[str, Dict]:
170        url = self.ping_url_template.format(self.repo_name)
171        ping_data = GleanPing._get_json(url)
172        for dependency in self.get_dependencies():
173            dependency_pings = self._get_json(self.ping_url_template.format(dependency))
174            ping_data.update(dependency_pings)
175        return ping_data
176
177    def _get_ping_data_without_dependencies(self) -> Dict[str, Dict]:
178        url = self.ping_url_template.format(self.repo_name)
179        ping_data = GleanPing._get_json(url)
180        return ping_data
181
182    def _get_dependency_pings(self, dependency):
183        return self._get_json(self.ping_url_template.format(dependency))
184
185    def get_pings(self) -> Set[str]:
186        return self._get_ping_data().keys()
187
188    @staticmethod
189    def apply_default_metadata(ping_metadata, default_metadata):
190        """apply_default_metadata recurses down into dicts nested
191        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
192        ``ping_metadata``.
193        :param ping_metadata: dict onto which the merge is executed
194        :param default_metadata: dct merged into ping_metadata
195        :return: None
196        """
197        for k, v in default_metadata.items():
198            if (
199                k in ping_metadata
200                and isinstance(ping_metadata[k], dict)
201                and isinstance(default_metadata[k], dict)
202            ):
203                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
204            else:
205                ping_metadata[k] = default_metadata[k]
206
207    def _get_ping_data_and_dependencies_with_default_metadata(self) -> Dict[str, Dict]:
208        # Get the ping data with the pipeline metadata
209        ping_data = self._get_ping_data_without_dependencies()
210
211        # The ping endpoint for the dependency pings does not include any repo defined
212        # moz_pipeline_metadata_defaults so they need to be applied here.
213
214        # 1.  Get repo and pipeline default metadata.
215        repos = GleanPing.get_repos()
216        current_repo = next((x for x in repos if x.get("app_id") == self.app_id), {})
217        default_metadata = current_repo.get("moz_pipeline_metadata_defaults", {})
218
219        # 2.  Apply the default metadata to each dependency defined ping.
220        for dependency in self.get_dependencies():
221            dependency_pings = self._get_dependency_pings(dependency)
222            for dependency_ping in dependency_pings.values():
223                # Although it is counter intuitive to apply the default metadata on top of the
224                # existing dependency ping metadata it does set the repo specific value for
225                # bq_dataset_family instead of using the dependency id for the bq_dataset_family
226                # value.
227                GleanPing.apply_default_metadata(
228                    dependency_ping.get("moz_pipeline_metadata"), default_metadata
229                )
230            ping_data.update(dependency_pings)
231        return ping_data
232
233    @staticmethod
234    def reorder_metadata(metadata):
235        desired_order_list = [
236            "bq_dataset_family",
237            "bq_table",
238            "bq_metadata_format",
239            "submission_timestamp_granularity",
240            "expiration_policy",
241            "override_attributes",
242            "jwe_mappings",
243        ]
244        reordered_metadata = {
245            k: metadata[k] for k in desired_order_list if k in metadata
246        }
247
248        # re-order jwe-mappings
249        desired_order_list = ["source_field_path", "decrypted_field_path"]
250        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
251        if jwe_mapping_metadata:
252            reordered_jwe_mapping_metadata = []
253            for mapping in jwe_mapping_metadata:
254                reordered_jwe_mapping_metadata.append(
255                    {k: mapping[k] for k in desired_order_list if k in mapping}
256                )
257            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
258
259        # future proofing, in case there are other fields added at the ping top level
260        # add them to the end.
261        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
262        reordered_metadata = {**reordered_metadata, **leftovers}
263        return reordered_metadata
264
265    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
266        pings = self._get_ping_data_and_dependencies_with_default_metadata()
267        for ping_name, ping_data in pings.items():
268            metadata = ping_data.get("moz_pipeline_metadata")
269
270            # While technically unnecessary, the dictionary elements are re-ordered to match the
271            # currently deployed order and used to verify no difference in output.
272            pings[ping_name] = GleanPing.reorder_metadata(metadata)
273        return pings
274
275    def get_ping_descriptions(self) -> Dict[str, str]:
276        return {
277            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
278        }
279
280    def generate_schema(
281        self, config, split, generic_schema=False
282    ) -> Dict[str, List[Schema]]:
283        pings = self.get_pings_and_pipeline_metadata()
284        schemas = {}
285
286        for ping, pipeline_meta in pings.items():
287            matchers = {
288                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
289            }
290
291            # Four newly introduced metric types were incorrectly deployed
292            # as repeated key/value structs in all Glean ping tables existing prior
293            # to November 2021. We maintain the incorrect fields for existing tables
294            # by disabling the associated matchers.
295            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
296            # defined that will allow metrics of these types to be injected into proper
297            # structs. The gcp-ingestion repository includes logic to rewrite these
298            # metrics under the "2" names.
299            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
300            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
301            if bq_identifier in self.bug_1737656_affected_tables:
302                matchers = {
303                    loc: m
304                    for loc, m in matchers.items()
305                    if not m.matcher.get("bug_1737656_affected")
306                }
307
308            for matcher in matchers.values():
309                matcher.matcher["send_in_pings"]["contains"] = ping
310            new_config = Config(ping, matchers=matchers)
311
312            defaults = {"mozPipelineMetadata": pipeline_meta}
313
314            if generic_schema:  # Use the generic glean ping schema
315                schema = self.get_schema(generic_schema=True)
316                schema.schema.update(defaults)
317                schemas[new_config.name] = [schema]
318            else:
319                generated = super().generate_schema(new_config)
320                for value in generated.values():
321                    for schema in value:
322                        schema.schema.update(defaults)
323                schemas.update(generated)
324
325        return schemas
326
327    @staticmethod
328    def get_repos():
329        """
330        Retrieve metadata for all non-library Glean repositories
331        """
332        repos = GleanPing._get_json(GleanPing.repos_url)
333        return [repo for repo in repos if "library_names" not in repo]
GleanPing(repo, **kwargs)
50    def __init__(self, repo, **kwargs):  # TODO: Make env-url optional
51        self.repo = repo
52        self.repo_name = repo["name"]
53        self.app_id = repo["app_id"]
54        super().__init__(
55            self.schema_url,
56            self.schema_url,
57            self.probes_url_template.format(self.repo_name),
58            **kwargs,
59        )
def get_schema(self, generic_schema=False) -> mozilla_schema_generator.schema.Schema:
61    def get_schema(self, generic_schema=False) -> Schema:
62        """
63        Fetch schema via URL.
64
65        Unless *generic_schema* is set to true, this function makes some modifications
66        to allow some workarounds for proper injection of metrics.
67        """
68        schema = super().get_schema()
69        if generic_schema:
70            return schema
71
72        # We need to inject placeholders for the url2, text2, etc. types as part
73        # of mitigation for https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
74        for metric_name in ["labeled_rate", "jwe", "url", "text"]:
75            metric1 = schema.get(
76                ("properties", "metrics", "properties", metric_name)
77            ).copy()
78            metric1 = schema.set_schema_elem(
79                ("properties", "metrics", "properties", metric_name + "2"),
80                metric1,
81            )
82
83        return schema

Fetch schema via URL.

Unless generic_schema is set to true, this function makes some modifications to allow some workarounds for proper injection of metrics.

def get_dependencies(self):
 85    def get_dependencies(self):
 86        # Get all of the library dependencies for the application that
 87        # are also known about in the repositories file.
 88
 89        # The dependencies are specified using library names, but we need to
 90        # map those back to the name of the repository in the repository file.
 91        try:
 92            dependencies = self._get_json(
 93                self.dependencies_url_template.format(self.repo_name)
 94            )
 95        except HTTPError:
 96            logging.info(f"For {self.repo_name}, using default Glean dependencies")
 97            return self.default_dependencies
 98
 99        dependency_library_names = list(dependencies.keys())
100
101        repos = GleanPing._get_json(GleanPing.repos_url)
102        repos_by_dependency_name = {}
103        for repo in repos:
104            for library_name in repo.get("library_names", []):
105                repos_by_dependency_name[library_name] = repo["name"]
106
107        dependencies = []
108        for name in dependency_library_names:
109            if name in repos_by_dependency_name:
110                dependencies.append(repos_by_dependency_name[name])
111
112        if len(dependencies) == 0:
113            logging.info(f"For {self.repo_name}, using default Glean dependencies")
114            return self.default_dependencies
115
116        logging.info(f"For {self.repo_name}, found Glean dependencies: {dependencies}")
117        return dependencies
def get_probes(self) -> List[mozilla_schema_generator.probes.GleanProbe]:
119    def get_probes(self) -> List[GleanProbe]:
120        data = self._get_json(self.probes_url)
121        probes = list(data.items())
122
123        for dependency in self.get_dependencies():
124            dependency_probes = self._get_json(
125                self.probes_url_template.format(dependency)
126            )
127            probes += list(dependency_probes.items())
128
129        pings = self.get_pings()
130
131        processed = []
132        for _id, defn in probes:
133            probe = GleanProbe(_id, defn, pings=pings)
134            processed.append(probe)
135
136            # Manual handling of incompatible schema changes
137            issue_118_affected = {
138                "fenix",
139                "fenix-nightly",
140                "firefox-android-nightly",
141                "firefox-android-beta",
142                "firefox-android-release",
143            }
144            if (
145                self.repo_name in issue_118_affected
146                and probe.get_name() == "installation.timestamp"
147            ):
148                logging.info(f"Writing column {probe.get_name()} for compatibility.")
149                # See: https://github.com/mozilla/mozilla-schema-generator/issues/118
150                # Search through history for the "string" type and add a copy of
151                # the probe at that time in history. The changepoint signifies
152                # this event.
153                changepoint_index = 0
154                for definition in probe.definition_history:
155                    if definition["type"] != probe.get_type():
156                        break
157                    changepoint_index += 1
158                # Modify the definition with the truncated history.
159                hist_defn = defn.copy()
160                hist_defn[probe.history_key] = probe.definition_history[
161                    changepoint_index:
162                ]
163                hist_defn["type"] = hist_defn[probe.history_key][0]["type"]
164                incompatible_probe_type = GleanProbe(_id, hist_defn, pings=pings)
165                processed.append(incompatible_probe_type)
166
167        return processed
def get_pings(self) -> Set[str]:
185    def get_pings(self) -> Set[str]:
186        return self._get_ping_data().keys()
@staticmethod
def apply_default_metadata(ping_metadata, default_metadata):
188    @staticmethod
189    def apply_default_metadata(ping_metadata, default_metadata):
190        """apply_default_metadata recurses down into dicts nested
191        to an arbitrary depth, updating keys. The ``default_metadata`` is merged into
192        ``ping_metadata``.
193        :param ping_metadata: dict onto which the merge is executed
194        :param default_metadata: dct merged into ping_metadata
195        :return: None
196        """
197        for k, v in default_metadata.items():
198            if (
199                k in ping_metadata
200                and isinstance(ping_metadata[k], dict)
201                and isinstance(default_metadata[k], dict)
202            ):
203                GleanPing.apply_default_metadata(ping_metadata[k], default_metadata[k])
204            else:
205                ping_metadata[k] = default_metadata[k]

apply_default_metadata recurses down into dicts nested to an arbitrary depth, updating keys. The default_metadata is merged into ping_metadata.

Parameters
  • ping_metadata: dict onto which the merge is executed
  • default_metadata: dct merged into ping_metadata
Returns

None

@staticmethod
def reorder_metadata(metadata):
233    @staticmethod
234    def reorder_metadata(metadata):
235        desired_order_list = [
236            "bq_dataset_family",
237            "bq_table",
238            "bq_metadata_format",
239            "submission_timestamp_granularity",
240            "expiration_policy",
241            "override_attributes",
242            "jwe_mappings",
243        ]
244        reordered_metadata = {
245            k: metadata[k] for k in desired_order_list if k in metadata
246        }
247
248        # re-order jwe-mappings
249        desired_order_list = ["source_field_path", "decrypted_field_path"]
250        jwe_mapping_metadata = reordered_metadata.get("jwe_mappings")
251        if jwe_mapping_metadata:
252            reordered_jwe_mapping_metadata = []
253            for mapping in jwe_mapping_metadata:
254                reordered_jwe_mapping_metadata.append(
255                    {k: mapping[k] for k in desired_order_list if k in mapping}
256                )
257            reordered_metadata["jwe_mappings"] = reordered_jwe_mapping_metadata
258
259        # future proofing, in case there are other fields added at the ping top level
260        # add them to the end.
261        leftovers = {k: metadata[k] for k in set(metadata) - set(reordered_metadata)}
262        reordered_metadata = {**reordered_metadata, **leftovers}
263        return reordered_metadata
def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
265    def get_pings_and_pipeline_metadata(self) -> Dict[str, Dict]:
266        pings = self._get_ping_data_and_dependencies_with_default_metadata()
267        for ping_name, ping_data in pings.items():
268            metadata = ping_data.get("moz_pipeline_metadata")
269
270            # While technically unnecessary, the dictionary elements are re-ordered to match the
271            # currently deployed order and used to verify no difference in output.
272            pings[ping_name] = GleanPing.reorder_metadata(metadata)
273        return pings
def get_ping_descriptions(self) -> Dict[str, str]:
275    def get_ping_descriptions(self) -> Dict[str, str]:
276        return {
277            k: v["history"][-1]["description"] for k, v in self._get_ping_data().items()
278        }
def generate_schema( self, config, split, generic_schema=False) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
280    def generate_schema(
281        self, config, split, generic_schema=False
282    ) -> Dict[str, List[Schema]]:
283        pings = self.get_pings_and_pipeline_metadata()
284        schemas = {}
285
286        for ping, pipeline_meta in pings.items():
287            matchers = {
288                loc: m.clone(new_table_group=ping) for loc, m in config.matchers.items()
289            }
290
291            # Four newly introduced metric types were incorrectly deployed
292            # as repeated key/value structs in all Glean ping tables existing prior
293            # to November 2021. We maintain the incorrect fields for existing tables
294            # by disabling the associated matchers.
295            # Note that each of these types now has a "2" matcher ("text2", "url2", etc.)
296            # defined that will allow metrics of these types to be injected into proper
297            # structs. The gcp-ingestion repository includes logic to rewrite these
298            # metrics under the "2" names.
299            # See https://bugzilla.mozilla.org/show_bug.cgi?id=1737656
300            bq_identifier = "{bq_dataset_family}.{bq_table}".format(**pipeline_meta)
301            if bq_identifier in self.bug_1737656_affected_tables:
302                matchers = {
303                    loc: m
304                    for loc, m in matchers.items()
305                    if not m.matcher.get("bug_1737656_affected")
306                }
307
308            for matcher in matchers.values():
309                matcher.matcher["send_in_pings"]["contains"] = ping
310            new_config = Config(ping, matchers=matchers)
311
312            defaults = {"mozPipelineMetadata": pipeline_meta}
313
314            if generic_schema:  # Use the generic glean ping schema
315                schema = self.get_schema(generic_schema=True)
316                schema.schema.update(defaults)
317                schemas[new_config.name] = [schema]
318            else:
319                generated = super().generate_schema(new_config)
320                for value in generated.values():
321                    for schema in value:
322                        schema.schema.update(defaults)
323                schemas.update(generated)
324
325        return schemas
@staticmethod
def get_repos():
327    @staticmethod
328    def get_repos():
329        """
330        Retrieve metadata for all non-library Glean repositories
331        """
332        repos = GleanPing._get_json(GleanPing.repos_url)
333        return [repo for repo in repos if "library_names" not in repo]

Retrieve metadata for all non-library Glean repositories