mozilla_schema_generator.schema

  1# -*- coding: utf-8 -*-
  2
  3# This Source Code Form is subject to the terms of the Mozilla Public
  4# License, v. 2.0. If a copy of the MPL was not distributed with this
  5# file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6
  7
  8from __future__ import annotations
  9
 10import copy
 11from json import JSONEncoder
 12from typing import Any, Iterable
 13
 14from .utils import _get
 15
 16
 17class SchemaException(Exception):
 18    pass
 19
 20
 21class SchemaEncoder(JSONEncoder):
 22    def default(self, obj):
 23        if isinstance(obj, Schema):
 24            return obj.schema
 25        if isinstance(obj, dict):
 26            return {k: self.default(v) for k, v in obj.items()}
 27        if isinstance(obj, list):
 28            return [self.default(v) for v in obj]
 29        return JSONEncoder.default(self, obj)
 30
 31
 32# TODO: s/Schema/JSONSchema
 33class Schema(object):
 34    def __init__(self, schema: dict):
 35        self.schema = schema
 36
 37    def __eq__(self, other):
 38        return isinstance(other, Schema) and self.schema == other.schema
 39
 40    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 41        """
 42        @param key: The key set
 43        @param elem: The value to set the key to
 44        @param propagate: If True, creates objects until it reaches the full key.
 45                          If False, and the parent of the key is not in the
 46                          schema, then the key will not be added.
 47        """
 48        new_elem = self.schema
 49
 50        for k in key[:-1]:
 51            if k not in new_elem:
 52                if not propagate:
 53                    return
 54
 55                new_elem[k] = {}
 56                if k == "properties":
 57                    new_elem["type"] = "object"
 58            new_elem = new_elem[k]
 59
 60        new_elem[key[-1]] = elem
 61
 62    def get(self, key: Iterable[str]) -> Any:
 63        return _get(self.schema, key)
 64
 65    def get_size(self) -> int:
 66        return self._get_schema_size(self.schema)
 67
 68    def clone(self) -> Schema:
 69        return Schema(copy.deepcopy(self.schema))
 70
 71    def _delete_key(self, key: Iterable[str]):
 72        try:
 73            elem = _get(self.schema, key[:-1])
 74            del elem[key[-1]]
 75        except KeyError:
 76            return
 77
 78    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 79        """
 80        @param key: The key to remove
 81        @param propagate: If True, then removes any parents of the deleted key
 82                          if they are now empty, i.e. there are no other
 83                          `properties`.
 84        """
 85        self._delete_key(key)
 86
 87        # Now check, moving backwards, if that was the only available property
 88        # If it was, and there are no additionalProperties, delete the parent
 89        if propagate:
 90            for subkey in reversed([key[:i] for i in range(len(key))]):
 91                if not subkey or subkey[-1] == "properties":
 92                    # we only want to check the actual entry
 93                    continue
 94
 95                try:
 96                    elem = _get(self.schema, subkey)
 97                    if not elem.get("properties") and not elem.get(
 98                        "additionalProperties", False
 99                    ):
