mozilla_schema_generator.validate_bigquery
1#!/usr/bin/env python3 2import difflib 3import json 4import sys 5from pathlib import Path 6from shutil import copyfile 7from typing import Tuple 8 9import click 10from git import Repo 11 12BASE_DIR = Path("/app").resolve() 13 14 15def compute_compact_columns(document): 16 def traverse(prefix, columns): 17 res = [] 18 for node in columns: 19 name = node["name"] + (".[]" if node["mode"] == "REPEATED" else "") 20 dtype = node["type"] 21 if dtype == "RECORD": 22 res += traverse(f"{prefix}.{name}", node["fields"]) 23 else: 24 res += [f"{prefix}.{name} {dtype}"] 25 return res 26 27 res = traverse("root", document) 28 return sorted(res) 29 30 31def check_evolution(base, head, verbose=False): 32 def nop(*args, **kwargs): 33 pass 34 35 log = print if verbose else nop 36 37 a, b = set(base), set(head) 38 is_error = 0 39 # error condition 40 base_only = a - b 41 if len(base_only) > 0: 42 log("items removed from the base") 43 log("\n".join([f"-{x}" for x in base_only])) 44 log("") 45 # set the status 46 is_error = 1 47 48 # informative only 49 head_only = b - a 50 if len(head_only) > 0: 51 log("items added to the base") 52 log("\n".join([f"+{x}" for x in head_only])) 53 log("") 54 return is_error 55 56 57def copy_schemas(head: str, repository: Path, artifact: Path) -> Path: 58 """Copy BigQuery schemas to a directory as an intermediary step for schema 59 evolution checks.""" 60 src = Path(repository) 61 repo = Repo(repository) 62 dst = Path(artifact) / repo.rev_parse(head).name_rev.replace(" ", "_") 63 dst.mkdir(parents=True, exist_ok=True) 64 schemas = sorted(src.glob("**/*.bq")) 65 if not schemas: 66 raise ValueError("no schemas found") 67 for path in schemas: 68 namespace = path.parts[-3] 69 doc = path.parts[-1] 70 qualified = f"{namespace}.{doc}" 71 click.echo(qualified) 72 copyfile(path, dst / qualified) 73 74 # also generate something easy to diff 75 cols = compute_compact_columns(json.loads(path.read_text())) 76 compact_filename = ".".join(qualified.split(".")[:-1]) + ".txt" 77 (dst / compact_filename).write_text("\n".join(cols)) 78 return dst 79 80 81def checkout_copy_schemas_revisions( 82 head: str, base: str, repository: Path, artifact: Path 83) -> Tuple[Path, Path]: 84 """Checkout two revisions of the schema repository into the artifact 85 directory. This returns paths to the head and the base directories.""" 86 repo = Repo(repository) 87 if repo.is_dirty(): 88 raise ValueError("the repo is dirty, stash any changes and try again") 89 head_path = None 90 base_path = None 91 # get the head to the closest symbolic reference 92 current_ref = repo.git.rev_parse("HEAD", abbrev_ref=True) 93 # note: if we try using --abbrev-ref on something like 94 # `generated-schemas~1`, we may end up with an empty string. We should 95 # fallback to the commit-hash if used. 96 head_rev = repo.rev_parse(head).hexsha 97 base_rev = repo.rev_parse(base).hexsha 98 try: 99 repo.git.checkout(head_rev) 100 head_path = copy_schemas(head_rev, repository, artifact) 101 repo.git.checkout(base_rev) 102 base_path = copy_schemas(base_rev, repository, artifact) 103 finally: 104 repo.git.checkout(current_ref) 105 return head_path, base_path 106 107 108def parse_incompatibility_allowlist(allowlist: Path) -> list: 109 res = [] 110 if not allowlist or not allowlist.exists(): 111 return res 112 lines = [line.strip() for line in allowlist.read_text().split("\n")] 113 for line in lines: 114 if not line or line.startswith("#"): 115 continue 116 res.append(line) 117 return res 118 119 120@click.group() 121def validate(): 122 """Click command group.""" 123 124 125@validate.command() 126@click.option("--head", type=str, default="local-working-branch") 127@click.option("--base", type=str, default="generated-schemas") 128@click.option( 129 "--repository", 130 type=click.Path(exists=True, file_okay=False), 131 default=BASE_DIR / "mozilla-pipeline-schemas", 132) 133@click.option( 134 "--artifact", 135 type=click.Path(file_okay=False), 136 default=BASE_DIR / "validate_schema_evolution", 137) 138@click.option( 139 "--incompatibility-allowlist", 140 type=click.Path(dir_okay=False), 141 help="newline delimited globs of schemas with allowed schema incompatibilities", 142 default=BASE_DIR / "mozilla-schema-generator/incompatibility-allowlist", 143) 144def local_validation(head, base, repository, artifact, incompatibility_allowlist): 145 """Validate schemas using a heuristic from the compact schemas.""" 146 head_path, base_path = checkout_copy_schemas_revisions( 147 head, base, repository, artifact 148 ) 149 is_error = 0 150 151 # look at the compact schemas 152 head_files = (head_path).glob("*.txt") 153 base_files = (base_path).glob("*.txt") 154 155 # also look at the exceptions 156 allowed_incompatibility_base_files = [] 157 if incompatibility_allowlist: 158 for glob in parse_incompatibility_allowlist(Path(incompatibility_allowlist)): 159 allowed_incompatibility_base_files += list((base_path).glob(f"{glob}.txt")) 160 161 a = set([p.name for p in base_files]) 162 b = set([p.name for p in head_files]) 163 allowed_incompatibility = set([p.name for p in allowed_incompatibility_base_files]) 164 165 # Check that we're not removing any schemas. If there are exceptions, we 166 # remove this from the base set before checking for evolution. 167 if allowed_incompatibility: 168 print("allowing incompatible changes in the following documents:") 169 print("\n".join([f"\t{x}" for x in allowed_incompatibility])) 170 is_error |= check_evolution((a - allowed_incompatibility), b, verbose=True) 171 172 for schema_name in a & b: 173 base = base_path / schema_name 174 head = head_path / schema_name 175 base_data = base.read_text().split("\n") 176 head_data = head.read_text().split("\n") 177 diff = "\n".join( 178 # control lines contain a newline at the end 179 [ 180 line.strip() 181 for line in difflib.unified_diff( 182 base_data, 183 head_data, 184 fromfile=base.as_posix(), 185 tofile=head.as_posix(), 186 n=1, 187 ) 188 ] 189 ) 190 if not diff: 191 # no difference detected 192 continue 193 # check if this is an error condition 194 print(diff + "\n") 195 err_code = check_evolution(base_data, head_data) 196 if err_code and schema_name in allowed_incompatibility: 197 print("found incompatible changes, but continuing") 198 continue 199 is_error |= err_code 200 201 if not is_error: 202 click.echo("no incompatible changes detected") 203 else: 204 click.echo("found incompatible changes") 205 206 sys.exit(is_error) 207 208 209if __name__ == "__main__": 210 validate()
BASE_DIR =
PosixPath('/app')
def
compute_compact_columns(document):
16def compute_compact_columns(document): 17 def traverse(prefix, columns): 18 res = [] 19 for node in columns: 20 name = node["name"] + (".[]" if node["mode"] == "REPEATED" else "") 21 dtype = node["type"] 22 if dtype == "RECORD": 23 res += traverse(f"{prefix}.{name}", node["fields"]) 24 else: 25 res += [f"{prefix}.{name} {dtype}"] 26 return res 27 28 res = traverse("root", document) 29 return sorted(res)
def
check_evolution(base, head, verbose=False):
32def check_evolution(base, head, verbose=False): 33 def nop(*args, **kwargs): 34 pass 35 36 log = print if verbose else nop 37 38 a, b = set(base), set(head) 39 is_error = 0 40 # error condition 41 base_only = a - b 42 if len(base_only) > 0: 43 log("items removed from the base") 44 log("\n".join([f"-{x}" for x in base_only])) 45 log("") 46 # set the status 47 is_error = 1 48 49 # informative only 50 head_only = b - a 51 if len(head_only) > 0: 52 log("items added to the base") 53 log("\n".join([f"+{x}" for x in head_only])) 54 log("") 55 return is_error
def
copy_schemas( head: str, repository: pathlib.Path, artifact: pathlib.Path) -> pathlib.Path:
58def copy_schemas(head: str, repository: Path, artifact: Path) -> Path: 59 """Copy BigQuery schemas to a directory as an intermediary step for schema 60 evolution checks.""" 61 src = Path(repository) 62 repo = Repo(repository) 63 dst = Path(artifact) / repo.rev_parse(head).name_rev.replace(" ", "_") 64 dst.mkdir(parents=True, exist_ok=True) 65 schemas = sorted(src.glob("**/*.bq")) 66 if not schemas: 67 raise ValueError("no schemas found") 68 for path in schemas: 69 namespace = path.parts[-3] 70 doc = path.parts[-1] 71 qualified = f"{namespace}.{doc}" 72 click.echo(qualified) 73 copyfile(path, dst / qualified) 74 75 # also generate something easy to diff 76 cols = compute_compact_columns(json.loads(path.read_text())) 77 compact_filename = ".".join(qualified.split(".")[:-1]) + ".txt" 78 (dst / compact_filename).write_text("\n".join(cols)) 79 return dst
Copy BigQuery schemas to a directory as an intermediary step for schema evolution checks.
def
checkout_copy_schemas_revisions( head: str, base: str, repository: pathlib.Path, artifact: pathlib.Path) -> Tuple[pathlib.Path, pathlib.Path]:
82def checkout_copy_schemas_revisions( 83 head: str, base: str, repository: Path, artifact: Path 84) -> Tuple[Path, Path]: 85 """Checkout two revisions of the schema repository into the artifact 86 directory. This returns paths to the head and the base directories.""" 87 repo = Repo(repository) 88 if repo.is_dirty(): 89 raise ValueError("the repo is dirty, stash any changes and try again") 90 head_path = None 91 base_path = None 92 # get the head to the closest symbolic reference 93 current_ref = repo.git.rev_parse("HEAD", abbrev_ref=True) 94 # note: if we try using --abbrev-ref on something like 95 # `generated-schemas~1`, we may end up with an empty string. We should 96 # fallback to the commit-hash if used. 97 head_rev = repo.rev_parse(head).hexsha 98 base_rev = repo.rev_parse(base).hexsha 99 try: 100 repo.git.checkout(head_rev) 101 head_path = copy_schemas(head_rev, repository, artifact) 102 repo.git.checkout(base_rev) 103 base_path = copy_schemas(base_rev, repository, artifact) 104 finally: 105 repo.git.checkout(current_ref) 106 return head_path, base_path
Checkout two revisions of the schema repository into the artifact directory. This returns paths to the head and the base directories.
def
parse_incompatibility_allowlist(allowlist: pathlib.Path) -> list:
109def parse_incompatibility_allowlist(allowlist: Path) -> list: 110 res = [] 111 if not allowlist or not allowlist.exists(): 112 return res 113 lines = [line.strip() for line in allowlist.read_text().split("\n")] 114 for line in lines: 115 if not line or line.startswith("#"): 116 continue 117 res.append(line) 118 return res
validate =
<Group validate>
Click command group.
local_validation =
<Command local-validation>
Validate schemas using a heuristic from the compact schemas.