mozilla_schema_generator.schema

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7
  8from __future__ import annotations
  9
 10import copy
 11from json import JSONEncoder
 12from typing import Any, Iterable
 13
 14from .utils import _get
 15
 16
 17class SchemaException(Exception):
 18    pass
 19
 20
 21class SchemaEncoder(JSONEncoder):
 22    def default(self, obj):
 23        if isinstance(obj, Schema):
 24            return obj.schema
 25        if isinstance(obj, dict):
 26            return {k: self.default(v) for k, v in obj.items()}
 27        if isinstance(obj, list):
 28            return [self.default(v) for v in obj]
 29        return JSONEncoder.default(self, obj)
 30
 31
 32# TODO: s/Schema/JSONSchema
 33class Schema(object):
 34    def __init__(self, schema: dict):
 35        self.schema = schema
 36
 37    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 38        """
 39        @param key: The key set
 40        @param elem: The value to set the key to
 41        @param propagate: If True, creates objects until it reaches the full key.
 42                          If False, and the parent of the key is not in the
 43                          schema, then the key will not be added.
 44        """
 45        new_elem = self.schema
 46
 47        for k in key[:-1]:
 48            if k not in new_elem:
 49                if not propagate:
 50                    return
 51
 52                new_elem[k] = {}
 53                if k == "properties":
 54                    new_elem["type"] = "object"
 55            new_elem = new_elem[k]
 56
 57        new_elem[key[-1]] = elem
 58
 59    def get(self, key: Iterable[str]) -> Any:
 60        return _get(self.schema, key)
 61
 62    def get_size(self) -> int:
 63        return self._get_schema_size(self.schema)
 64
 65    def clone(self) -> Schema:
 66        return Schema(copy.deepcopy(self.schema))
 67
 68    def _delete_key(self, key: Iterable[str]):
 69        try:
 70            elem = _get(self.schema, key[:-1])
 71            del elem[key[-1]]
 72        except KeyError:
 73            return
 74
 75    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 76        """
 77        @param key: The key to remove
 78        @param propagate: If True, then removes any parents of the deleted key
 79                          if they are now empty, i.e. there are no other
 80                          `properties`.
 81        """
 82        self._delete_key(key)
 83
 84        # Now check, moving backwards, if that was the only available property
 85        # If it was, and there are no additionalProperties, delete the parent
 86        if propagate:
 87            for subkey in reversed([key[:i] for i in range(len(key))]):
 88                if not subkey or subkey[-1] == "properties":
 89                    # we only want to check the actual entry
 90                    continue
 91
 92                try:
 93                    elem = _get(self.schema, subkey)
 94                    if not elem.get("properties") and not elem.get(
 95                        "additionalProperties", False
 96                    ):
 97                        self._delete_key(subkey)
 98                except KeyError:
 99                    break
100
101    def property_exists(self, key: Iterable[str]) -> bool:
102        """
103        @param key: The key to check for existence
104        """
105        target = self.schema
106        for x in key:
107            target = target.get(x, {})
108        return bool(target)
109
110    @staticmethod
111    def _get_schema_size(schema: dict, key=None) -> int:
112        if key is None:
113            key = tuple()
114
115        if isinstance(schema, list):
116            return sum(Schema._get_schema_size(s) for s in schema)
117
118        if "type" not in schema:
119            # A JSON column is just that: one column
120            if schema.get("format") == "json":
121                return 1
122
123            raise Exception("Missing type for schema element at key " + "/".join(key))
124
125        if isinstance(schema["type"], list):
126            max_size = 0
127            for t in schema["type"]:
128                s = copy.deepcopy(schema)
129                s["type"] = t
130                max_size = max(max_size, Schema._get_schema_size(s, key))
131            return max_size
132
133        # TODO: Tests and finalize the different types available and how they map to BQ
134        # e.g. (allOf, anyOf, etc.)
135        if schema["type"] == "object":
136            # Sometimes the "properties" field is empty...
137            if "properties" in schema and schema["properties"]:
138                # A ROW type with a known set of fields
139                return sum(
140                    (
141                        Schema._get_schema_size(p, key=key + (n,))
142                        for n, p in schema["properties"].items()
143                    )
144                )
145
146            # A MAP type with key and value groups
147            return 2
148
149        if schema["type"] == "array":
150            if "items" not in schema:
151                raise Exception(
152                    "Missing items for array schema element at key " + "/".join(key)
153                )
154            # Arrays are repeated fields, get its size
155            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
156
157        # Otherwise, assume a scalar value
158        return 1
class SchemaException(builtins.Exception):
18class SchemaException(Exception):
19    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SchemaEncoder(json.encoder.JSONEncoder):
22class SchemaEncoder(JSONEncoder):
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Extensible JSON http://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj):
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
item_separator
key_separator
skipkeys
ensure_ascii
check_circular
allow_nan
sort_keys
indent
encode
iterencode
class Schema:
 34class Schema(object):
 35    def __init__(self, schema: dict):
 36        self.schema = schema
 37
 38    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 39        """
 40        @param key: The key set
 41        @param elem: The value to set the key to
 42        @param propagate: If True, creates objects until it reaches the full key.
 43                          If False, and the parent of the key is not in the
 44                          schema, then the key will not be added.
 45        """
 46        new_elem = self.schema
 47
 48        for k in key[:-1]:
 49            if k not in new_elem:
 50                if not propagate:
 51                    return
 52
 53                new_elem[k] = {}
 54                if k == "properties":
 55                    new_elem["type"] = "object"
 56            new_elem = new_elem[k]
 57
 58        new_elem[key[-1]] = elem
 59
 60    def get(self, key: Iterable[str]) -> Any:
 61        return _get(self.schema, key)
 62
 63    def get_size(self) -> int:
 64        return self._get_schema_size(self.schema)
 65
 66    def clone(self) -> Schema:
 67        return Schema(copy.deepcopy(self.schema))
 68
 69    def _delete_key(self, key: Iterable[str]):
 70        try:
 71            elem = _get(self.schema, key[:-1])
 72            del elem[key[-1]]
 73        except KeyError:
 74            return
 75
 76    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 77        """
 78        @param key: The key to remove
 79        @param propagate: If True, then removes any parents of the deleted key
 80                          if they are now empty, i.e. there are no other
 81                          `properties`.
 82        """
 83        self._delete_key(key)
 84
 85        # Now check, moving backwards, if that was the only available property
 86        # If it was, and there are no additionalProperties, delete the parent
 87        if propagate:
 88            for subkey in reversed([key[:i] for i in range(len(key))]):
 89                if not subkey or subkey[-1] == "properties":
 90                    # we only want to check the actual entry
 91                    continue
 92
 93                try:
 94                    elem = _get(self.schema, subkey)
 95                    if not elem.get("properties") and not elem.get(
 96                        "additionalProperties", False
 97                    ):
 98                        self._delete_key(subkey)
 99                except KeyError:
