mozilla_schema_generator.schema

View Source
# -*- coding: utf-8 -*-

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


from __future__ import annotations

import copy
from json import JSONEncoder
from typing import Any, Iterable

from .utils import _get


class SchemaException(Exception):
    pass


class SchemaEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Schema):
            return obj.schema
        if isinstance(obj, dict):
            return {k: self.default(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [self.default(v) for v in obj]
        return JSONEncoder.default(self, obj)


# TODO: s/Schema/JSONSchema
class Schema(object):
    def __init__(self, schema: dict):
        self.schema = schema

    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
        """
        @param key: The key set
        @param elem: The value to set the key to
        @param propagate: If True, creates objects until it reaches the full key.
                          If False, and the parent of the key is not in the
                          schema, then the key will not be added.
        """
        new_elem = self.schema

        for k in key[:-1]:
            if k not in new_elem:
                if not propagate:
                    return

                new_elem[k] = {}
                if k == "properties":
                    new_elem["type"] = "object"
            new_elem = new_elem[k]

        new_elem[key[-1]] = elem

    def get(self, key: Iterable[str]) -> Any:
        return _get(self.schema, key)

    def get_size(self) -> int:
        return self._get_schema_size(self.schema)

    def clone(self) -> Schema:
        return Schema(copy.deepcopy(self.schema))

    def _delete_key(self, key: Iterable[str]):
        try:
            elem = _get(self.schema, key[:-1])
            del elem[key[-1]]
        except KeyError:
            return

    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
        """
        @param key: The key to remove
        @param propagate: If True, then removes any parents of the deleted key
                          if they are now empty, i.e. there are no other
                          `properties`.
        """
        self._delete_key(key)

        # Now check, moving backwards, if that was the only available property
        # If it was, and there are no additionalProperties, delete the parent
        if propagate:
            for subkey in reversed([key[:i] for i in range(len(key))]):
                if not subkey or subkey[-1] == "properties":
                    # we only want to check the actual entry
                    continue

                try:
                    elem = _get(self.schema, subkey)
                    if not elem.get("properties") and not elem.get(
                        "additionalProperties", False
                    ):
                        self._delete_key(subkey)
                except KeyError:
                    break

    def property_exists(self, key: Iterable[str]) -> bool:
        """
        @param key: The key to check for existence
        """
        target = self.schema
        for x in key:
            target = target.get(x, {})
        return bool(target)

    @staticmethod
    def _get_schema_size(schema: dict, key=None) -> int:
        if key is None:
            key = tuple()

        if isinstance(schema, list):
            return sum(Schema._get_schema_size(s) for s in schema)

        if "type" not in schema:
            raise Exception("Missing type for schema element at key " + "/".join(key))

        if isinstance(schema["type"], list):
            max_size = 0
            for t in schema["type"]:
                s = copy.deepcopy(schema)
                s["type"] = t
                max_size = max(max_size, Schema._get_schema_size(s, key))
            return max_size

        # TODO: Tests and finalize the different types available and how they map to BQ
        # e.g. (allOf, anyOf, etc.)
        if schema["type"] == "object":
            # Sometimes the "properties" field is empty...
            if "properties" in schema and schema["properties"]:
                # A ROW type with a known set of fields
                return sum(
                    (
                        Schema._get_schema_size(p, key=key + (n,))
                        for n, p in schema["properties"].items()
                    )
                )

            # A MAP type with key and value groups
            return 2

        if schema["type"] == "array":
            if "items" not in schema:
                raise Exception(
                    "Missing items for array schema element at key " + "/".join(key)
                )
            # Arrays are repeated fields, get its size
            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))

        # Otherwise, assume a scalar value
        return 1
#   class SchemaException(builtins.Exception):
View Source
class SchemaException(Exception):
    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
