mozilla_schema_generator.schema
1# -*- coding: utf-8 -*- 2 3# This Source Code Form is subject to the terms of the Mozilla Public 4# License, v. 2.0. If a copy of the MPL was not distributed with this 5# file, You can obtain one at http://mozilla.org/MPL/2.0/. 6 7 8from __future__ import annotations 9 10import copy 11from json import JSONEncoder 12from typing import Any, Iterable 13 14from .utils import _get 15 16 17class SchemaException(Exception): 18 pass 19 20 21class SchemaEncoder(JSONEncoder): 22 def default(self, obj): 23 if isinstance(obj, Schema): 24 return obj.schema 25 if isinstance(obj, dict): 26 return {k: self.default(v) for k, v in obj.items()} 27 if isinstance(obj, list): 28 return [self.default(v) for v in obj] 29 return JSONEncoder.default(self, obj) 30 31 32# TODO: s/Schema/JSONSchema 33class Schema(object): 34 def __init__(self, schema: dict): 35 self.schema = schema 36 37 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 38 """ 39 @param key: The key set 40 @param elem: The value to set the key to 41 @param propagate: If True, creates objects until it reaches the full key. 42 If False, and the parent of the key is not in the 43 schema, then the key will not be added. 44 """ 45 new_elem = self.schema 46 47 for k in key[:-1]: 48 if k not in new_elem: 49 if not propagate: 50 return 51 52 new_elem[k] = {} 53 if k == "properties": 54 new_elem["type"] = "object" 55 new_elem = new_elem[k] 56 57 new_elem[key[-1]] = elem 58 59 def get(self, key: Iterable[str]) -> Any: 60 return _get(self.schema, key) 61 62 def get_size(self) -> int: 63 return self._get_schema_size(self.schema) 64 65 def clone(self) -> Schema: 66 return Schema(copy.deepcopy(self.schema)) 67 68 def _delete_key(self, key: Iterable[str]): 69 try: 70 elem = _get(self.schema, key[:-1]) 71 del elem[key[-1]] 72 except KeyError: 73 return 74 75 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 76 """ 77 @param key: The key to remove 78 @param propagate: If True, then removes any parents of the deleted key 79 if they are now empty, i.e. there are no other 80 `properties`. 81 """ 82 self._delete_key(key) 83 84 # Now check, moving backwards, if that was the only available property 85 # If it was, and there are no additionalProperties, delete the parent 86 if propagate: 87 for subkey in reversed([key[:i] for i in range(len(key))]): 88 if not subkey or subkey[-1] == "properties": 89 # we only want to check the actual entry 90 continue 91 92 try: 93 elem = _get(self.schema, subkey) 94 if not elem.get("properties") and not elem.get( 95 "additionalProperties", False 96 ): 97 self._delete_key(subkey) 98 except KeyError: 99 break 100 101 def property_exists(self, key: Iterable[str]) -> bool: 102 """ 103 @param key: The key to check for existence 104 """ 105 target = self.schema 106 for x in key: 107 target = target.get(x, {}) 108 return bool(target) 109 110 @staticmethod 111 def _get_schema_size(schema: dict, key=None) -> int: 112 if key is None: 113 key = tuple() 114 115 if isinstance(schema, list): 116 return sum(Schema._get_schema_size(s) for s in schema) 117 118 if "type" not in schema: 119 # A JSON column is just that: one column 120 if schema.get("format") == "json": 121 return 1 122 123 raise Exception("Missing type for schema element at key " + "/".join(key)) 124 125 if isinstance(schema["type"], list): 126 max_size = 0 127 for t in schema["type"]: 128 s = copy.deepcopy(schema) 129 s["type"] = t 130 max_size = max(max_size, Schema._get_schema_size(s, key)) 131 return max_size 132 133 # TODO: Tests and finalize the different types available and how they map to BQ 134 # e.g. (allOf, anyOf, etc.) 135 if schema["type"] == "object": 136 # Sometimes the "properties" field is empty... 137 if "properties" in schema and schema["properties"]: 138 # A ROW type with a known set of fields 139 return sum( 140 ( 141 Schema._get_schema_size(p, key=key + (n,)) 142 for n, p in schema["properties"].items() 143 ) 144 ) 145 146 # A MAP type with key and value groups 147 return 2 148 149 if schema["type"] == "array": 150 if "items" not in schema: 151 raise Exception( 152 "Missing items for array schema element at key " + "/".join(key) 153 ) 154 # Arrays are repeated fields, get its size 155 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 156 157 # Otherwise, assume a scalar value 158 return 1
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
22class SchemaEncoder(JSONEncoder): 23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Extensible JSON http://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable
object for o
if possible, otherwise it should call the superclass
implementation (to raise TypeError
).
23 def default(self, obj): 24 if isinstance(obj, Schema): 25 return obj.schema 26 if isinstance(obj, dict): 27 return {k: self.default(v) for k, v in obj.items()} 28 if isinstance(obj, list): 29 return [self.default(v) for v in obj] 30 return JSONEncoder.default(self, obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
try:
iterable = iter(o)
except TypeError:
pass
else:
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Inherited Members
- json.encoder.JSONEncoder
- JSONEncoder
- item_separator
- key_separator
- skipkeys
- ensure_ascii
- check_circular
- allow_nan
- sort_keys
- indent
- encode
- iterencode
34class Schema(object): 35 def __init__(self, schema: dict): 36 self.schema = schema 37 38 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 39 """ 40 @param key: The key set 41 @param elem: The value to set the key to 42 @param propagate: If True, creates objects until it reaches the full key. 43 If False, and the parent of the key is not in the 44 schema, then the key will not be added. 45 """ 46 new_elem = self.schema 47 48 for k in key[:-1]: 49 if k not in new_elem: 50 if not propagate: 51 return 52 53 new_elem[k] = {} 54 if k == "properties": 55 new_elem["type"] = "object" 56 new_elem = new_elem[k] 57 58 new_elem[key[-1]] = elem 59 60 def get(self, key: Iterable[str]) -> Any: 61 return _get(self.schema, key) 62 63 def get_size(self) -> int: 64 return self._get_schema_size(self.schema) 65 66 def clone(self) -> Schema: 67 return Schema(copy.deepcopy(self.schema)) 68 69 def _delete_key(self, key: Iterable[str]): 70 try: 71 elem = _get(self.schema, key[:-1]) 72 del elem[key[-1]] 73 except KeyError: 74 return 75 76 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 77 """ 78 @param key: The key to remove 79 @param propagate: If True, then removes any parents of the deleted key 80 if they are now empty, i.e. there are no other 81 `properties`. 82 """ 83 self._delete_key(key) 84 85 # Now check, moving backwards, if that was the only available property 86 # If it was, and there are no additionalProperties, delete the parent 87 if propagate: 88 for subkey in reversed([key[:i] for i in range(len(key))]): 89 if not subkey or subkey[-1] == "properties": 90 # we only want to check the actual entry 91 continue 92 93 try: 94 elem = _get(self.schema, subkey) 95 if not elem.get("properties") and not elem.get( 96 "additionalProperties", False 97 ): 98 self._delete_key(subkey) 99 except KeyError: 100 break 101 102 def property_exists(self, key: Iterable[str]) -> bool: 103 """ 104 @param key: The key to check for existence 105 """ 106 target = self.schema 107 for x in key: 108 target = target.get(x, {}) 109 return bool(target) 110 111 @staticmethod 112 def _get_schema_size(schema: dict, key=None) -> int: 113 if key is None: 114 key = tuple() 115 116 if isinstance(schema, list): 117 return sum(Schema._get_schema_size(s) for s in schema) 118 119 if "type" not in schema: 120 # A JSON column is just that: one column 121 if schema.get("format") == "json": 122 return 1 123 124 raise Exception("Missing type for schema element at key " + "/".join(key)) 125 126 if isinstance(schema["type"], list): 127 max_size = 0 128 for t in schema["type"]: 129 s = copy.deepcopy(schema) 130 s["type"] = t 131 max_size = max(max_size, Schema._get_schema_size(s, key)) 132 return max_size 133 134 # TODO: Tests and finalize the different types available and how they map to BQ 135 # e.g. (allOf, anyOf, etc.) 136 if schema["type"] == "object": 137 # Sometimes the "properties" field is empty... 138 if "properties" in schema and schema["properties"]: 139 # A ROW type with a known set of fields 140 return sum( 141 ( 142 Schema._get_schema_size(p, key=key + (n,)) 143 for n, p in schema["properties"].items() 144 ) 145 ) 146 147 # A MAP type with key and value groups 148 return 2 149 150 if schema["type"] == "array": 151 if "items" not in schema: 152 raise Exception( 153 "Missing items for array schema element at key " + "/".join(key) 154 ) 155 # Arrays are repeated fields, get its size 156 return Schema._get_schema_size(schema["items"], key=key + ("arr-items",)) 157 158 # Otherwise, assume a scalar value 159 return 1
38 def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict: 39 """ 40 @param key: The key set 41 @param elem: The value to set the key to 42 @param propagate: If True, creates objects until it reaches the full key. 43 If False, and the parent of the key is not in the 44 schema, then the key will not be added. 45 """ 46 new_elem = self.schema 47 48 for k in key[:-1]: 49 if k not in new_elem: 50 if not propagate: 51 return 52 53 new_elem[k] = {} 54 if k == "properties": 55 new_elem["type"] = "object" 56 new_elem = new_elem[k] 57 58 new_elem[key[-1]] = elem
@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.
76 def delete_group_from_schema(self, key: Iterable[str], *, propagate=True): 77 """ 78 @param key: The key to remove 79 @param propagate: If True, then removes any parents of the deleted key 80 if they are now empty, i.e. there are no other 81 `properties`. 82 """ 83 self._delete_key(key) 84 85 # Now check, moving backwards, if that was the only available property 86 # If it was, and there are no additionalProperties, delete the parent 87 if propagate: 88 for subkey in reversed([key[:i] for i in range(len(key))]): 89 if not subkey or subkey[-1] == "properties": 90 # we only want to check the actual entry 91 continue 92 93 try: 94 elem = _get(self.schema, subkey) 95 if not elem.get("properties") and not elem.get( 96 "additionalProperties", False 97 ): 98 self._delete_key(subkey) 99 except KeyError: 100 break
@param key: The key to remove
@param propagate: If True, then removes any parents of the deleted key
if they are now empty, i.e. there are no other
properties
.
102 def property_exists(self, key: Iterable[str]) -> bool: 103 """ 104 @param key: The key to check for existence 105 """ 106 target = self.schema 107 for x in key: 108 target = target.get(x, {}) 109 return bool(target)
@param key: The key to check for existence