mozilla_schema_generator.subset_pings

  1import json
  2import re
  3from collections import defaultdict
  4from copy import deepcopy
  5from pathlib import Path
  6from typing import Dict, Tuple
  7
  8# most metadata fields are added to the bq schema directly and left out of the json schema, but
  9# fields here appear in the json schema and must be explicitly included in all resulting pings
 10ADDITIONAL_METADATA_FIELDS = [
 11    "client_id",
 12    "clientId",
 13    "client_info",
 14]
 15
 16
 17def _get_path(out_dir, namespace, doctype, version):
 18    return out_dir / namespace / doctype / f"{doctype}.{version}.schema.json"
 19
 20
 21def _path_string(*path):
 22    return ".".join(path)
 23
 24
 25def _schema_copy(src, pattern, dst=None, delete=True, prefix=()):
 26    if src.get("type") != "object" or "properties" not in src:
 27        # only recurse into objects with explicitly defined properties
 28        return None
 29    src_props = src["properties"]
 30    dst_props = {}
 31    for name, src_subschema in list(src_props.items()):
 32        path = ".".join((*prefix, name))
 33        if pattern.fullmatch(path):
 34            prop = src_props.pop(name) if delete else deepcopy(src_props[name])
 35        else:
 36            prop = _schema_copy(
 37                src_subschema,
 38                pattern,
 39                dst=None if dst is None else dst["properties"].get(name, None),
 40                delete=delete,
 41                prefix=(*prefix, name),
 42            )
 43        if prop is not None:
 44            dst_props[name] = prop
 45    if dst_props:
 46        if dst is None:
 47            return {"properties": dst_props, "type": "object"}
 48        else:
 49            dst["properties"].update(dst_props)
 50            return dst
 51    return None
 52
 53
 54def _copy_metadata(source, destination):
 55    for key in ("$id", "$schema", "mozPipelineMetadata"):
 56        if key not in source:
 57            continue
 58        elif isinstance(source[key], dict):
 59            destination[key] = deepcopy(source[key])
 60        else:
 61            destination[key] = source[key]
 62    for key in ADDITIONAL_METADATA_FIELDS:
 63        if key in source["properties"]:
 64            destination["properties"][key] = deepcopy(source["properties"][key])
 65
 66
 67def _update_pipeline_metadata(schema, namespace, doctype, version):
 68    pipeline_metadata = schema["mozPipelineMetadata"]
 69    pipeline_metadata["bq_dataset_family"] = namespace
 70    pipeline_metadata["bq_table"] = f'{doctype.replace("-", "_")}_v{version}'
 71
 72
 73def _target_as_tuple(target: Dict[str, str]) -> Tuple[str, str, str]:
 74    return (
 75        target["document_namespace"],
 76        target["document_type"],
 77        target["document_version"],
 78    )
 79
 80
 81def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]:
 82    """Read in pings from disk and split fields into new subset pings.
 83
 84    If configured, also produce a remainder ping with all the fields that weren't moved.
 85    """
 86    schemas = defaultdict(lambda: defaultdict(dict))
 87    # read in pings and split them according to config
 88    for source in config_data:
 89        src_namespace, src_doctype, src_version = _target_as_tuple(source)
 90        src_path = _get_path(out_dir, src_namespace, src_doctype, src_version)
 91        schema = json.loads(src_path.read_text())
 92
 93        config = schema["mozPipelineMetadata"].pop("split_config")
 94        for subset_config in config["subsets"]:
 95            dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config)
 96            pattern = re.compile(subset_config["pattern"])
 97            subset = _schema_copy(schema, pattern, delete=True)
 98            assert subset is not None, "Subset pattern matched no paths"
 99            if "extra_pattern" in subset_config:
100                # match paths where the schema must be present in the remainder because
101                # schemas cannot delete fields, but data must only go to the subset.
102                pattern = re.compile(subset_config["extra_pattern"])
103                subset = _schema_copy(schema, pattern, dst=subset, delete=False)
104                assert subset is not None, "Subset extra_pattern matched no paths"
105            _copy_metadata(schema, subset)
106            _update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version)
107            schemas[dst_namespace][dst_doctype][dst_version] = subset
108        remainder_config = config.get("remainder")
109        if remainder_config:
110            dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config)
111            # no need to copy metadata
112            _update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version)
113            schemas[dst_namespace][dst_doctype][dst_version] = schema
114    return schemas
ADDITIONAL_METADATA_FIELDS = ['client_id', 'clientId', 'client_info']
def generate( config_data, out_dir: pathlib.Path) -> Dict[str, Dict[str, Dict[str, Dict]]]:
 82def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]:
 83    """Read in pings from disk and split fields into new subset pings.
 84
 85    If configured, also produce a remainder ping with all the fields that weren't moved.
 86    """
 87    schemas = defaultdict(lambda: defaultdict(dict))
 88    # read in pings and split them according to config
 89    for source in config_data:
 90        src_namespace, src_doctype, src_version = _target_as_tuple(source)
 91        src_path = _get_path(out_dir, src_namespace, src_doctype, src_version)
 92        schema = json.loads(src_path.read_text())
 93
 94        config = schema["mozPipelineMetadata"].pop("split_config")
 95        for subset_config in config["subsets"]:
 96            dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config)
 97            pattern = re.compile(subset_config["pattern"])
 98            subset = _schema_copy(schema, pattern, delete=True)
 99            assert subset is not None, "Subset pattern matched no paths"
100            if "extra_pattern" in subset_config:
101                # match paths where the schema must be present in the remainder because
102                # schemas cannot delete fields, but data must only go to the subset.
103                pattern = re.compile(subset_config["extra_pattern"])
104                subset = _schema_copy(schema, pattern, dst=subset, delete=False)
105                assert subset is not None, "Subset extra_pattern matched no paths"
106            _copy_metadata(schema, subset)
107            _update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version)
108            schemas[dst_namespace][dst_doctype][dst_version] = subset
109        remainder_config = config.get("remainder")
110        if remainder_config:
111            dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config)
112            # no need to copy metadata
113            _update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version)
114            schemas[dst_namespace][dst_doctype][dst_version] = schema
115    return schemas

Read in pings from disk and split fields into new subset pings.

If configured, also produce a remainder ping with all the fields that weren't moved.