100                        self._delete_key(subkey)
101                except KeyError:
102                    break
103
104    def property_exists(self, key: Iterable[str]) -> bool:
105        """
106        @param key: The key to check for existence
107        """
108        target = self.schema
109        for x in key:
110            target = target.get(x, {})
111        return bool(target)
112
113    @staticmethod
114    def _get_schema_size(schema: dict, key=None) -> int:
115        if key is None:
116            key = tuple()
117
118        if isinstance(schema, list):
119            return sum(Schema._get_schema_size(s) for s in schema)
120
121        if "type" not in schema:
122            # A JSON column is just that: one column
123            if schema.get("format") == "json":
124                return 1
125
126            raise Exception("Missing type for schema element at key " + "/".join(key))
127
128        if isinstance(schema["type"], list):
129            max_size = 0
130            for t in schema["type"]:
131                s = copy.deepcopy(schema)
132                s["type"] = t
133                max_size = max(max_size, Schema._get_schema_size(s, key))
134            return max_size
135
136        # TODO: Tests and finalize the different types available and how they map to BQ
137        # e.g. (allOf, anyOf, etc.)
138        if schema["type"] == "object":
139            # Sometimes the "properties" field is empty...
140            if "properties" in schema and schema["properties"]:
141                # A ROW type with a known set of fields
142                return sum(
143                    (
144                        Schema._get_schema_size(p, key=key + (n,))
145                        for n, p in schema["properties"].items()
146                    )
147                )
148
149            # A MAP type with key and value groups
150            return 2
151
152        if schema["type"] == "array":
153            if "items" not in schema:
154                raise Exception(
155                    "Missing items for array schema element at key " + "/".join(key)
156                )
157            # Arrays are repeated fields, get its size
158            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
159
160        # Otherwise, assume a scalar value
161        return 1
class SchemaException(builtins.Exception):
18class SchemaException(Exception):
19    pass

Common base class for all non-exit exceptions.

class SchemaEncoder(json.encoder.JSONEncoder):
22class SchemaEncoder(JSONEncoder):
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Extensible JSON https://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

def default(self, obj):
23    def default(self, obj):
24        if isinstance(obj, Schema):
25            return obj.schema
26        if isinstance(obj, dict):
27            return {k: self.default(v) for k, v in obj.items()}
28        if isinstance(obj, list):
29            return [self.default(v) for v in obj]
30        return JSONEncoder.default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return super().default(o)
class Schema:
 34class Schema(object):
 35    def __init__(self, schema: dict):
 36        self.schema = schema
 37
 38    def __eq__(self, other):
 39        return isinstance(other, Schema) and self.schema == other.schema
 40
 41    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
 42        """
 43        @param key: The key set
 44        @param elem: The value to set the key to
 45        @param propagate: If True, creates objects until it reaches the full key.
 46                          If False, and the parent of the key is not in the
 47                          schema, then the key will not be added.
 48        """
 49        new_elem = self.schema
 50
 51        for k in key[:-1]:
 52            if k not in new_elem:
 53                if not propagate:
 54                    return
 55
 56                new_elem[k] = {}
 57                if k == "properties":
 58                    new_elem["type"] = "object"
 59            new_elem = new_elem[k]
 60
 61        new_elem[key[-1]] = elem
 62
 63    def get(self, key: Iterable[str]) -> Any:
 64        return _get(self.schema, key)
 65
 66    def get_size(self) -> int:
 67        return self._get_schema_size(self.schema)
 68
 69    def clone(self) -> Schema:
 70        return Schema(copy.deepcopy(self.schema))
 71
 72    def _delete_key(self, key: Iterable[str]):
 73        try:
 74            elem = _get(self.schema, key[:-1])
 75            del elem[key[-1]]
 76        except KeyError:
 77            return
 78
 79    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 80        """
 81        @param key: The key to remove
 82        @param propagate: If True, then removes any parents of the deleted key
 83                          if they are now empty, i.e. there are no other
 84                          `properties`.
 85        """
 86        self._delete_key(key)
 87
 88        # Now check, moving backwards, if that was the only available property
 89        # If it was, and there are no additionalProperties, delete the parent
 90        if propagate:
 91            for subkey in reversed([key[:i] for i in range(len(key))]):
 92                if not subkey or subkey[-1] == "properties":
 93                    # we only want to check the actual entry
 94                    continue
 95
 96                try:
 97                    elem = _get(self.schema, subkey)
 98                    if not elem.get("properties") and not elem.get(
 99                        "additionalProperties", False
100                    ):
101                        self._delete_key(subkey)
102                except KeyError:
103                    break
104
105    def property_exists(self, key: Iterable[str]) -> bool:
106        """
107        @param key: The key to check for existence
108        """
109        target = self.schema
110        for x in key:
111            target = target.get(x, {})
112        return bool(target)
113
114    @staticmethod
115    def _get_schema_size(schema: dict, key=None) -> int:
116        if key is None:
117            key = tuple()
118
119        if isinstance(schema, list):
120            return sum(Schema._get_schema_size(s) for s in schema)
121
122        if "type" not in schema:
123            # A JSON column is just that: one column
124            if schema.get("format") == "json":
125                return 1
126
127            raise Exception("Missing type for schema element at key " + "/".join(key))
128
129        if isinstance(schema["type"], list):
130            max_size = 0
131            for t in schema["type"]:
132                s = copy.deepcopy(schema)
133                s["type"] = t
134                max_size = max(max_size, Schema._get_schema_size(s, key))
135            return max_size
136
137        # TODO: Tests and finalize the different types available and how they map to BQ
138        # e.g. (allOf, anyOf, etc.)
139        if schema["type"] == "object":
140            # Sometimes the "properties" field is empty...
141            if "properties" in schema and schema["properties"]:
142                # A ROW type with a known set of fields
143                return sum(
144                    (
145                        Schema._get_schema_size(p, key=key + (n,))
146                        for n, p in schema["properties"].items()
147                    )
148                )
149
150            # A MAP type with key and value groups
151            return 2
152
153        if schema["type"] == "array":
154            if "items" not in schema:
155                raise Exception(
156                    "Missing items for array schema element at key " + "/".join(key)
157                )
158            # Arrays are repeated fields, get its size
159            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))
160
161        # Otherwise, assume a scalar value
162        return 1
Schema(schema: dict)
35    def __init__(self, schema: dict):
36        self.schema = schema
schema
def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
41    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
42        """
43        @param key: The key set
44        @param elem: The value to set the key to
45        @param propagate: If True, creates objects until it reaches the full key.
46                          If False, and the parent of the key is not in the
47                          schema, then the key will not be added.
48        """
49        new_elem = self.schema
50
51        for k in key[:-1]:
52            if k not in new_elem:
53                if not propagate:
54                    return
55
56                new_elem[k] = {}
57                if k == "properties":
58                    new_elem["type"] = "object"
59            new_elem = new_elem[k]
60
61        new_elem[key[-1]] = elem

