mozilla_schema_generator.generic_ping

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7import json
  8import logging
  9import os
 10import pathlib
 11import re
 12from json.decoder import JSONDecodeError
 13from typing import Dict, List
 14
 15import requests
 16from requests.adapters import HTTPAdapter
 17from urllib3.util.retry import Retry
 18
 19from .config import Config
 20from .probes import Probe
 21from .schema import Schema, SchemaException
 22
 23logger = logging.getLogger(__name__)
 24
 25_http_session = requests.Session()
 26_http_session.mount(
 27    "https://",
 28    HTTPAdapter(
 29        max_retries=Retry(total=3, backoff_factor=1, status_forcelist=[502, 503, 504])
 30    ),
 31)
 32
 33
 34class GenericPing(object):
 35    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 36    default_encoding = "utf-8"
 37    default_max_size = 12900  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 38    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 39
 40    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 41        self.branch_name = mps_branch
 42        self.schema_url = schema_url.format(branch=self.branch_name)
 43        self.env_url = env_url.format(branch=self.branch_name)
 44        self.probes_url = probes_url
 45
 46    def get_schema(self) -> Schema:
 47        return Schema(self._get_json(self.schema_url))
 48
 49    def get_env(self) -> Schema:
 50        return Schema(self._get_json(self.env_url))
 51
 52    def get_probes(self) -> List[Probe]:
 53        return [
 54            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 55        ]
 56
 57    def generate_schema(
 58        self, config: Config, *, max_size: int = None
 59    ) -> Dict[str, Schema]:
 60        schema = self.get_schema()
 61        env = self.get_env()
 62
 63        probes = self.get_probes()
 64
 65        if max_size is None:
 66            max_size = self.default_max_size
 67
 68        if env.get_size() >= max_size:
 69            raise SchemaException(
 70                "Environment must be smaller than max_size {}".format(max_size)
 71            )
 72
 73        if schema.get_size() >= max_size:
 74            raise SchemaException(
 75                "Schema must be smaller than max_size {}".format(max_size)
 76            )
 77
 78        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 79
 80        if any(schema.get_size() > max_size for schema in schemas.values()):
 81            raise SchemaException(
 82                "Schema must be smaller or equal max_size {}".format(max_size)
 83            )
 84
 85        return schemas
 86
 87    @staticmethod
 88    def make_schema(
 89        env: Schema, probes: List[Probe], config: Config, max_size: int
 90    ) -> Schema:
 91        """
 92        Fill in probes based on the config, and keep only the env
 93        parts of the schema. Throw away everything else.
 94        """
 95        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 96
 97        schema = env.clone()
 98        for schema_key, probe in schema_elements:
 99            try:
100                addtlProps = env.get(schema_key + ("additionalProperties",))
101            except KeyError:
102                addtlProps = None
103
104            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
105
106            schema.set_schema_elem(
107                schema_key + ("properties", probe.name), probe_schema.schema
108            )
109
110        # Remove all additionalProperties (#22)
111        for key in config.get_match_keys():
112            try:
113                schema.delete_group_from_schema(
114                    key + ("propertyNames",), propagate=False
115                )
116            except KeyError:
117                pass
118
119            try:
120                schema.delete_group_from_schema(
121                    key + ("additionalProperties",), propagate=True
122                )
123            except KeyError:
124                pass
125
126        return schema
127
128    @staticmethod
129    def _slugify(text: str) -> str:
130        """Get a valid slug from an arbitrary string"""
131        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
132        return re.sub(r"[-\s]+", "-", value)
133
134    @staticmethod
135    def _present_in_cache(url: str) -> bool:
136        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
137
138    @staticmethod
139    def _add_to_cache(url: str, val: str):
140        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
141
142        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
143        # protect against multiple writers to the cache:
144        # https://github.com/mozilla/mozilla-schema-generator/pull/210
145        try:
146            with open(cache_file, "x") as f:
147                f.write(val)
148        except FileExistsError:
149            pass
150
151    @staticmethod
152    def _retrieve_from_cache(url: str) -> str:
153        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
154
155    @staticmethod
156    def _get_json_str(url: str) -> str:
157        if GenericPing._present_in_cache(url):
158            return GenericPing._retrieve_from_cache(url)
159
160        headers = {}
161        if url.startswith(GenericPing.probe_info_base_url):
162            # For probe-info-service requests, set the cache-control header to force
163            # google cloud cdn to bypass the cache
164            headers["Cache-Control"] = "no-cache"
165
166        r = _http_session.get(url, headers=headers)
167        r.raise_for_status()
168
169        final_json = r.content.decode(r.encoding or GenericPing.default_encoding)
170        GenericPing._add_to_cache(url, final_json)
171
172        return final_json
173
174    @staticmethod
175    def _get_json(url: str) -> dict:
176        try:
177            return json.loads(GenericPing._get_json_str(url))
178        except JSONDecodeError:
179            logging.error("Unable to process JSON for url: %s", url)
180            raise
logger = <Logger mozilla_schema_generator.generic_ping (WARNING)>
class GenericPing:
 35class GenericPing(object):
 36    probe_info_base_url = "https://probeinfo.telemetry.mozilla.org"
 37    default_encoding = "utf-8"
 38    default_max_size = 12900  # https://bugzilla.mozilla.org/show_bug.cgi?id=1688633
 39    cache_dir = pathlib.Path(os.environ.get("MSG_PROBE_CACHE_DIR", ".probe_cache"))
 40
 41    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
 42        self.branch_name = mps_branch
 43        self.schema_url = schema_url.format(branch=self.branch_name)
 44        self.env_url = env_url.format(branch=self.branch_name)
 45        self.probes_url = probes_url
 46
 47    def get_schema(self) -> Schema:
 48        return Schema(self._get_json(self.schema_url))
 49
 50    def get_env(self) -> Schema:
 51        return Schema(self._get_json(self.env_url))
 52
 53    def get_probes(self) -> List[Probe]:
 54        return [
 55            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
 56        ]
 57
 58    def generate_schema(
 59        self, config: Config, *, max_size: int = None
 60    ) -> Dict[str, Schema]:
 61        schema = self.get_schema()
 62        env = self.get_env()
 63
 64        probes = self.get_probes()
 65
 66        if max_size is None:
 67            max_size = self.default_max_size
 68
 69        if env.get_size() >= max_size:
 70            raise SchemaException(
 71                "Environment must be smaller than max_size {}".format(max_size)
 72            )
 73
 74        if schema.get_size() >= max_size:
 75            raise SchemaException(
 76                "Schema must be smaller than max_size {}".format(max_size)
 77            )
 78
 79        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
 80
 81        if any(schema.get_size() > max_size for schema in schemas.values()):
 82            raise SchemaException(
 83                "Schema must be smaller or equal max_size {}".format(max_size)
 84            )
 85
 86        return schemas
 87
 88    @staticmethod
 89    def make_schema(
 90        env: Schema, probes: List[Probe], config: Config, max_size: int
 91    ) -> Schema:
 92        """
 93        Fill in probes based on the config, and keep only the env
 94        parts of the schema. Throw away everything else.
 95        """
 96        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 97
 98        schema = env.clone()
 99        for schema_key, probe in schema_elements:
100            try:
101                addtlProps = env.get(schema_key + ("additionalProperties",))
102            except KeyError:
103                addtlProps = None
104
105            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
106
107            schema.set_schema_elem(
108                schema_key + ("properties", probe.name), probe_schema.schema
109            )
110
111        # Remove all additionalProperties (#22)
112        for key in config.get_match_keys():
113            try:
114                schema.delete_group_from_schema(
115                    key + ("propertyNames",), propagate=False
116                )
117            except KeyError:
118                pass
119
120            try:
121                schema.delete_group_from_schema(
122                    key + ("additionalProperties",), propagate=True
123                )
124            except KeyError:
125                pass
126
127        return schema
128
129    @staticmethod
130    def _slugify(text: str) -> str:
131        """Get a valid slug from an arbitrary string"""
132        value = re.sub(r"[^\w\s-]", "", text.lower()).strip()
133        return re.sub(r"[-\s]+", "-", value)
134
135    @staticmethod
136    def _present_in_cache(url: str) -> bool:
137        return (GenericPing.cache_dir / GenericPing._slugify(url)).exists()
138
139    @staticmethod
140    def _add_to_cache(url: str, val: str):
141        GenericPing.cache_dir.mkdir(parents=True, exist_ok=True)
142
143        cache_file = GenericPing.cache_dir / GenericPing._slugify(url)
144        # protect against multiple writers to the cache:
145        # https://github.com/mozilla/mozilla-schema-generator/pull/210
146        try:
147            with open(cache_file, "x") as f:
148                f.write(val)
149        except FileExistsError:
150            pass
151
152    @staticmethod
153    def _retrieve_from_cache(url: str) -> str:
154        return (GenericPing.cache_dir / GenericPing._slugify(url)).read_text()
155
156    @staticmethod
157    def _get_json_str(url: str) -> str:
158        if GenericPing._present_in_cache(url):
159            return GenericPing._retrieve_from_cache(url)
160
161        headers = {}
162        if url.startswith(GenericPing.probe_info_base_url):
163            # For probe-info-service requests, set the cache-control header to force
164            # google cloud cdn to bypass the cache
165            headers["Cache-Control"] = "no-cache"
166
167        r = _http_session.get(url, headers=headers)
168        r.raise_for_status()
169
170        final_json = r.content.decode(r.encoding or GenericPing.default_encoding)
171        GenericPing._add_to_cache(url, final_json)
172
173        return final_json
174
175    @staticmethod
176    def _get_json(url: str) -> dict:
177        try:
178            return json.loads(GenericPing._get_json_str(url))
179        except JSONDecodeError:
180            logging.error("Unable to process JSON for url: %s", url)
181            raise
GenericPing(schema_url, env_url, probes_url, mps_branch='main')
41    def __init__(self, schema_url, env_url, probes_url, mps_branch="main"):
42        self.branch_name = mps_branch
43        self.schema_url = schema_url.format(branch=self.branch_name)
44        self.env_url = env_url.format(branch=self.branch_name)
45        self.probes_url = probes_url
probe_info_base_url = 'https://probeinfo.telemetry.mozilla.org'
default_encoding = 'utf-8'
default_max_size = 12900
cache_dir = PosixPath('.probe_cache')
branch_name
schema_url
env_url
probes_url
def get_schema(self) -> mozilla_schema_generator.schema.Schema:
47    def get_schema(self) -> Schema:
48        return Schema(self._get_json(self.schema_url))
def get_env(self) -> mozilla_schema_generator.schema.Schema:
50    def get_env(self) -> Schema:
51        return Schema(self._get_json(self.env_url))
def get_probes(self) -> List[mozilla_schema_generator.probes.Probe]:
53    def get_probes(self) -> List[Probe]:
54        return [
55            Probe(_id, defn) for _id, defn in self._get_json(self.probes_url).items()
56        ]
def generate_schema( self, config: mozilla_schema_generator.config.Config, *, max_size: int = None) -> Dict[str, mozilla_schema_generator.schema.Schema]:
58    def generate_schema(
59        self, config: Config, *, max_size: int = None
60    ) -> Dict[str, Schema]:
61        schema = self.get_schema()
62        env = self.get_env()
63
64        probes = self.get_probes()
65
66        if max_size is None:
67            max_size = self.default_max_size
68
69        if env.get_size() >= max_size:
70            raise SchemaException(
71                "Environment must be smaller than max_size {}".format(max_size)
72            )
73
74        if schema.get_size() >= max_size:
75            raise SchemaException(
76                "Schema must be smaller than max_size {}".format(max_size)
77            )
78
79        schemas = {config.name: self.make_schema(schema, probes, config, max_size)}
80
81        if any(schema.get_size() > max_size for schema in schemas.values()):
82            raise SchemaException(
83                "Schema must be smaller or equal max_size {}".format(max_size)
84            )
85
86        return schemas
 88    @staticmethod
 89    def make_schema(
 90        env: Schema, probes: List[Probe], config: Config, max_size: int
 91    ) -> Schema:
 92        """
 93        Fill in probes based on the config, and keep only the env
 94        parts of the schema. Throw away everything else.
 95        """
 96        schema_elements = sorted(config.get_schema_elements(probes), key=lambda x: x[1])
 97
 98        schema = env.clone()
 99        for schema_key, probe in schema_elements:
100            try:
101                addtlProps = env.get(schema_key + ("additionalProperties",))
102            except KeyError:
103                addtlProps = None
104
105            probe_schema = Schema(probe.get_schema(addtlProps)).clone()
106
107            schema.set_schema_elem(
108                schema_key + ("properties", probe.name), probe_schema.schema
109            )
110
111        # Remove all additionalProperties (#22)
112        for key in config.get_match_keys():
113            try:
114                schema.delete_group_from_schema(
115                    key + ("propertyNames",), propagate=False
116                )
117            except KeyError:
118                pass
119
120            try:
121                schema.delete_group_from_schema(
122                    key + ("additionalProperties",), propagate=True
123                )
124            except KeyError:
125                pass
126
127        return schema

Fill in probes based on the config, and keep only the env parts of the schema. Throw away everything else.