mozilla_schema_generator.generic_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import json
  8import logging
  9import os
 10import pathlib
 11import re
 12from json.decoder import JSONDecodeError
 13from typing import Dict, List
 14
 15import requests
 16
 17from .config import Config
 18from .probes import Probe
 19from .schema import Schema, SchemaException
 20
 21logger = logging.getLogger(__name__)
 22
 23
 24class GenericPing(object):
 25    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 26    default_encoding = "utf-8"
 27    default_max_size = 12900  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 28    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 29
 30    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 31        self.branch_name = mps_branch
 32        self.schema_url = schema_url.format(branch=self.branch_name)
 33        self.env_url = env_url.format(branch=self.branch_name)
 34        self.probes_url = probes_url
 35
 36    def get_schema(self) -> Schema:
 37        return Schema(self._get_json(self.schema_url))
 38
 39    def get_env(self) -> Schema:
 40        return Schema(self._get_json(self.env_url))
 41
 42    def get_probes(self) -> List[Probe]:
 43        return [
 44            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 45        ]
 46
 47    def generate_schema(
 48        self, config: Config, *, max_size: int = None
 49    ) -> Dict[str, Schema]:
 50        schema = self.get_schema()
 51        env = self.get_env()
 52
 53        probes = self.get_probes()
 54
 55        if max_size is None:
 56            max_size = self.default_max_size
 57
 58        if env.get_size() >= max_size:
 59            raise SchemaException(
 60                "Environment must be smaller than max_size {}".format(max_size)
 61            )
 62
 63        if schema.get_size() >= max_size:
 64            raise SchemaException(
 65                "Schema must be smaller than max_size {}".format(max_size)
 66            )
 67
 68        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 69
 70        if any(schema.get_size() > max_size for schema in schemas.values()):
 71            raise SchemaException(
 72                "Schema must be smaller or equal max_size {}".format(max_size)
 73            )
 74
 75        return schemas
 76
 77    @staticmethod
 78    def make_schema(
 79        env: Schema, probes: List[Probe], config: Config, max_size: int
 80    ) -> Schema:
 81        """
 82        Fill in probes based on the config, and keep only the env
 83        parts of the schema. Throw away everything else.
 84        """
 85        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 86
 87        schema = env.clone()
 88        for schema_key, probe in schema_elements:
 89            try:
 90                addtlProps = env.get(schema_key + ("additionalProperties",))
 91            except KeyError:
 92                addtlProps = None
 93
 94            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 95
 96            schema.set_schema_elem(
 97                schema_key + ("properties", probe.name), probe_schema.schema
 98            )
 99
