mozilla_schema_generator.subset_pings
1import json 2import re 3from collections import defaultdict 4from copy import deepcopy 5from pathlib import Path 6from typing import Dict, Tuple 7 8# most metadata fields are added to the bq schema directly and left out of the json schema, but 9# fields here appear in the json schema and must be explicitly included in all resulting pings 10ADDITIONAL_METADATA_FIELDS = [ 11 "client_id", 12 "clientId", 13 "client_info", 14] 15 16 17def _get_path(out_dir, namespace, doctype, version): 18 return out_dir / namespace / doctype / f"{doctype}.{version}.schema.json" 19 20 21def _path_string(*path): 22 return ".".join(path) 23 24 25def _schema_copy(src, pattern, dst=None, delete=True, prefix=()): 26 if src.get("type") != "object" or "properties" not in src: 27 # only recurse into objects with explicitly defined properties 28 return None 29 src_props = src["properties"] 30 dst_props = {} 31 for name, src_subschema in list(src_props.items()): 32 path = ".".join((*prefix, name)) 33 if pattern.fullmatch(path): 34 prop = src_props.pop(name) if delete else deepcopy(src_props[name]) 35 else: 36 prop = _schema_copy( 37 src_subschema, 38 pattern, 39 dst=None if dst is None else dst["properties"].get(name, None), 40 delete=delete, 41 prefix=(*prefix, name), 42 ) 43 if prop is not None: 44 dst_props[name] = prop 45 if dst_props: 46 if dst is None: 47 return {"properties": dst_props, "type": "object"} 48 else: 49 dst["properties"].update(dst_props) 50 return dst 51 return None 52 53 54def _copy_metadata(source, destination): 55 for key in ("$id", "$schema", "mozPipelineMetadata"): 56 if key not in source: 57 continue 58 elif isinstance(source[key], dict): 59 destination[key] = deepcopy(source[key]) 60 else: 61 destination[key] = source[key] 62 for key in ADDITIONAL_METADATA_FIELDS: 63 if key in source["properties"]: 64 destination["properties"][key] = deepcopy(source["properties"][key]) 65 66 67def _update_pipeline_metadata(schema, namespace, doctype, version): 68 pipeline_metadata = schema["mozPipelineMetadata"] 69 pipeline_metadata["bq_dataset_family"] = namespace 70 pipeline_metadata["bq_table"] = f'{doctype.replace("-", "_")}_v{version}' 71 72 73def _target_as_tuple(target: Dict[str, str]) -> Tuple[str, str, str]: 74 return ( 75 target["document_namespace"], 76 target["document_type"], 77 target["document_version"], 78 ) 79 80 81def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]: 82 """Read in pings from disk and split fields into new subset pings. 83 84 If configured, also produce a remainder ping with all the fields that weren't moved. 85 """ 86 schemas = defaultdict(lambda: defaultdict(dict)) 87 # read in pings and split them according to config 88 for source in config_data: 89 src_namespace, src_doctype, src_version = _target_as_tuple(source) 90 src_path = _get_path(out_dir, src_namespace, src_doctype, src_version) 91 schema = json.loads(src_path.read_text()) 92 93 config = schema["mozPipelineMetadata"].pop("split_config") 94 for subset_config in config["subsets"]: 95 dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config) 96 pattern = re.compile(subset_config["pattern"]) 97 subset = _schema_copy(schema, pattern, delete=True) 98 assert subset is not None, "Subset pattern matched no paths" 99 if "extra_pattern" in subset_config: 100 # match paths where the schema must be present in the remainder because 101 # schemas cannot delete fields, but data must only go to the subset. 102 pattern = re.compile(subset_config["extra_pattern"]) 103 subset = _schema_copy(schema, pattern, dst=subset, delete=False) 104 assert subset is not None, "Subset extra_pattern matched no paths" 105 _copy_metadata(schema, subset) 106 _update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version) 107 schemas[dst_namespace][dst_doctype][dst_version] = subset 108 remainder_config = config.get("remainder") 109 if remainder_config: 110 dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config) 111 # no need to copy metadata 112 _update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version) 113 schemas[dst_namespace][dst_doctype][dst_version] = schema 114 return schemas
ADDITIONAL_METADATA_FIELDS =
['client_id', 'clientId', 'client_info']
def
generate( config_data, out_dir: pathlib.Path) -> Dict[str, Dict[str, Dict[str, Dict]]]:
82def generate(config_data, out_dir: Path) -> Dict[str, Dict[str, Dict[str, Dict]]]: 83 """Read in pings from disk and split fields into new subset pings. 84 85 If configured, also produce a remainder ping with all the fields that weren't moved. 86 """ 87 schemas = defaultdict(lambda: defaultdict(dict)) 88 # read in pings and split them according to config 89 for source in config_data: 90 src_namespace, src_doctype, src_version = _target_as_tuple(source) 91 src_path = _get_path(out_dir, src_namespace, src_doctype, src_version) 92 schema = json.loads(src_path.read_text()) 93 94 config = schema["mozPipelineMetadata"].pop("split_config") 95 for subset_config in config["subsets"]: 96 dst_namespace, dst_doctype, dst_version = _target_as_tuple(subset_config) 97 pattern = re.compile(subset_config["pattern"]) 98 subset = _schema_copy(schema, pattern, delete=True) 99 assert subset is not None, "Subset pattern matched no paths" 100 if "extra_pattern" in subset_config: 101 # match paths where the schema must be present in the remainder because 102 # schemas cannot delete fields, but data must only go to the subset. 103 pattern = re.compile(subset_config["extra_pattern"]) 104 subset = _schema_copy(schema, pattern, dst=subset, delete=False) 105 assert subset is not None, "Subset extra_pattern matched no paths" 106 _copy_metadata(schema, subset) 107 _update_pipeline_metadata(subset, dst_namespace, dst_doctype, dst_version) 108 schemas[dst_namespace][dst_doctype][dst_version] = subset 109 remainder_config = config.get("remainder") 110 if remainder_config: 111 dst_namespace, dst_doctype, dst_version = _target_as_tuple(remainder_config) 112 # no need to copy metadata 113 _update_pipeline_metadata(schema, dst_namespace, dst_doctype, dst_version) 114 schemas[dst_namespace][dst_doctype][dst_version] = schema 115 return schemas
Read in pings from disk and split fields into new subset pings.
If configured, also produce a remainder ping with all the fields that weren't moved.