mozilla_schema_generator.generic_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import json
  8import logging
  9import os
 10import pathlib
 11import re
 12from json.decoder import JSONDecodeError
 13from typing import Dict, List
 14
 15import requests
 16
 17from .config import Config
 18from .probes import Probe
 19from .schema import Schema, SchemaException
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24class GenericPing(object):
 25
 26    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 27    default_encoding = "utf-8"
 28    default_max_size = 12000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 29    extra_schema_key = "extra"
 30    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 31
 32    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 33        self.schema_url = schema_url.format(branch=mps_branch)
 34        self.env_url = env_url.format(branch=mps_branch)
 35        self.probes_url = probes_url
 36
 37    def get_schema(self) -> Schema:
 38        return Schema(self._get_json(self.schema_url))
 39
 40    def get_env(self) -> Schema:
 41        return Schema(self._get_json(self.env_url))
 42
 43    def get_probes(self) -> List[Probe]:
 44        return [
 45            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 46        ]
 47
 48    def generate_schema(
 49        self, config: Config, *, split: bool = None, max_size: int = None
 50    ) -> Dict[str, List[Schema]]:
 51        schema = self.get_schema()
 52        env = self.get_env()
 53
 54        probes = self.get_probes()
 55
 56        if split is None:
 57            split = False
 58        if max_size is None:
 59            max_size = self.default_max_size
 60
 61        if env.get_size() >= max_size:
 62            raise SchemaException(
 63                "Environment must be smaller than max_size {}".format(max_size)
 64            )
 65
 66        # TODO: Allow splits of extra schema, if necessary
 67        if schema.get_size() >= max_size:
 68            raise SchemaException(
 69                "Schema must be smaller than max_size {}".format(max_size)
 70            )
 71
 72        if split:
 73            configs = config.split()
 74        else:
 75            configs = [config]
 76            env = schema
 77
 78        schemas = {
 79            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
 80        }
 81
 82        if split:
 83            schemas[self.extra_schema_key] = self.make_extra_schema(
 84                schema, probes, configs
 85            )
 86
 87        if any(
 88            schema.get_size() > max_size for _, s in schemas.items() for schema in s
 89        ):
 90            raise SchemaException(
 91                "Schema must be smaller or equal max_size {}".format(max_size)
 92            )
 93
 94        return schemas
 95
 96    @staticmethod
 97    def make_schemas(
 98        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
 99    ) -> List[Schema]:
100        """
101        Fill in probes based on the config, and keep only the env
102        parts of the schema. Throw away everything else.
103        """
104        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
105        schemas = []
106
107        # TODO: Should env be checked to be a subset of schema?
108        final_schema = env.clone()
109        for schema_key, probe in schema_elements:
110            try:
111                addtlProps = env.get(schema_key + ("additionalProperties",))
112            except KeyError:
113                addtlProps = None
114
115            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
116
117            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
118                schemas.append(final_schema)
119                final_schema = env.clone()
120
121            final_schema.set_schema_elem(
122                schema_key + ("properties", probe.name), probe_schema.schema
123            )
124
125        # Remove all additionalProperties (#22)
126        schemas.append(final_schema)
127        for s in schemas:
128            for key in config.get_match_keys():
129                try:
130                    s.delete_group_from_schema(
131                        key + ("propertyNames",), propagate=False
132                    )
133                except KeyError:
134                    pass
135
136                try:
137                    s.delete_group_from_schema(
138                        key + ("additionalProperties",), propagate=True
139                    )
140                except KeyError:
141                    pass
142
143        return schemas
144
145    @staticmethod
146    def make_extra_schema(
147        schema: Schema, probes: List[Probe], configs: List[Config]
148    ) -> List[Schema]:
149        """
150        Given the list of probes and the configuration,
151        return the schema that has everything but those sections that we
152        filled in already.
153
154        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
155        """
156        schema = schema.clone()
157
158        # Get the schema elements we already filled in for the other tables
159        schema_elements = [
160            schema_key
161            for _config in configs
162            for schema_key, _ in _config.get_schema_elements(probes)
163        ]
164
165        # Delete those from the schema
166        for schema_key in schema_elements:
167            schema.delete_group_from_schema(schema_key)
168
169        return [schema]
170
171    @staticmethod
172    def _slugify(text: str) -> str:
173        """Get a valid slug from an arbitrary string"""
174        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
175        return re.sub(r"[-\s]+", "-", value)
176
177    @staticmethod
178    def _present_in_cache(url: str) -> bool:
179        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
180
181    @staticmethod
182    def _add_to_cache(url: str, val: str):
183        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
184
185        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
186        # protect against multiple writers to the cache:
187        # https://github.com/mozilla/mozilla-schema-generator/pull/210
188        try:
189            with open(cache_file, "x") as f:
190                f.write(val)
191        except FileExistsError:
192            pass
193
194    @staticmethod
195    def _retrieve_from_cache(url: str) -> str:
196        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
197
198    @staticmethod
199    def _get_json_str(url: str) -> str:
200        if GenericPing._present_in_cache(url):
201            return GenericPing._retrieve_from_cache(url)
202
203        headers = {}
204        if url.startswith(GenericPing.probe_info_base_url):
205            # For probe-info-service requests, set the cache-control header to force
206            # google cloud cdn to bypass the cache
207            headers["Cache-Control"] = "no-cache"
208
209        r = requests.get(url, headers=headers, stream=True)
210        r.raise_for_status()
211
212        json_bytes = b""
213
214        try:
215            for chunk in r.iter_content(chunk_size=1024):
216                if chunk:
217                    json_bytes += chunk
218        except ValueError as e:
219            raise ValueError("Could not parse " + url) from e
220
221        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
222        GenericPing._add_to_cache(url, final_json)
223
224        return final_json
225
226    @staticmethod
227    def _get_json(url: str) -> dict:
228        try:
229            return json.loads(GenericPing._get_json_str(url))
230        except JSONDecodeError:
231            logging.error("Unable to process JSON for url: %s", url)
232            raise
class GenericPing:
 25class GenericPing(object):
 26
 27    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 28    default_encoding = "utf-8"
 29    default_max_size = 12000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 30    extra_schema_key = "extra"
 31    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 32
 33    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 34        self.schema_url = schema_url.format(branch=mps_branch)
 35        self.env_url = env_url.format(branch=mps_branch)
 36        self.probes_url = probes_url
 37
 38    def get_schema(self) -> Schema:
 39        return Schema(self._get_json(self.schema_url))
 40
 41    def get_env(self) -> Schema:
 42        return Schema(self._get_json(self.env_url))
 43
 44    def get_probes(self) -> List[Probe]:
 45        return [
 46            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 47        ]
 48
 49    def generate_schema(
 50        self, config: Config, *, split: bool = None, max_size: int = None
 51    ) -> Dict[str, List[Schema]]:
 52        schema = self.get_schema()
 53        env = self.get_env()
 54
 55        probes = self.get_probes()
 56
 57        if split is None:
 58            split = False
 59        if max_size is None:
 60            max_size = self.default_max_size
 61
 62        if env.get_size() >= max_size:
 63            raise SchemaException(
 64                "Environment must be smaller than max_size {}".format(max_size)
 65            )
 66
 67        # TODO: Allow splits of extra schema, if necessary
 68        if schema.get_size() >= max_size:
 69            raise SchemaException(
 70                "Schema must be smaller than max_size {}".format(max_size)
 71            )
 72
 73        if split:
 74            configs = config.split()
 75        else:
 76            configs = [config]
 77            env = schema
 78
 79        schemas = {
 80            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
 81        }
 82
 83        if split:
 84            schemas[self.extra_schema_key] = self.make_extra_schema(
 85                schema, probes, configs
 86            )
 87
 88        if any(
 89            schema.get_size() > max_size for _, s in schemas.items() for schema in s
 90        ):
 91            raise SchemaException(
 92                "Schema must be smaller or equal max_size {}".format(max_size)
 93            )
 94
 95        return schemas
 96
 97    @staticmethod
 98    def make_schemas(
 99        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
100    ) -> List[Schema]:
101        """
102        Fill in probes based on the config, and keep only the env
103        parts of the schema. Throw away everything else.
104        """
105        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
106        schemas = []
107
108        # TODO: Should env be checked to be a subset of schema?
109        final_schema = env.clone()
110        for schema_key, probe in schema_elements:
111            try:
112                addtlProps = env.get(schema_key + ("additionalProperties",))
113            except KeyError:
114                addtlProps = None
115
116            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
117
118            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
119                schemas.append(final_schema)
120                final_schema = env.clone()
121
122            final_schema.set_schema_elem(
123                schema_key + ("properties", probe.name), probe_schema.schema
124            )
125
126        # Remove all additionalProperties (#22)
127        schemas.append(final_schema)
128        for s in schemas:
129            for key in config.get_match_keys():
130                try:
131                    s.delete_group_from_schema(
132                        key + ("propertyNames",), propagate=False
133                    )
134                except KeyError:
135                    pass
136
137                try:
138                    s.delete_group_from_schema(
139                        key + ("additionalProperties",), propagate=True
140                    )
141                except KeyError:
142                    pass
143
144        return schemas
145
146    @staticmethod
147    def make_extra_schema(
148        schema: Schema, probes: List[Probe], configs: List[Config]
149    ) -> List[Schema]:
150        """
151        Given the list of probes and the configuration,
152        return the schema that has everything but those sections that we
153        filled in already.
154
155        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
156        """
157        schema = schema.clone()
158
159        # Get the schema elements we already filled in for the other tables
160        schema_elements = [
161            schema_key
162            for _config in configs
163            for schema_key, _ in _config.get_schema_elements(probes)
164        ]
165
166        # Delete those from the schema
167        for schema_key in schema_elements:
168            schema.delete_group_from_schema(schema_key)
169
170        return [schema]
171
172    @staticmethod
173    def _slugify(text: str) -> str:
174        """Get a valid slug from an arbitrary string"""
175        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
176        return re.sub(r"[-\s]+", "-", value)
177
178    @staticmethod
179    def _present_in_cache(url: str) -> bool:
180        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
181
182    @staticmethod
183    def _add_to_cache(url: str, val: str):
184        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
185
186        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
187        # protect against multiple writers to the cache:
188        # https://github.com/mozilla/mozilla-schema-generator/pull/210
189        try:
190            with open(cache_file, "x") as f:
191                f.write(val)
192        except FileExistsError:
193            pass
194
195    @staticmethod
196    def _retrieve_from_cache(url: str) -> str:
197        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
198
199    @staticmethod
200    def _get_json_str(url: str) -> str:
201        if GenericPing._present_in_cache(url):
202            return GenericPing._retrieve_from_cache(url)
203
204        headers = {}
205        if url.startswith(GenericPing.probe_info_base_url):
206            # For probe-info-service requests, set the cache-control header to force
207            # google cloud cdn to bypass the cache
208            headers["Cache-Control"] = "no-cache"
209
210        r = requests.get(url, headers=headers, stream=True)
211        r.raise_for_status()
212
213        json_bytes = b""
214
215        try:
216            for chunk in r.iter_content(chunk_size=1024):
217                if chunk:
218                    json_bytes += chunk
219        except ValueError as e:
220            raise ValueError("Could not parse " + url) from e
221
222        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
223        GenericPing._add_to_cache(url, final_json)
224
225        return final_json
226
227    @staticmethod
228    def _get_json(url: str) -> dict:
229        try:
230            return json.loads(GenericPing._get_json_str(url))
231        except JSONDecodeError:
232            logging.error("Unable to process JSON for url: %s", url)
233            raise
GenericPing(schema_url, env_url, probes_url, mps_branch='main')
33    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
34        self.schema_url = schema_url.format(branch=mps_branch)
35        self.env_url = env_url.format(branch=mps_branch)
36        self.probes_url = probes_url
def get_schema(self) -> mozilla_schema_generator.schema.Schema:
38    def get_schema(self) -> Schema:
39        return Schema(self._get_json(self.schema_url))
def get_env(self) -> mozilla_schema_generator.schema.Schema:
41    def get_env(self) -> Schema:
42        return Schema(self._get_json(self.env_url))
def get_probes(self) -> List[mozilla_schema_generator.probes.Probe]:
44    def get_probes(self) -> List[Probe]:
45        return [
46            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
47        ]
def generate_schema( self, config: mozilla_schema_generator.config.Config, *, split: bool = None, max_size: int = None) -> Dict[str, List[mozilla_schema_generator.schema.Schema]]:
49    def generate_schema(
50        self, config: Config, *, split: bool = None, max_size: int = None
51    ) -> Dict[str, List[Schema]]:
52        schema = self.get_schema()
53        env = self.get_env()
54
55        probes = self.get_probes()
56
57        if split is None:
58            split = False
59        if max_size is None:
60            max_size = self.default_max_size
61
62        if env.get_size() >= max_size:
63            raise SchemaException(
64                "Environment must be smaller than max_size {}".format(max_size)
65            )
66
67        # TODO: Allow splits of extra schema, if necessary
68        if schema.get_size() >= max_size:
69            raise SchemaException(
70                "Schema must be smaller than max_size {}".format(max_size)
71            )
72
73        if split:
74            configs = config.split()
75        else:
76            configs = [config]
77            env = schema
78
79        schemas = {
80            c.name: self.make_schemas(env, probes, c, split, max_size) for c in configs
81        }
82
83        if split:
84            schemas[self.extra_schema_key] = self.make_extra_schema(
85                schema, probes, configs
86            )
87
88        if any(
89            schema.get_size() > max_size for _, s in schemas.items() for schema in s
90        ):
91            raise SchemaException(
92                "Schema must be smaller or equal max_size {}".format(max_size)
93            )
94
95        return schemas
@staticmethod
def make_schemas( env: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], config: mozilla_schema_generator.config.Config, split: bool, max_size: int) -> List[mozilla_schema_generator.schema.Schema]:
 97    @staticmethod
 98    def make_schemas(
 99        env: Schema, probes: List[Probe], config: Config, split: bool, max_size: int
100    ) -> List[Schema]:
101        """
102        Fill in probes based on the config, and keep only the env
103        parts of the schema. Throw away everything else.
104        """
105        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
106        schemas = []
107
108        # TODO: Should env be checked to be a subset of schema?
109        final_schema = env.clone()
110        for schema_key, probe in schema_elements:
111            try:
112                addtlProps = env.get(schema_key + ("additionalProperties",))
113            except KeyError:
114                addtlProps = None
115
116            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
117
118            if split and final_schema.get_size() + probe_schema.get_size() > max_size:
119                schemas.append(final_schema)
120                final_schema = env.clone()
121
122            final_schema.set_schema_elem(
123                schema_key + ("properties", probe.name), probe_schema.schema
124            )
125
126        # Remove all additionalProperties (#22)
127        schemas.append(final_schema)
128        for s in schemas:
129            for key in config.get_match_keys():
130                try:
131                    s.delete_group_from_schema(
132                        key + ("propertyNames",), propagate=False
133                    )
134                except KeyError:
135                    pass
136
137                try:
138                    s.delete_group_from_schema(
139                        key + ("additionalProperties",), propagate=True
140                    )
141                except KeyError:
142                    pass
143
144        return schemas

Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.

@staticmethod
def make_extra_schema( schema: mozilla_schema_generator.schema.Schema, probes: List[mozilla_schema_generator.probes.Probe], configs: List[mozilla_schema_generator.config.Config]) -> List[mozilla_schema_generator.schema.Schema]:
146    @staticmethod
147    def make_extra_schema(
148        schema: Schema, probes: List[Probe], configs: List[Config]
149    ) -> List[Schema]:
150        """
151        Given the list of probes and the configuration,
152        return the schema that has everything but those sections that we
153        filled in already.
154
155        TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)
156        """
157        schema = schema.clone()
158
159        # Get the schema elements we already filled in for the other tables
160        schema_elements = [
161            schema_key
162            for _config in configs
163            for schema_key, _ in _config.get_schema_elements(probes)
164        ]
165
166        # Delete those from the schema
167        for schema_key in schema_elements:
168            schema.delete_group_from_schema(schema_key)
169
170        return [schema]

Given the list of probes and the configuration, return the schema that has everything but those sections that we filled in already.

TODO: Split the extra schema, when needed (e.g. extra.0.schema.json, extra.1.schema.json)