100        # Remove all additionalProperties (#22)
101        for key in config.get_match_keys():
102            try:
103                schema.delete_group_from_schema(
104                    key + ("propertyNames",), propagate=False
105                )
106            except KeyError:
107                pass
108
109            try:
110                schema.delete_group_from_schema(
111                    key + ("additionalProperties",), propagate=True
112                )
113            except KeyError:
114                pass
115
116        return schema
117
118    @staticmethod
119    def _slugify(text: str) -> str:
120        """Get a valid slug from an arbitrary string"""
121        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
122        return re.sub(r"[-\s]+", "-", value)
123
124    @staticmethod
125    def _present_in_cache(url: str) -> bool:
126        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
127
128    @staticmethod
129    def _add_to_cache(url: str, val: str):
130        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
131
132        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
133        # protect against multiple writers to the cache:
134        # https://github.com/mozilla/mozilla-schema-generator/pull/210
135        try:
136            with open(cache_file, "x") as f:
137                f.write(val)
138        except FileExistsError:
139            pass
140
141    @staticmethod
142    def _retrieve_from_cache(url: str) -> str:
143        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
144
145    @staticmethod
146    def _get_json_str(url: str) -> str:
147        if GenericPing._present_in_cache(url):
148            return GenericPing._retrieve_from_cache(url)
149
150        headers = {}
151        if url.startswith(GenericPing.probe_info_base_url):
152            # For probe-info-service requests, set the cache-control header to force
153            # google cloud cdn to bypass the cache
154            headers["Cache-Control"] = "no-cache"
155
156        r = requests.get(url, headers=headers, stream=True)
157        r.raise_for_status()
158
159        json_bytes = b""
160
161        try:
162            for chunk in r.iter_content(chunk_size=1024):
163                if chunk:
164                    json_bytes += chunk
165        except ValueError as e:
166            raise ValueError("Could not parse " + url) from e
167
168        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
169        GenericPing._add_to_cache(url, final_json)
170
171        return final_json
172
173    @staticmethod
174    def _get_json(url: str) -> dict:
175        try:
176            return json.loads(GenericPing._get_json_str(url))
177        except JSONDecodeError:
178            logging.error("Unable to process JSON for url: %s", url)
179            raise
logger = <Logger mozilla_schema_generator.generic_ping (WARNING)>
class GenericPing:
 25class GenericPing(object):
 26    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 27    default_encoding = "utf-8"
 28    default_max_size = 12900  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 29    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 30
 31    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 32        self.branch_name = mps_branch
 33        self.schema_url = schema_url.format(branch=self.branch_name)
 34        self.env_url = env_url.format(branch=self.branch_name)
 35        self.probes_url = probes_url
 36
 37    def get_schema(self) -> Schema:
 38        return Schema(self._get_json(self.schema_url))
 39
 40    def get_env(self) -> Schema:
 41        return Schema(self._get_json(self.env_url))
 42
 43    def get_probes(self) -> List[Probe]:
 44        return [
 45            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 46        ]
 47
 48    def generate_schema(
 49        self, config: Config, *, max_size: int = None
 50    ) -> Dict[str, Schema]:
 51        schema = self.get_schema()
 52        env = self.get_env()
 53
 54        probes = self.get_probes()
 55
 56        if max_size is None:
 57            max_size = self.default_max_size
 58
 59        if env.get_size() >= max_size:
 60            raise SchemaException(
 61                "Environment must be smaller than max_size {}".format(max_size)
 62            )
 63
 64        if schema.get_size() >= max_size:
 65            raise SchemaException(
 66                "Schema must be smaller than max_size {}".format(max_size)
 67            )
 68
 69        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 70
 71        if any(schema.get_size() > max_size for schema in schemas.values()):
 72            raise SchemaException(
 73                "Schema must be smaller or equal max_size {}".format(max_size)
 74            )
 75
 76        return schemas
 77
 78    @staticmethod
 79    def make_schema(
 80        env: Schema, probes: List[Probe], config: Config, max_size: int
 81    ) -> Schema:
 82        """
 83        Fill in probes based on the config, and keep only the env
 84        parts of the schema. Throw away everything else.
 85        """
 86        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 87
 88        schema = env.clone()
 89        for schema_key, probe in schema_elements:
 90            try:
 91                addtlProps = env.get(schema_key + ("additionalProperties",))
 92            except KeyError:
 93                addtlProps = None
 94
 95            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 96
 97            schema.set_schema_elem(
 98                schema_key + ("properties", probe.name), probe_schema.schema
 99            )
