mozilla_schema_generator.generic_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import datetime
  8import json
  9import logging
 10import os
 11import pathlib
 12import re
 13from json.decoder import JSONDecodeError
 14from typing import Dict, List
 15
 16import requests
 17
 18from .config import Config
 19from .probes import Probe
 20from .schema import Schema, SchemaException
 21
 22logger = logging.getLogger(__name__)
 23
 24
 25class GenericPing(object):
 26
 27    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 28    default_encoding = "utf-8"
 29    default_max_size = 11000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 30    extra_schema_key = "extra"
 31    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 32
 33    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 34        self.schema_url = schema_url.format(branch=mps_branch)
 35        self.env_url = env_url.format(branch=mps_branch)
 36        self.probes_url = probes_url
 37
 38    def get_schema(self) -> Schema:
 39        return Schema(self._get_json(self.schema_url))
 40
 41    def get_env(self) -> Schema:
 42        return Schema(self._get_json(self.env_url))
 43
 44    def get_probes(self) -> List[Probe]:
 45        return [
 46            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 47        ]
 48
 49    def generate_schema(
 50        self, config: Config, *, split: bool = None, max_size: int = None
 51    ) -> Dict[str, List[Schema]]:
 52        schema = self.get_schema()
 53        env = self.get_env()
 54
 55        probes = self.get_probes()
 56
 57        if split is None:
 58            split = False
 59        if max_size is None:
 60            max_size = self.default_max_size
 61
 62        if env.get_size() >= max_size:
 63            raise SchemaException(
 64                "Environment must be smaller than max_size {}".format(max_size)
 65            )
 66
 67        # TODO: Allow splits of extra schema, if necessary
 68        if schema.get_size() >= max_size:
 69            raise SchemaException(
 70                "Schema must be smaller than max_size {}".format(max_size)
 71            )
 72
 73        if split:
 74            configs = config.split()
 75        else:
 76            configs = [config]
 77            env = schema
 78
 79        schemas = {
 80            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
 81        }
 82
 83        if split:
 84            schemas[self.extra_schema_key] = self.make_extra_schema(
 85                schema, probes, configs
 86            )
 87
 88        if any(
 89            schema.get_size() > max_size for _, s in schemas.items() for schema in s
 90        ):
 91            raise SchemaException(
 92                "Schema must be smaller or equal max_size {}".format(max_size)
 93            )
 94
 95        return schemas
 96
 97    @staticmethod
 98    def make_schemas(
 99        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
100    ) -> List[Schema]:
101        """
102        Fill in probes based on the config, and keep only the env
103        parts of the schema. Throw away everything else.
104        """
105        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
106        schemas = []
107
108        # TODO: Should env be checked to be a subset of schema?
109        final_schema = env.clone()
110        for schema_key, probe in schema_elements:
111            try:
112                addtlProps = env.get(schema_key + ("additionalProperties",))
113            except KeyError:
114                addtlProps = None
115
116            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
117
118            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
119                schemas.append(final_schema)
120                final_schema = env.clone()
121
122            final_schema.set_schema_elem(
123                schema_key + ("properties", probe.name), probe_schema.schema
124            )
125
126        # Remove all additionalProperties (#22)
127        schemas.append(final_schema)
128        for s in schemas:
129            for key in config.get_match_keys():
130                try:
131                    s.delete_group_from_schema(
132                        key + ("propertyNames",), propagate=False
133                    )
134                except KeyError:
135                    pass
136
137                try:
138                    s.delete_group_from_schema(
139                        key + ("additionalProperties",), propagate=True
140                    )
141                except KeyError:
142                    pass
143
144        return schemas
145
146    @staticmethod
147    def make_extra_schema(
148        schema: Schema, probes: List[Probe], configs: List[Config]
149    ) -> List[Schema]:
150        """
151        Given the list of probes and the configuration,
152        return the schema that has everything but those sections that we
153        filled in already.
154
155        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
156        """
157        schema = schema.clone()
158
159        # Get the schema elements we already filled in for the other tables
160        schema_elements = [
161            schema_key
162            for _config in configs
163            for schema_key, _ in _config.get_schema_elements(probes)
164        ]
165
166        # Delete those from the schema
167        for schema_key in schema_elements:
168            schema.delete_group_from_schema(schema_key)
169
170        return [schema]
171
172    @staticmethod
173    def _slugify(text: str) -> str:
174        """Get a valid slug from an arbitrary string"""
175        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
176        return re.sub(r"[-\s]+", "-", value)
177
178    @staticmethod
179    def _present_in_cache(url: str) -> bool:
180        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
181
182    @staticmethod
183    def _add_to_cache(url: str, val: str):
184        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
185
186        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
187        # protect against multiple writers to the cache:
188        # https://github.com/mozilla/mozilla-schema-generator/pull/210
189        try:
190            with open(cache_file, "x") as f:
191                f.write(val)
192        except FileExistsError:
193            pass
194
195    @staticmethod
196    def _retrieve_from_cache(url: str) -> str:
197        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
198
199    @staticmethod
200    def _get_json_str(url: str) -> str:
201        no_param_url = re.sub(r"\?.*", "", url)
202
203        if GenericPing._present_in_cache(no_param_url):
204            return GenericPing._retrieve_from_cache(no_param_url)
205
206        r = requests.get(url, stream=True)
207        r.raise_for_status()
208
209        json_bytes = b""
210
211        try:
212            for chunk in r.iter_content(chunk_size=1024):
213                if chunk:
214                    json_bytes += chunk
215        except ValueError as e:
216            raise ValueError("Could not parse " + url) from e
217
218        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
219        GenericPing._add_to_cache(no_param_url, final_json)
220
221        return final_json
222
223    @staticmethod
224    def _get_json(url: str) -> dict:
225        if url.startswith(GenericPing.probe_info_base_url):
226            # For probe-info-service requests, add
227            # random query param to force cloudfront
228            # to bypass the cache
229            url += f"?t={datetime.datetime.utcnow().isoformat()}"
230        try:
231            return json.loads(GenericPing._get_json_str(url))
232        except JSONDecodeError:
233            logging.error("Unable to process JSON for url: %s", url)
234            raise
class GenericPing:
 26class GenericPing(object):
 27
 28    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 29    default_encoding = "utf-8"
 30    default_max_size = 11000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 31    extra_schema_key = "extra"
 32    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 33
 34    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 35        self.schema_url = schema_url.format(branch=mps_branch)
 36        self.env_url = env_url.format(branch=mps_branch)
 37        self.probes_url = probes_url
 38
 39    def get_schema(self) -> Schema:
 40        return Schema(self._get_json(self.schema_url))
 41
 42    def get_env(self) -> Schema:
 43        return Schema(self._get_json(self.env_url))
 44
 45    def get_probes(self) -> List[Probe]:
 46        return [
 47            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 48        ]
 49
 50    def generate_schema(
 51        self, config: Config, *, split: bool = None, max_size: int = None
 52    ) -> Dict[str, List[Schema]]:
 53        schema = self.get_schema()
 54        env = self.get_env()
 55
 56        probes = self.get_probes()
 57
 58        if split is None:
 59            split = False
 60        if max_size is None:
 61            max_size = self.default_max_size
 62
 63        if env.get_size() >= max_size:
 64            raise SchemaException(
 65                "Environment must be smaller than max_size {}".format(max_size)
 66            )
 67
 68        # TODO: Allow splits of extra schema, if necessary
 69        if schema.get_size() >= max_size:
 70            raise SchemaException(
 71                "Schema must be smaller than max_size {}".format(max_size)
 72            )
 73
 74        if split:
 75            configs = config.split()
 76        else:
 77            configs = [config]
 78            env = schema
 79
 80        schemas = {
 81            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
 82        }
 83
 84        if split:
 85            schemas[self.extra_schema_key] = self.make_extra_schema(
 86                schema, probes, configs
 87            )
 88
 89        if any(
 90            schema.get_size() > max_size for _, s in schemas.items() for schema in s
 91        ):
 92            raise SchemaException(
 93                "Schema must be smaller or equal max_size {}".format(max_size)
 94            )
 95
 96        return schemas
 97
 98    @staticmethod
 99    def make_schemas(
100        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
101    ) -> List[Schema]:
102        """
103        Fill in probes based on the config, and keep only the env
104        parts of the schema. Throw away everything else.
105        """
106        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
107        schemas = []
108
109        # TODO: Should env be checked to be a subset of schema?
110        final_schema = env.clone()
111        for schema_key, probe in schema_elements:
112            try:
113                addtlProps = env.get(schema_key + ("additionalProperties",))
114            except KeyError:
115                addtlProps = None
116
117            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
118
119            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
120                schemas.append(final_schema)
121                final_schema = env.clone()
122
123            final_schema.set_schema_elem(
124                schema_key + ("properties", probe.name), probe_schema.schema
125            )
126
127        # Remove all additionalProperties (#22)
128        schemas.append(final_schema)
129        for s in schemas:
130            for key in config.get_match_keys():
131                try:
132                    s.delete_group_from_schema(
133                        key + ("propertyNames",), propagate=False
134                    )
135                except KeyError:
136                    pass
137
138                try:
139                    s.delete_group_from_schema(
140                        key + ("additionalProperties",), propagate=True
141                    )
142                except KeyError:
143                    pass
144
145        return schemas
146
147    @staticmethod
148    def make_extra_schema(
149        schema: Schema, probes: List[Probe], configs: List[Config]
150    ) -> List[Schema]:
151        """
152        Given the list of probes and the configuration,
153        return the schema that has everything but those sections that we
154        filled in already.
155
156        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
157        """
158        schema = schema.clone()
159
160        # Get the schema elements we already filled in for the other tables
161        schema_elements = [
162            schema_key
163            for _config in configs
164            for schema_key, _ in _config.get_schema_elements(probes)
165        ]
166
167        # Delete those from the schema
168        for schema_key in schema_elements:
169            schema.delete_group_from_schema(schema_key)
170
171        return [schema]
172
173    @staticmethod
174    def _slugify(text: str) -> str:
175        """Get a valid slug from an arbitrary string"""
176        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
177        return re.sub(r"[-\s]+", "-", value)
178
179    @staticmethod
180    def _present_in_cache(url: str) -> bool:
181        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
182
183    @staticmethod
184    def _add_to_cache(url: str, val: str):
185        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
186
187        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
188        # protect against multiple writers to the cache:
189        # https://github.com/mozilla/mozilla-schema-generator/pull/210
190        try:
191            with open(cache_file, "x") as f:
192                f.write(val)
193        except FileExistsError:
194            pass
195
196    @staticmethod
197    def _retrieve_from_cache(url: str) -> str:
198        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
199
200    @staticmethod
201    def _get_json_str(url: str) -> str:
202        no_param_url = re.sub(r"\?.*", "", url)
203
204        if GenericPing._present_in_cache(no_param_url):
205            return GenericPing._retrieve_from_cache(no_param_url)
206
207        r = requests.get(url, stream=True)
208        r.raise_for_status()
209
210        json_bytes = b""
211
212        try:
213            for chunk in r.iter_content(chunk_size=1024):
214                if chunk:
215                    json_bytes += chunk
216        except ValueError as e:
217            raise ValueError("Could not parse " + url) from e
218
219        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
220        GenericPing._add_to_cache(no_param_url, final_json)
221
222        return final_json
223
224    @staticmethod
225    def _get_json(url: str) -> dict:
226        if url.startswith(GenericPing.probe_info_base_url):
227            # For probe-info-service requests, add
228            # random query param to force cloudfront
229            # to bypass the cache
230            url += f"?t={datetime.datetime.utcnow().isoformat()}"
231        try:
232            return json.loads(GenericPing._get_json_str(url))
233        except JSONDecodeError:
234            logging.error("Unable to process JSON for url: %s", url)
235            raise
GenericPing(schema_url, env_url, probes_url, mps_branch='main')
34    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
35        self.schema_url = schema_url.format(branch=mps_branch)
36        self.env_url = env_url.format(branch=mps_branch)
37        self.probes_url = probes_url
probe_info_base_url = 'https://probeinfo.telemetry.mozilla.org'
default_encoding = 'utf-8'
default_max_size = 11000
extra_schema_key = 'extra'
cache_dir = PosixPath('.probe_cache')
def get_schema(self) -> mozilla_schema_generator.schema.Schema:
39    def get_schema(self) -> Schema:
40        return Schema(self._get_json(self.schema_url))
def get_env(self) -> mozilla_schema_generator.schema.Schema:
42    def get_env(self) -> Schema:
43        return Schema(self._get_json(self.env_url))
def get_probes(self) -> List[mozilla_schema_generator.probes.Probe]:
45    def get_probes(self) -> List[Probe]:
46        return [
47            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
48        ]
def generate_schema( self, config: mozilla_schema_generator.config.Config, *, split: bool = None, max_size: int = None) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
50    def generate_schema(
51        self, config: Config, *, split: bool = None, max_size: int = None
52    ) -> Dict[str, List[Schema]]:
53        schema = self.get_schema()
54        env = self.get_env()
55
56        probes = self.get_probes()
57
58        if split is None:
59            split = False
60        if max_size is None:
61            max_size = self.default_max_size
62
63        if env.get_size() >= max_size:
64            raise SchemaException(
65                "Environment must be smaller than max_size {}".format(max_size)
66            )
67
68        # TODO: Allow splits of extra schema, if necessary
69        if schema.get_size() >= max_size:
70            raise SchemaException(
71                "Schema must be smaller than max_size {}".format(max_size)
72            )
73
74        if split:
75            configs = config.split()
76        else:
77            configs = [config]
78            env = schema
79
80        schemas = {
81            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
82        }
83
84        if split:
85            schemas[self.extra_schema_key] = self.make_extra_schema(
86                schema, probes, configs
87            )
88
89        if any(
90            schema.get_size() > max_size for _, s in schemas.items() for schema in s
91        ):
92            raise SchemaException(
93                "Schema must be smaller or equal max_size {}".format(max_size)
94            )
95
96        return schemas
@staticmethod
def make_schemas( env: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], config: mozilla_schema_generator.config.Config, split: bool, max_size: int) -> List[mozilla_schema_generator.schema.Schema]:
 98    @staticmethod
 99    def make_schemas(
100        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
101    ) -> List[Schema]:
102        """
103        Fill in probes based on the config, and keep only the env
104        parts of the schema. Throw away everything else.
105        """
106        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
107        schemas = []
108
109        # TODO: Should env be checked to be a subset of schema?
110        final_schema = env.clone()
111        for schema_key, probe in schema_elements:
112            try:
113                addtlProps = env.get(schema_key + ("additionalProperties",))
114            except KeyError:
115                addtlProps = None
116
117            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
118
119            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
120                schemas.append(final_schema)
121                final_schema = env.clone()
122
123            final_schema.set_schema_elem(
124                schema_key + ("properties", probe.name), probe_schema.schema
125            )
126
127        # Remove all additionalProperties (#22)
128        schemas.append(final_schema)
129        for s in schemas:
130            for key in config.get_match_keys():
131                try:
132                    s.delete_group_from_schema(
133                        key + ("propertyNames",), propagate=False
134                    )
135                except KeyError:
136                    pass
137
138                try:
139                    s.delete_group_from_schema(
140                        key + ("additionalProperties",), propagate=True
141                    )
142                except KeyError:
143                    pass
144
145        return schemas

Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.

@staticmethod
def make_extra_schema( schema: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], configs: List[mozilla_schema_generator.config.Config]) -> List[mozilla_schema_generator.schema.Schema]:
147    @staticmethod
148    def make_extra_schema(
149        schema: Schema, probes: List[Probe], configs: List[Config]
150    ) -> List[Schema]:
151        """
152        Given the list of probes and the configuration,
153        return the schema that has everything but those sections that we
154        filled in already.
155
156        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
157        """
158        schema = schema.clone()
159
160        # Get the schema elements we already filled in for the other tables
161        schema_elements = [
162            schema_key
163            for _config in configs
164            for schema_key, _ in _config.get_schema_elements(probes)
165        ]
166
167        # Delete those from the schema
168        for schema_key in schema_elements:
169            schema.delete_group_from_schema(schema_key)
170
171        return [schema]

Given the list of probes and the configuration, return the schema that has everything but those sections that we filled in already.

TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)