#   class SchemaEncoder(json.encoder.JSONEncoder):
View Source
class SchemaEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Schema):
            return obj.schema
        if isinstance(obj, dict):
            return {k: self.default(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [self.default(v) for v in obj]
        return JSONEncoder.default(self, obj)

Extensible JSON http://json.org encoder for Python data structures.

Supports the following objects and types by default:

+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+

To extend this to recognize other objects, subclass and implement a .default() method with another method that returns a serializable object for o if possible, otherwise it should call the superclass implementation (to raise TypeError).

#   def default(self, obj):
View Source
    def default(self, obj):
        if isinstance(obj, Schema):
            return obj.schema
        if isinstance(obj, dict):
            return {k: self.default(v) for k, v in obj.items()}
        if isinstance(obj, list):
            return [self.default(v) for v in obj]
        return JSONEncoder.default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Inherited Members
json.encoder.JSONEncoder
JSONEncoder
item_separator
key_separator
encode
iterencode
#   class Schema:
View Source
class Schema(object):
    def __init__(self, schema: dict):
        self.schema = schema

    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
        """
        @param key: The key set
        @param elem: The value to set the key to
        @param propagate: If True, creates objects until it reaches the full key.
                          If False, and the parent of the key is not in the
                          schema, then the key will not be added.
        """
        new_elem = self.schema

        for k in key[:-1]:
            if k not in new_elem:
                if not propagate:
                    return

                new_elem[k] = {}
                if k == "properties":
                    new_elem["type"] = "object"
            new_elem = new_elem[k]

        new_elem[key[-1]] = elem

    def get(self, key: Iterable[str]) -> Any:
        return _get(self.schema, key)

    def get_size(self) -> int:
        return self._get_schema_size(self.schema)

    def clone(self) -> Schema:
        return Schema(copy.deepcopy(self.schema))

    def _delete_key(self, key: Iterable[str]):
        try:
            elem = _get(self.schema, key[:-1])
            del elem[key[-1]]
        except KeyError:
            return

    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
        """
        @param key: The key to remove
        @param propagate: If True, then removes any parents of the deleted key
                          if they are now empty, i.e. there are no other
                          `properties`.
        """
        self._delete_key(key)

        # Now check, moving backwards, if that was the only available property
        # If it was, and there are no additionalProperties, delete the parent
        if propagate:
            for subkey in reversed([key[:i] for i in range(len(key))]):
                if not subkey or subkey[-1] == "properties":
                    # we only want to check the actual entry
                    continue

                try:
                    elem = _get(self.schema, subkey)
                    if not elem.get("properties") and not elem.get(
                        "additionalProperties", False
                    ):
                        self._delete_key(subkey)
                except KeyError:
                    break

    def property_exists(self, key: Iterable[str]) -> bool:
        """
        @param key: The key to check for existence
        """
        target = self.schema
        for x in key:
            target = target.get(x, {})
        return bool(target)

    @staticmethod
    def _get_schema_size(schema: dict, key=None) -> int:
        if key is None:
            key = tuple()

        if isinstance(schema, list):
            return sum(Schema._get_schema_size(s) for s in schema)

        if "type" not in schema:
            raise Exception("Missing type for schema element at key " + "/".join(key))

        if isinstance(schema["type"], list):
            max_size = 0
            for t in schema["type"]:
                s = copy.deepcopy(schema)
                s["type"] = t
                max_size = max(max_size, Schema._get_schema_size(s, key))
            return max_size

        # TODO: Tests and finalize the different types available and how they map to BQ
        # e.g. (allOf, anyOf, etc.)
        if schema["type"] == "object":
            # Sometimes the "properties" field is empty...
            if "properties" in schema and schema["properties"]:
                # A ROW type with a known set of fields
                return sum(
                    (
                        Schema._get_schema_size(p, key=key + (n,))
                        for n, p in schema["properties"].items()
                    )
                )

            # A MAP type with key and value groups
            return 2

        if schema["type"] == "array":
            if "items" not in schema:
                raise Exception(
                    "Missing items for array schema element at key " + "/".join(key)
                )
            # Arrays are repeated fields, get its size
            return Schema._get_schema_size(schema["items"], key=key + ("arr-items",))

        # Otherwise, assume a scalar value
        return 1
#   Schema(schema: dict)
View Source
    def __init__(self, schema: dict):
        self.schema = schema
#   def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
View Source
    def set_schema_elem(self, key: Iterable[str], elem: Any, *, propagate=True) -> dict:
        """
        @param key: The key set
        @param elem: The value to set the key to
        @param propagate: If True, creates objects until it reaches the full key.
                          If False, and the parent of the key is not in the
                          schema, then the key will not be added.
        """
        new_elem = self.schema

        for k in key[:-1]:
            if k not in new_elem:
                if not propagate:
                    return

                new_elem[k] = {}
                if k == "properties":
                    new_elem["type"] = "object"
            new_elem = new_elem[k]

        new_elem[key[-1]] = elem

@param key: The key set @param elem: The value to set the key to @param propagate: If True, creates objects until it reaches the full key. If False, and the parent of the key is not in the schema, then the key will not be added.

#   def get(self, key: Iterable[str]) -> Any:
View Source
    def get(self, key: Iterable[str]) -> Any:
        return _get(self.schema, key)
#   def get_size(self) -> int:
View Source
    def get_size(self) -> int:
        return self._get_schema_size(self.schema)
View Source
    def clone(self) -> Schema:
        return Schema(copy.deepcopy(self.schema))
#   def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
View Source
    def delete_group_from_schema(self, key: Iterable[str], *, propagate=True):
        """
        @param key: The key to remove
        @param propagate: If True, then removes any parents of the deleted key
                          if they are now empty, i.e. there are no other
                          `properties`.
        """
        self._delete_key(key)

        # Now check, moving backwards, if that was the only available property
        # If it was, and there are no additionalProperties, delete the parent
        if propagate:
            for subkey in reversed([key[:i] for i in range(len(key))]):
                if not subkey or subkey[-1] == "properties":
                    # we only want to check the actual entry
                    continue

                try:
                    elem = _get(self.schema, subkey)
                    if not elem.get("properties") and not elem.get(
                        "additionalProperties", False
                    ):
                        self._delete_key(subkey)
                except KeyError:
                    break

@param key: The key to remove @param propagate: If True, then removes any parents of the deleted key if they are now empty, i.e. there are no other properties.

#   def property_exists(self, key: Iterable[str]) -> bool:
View Source
    def property_exists(self, key: Iterable[str]) -> bool:
        """
        @param key: The key to check for existence
        """
        target = self.schema
        for x in key:
            target = target.get(x, {})
        return bool(target)

@param key: The key to check for existence