mozilla_schema_generator.schema
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7 8from __future__ import annotations 9 10import copy 11from json import JSONEncoder 12from typing import Any, Iterable 13 14from .utils import _get 15 16 17class SchemaException(Exception): 18 pass 19 20 21class SchemaEncoder(JSONEncoder): 22 def default(self, obj): 23 if isinstance(obj, Schema): 24 return obj.schema 25 if isinstance(obj, dict): 26 return {k: self.default(v) for k, v in obj.items()} 27 if isinstance(obj, list): 28 return [self.default(v) for v in obj] 29 return JSONEncoder.default(self, obj) 30 31 32# TODO: s/Schema/JSONSchema 33class Schema(object): 34 def __init__(self, schema: dict): 35 self.schema = schema 36 37 def __eq__(self, other): 38 return isinstance(other, Schema) and self.schema == other.schema 39 40 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 41 """ 42 @param key: The key set 43 @param elem: The value to set the key to 44 @param propagate: If True, creates objects until it reaches the full key. 45 If False, and the parent of the key is not in the 46 schema, then the key will not be added. 47 """ 48 new_elem = self.schema 49 50 for k in key[:-1]: 51 if k not in new_elem: 52 if not propagate: 53 return 54 55 new_elem[k] = {} 56 if k == "properties": 57 new_elem["type"] = "object" 58 new_elem = new_elem[k] 59 60 new_elem[key[-1]] = elem 61 62 def get(self, key: Iterable[str]) -> Any: 63 return _get(self.schema, key) 64 65 def get_size(self) -> int: 66 return self._get_schema_size(self.schema) 67 68 def clone(self) -> Schema: 69 return Schema(copy.deepcopy(self.schema)) 70 71 def _delete_key(self, key: Iterable[str]): 72 try: 73 elem = _get(self.schema, key[:-1]) 74 del elem[key[-1]] 75 except KeyError: 76 return 77 78 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 79 """ 80 @param key: The key to remove 81 @param propagate: If True, then removes any parents of the deleted key 82 if they are now empty, i.e. there are no other 83 `properties`. 84 """ 85 self._delete_key(key) 86 87 # Now check, moving backwards, if that was the only available property 88 # If it was, and there are no additionalProperties, delete the parent 89 if propagate: 90 for subkey in reversed([key[:i] for i in range(len(key))]): 91 if not subkey or subkey[-1] == "properties": 92 # we only want to check the actual entry 93 continue 94 95 try: 96 elem = _get(self.schema, subkey) 97 if not elem.get("properties") and not elem.get( 98 "additionalProperties", False 99 ): 100 self._delete_key(subkey) 101 except KeyError: 102 break 103 104 def property_exists(self, key: Iterable[str]) -> bool: 105 """ 106 @param key: The key to check for existence 107 """ 108 target = self.schema 109 for x in key: 110 target = target.get(x, {}) 111 return bool(target) 112 113 @staticmethod 114 def _get_schema_size(schema: dict, key=None) -> int: 115 if key is None: 116 key = tuple() 117 118 if isinstance(schema, list): 119 return sum(Schema._get_schema_size(s) for s in schema) 120 121 if "type" not in schema: 122 # A JSON column is just that: one column 123 if schema.get("format") == "json": 124 return 1 125 126 raise Exception("Missing type for schema element at key " + "/".join(key)) 127 128 if isinstance(schema["type"], list): 129 max_size = 0 130 for t in schema["type"]: 131 s = copy.deepcopy(schema) 132 s["type"] = t 133 max_size = max(max_size, Schema._get_schema_size(s, key)) 134 return max_size 135 136 # TODO: Tests and finalize the different types available and how they map to BQ 137 # e.g. (allOf, anyOf, etc.) 138 if schema["type"] == "object": 139 # Sometimes the "properties" field is empty... 140 if "properties" in schema and schema["properties"]: 141 # A ROW type with a known set of fields 142 return sum( 143 ( 144 Schema._get_schema_size(p, key=key + (n,)) 145 for n, p in schema["properties"].items() 146 ) 147 ) 148 149 # A MAP type with key and value groups 150 return 2 151 152 if schema["type"] == "array": 153 if "items" not in schema: 154 raise Exception( 155 "Missing items for array schema element at key " + "/".join(key) 156 ) 157 # Arrays are repeated fields, get its size 158 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 159 160 # Otherwise, assume a scalar value 161 return 1
Common base class for all non-exit exceptions.
22class SchemaEncoder(JSONEncoder): 23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Extensible JSON https://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default() method with another method that returns a serializable
object for o if possible, otherwise it should call the superclass
implementation (to raise TypeError).
23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Implement this method in a subclass such that it returns
a serializable object for o, or calls the base implementation
(to raise a TypeError).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return super().default(o)
34class Schema(object): 35 def __init__(self, schema: dict): 36 self.schema = schema 37 38 def __eq__(self, other): 39 return isinstance(other, Schema) and self.schema == other.schema 40 41 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 42 """ 43 @param key: The key set 44 @param elem: The value to set the key to 45 @param propagate: If True, creates objects until it reaches the full key. 46 If False, and the parent of the key is not in the 47 schema, then the key will not be added. 48 """ 49 new_elem = self.schema 50 51 for k in key[:-1]: 52 if k not in new_elem: 53 if not propagate: 54 return 55 56 new_elem[k] = {} 57 if k == "properties": 58 new_elem["type"] = "object" 59 new_elem = new_elem[k] 60 61 new_elem[key[-1]] = elem 62 63 def get(self, key: Iterable[str]) -> Any: 64 return _get(self.schema, key) 65 66 def get_size(self) -> int: 67 return self._get_schema_size(self.schema) 68 69 def clone(self) -> Schema: 70 return Schema(copy.deepcopy(self.schema)) 71 72 def _delete_key(self, key: Iterable[str]): 73 try: 74 elem = _get(self.schema, key[:-1]) 75 del elem[key[-1]] 76 except KeyError: 77 return 78 79 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 80 """ 81 @param key: The key to remove 82 @param propagate: If True, then removes any parents of the deleted key 83 if they are now empty, i.e. there are no other 84 `properties`. 85 """ 86 self._delete_key(key) 87 88 # Now check, moving backwards, if that was the only available property 89 # If it was, and there are no additionalProperties, delete the parent 90 if propagate: 91 for subkey in reversed([key[:i] for i in range(len(key))]): 92 if not subkey or subkey[-1] == "properties": 93 # we only want to check the actual entry 94 continue 95 96 try: 97 elem = _get(self.schema, subkey) 98 if not elem.get("properties") and not elem.get( 99 "additionalProperties", False 100 ): 101 self._delete_key(subkey) 102 except KeyError: 103 break 104 105 def property_exists(self, key: Iterable[str]) -> bool: 106 """ 107 @param key: The key to check for existence 108 """ 109 target = self.schema 110 for x in key: 111 target = target.get(x, {}) 112 return bool(target) 113 114 @staticmethod 115 def _get_schema_size(schema: dict, key=None) -> int: 116 if key is None: 117 key = tuple() 118 119 if isinstance(schema, list): 120 return sum(Schema._get_schema_size(s) for s in schema) 121 122 if "type" not in schema: 123 # A JSON column is just that: one column 124 if schema.get("format") == "json": 125 return 1 126 127 raise Exception("Missing type for schema element at key " + "/".join(key)) 128 129 if isinstance(schema["type"], list): 130 max_size = 0 131 for t in schema["type"]: 132 s = copy.deepcopy(schema) 133 s["type"] = t 134 max_size = max(max_size, Schema._get_schema_size(s, key)) 135 return max_size 136 137 # TODO: Tests and finalize the different types available and how they map to BQ 138 # e.g. (allOf, anyOf, etc.) 139 if schema["type"] == "object": 140 # Sometimes the "properties" field is empty... 141 if "properties" in schema and schema["properties"]: 142 # A ROW type with a known set of fields 143 return sum( 144 ( 145 Schema._get_schema_size(p, key=key + (n,)) 146 for n, p in schema["properties"].items() 147 ) 148 ) 149 150 # A MAP type with key and value groups 151 return 2 152 153 if schema["type"] == "array": 154 if "items" not in schema: 155 raise Exception( 156 "Missing items for array schema element at key " + "/".join(key) 157 ) 158 # Arrays are repeated fields, get its size 159 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 160 161 # Otherwise, assume a scalar value 162 return 1
41 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 42 """ 43 @param key: The key set 44 @param elem: The value to set the key to 45 @param propagate: If True, creates objects until it reaches the full key. 46 If False, and the parent of the key is not in the 47 schema, then the key will not be added. 48 """ 49 new_elem = self.schema 50 51 for k in key[:-1]: 52 if k not in new_elem: 53 if not propagate: 54 return 55 56 new_elem[k] = {} 57 if k == "properties": 58 new_elem["type"] = "object" 59 new_elem = new_elem[k] 60 61 new_elem[key[-1]] = elem
@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.
79 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 80 """ 81 @param key: The key to remove 82 @param propagate: If True, then removes any parents of the deleted key 83 if they are now empty, i.e. there are no other 84 `properties`. 85 """ 86 self._delete_key(key) 87 88 # Now check, moving backwards, if that was the only available property 89 # If it was, and there are no additionalProperties, delete the parent 90 if propagate: 91 for subkey in reversed([key[:i] for i in range(len(key))]): 92 if not subkey or subkey[-1] == "properties": 93 # we only want to check the actual entry 94 continue 95 96 try: 97 elem = _get(self.schema, subkey) 98 if not elem.get("properties") and not elem.get( 99 "additionalProperties", False 100 ): 101 self._delete_key(subkey) 102 except KeyError: 103 break
@param key: The key to remove
@param propagate: If True, then removes any parents of the deleted key
if they are now empty, i.e. there are no other
properties.
105 def property_exists(self, key: Iterable[str]) -> bool: 106 """ 107 @param key: The key to check for existence 108 """ 109 target = self.schema 110 for x in key: 111 target = target.get(x, {}) 112 return bool(target)
@param key: The key to check for existence