mozilla_schema_generator.generic_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import json
  8import logging
  9import os
 10import pathlib
 11import re
 12from json.decoder import JSONDecodeError
 13from typing import Dict, List
 14
 15import requests
 16
 17from .config import Config
 18from .probes import Probe
 19from .schema import Schema, SchemaException
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24class GenericPing(object):
 25    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 26    default_encoding = "utf-8"
 27    default_max_size = 12000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 28    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 29
 30    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 31        self.schema_url = schema_url.format(branch=mps_branch)
 32        self.env_url = env_url.format(branch=mps_branch)
 33        self.probes_url = probes_url
 34
 35    def get_schema(self) -> Schema:
 36        return Schema(self._get_json(self.schema_url))
 37
 38    def get_env(self) -> Schema:
 39        return Schema(self._get_json(self.env_url))
 40
 41    def get_probes(self) -> List[Probe]:
 42        return [
 43            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 44        ]
 45
 46    def generate_schema(
 47        self, config: Config, *, max_size: int = None
 48    ) -> Dict[str, Schema]:
 49        schema = self.get_schema()
 50        env = self.get_env()
 51
 52        probes = self.get_probes()
 53
 54        if max_size is None:
 55            max_size = self.default_max_size
 56
 57        if env.get_size() >= max_size:
 58            raise SchemaException(
 59                "Environment must be smaller than max_size {}".format(max_size)
 60            )
 61
 62        if schema.get_size() >= max_size:
 63            raise SchemaException(
 64                "Schema must be smaller than max_size {}".format(max_size)
 65            )
 66
 67        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 68
 69        if any(schema.get_size() > max_size for schema in schemas.values()):
 70            raise SchemaException(
 71                "Schema must be smaller or equal max_size {}".format(max_size)
 72            )
 73
 74        return schemas
 75
 76    @staticmethod
 77    def make_schema(
 78        env: Schema, probes: List[Probe], config: Config, max_size: int
 79    ) -> Schema:
 80        """
 81        Fill in probes based on the config, and keep only the env
 82        parts of the schema. Throw away everything else.
 83        """
 84        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 85
 86        schema = env.clone()
 87        for schema_key, probe in schema_elements:
 88            try:
 89                addtlProps = env.get(schema_key + ("additionalProperties",))
 90            except KeyError:
 91                addtlProps = None
 92
 93            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 94
 95            schema.set_schema_elem(
 96                schema_key + ("properties", probe.name), probe_schema.schema
 97            )
 98
 99        # Remove all additionalProperties (#22)