100
101        # Remove all additionalProperties (#22)
102        for key in config.get_match_keys():
103            try:
104                schema.delete_group_from_schema(
105                    key + ("propertyNames",), propagate=False
106                )
107            except KeyError:
108                pass
109
110            try:
111                schema.delete_group_from_schema(
112                    key + ("additionalProperties",), propagate=True
113                )
114            except KeyError:
115                pass
116
117        return schema
118
119    @staticmethod
120    def _slugify(text: str) -> str:
121        """Get a valid slug from an arbitrary string"""
122        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
123        return re.sub(r"[-\s]+", "-", value)
124
125    @staticmethod
126    def _present_in_cache(url: str) -> bool:
127        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
128
129    @staticmethod
130    def _add_to_cache(url: str, val: str):
131        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
132
133        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
134        # protect against multiple writers to the cache:
135        # https://github.com/mozilla/mozilla-schema-generator/pull/210
136        try:
137            with open(cache_file, "x") as f:
138                f.write(val)
139        except FileExistsError:
140            pass
141
142    @staticmethod
143    def _retrieve_from_cache(url: str) -> str:
144        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
145
146    @staticmethod
147    def _get_json_str(url: str) -> str:
148        if GenericPing._present_in_cache(url):
149            return GenericPing._retrieve_from_cache(url)
150
151        headers = {}
152        if url.startswith(GenericPing.probe_info_base_url):
153            # For probe-info-service requests, set the cache-control header to force
154            # google cloud cdn to bypass the cache
155            headers["Cache-Control"] = "no-cache"
156
157        r = requests.get(url, headers=headers, stream=True)
158        r.raise_for_status()
159
160        json_bytes = b""
161
162        try:
163            for chunk in r.iter_content(chunk_size=1024):
164                if chunk:
165                    json_bytes += chunk
166        except ValueError as e:
167            raise ValueError("Could not parse " + url) from e
168
169        final_json = json_bytes.decode(r.encoding or GenericPing.default_encoding)
170        GenericPing._add_to_cache(url, final_json)
171
172        return final_json
173
174    @staticmethod
175    def _get_json(url: str) -> dict:
176        try:
177            return json.loads(GenericPing._get_json_str(url))
178        except JSONDecodeError:
179            logging.error("Unable to process JSON for url: %s", url)
180            raise
GenericPing(schema_url, env_url, probes_url, mps_branch='main')
31    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
32        self.branch_name = mps_branch
33        self.schema_url = schema_url.format(branch=self.branch_name)
34        self.env_url = env_url.format(branch=self.branch_name)
35        self.probes_url = probes_url
probe_info_base_url = 'https://probeinfo.telemetry.mozilla.org'
default_encoding = 'utf-8'
default_max_size = 12900
cache_dir = PosixPath('.probe_cache')
branch_name
schema_url
env_url
probes_url
def get_schema(self) -> mozilla_schema_generator.schema.Schema:
37    def get_schema(self) -> Schema:
38        return Schema(self._get_json(self.schema_url))
def get_env(self) -> mozilla_schema_generator.schema.Schema:
40    def get_env(self) -> Schema:
41        return Schema(self._get_json(self.env_url))
def get_probes(self) -> List[mozilla_schema_generator.probes.Probe]:
43    def get_probes(self) -> List[Probe]:
44        return [
45            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
46        ]
def generate_schema( self, config: mozilla_schema_generator.config.Config, *, max_size: int = None) -> Dict[str, mozilla_schema_generator.schema.Schema]:
48    def generate_schema(
49        self, config: Config, *, max_size: int = None
50    ) -> Dict[str, Schema]:
51        schema = self.get_schema()
52        env = self.get_env()
53
54        probes = self.get_probes()
55
56        if max_size is None:
57            max_size = self.default_max_size
58
59        if env.get_size() >= max_size:
60            raise SchemaException(
61                "Environment must be smaller than max_size {}".format(max_size)
62            )
63
64        if schema.get_size() >= max_size:
65            raise SchemaException(
66                "Schema must be smaller than max_size {}".format(max_size)
67            )
68
69        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
70
71        if any(schema.get_size() > max_size for schema in schemas.values()):
72            raise SchemaException(
73                "Schema must be smaller or equal max_size {}".format(max_size)
74            )
75
76        return schemas
 78    @staticmethod
 79    def make_schema(
 80        env: Schema, probes: List[Probe], config: Config, max_size: int
 81    ) -> Schema:
 82        """
 83        Fill in probes based on the config, and keep only the env
 84        parts of the schema. Throw away everything else.
 85        """
 86        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 87
 88        schema = env.clone()
 89        for schema_key, probe in schema_elements:
 90            try:
 91                addtlProps = env.get(schema_key + ("additionalProperties",))
 92            except KeyError:
 93                addtlProps = None
 94
 95            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
 96
 97            schema.set_schema_elem(
 98                schema_key + ("properties", probe.name), probe_schema.schema
 99            )
100
101        # Remove all additionalProperties (#22)
102        for key in config.get_match_keys():
103            try:
104                schema.delete_group_from_schema(
105                    key + ("propertyNames",), propagate=False
106                )
107            except KeyError:
108                pass
109
110            try:
111                schema.delete_group_from_schema(
112                    key + ("additionalProperties",), propagate=True
113                )
114            except KeyError:
115                pass
116
117        return schema

Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.