100                    break
101
102    def property_exists(self, key: Iterable[str]) -> bool:
103        """
104        @param key: The key to check for existence
105        """
106        target = self.schema
107        for x in key:
108            target = target.get(x, {})
109        return bool(target)
110
111    @staticmethod
112    def _get_schema_size(schema: dict, key=None) -> int:
113        if key is None:
114            key = tuple()
115
116        if isinstance(schema, list):
117            return sum(Schema._get_schema_size(s) for s in schema)
118
119        if "type" not in schema:
120            # A JSON column is just that: one column
121            if schema.get("format") == "json":
122                return 1
123
124            raise Exception("Missing type for schema element at key " + "/".join(key))
125
126        if isinstance(schema["type"], list):
127            max_size = 0
128            for t in schema["type"]:
129                s = copy.deepcopy(schema)
130                s["type"] = t
131                max_size = max(max_size, Schema._get_schema_size(s, key))
132            return max_size
133
134        # TODO: Tests and finalize the different types available and how they map to BQ
135        # e.g. (allOf, anyOf, etc.)
136        if schema["type"] == "object":
137            # Sometimes the "properties" field is empty...
138            if "properties" in schema and schema["properties"]:
139                # A ROW type with a known set of fields
140                return sum(
141                    (
142                        Schema._get_schema_size(p, key=key + (n,))
143                        for n, p in schema["properties"].items()
144                    )
145                )
146
147            # A MAP type with key and value groups
148            return 2
149
150        if schema["type"] == "array":
151            if "items" not in schema:
152                raise Exception(
153                    "Missing items for array schema element at key " + "/".join(key)
154                )
155            # Arrays are repeated fields, get its size
156            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
157
158        # Otherwise, assume a scalar value
159        return 1
Schema(schema: dict)
35    def __init__(self, schema: dict):
36        self.schema = schema
schema
def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
38    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
39        """
40        @param key: The key set
41        @param elem: The value to set the key to
42        @param propagate: If True, creates objects until it reaches the full key.
43                          If False, and the parent of the key is not in the
44                          schema, then the key will not be added.
45        """
46        new_elem = self.schema
47
48        for k in key[:-1]:
49            if k not in new_elem:
50                if not propagate:
51                    return
52
53                new_elem[k] = {}
54                if k == "properties":
55                    new_elem["type"] = "object"
56            new_elem = new_elem[k]
57
58        new_elem[key[-1]] = elem

@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.

def get(self, key: Iterable[str]) -> Any:
60    def get(self, key: Iterable[str]) -> Any:
61        return _get(self.schema, key)
def get_size(self) -> int:
63    def get_size(self) -> int:
64        return self._get_schema_size(self.schema)
def clone(self) -> Schema:
66    def clone(self) -> Schema:
67        return Schema(copy.deepcopy(self.schema))
def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 76    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 77        """
 78        @param key: The key to remove
 79        @param propagate: If True, then removes any parents of the deleted key
 80                          if they are now empty, i.e. there are no other
 81                          `properties`.
 82        """
 83        self._delete_key(key)
 84
 85        # Now check, moving backwards, if that was the only available property
 86        # If it was, and there are no additionalProperties, delete the parent
 87        if propagate:
 88            for subkey in reversed([key[:i] for i in range(len(key))]):
 89                if not subkey or subkey[-1] == "properties":
 90                    # we only want to check the actual entry
 91                    continue
 92
 93                try:
 94                    elem = _get(self.schema, subkey)
 95                    if not elem.get("properties") and not elem.get(
 96                        "additionalProperties", False
 97                    ):
 98                        self._delete_key(subkey)
 99                except KeyError:
100                    break

@param key: The key to remove @param propagate: If True, then removes any parents of the deleted key if they are now empty, i.e. there are no other properties.

def property_exists(self, key: Iterable[str]) -> bool:
102    def property_exists(self, key: Iterable[str]) -> bool:
103        """
104        @param key: The key to check for existence
105        """
106        target = self.schema
107        for x in key:
108            target = target.get(x, {})
109        return bool(target)

@param key: The key to check for existence