100        for key in config.get_match_keys():
101            try:
102                schema.delete_group_from_schema(
103                    key + ("propertyNames",), propagate=False
104                )
105            except KeyError:
106                pass
107
108            try:
109                schema.delete_group_from_schema(
110                    key + ("additionalProperties",), propagate=True
111                )
112            except KeyError:
113                pass
114
115        return schema
116
117    @staticmethod
118    def _slugify(text: str) -> str:
119        """Get a valid slug from an arbitrary string"""
120        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
121        return re.sub(r"[-\s]+", "-", value)
122
123    @staticmethod
124    def _present_in_cache(url: str) -> bool:
125        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
126
127    @staticmethod
128    def _add_to_cache(url: str, val: str):
129        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
130
131        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
132        # protect against multiple writers to the cache:
133        # https://github.com/mozilla/mozilla-schema-generator/pull/210
134        try:
135            with open(cache_file, "x") as f:
136                f.write(val)
137        except FileExistsError:
138            pass
139
140    @staticmethod
141    def _retrieve_from_cache(url: str) -> str:
142        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
143
144    @staticmethod
145    def _get_json_str(url: str) -> str:
146        if GenericPing._present_in_cache(url):
147            return GenericPing._retrieve_from_cache(url)
148
149        headers = {}
150        if url.startswith(GenericPing.probe_info_base_url):
151            # For probe-info-service requests, set the cache-control header to force
152            # google cloud cdn to bypass the cache
153            headers["Cache-Control"] = "no-cache"
154
155        r = requests.get(url, headers=headers, stream=True)
156        r.raise_for_status()
157
158        json_bytes = b""
159
160        try:
161            for chunk in r.iter_content(chunk_size=1024):
162                if chunk:
163                    json_bytes += chunk
164        except ValueError as e:
165            raise ValueError("Could not parse " + url) from e
166
167        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
168        GenericPing._add_to_cache(url, final_json)
169
170        return final_json
171
172    @staticmethod
173    def _get_json(url: str) -> dict:
174        try:
175            return json.loads(GenericPing._get_json_str(url))
176        except JSONDecodeError:
177            logging.error("Unable to process JSON for url: %s", url)
178            raise
logger = <Logger mozilla_schema_generator.generic_ping (WARNING)>
class GenericPing:
 25class GenericPing(object):
 26    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 27    default_encoding = "utf-8"
 28    default_max_size = 12000  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 29    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 30
 31    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 32        self.schema_url = schema_url.format(branch=mps_branch)
 33        self.env_url = env_url.format(branch=mps_branch)
 34        self.probes_url = probes_url
 35
 36    def get_schema(self) -> Schema:
 37        return Schema(self._get_json(self.schema_url))
 38
 39    def get_env(self) -> Schema:
 40        return Schema(self._get_json(self.env_url))
 41
 42    def get_probes(self) -> List[Probe]:
 43        return [
 44            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 45        ]
 46
 47    def generate_schema(
 48        self, config: Config, *, max_size: int = None
 49    ) -> Dict[str, Schema]:
 50        schema = self.get_schema()
 51        env = self.get_env()
 52
 53        probes = self.get_probes()
 54
 55        if max_size is None:
 56            max_size = self.default_max_size
 57
 58        if env.get_size() >= max_size:
 59            raise SchemaException(
 60                "Environment must be smaller than max_size {}".format(max_size)
 61            )
 62
 63        if schema.get_size() >= max_size:
 64            raise SchemaException(
 65                "Schema must be smaller than max_size {}".format(max_size)
 66            )
 67
 68        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 69
 70        if any(schema.get_size() > max_size for schema in schemas.values()):
 71            raise SchemaException(
 72                "Schema must be smaller or equal max_size {}".format(max_size)
 73            )
 74
 75        return schemas
 76
 77    @staticmethod
 78    def make_schema(
 79        env: Schema, probes: List[Probe], config: Config, max_size: int
 80    ) -> Schema:
 81        """
 82        Fill in probes based on the config, and keep only the env
 83        parts of the schema. Throw away everything else.
 84        """
 85        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 86
 87        schema = env.clone()
 88        for schema_key, probe in schema_elements:
 89            try:
 90                addtlProps = env.get(schema_key + ("additionalProperties",))
 91            except KeyError:
 92                addtlProps = None
 93
 94            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 95
 96            schema.set_schema_elem(
 97                schema_key + ("properties", probe.name), probe_schema.schema
 98            )
 99
