mozilla_schema_generator.schema
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7 8from __future__ import annotations 9 10import copy 11from json import JSONEncoder 12from typing import Any, Iterable 13 14from .utils import _get 15 16 17class SchemaException(Exception): 18 pass 19 20 21class SchemaEncoder(JSONEncoder): 22 def default(self, obj): 23 if isinstance(obj, Schema): 24 return obj.schema 25 if isinstance(obj, dict): 26 return {k: self.default(v) for k, v in obj.items()} 27 if isinstance(obj, list): 28 return [self.default(v) for v in obj] 29 return JSONEncoder.default(self, obj) 30 31 32# TODO: s/Schema/JSONSchema 33class Schema(object): 34 def __init__(self, schema: dict): 35 self.schema = schema 36 37 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 38 """ 39 @param key: The key set 40 @param elem: The value to set the key to 41 @param propagate: If True, creates objects until it reaches the full key. 42 If False, and the parent of the key is not in the 43 schema, then the key will not be added. 44 """ 45 new_elem = self.schema 46 47 for k in key[:-1]: 48 if k not in new_elem: 49 if not propagate: 50 return 51 52 new_elem[k] = {} 53 if k == "properties": 54 new_elem["type"] = "object" 55 new_elem = new_elem[k] 56 57 new_elem[key[-1]] = elem 58 59 def get(self, key: Iterable[str]) -> Any: 60 return _get(self.schema, key) 61 62 def get_size(self) -> int: 63 return self._get_schema_size(self.schema) 64 65 def clone(self) -> Schema: 66 return Schema(copy.deepcopy(self.schema)) 67 68 def _delete_key(self, key: Iterable[str]): 69 try: 70 elem = _get(self.schema, key[:-1]) 71 del elem[key[-1]] 72 except KeyError: 73 return 74 75 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 76 """ 77 @param key: The key to remove 78 @param propagate: If True, then removes any parents of the deleted key 79 if they are now empty, i.e. there are no other 80 `properties`. 81 """ 82 self._delete_key(key) 83 84 # Now check, moving backwards, if that was the only available property 85 # If it was, and there are no additionalProperties, delete the parent 86 if propagate: 87 for subkey in reversed([key[:i] for i in range(len(key))]): 88 if not subkey or subkey[-1] == "properties": 89 # we only want to check the actual entry 90 continue 91 92 try: 93 elem = _get(self.schema, subkey) 94 if not elem.get("properties") and not elem.get( 95 "additionalProperties", False 96 ): 97 self._delete_key(subkey) 98 except KeyError: 99 break 100 101 def property_exists(self, key: Iterable[str]) -> bool: 102 """ 103 @param key: The key to check for existence 104 """ 105 target = self.schema 106 for x in key: 107 target = target.get(x, {}) 108 return bool(target) 109 110 @staticmethod 111 def _get_schema_size(schema: dict, key=None) -> int: 112 if key is None: 113 key = tuple() 114 115 if isinstance(schema, list): 116 return sum(Schema._get_schema_size(s) for s in schema) 117 118 if "type" not in schema: 119 raise Exception("Missing type for schema element at key " + "/".join(key)) 120 121 if isinstance(schema["type"], list): 122 max_size = 0 123 for t in schema["type"]: 124 s = copy.deepcopy(schema) 125 s["type"] = t 126 max_size = max(max_size, Schema._get_schema_size(s, key)) 127 return max_size 128 129 # TODO: Tests and finalize the different types available and how they map to BQ 130 # e.g. (allOf, anyOf, etc.) 131 if schema["type"] == "object": 132 # Sometimes the "properties" field is empty... 133 if "properties" in schema and schema["properties"]: 134 # A ROW type with a known set of fields 135 return sum( 136 ( 137 Schema._get_schema_size(p, key=key + (n,)) 138 for n, p in schema["properties"].items() 139 ) 140 ) 141 142 # A MAP type with key and value groups 143 return 2 144 145 if schema["type"] == "array": 146 if "items" not in schema: 147 raise Exception( 148 "Missing items for array schema element at key " + "/".join(key) 149 ) 150 # Arrays are repeated fields, get its size 151 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 152 153 # Otherwise, assume a scalar value 154 return 1
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
22class SchemaEncoder(JSONEncoder): 23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Extensible JSON http://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Inherited Members
- json.encoder.JSONEncoder
- JSONEncoder
- encode
- iterencode
34class Schema(object): 35 def __init__(self, schema: dict): 36 self.schema = schema 37 38 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 39 """ 40 @param key: The key set 41 @param elem: The value to set the key to 42 @param propagate: If True, creates objects until it reaches the full key. 43 If False, and the parent of the key is not in the 44 schema, then the key will not be added. 45 """ 46 new_elem = self.schema 47 48 for k in key[:-1]: 49 if k not in new_elem: 50 if not propagate: 51 return 52 53 new_elem[k] = {} 54 if k == "properties": 55 new_elem["type"] = "object" 56 new_elem = new_elem[k] 57 58 new_elem[key[-1]] = elem 59 60 def get(self, key: Iterable[str]) -> Any: 61 return _get(self.schema, key) 62 63 def get_size(self) -> int: 64 return self._get_schema_size(self.schema) 65 66 def clone(self) -> Schema: 67 return Schema(copy.deepcopy(self.schema)) 68 69 def _delete_key(self, key: Iterable[str]): 70 try: 71 elem = _get(self.schema, key[:-1]) 72 del elem[key[-1]] 73 except KeyError: 74 return 75 76 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 77 """ 78 @param key: The key to remove 79 @param propagate: If True, then removes any parents of the deleted key 80 if they are now empty, i.e. there are no other 81 `properties`. 82 """ 83 self._delete_key(key) 84 85 # Now check, moving backwards, if that was the only available property 86 # If it was, and there are no additionalProperties, delete the parent 87 if propagate: 88 for subkey in reversed([key[:i] for i in range(len(key))]): 89 if not subkey or subkey[-1] == "properties": 90 # we only want to check the actual entry 91 continue 92 93 try: 94 elem = _get(self.schema, subkey) 95 if not elem.get("properties") and not elem.get( 96 "additionalProperties", False 97 ): 98 self._delete_key(subkey) 99 except KeyError: 100 break 101 102 def property_exists(self, key: Iterable[str]) -> bool: 103 """ 104 @param key: The key to check for existence 105 """ 106 target = self.schema 107 for x in key: 108 target = target.get(x, {}) 109 return bool(target) 110 111 @staticmethod 112 def _get_schema_size(schema: dict, key=None) -> int: 113 if key is None: 114 key = tuple() 115 116 if isinstance(schema, list): 117 return sum(Schema._get_schema_size(s) for s in schema) 118 119 if "type" not in schema: 120 raise Exception("Missing type for schema element at key " + "/".join(key)) 121 122 if isinstance(schema["type"], list): 123 max_size = 0 124 for t in schema["type"]: 125 s = copy.deepcopy(schema) 126 s["type"] = t 127 max_size = max(max_size, Schema._get_schema_size(s, key)) 128 return max_size 129 130 # TODO: Tests and finalize the different types available and how they map to BQ 131 # e.g. (allOf, anyOf, etc.) 132 if schema["type"] == "object": 133 # Sometimes the "properties" field is empty... 134 if "properties" in schema and schema["properties"]: 135 # A ROW type with a known set of fields 136 return sum( 137 ( 138 Schema._get_schema_size(p, key=key + (n,)) 139 for n, p in schema["properties"].items() 140 ) 141 ) 142 143 # A MAP type with key and value groups 144 return 2 145 146 if schema["type"] == "array": 147 if "items" not in schema: 148 raise Exception( 149 "Missing items for array schema element at key " + "/".join(key) 150 ) 151 # Arrays are repeated fields, get its size 152 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 153 154 # Otherwise, assume a scalar value 155 return 1
38 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 39 """ 40 @param key: The key set 41 @param elem: The value to set the key to 42 @param propagate: If True, creates objects until it reaches the full key. 43 If False, and the parent of the key is not in the 44 schema, then the key will not be added. 45 """ 46 new_elem = self.schema 47 48 for k in key[:-1]: 49 if k not in new_elem: 50 if not propagate: 51 return 52 53 new_elem[k] = {} 54 if k == "properties": 55 new_elem["type"] = "object" 56 new_elem = new_elem[k] 57 58 new_elem[key[-1]] = elem
@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.
76 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 77 """ 78 @param key: The key to remove 79 @param propagate: If True, then removes any parents of the deleted key 80 if they are now empty, i.e. there are no other 81 `properties`. 82 """ 83 self._delete_key(key) 84 85 # Now check, moving backwards, if that was the only available property 86 # If it was, and there are no additionalProperties, delete the parent 87 if propagate: 88 for subkey in reversed([key[:i] for i in range(len(key))]): 89 if not subkey or subkey[-1] == "properties": 90 # we only want to check the actual entry 91 continue 92 93 try: 94 elem = _get(self.schema, subkey) 95 if not elem.get("properties") and not elem.get( 96 "additionalProperties", False 97 ): 98 self._delete_key(subkey) 99 except KeyError: 100 break
@param key: The key to remove
@param propagate: If True, then removes any parents of the deleted key
if they are now empty, i.e. there are no other
properties
.
102 def property_exists(self, key: Iterable[str]) -> bool: 103 """ 104 @param key: The key to check for existence 105 """ 106 target = self.schema 107 for x in key: 108 target = target.get(x, {}) 109 return bool(target)
@param key: The key to check for existence