@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.

def get(self, key: Iterable[str]) -> Any:
63    def get(self, key: Iterable[str]) -> Any:
64        return _get(self.schema, key)
def get_size(self) -> int:
66    def get_size(self) -> int:
67        return self._get_schema_size(self.schema)
def clone(self) -> Schema:
69    def clone(self) -> Schema:
70        return Schema(copy.deepcopy(self.schema))
def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 79    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
 80        """
 81        @param key: The key to remove
 82        @param propagate: If True, then removes any parents of the deleted key
 83                          if they are now empty, i.e. there are no other
 84                          `properties`.
 85        """
 86        self._delete_key(key)
 87
 88        # Now check, moving backwards, if that was the only available property
 89        # If it was, and there are no additionalProperties, delete the parent
 90        if propagate:
 91            for subkey in reversed([key[:i] for i in range(len(key))]):
 92                if not subkey or subkey[-1] == "properties":
 93                    # we only want to check the actual entry
 94                    continue
 95
 96                try:
 97                    elem = _get(self.schema, subkey)
 98                    if not elem.get("properties") and not elem.get(
 99                        "additionalProperties", False
100                    ):
101                        self._delete_key(subkey)
102                except KeyError:
103                    break

@param key: The key to remove @param propagate: If True, then removes any parents of the deleted key if they are now empty, i.e. there are no other properties.

def property_exists(self, key: Iterable[str]) -> bool:
105    def property_exists(self, key: Iterable[str]) -> bool:
106        """
107        @param key: The key to check for existence
108        """
109        target = self.schema
110        for x in key:
111            target = target.get(x, {})
112        return bool(target)

@param key: The key to check for existence