100        # Remove all additionalProperties (#22)
101        for key in config.get_match_keys():
102            try:
103                schema.delete_group_from_schema(
104                    key + ("propertyNames",), propagate=False
105                )
106            except KeyError:
107                pass
108
109            try:
110                schema.delete_group_from_schema(
111                    key + ("additionalProperties",), propagate=True
112                )
113            except KeyError:
114                pass
115
116        return schema
117
118    @staticmethod
119    def _slugify(text: str) -> str:
120        """Get a valid slug from an arbitrary string"""
121        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
122        return re.sub(r"[-\s]+", "-", value)
123
124    @staticmethod
125    def _present_in_cache(url: str) -> bool:
126        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
127
128    @staticmethod
129    def _add_to_cache(url: str, val: str):
130        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
131
132        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
133        # protect against multiple writers to the cache:
134        # https://github.com/mozilla/mozilla-schema-generator/pull/210
135        try:
136            with open(cache_file, "x") as f:
137                f.write(val)
138        except FileExistsError:
139            pass
140
141    @staticmethod
142    def _retrieve_from_cache(url: str) -> str:
143        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
144
145    @staticmethod
146    def _get_json_str(url: str) -> str:
147        if GenericPing._present_in_cache(url):
148            return GenericPing._retrieve_from_cache(url)
149
150        headers = {}
151        if url.startswith(GenericPing.probe_info_base_url):
152            # For probe-info-service requests, set the cache-control header to force
153            # google cloud cdn to bypass the cache
154            headers["Cache-Control"] = "no-cache"
155
156        r = requests.get(url, headers=headers, stream=True)
157        r.raise_for_status()
158
159        json_bytes = b""
160
161        try:
162            for chunk in r.iter_content(chunk_size=1024):
163                if chunk:
164                    json_bytes += chunk
165        except ValueError as e:
166            raise ValueError("Could not parse " + url) from e
167
168        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
169        GenericPing._add_to_cache(url, final_json)
170
171        return final_json
172
173    @staticmethod
174    def _get_json(url: str) -> dict:
175        try:
176            return json.loads(GenericPing._get_json_str(url))
177        except JSONDecodeError:
178            logging.error("Unable to process JSON for url: %s", url)
179            raise
GenericPing(schema_url, env_url, probes_url, mps_branch='main')
31    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
32        self.schema_url = schema_url.format(branch=mps_branch)
33        self.env_url = env_url.format(branch=mps_branch)
34        self.probes_url = probes_url
probe_info_base_url = 'https://probeinfo.telemetry.mozilla.org'
default_encoding = 'utf-8'
default_max_size = 12000
cache_dir = PosixPath('.probe_cache')
schema_url
env_url
probes_url
def get_schema(self) -> mozilla_schema_generator.schema.Schema:
36    def get_schema(self) -> Schema:
37        return Schema(self._get_json(self.schema_url))
def get_env(self) -> mozilla_schema_generator.schema.Schema:
39    def get_env(self) -> Schema:
40        return Schema(self._get_json(self.env_url))
def get_probes(self) -> List[mozilla_schema_generator.probes.Probe]:
42    def get_probes(self) -> List[Probe]:
43        return [
44            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
45        ]
def generate_schema( self, config: mozilla_schema_generator.config.Config, *, max_size: int = None) -> Dict[str, mozilla_schema_generator.schema.Schema]:
47    def generate_schema(
48        self, config: Config, *, max_size: int = None
49    ) -> Dict[str, Schema]:
50        schema = self.get_schema()
51        env = self.get_env()
52
53        probes = self.get_probes()
54
55        if max_size is None:
56            max_size = self.default_max_size
57
58        if env.get_size() >= max_size:
59            raise SchemaException(
60                "Environment must be smaller than max_size {}".format(max_size)
61            )
62
63        if schema.get_size() >= max_size:
64            raise SchemaException(
65                "Schema must be smaller than max_size {}".format(max_size)
66            )
67
68        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
69
70        if any(schema.get_size() > max_size for schema in schemas.values()):
71            raise SchemaException(
72                "Schema must be smaller or equal max_size {}".format(max_size)
73            )
74
75        return schemas
 77    @staticmethod
 78    def make_schema(
 79        env: Schema, probes: List[Probe], config: Config, max_size: int
 80    ) -> Schema:
 81        """
 82        Fill in probes based on the config, and keep only the env
 83        parts of the schema. Throw away everything else.
 84        """
 85        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 86
 87        schema = env.clone()
 88        for schema_key, probe in schema_elements:
 89            try:
 90                addtlProps = env.get(schema_key + ("additionalProperties",))
 91            except KeyError:
 92                addtlProps = None
 93
 94            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 95
 96            schema.set_schema_elem(
 97                schema_key + ("properties", probe.name), probe_schema.schema
 98            )
 99
100        # Remove all additionalProperties (#22)
101        for key in config.get_match_keys():
102            try:
103                schema.delete_group_from_schema(
104                    key + ("propertyNames",), propagate=False
105                )
106            except KeyError:
107                pass
108
109            try:
110                schema.delete_group_from_schema(
111                    key + ("additionalProperties",), propagate=True
112                )
113            except KeyError:
114                pass
115
116        return schema

Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.