mozilla_schema_generator.schema

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7
  8from __future__ import annotations
  9
 10import copy
 11from json import JSONEncoder
 12from typing import Any, Iterable
 13
 14from .utils import _get
 15
 16
 17class SchemaException(Exception):
 18    pass
 19
 20
 21class SchemaEncoder(JSONEncoder):
 22    def default(self, obj):
 23        if isinstance(obj, Schema):
 24            return obj.schema
 25        if isinstance(obj, dict):
 26            return {k: self.default(v) for k, v in obj.items()}
 27        if isinstance(obj, list):
 28            return [self.default(v) for v in obj]
 29        return JSONEncoder.default(self, obj)
 30
 31
 32# TODO: s/Schema/JSONSchema
 33class Schema(object):
 34    def __init__(self, schema: dict):
 35        self.schema = schema
 36
 37    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 38        """
 39        @param key: The key set
 40        @param elem: The value to set the key to
 41        @param propagate: If True, creates objects until it reaches the full key.
 42                          If False, and the parent of the key is not in the
 43                          schema, then the key will not be added.
 44        """
 45        new_elem = self.schema
 46
 47        for k in key[:-1]:
 48            if k not in new_elem:
 49                if not propagate:
 50                    return
 51
 52                new_elem[k] = {}
 53                if k == "properties":
 54                    new_elem["type"] = "object"
 55            new_elem = new_elem[k]
 56
 57        new_elem[key[-1]] = elem
 58
 59    def get(self, key: Iterable[str]) -> Any:
 60        return _get(self.schema, key)
 61
 62    def get_size(self) -> int:
 63        return self._get_schema_size(self.schema)
 64
 65    def clone(self) -> Schema:
 66        return Schema(copy.deepcopy(self.schema))
 67
 68    def _delete_key(self, key: Iterable[str]):
 69        try:
 70            elem = _get(self.schema, key[:-1])
 71            del elem[key[-1]]
 72        except KeyError:
 73            return
 74
 75    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 76        """
 77        @param key: The key to remove
 78        @param propagate: If True, then removes any parents of the deleted key
 79                          if they are now empty, i.e. there are no other
 80                          `properties`.
 81        """
 82        self._delete_key(key)
 83
 84        # Now check, moving backwards, if that was the only available property
 85        # If it was, and there are no additionalProperties, delete the parent
 86        if propagate:
 87            for subkey in reversed([key[:i] for i in range(len(key))]):
 88                if not subkey or subkey[-1] == "properties":
 89                    # we only want to check the actual entry
 90                    continue
 91
 92                try:
 93                    elem = _get(self.schema, subkey)
 94                    if not elem.get("properties") and not elem.get(
 95                        "additionalProperties", False
 96                    ):
 97                        self._delete_key(subkey)
 98                except KeyError:
 99                    break
100
101    def property_exists(self, key: Iterable[str]) -> bool:
102        """
103        @param key: The key to check for existence
104        """
105        target = self.schema
106        for x in key:
107            target = target.get(x, {})
108        return bool(target)
109
110    @staticmethod
111    def _get_schema_size(schema: dict, key=None) -> int:
112        if key is None:
113            key = tuple()
114
115        if isinstance(schema, list):
116            return sum(Schema._get_schema_size(s) for s in schema)
117
118        if "type" not in schema:
119            raise Exception("Missing type for schema element at key " + "/".join(key))
120
121        if isinstance(schema["type"], list):
122            max_size = 0
123            for t in schema["type"]:
124                s = copy.deepcopy(schema)
125                s["type"] = t
126                max_size = max(max_size, Schema._get_schema_size(s, key))
127            return max_size
128
129        # TODO: Tests and finalize the different types available and how they map to BQ
130        # e.g. (allOf, anyOf, etc.)
131        if schema["type"] == "object":
132            # Sometimes the "properties" field is empty...
133            if "properties" in schema and schema["properties"]:
134                # A ROW type with a known set of fields
135                return sum(
136                    (
137                        Schema._get_schema_size(p, key=key + (n,))
138                        for n, p in schema["properties"].items()
139                    )
140                )
141
142            # A MAP type with key and value groups
143            return 2
144
145        if schema["type"] == "array":
146            if "items" not in schema:
147                raise Exception(
148                    "Missing items for array schema element at key " + "/".join(key)
149                )
150            # Arrays are repeated fields, get its size
151            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
152
153        # Otherwise, assume a scalar value
154        return 1
class SchemaException(builtins.Exception):
18class SchemaException(Exception):
19    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class SchemaEncoder(json.encoder.JSONEncoder):
22class SchemaEncoder(JSONEncoder):
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Extensible JSON http://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj)
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
item_separator
key_separator
encode
iterencode
class Schema:
 34class Schema(object):
 35    def __init__(self, schema: dict):
 36        self.schema = schema
 37
 38    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 39        """
 40        @param key: The key set
 41        @param elem: The value to set the key to
 42        @param propagate: If True, creates objects until it reaches the full key.
 43                          If False, and the parent of the key is not in the
 44                          schema, then the key will not be added.
 45        """
 46        new_elem = self.schema
 47
 48        for k in key[:-1]:
 49            if k not in new_elem:
 50                if not propagate:
 51                    return
 52
 53                new_elem[k] = {}
 54                if k == "properties":
 55                    new_elem["type"] = "object"
 56            new_elem = new_elem[k]
 57
 58        new_elem[key[-1]] = elem
 59
 60    def get(self, key: Iterable[str]) -> Any:
 61        return _get(self.schema, key)
 62
 63    def get_size(self) -> int:
 64        return self._get_schema_size(self.schema)
 65
 66    def clone(self) -> Schema:
 67        return Schema(copy.deepcopy(self.schema))
 68
 69    def _delete_key(self, key: Iterable[str]):
 70        try:
 71            elem = _get(self.schema, key[:-1])
 72            del elem[key[-1]]
 73        except KeyError:
 74            return
 75
 76    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 77        """
 78        @param key: The key to remove
 79        @param propagate: If True, then removes any parents of the deleted key
 80                          if they are now empty, i.e. there are no other
 81                          `properties`.
 82        """
 83        self._delete_key(key)
 84
 85        # Now check, moving backwards, if that was the only available property
 86        # If it was, and there are no additionalProperties, delete the parent
 87        if propagate:
 88            for subkey in reversed([key[:i] for i in range(len(key))]):
 89                if not subkey or subkey[-1] == "properties":
 90                    # we only want to check the actual entry
 91                    continue
 92
 93                try:
 94                    elem = _get(self.schema, subkey)
 95                    if not elem.get("properties") and not elem.get(
 96                        "additionalProperties", False
 97                    ):
 98                        self._delete_key(subkey)
 99                except KeyError:
100                    break
101
102    def property_exists(self, key: Iterable[str]) -> bool:
103        """
104        @param key: The key to check for existence
105        """
106        target = self.schema
107        for x in key:
108            target = target.get(x, {})
109        return bool(target)
110
111    @staticmethod
112    def _get_schema_size(schema: dict, key=None) -> int:
113        if key is None:
114            key = tuple()
115
116        if isinstance(schema, list):
117            return sum(Schema._get_schema_size(s) for s in schema)
118
119        if "type" not in schema:
120            raise Exception("Missing type for schema element at key " + "/".join(key))
121
122        if isinstance(schema["type"], list):
123            max_size = 0
124            for t in schema["type"]:
125                s = copy.deepcopy(schema)
126                s["type"] = t
127                max_size = max(max_size, Schema._get_schema_size(s, key))
128            return max_size
129
130        # TODO: Tests and finalize the different types available and how they map to BQ
131        # e.g. (allOf, anyOf, etc.)
132        if schema["type"] == "object":
133            # Sometimes the "properties" field is empty...
134            if "properties" in schema and schema["properties"]:
135                # A ROW type with a known set of fields
136                return sum(
137                    (
138                        Schema._get_schema_size(p, key=key + (n,))
139                        for n, p in schema["properties"].items()
140                    )
141                )
142
143            # A MAP type with key and value groups
144            return 2
145
146        if schema["type"] == "array":
147            if "items" not in schema:
148                raise Exception(
149                    "Missing items for array schema element at key " + "/".join(key)
150                )
151            # Arrays are repeated fields, get its size
152            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
153
154        # Otherwise, assume a scalar value
155        return 1
Schema(schema: dict)
35    def __init__(self, schema: dict):
36        self.schema = schema
def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
38    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
39        """
40        @param key: The key set
41        @param elem: The value to set the key to
42        @param propagate: If True, creates objects until it reaches the full key.
43                          If False, and the parent of the key is not in the
44                          schema, then the key will not be added.
45        """
46        new_elem = self.schema
47
48        for k in key[:-1]:
49            if k not in new_elem:
50                if not propagate:
51                    return
52
53                new_elem[k] = {}
54                if k == "properties":
55                    new_elem["type"] = "object"
56            new_elem = new_elem[k]
57
58        new_elem[key[-1]] = elem

@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.

def get(self, key: Iterable[str]) -> Any:
60    def get(self, key: Iterable[str]) -> Any:
61        return _get(self.schema, key)
def get_size(self) -> int:
63    def get_size(self) -> int:
64        return self._get_schema_size(self.schema)
def clone(self) -> mozilla_schema_generator.schema.Schema:
66    def clone(self) -> Schema:
67        return Schema(copy.deepcopy(self.schema))
def delete_group_from_schema(self, key: Iterable[str], *, propagate=True)
 76    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 77        """
 78        @param key: The key to remove
 79        @param propagate: If True, then removes any parents of the deleted key
 80                          if they are now empty, i.e. there are no other
 81                          `properties`.
 82        """
 83        self._delete_key(key)
 84
 85        # Now check, moving backwards, if that was the only available property
 86        # If it was, and there are no additionalProperties, delete the parent
 87        if propagate:
 88            for subkey in reversed([key[:i] for i in range(len(key))]):
 89                if not subkey or subkey[-1] == "properties":
 90                    # we only want to check the actual entry
 91                    continue
 92
 93                try:
 94                    elem = _get(self.schema, subkey)
 95                    if not elem.get("properties") and not elem.get(
 96                        "additionalProperties", False
 97                    ):
 98                        self._delete_key(subkey)
 99                except KeyError:
100                    break

@param key: The key to remove @param propagate: If True, then removes any parents of the deleted key if they are now empty, i.e. there are no other properties.

def property_exists(self, key: Iterable[str]) -> bool:
102    def property_exists(self, key: Iterable[str]) -> bool:
103        """
104        @param key: The key to check for existence
105        """
106        target = self.schema
107        for x in key:
108            target = target.get(x, {})
109        return bool(target)

@param key: The key to check for existence