mozilla_schema_generator.validate_bigquery

  1#!/usr/bin/env python3
  2import difflib
  3import json
  4import sys
  5from pathlib import Path
  6from shutil import copyfile
  7from typing import Tuple
  8
  9import click
 10from git import Repo
 11
 12BASE_DIR = Path("/app").resolve()
 13
 14
 15def compute_compact_columns(document):
 16    def traverse(prefix, columns):
 17        res = []
 18        for node in columns:
 19            name = node["name"] + (".[]" if node["mode"] == "REPEATED" else "")
 20            dtype = node["type"]
 21            if dtype == "RECORD":
 22                res += traverse(f"{prefix}.{name}", node["fields"])
 23            else:
 24                res += [f"{prefix}.{name} {dtype}"]
 25        return res
 26
 27    res = traverse("root", document)
 28    return sorted(res)
 29
 30
 31def check_evolution(base, head, verbose=False):
 32    def nop(*args, **kwargs):
 33        pass
 34
 35    log = print if verbose else nop
 36
 37    a, b = set(base), set(head)
 38    is_error = 0
 39    # error condition
 40    base_only = a - b
 41    if len(base_only) > 0:
 42        log("items removed from the base")
 43        log("\n".join([f"-{x}" for x in base_only]))
 44        log("")
 45        # set the status
 46        is_error = 1
 47
 48    # informative only
 49    head_only = b - a
 50    if len(head_only) > 0:
 51        log("items added to the base")
 52        log("\n".join([f"+{x}" for x in head_only]))
 53        log("")
 54    return is_error
 55
 56
 57def copy_schemas(head: str, repository: Path, artifact: Path) -> Path:
 58    """Copy BigQuery schemas to a directory as an intermediary step for schema
 59    evolution checks."""
 60    src = Path(repository)
 61    repo = Repo(repository)
 62    dst = Path(artifact) / repo.rev_parse(head).name_rev.replace(" ", "_")
 63    dst.mkdir(parents=True, exist_ok=True)
 64    schemas = sorted(src.glob("**/*.bq"))
 65    if not schemas:
 66        raise ValueError("no schemas found")
 67    for path in schemas:
 68        namespace = path.parts[-3]
 69        doc = path.parts[-1]
 70        qualified = f"{namespace}.{doc}"
 71        click.echo(qualified)
 72        copyfile(path, dst / qualified)
 73
 74        # also generate something easy to diff
 75        cols = compute_compact_columns(json.loads(path.read_text()))
 76        compact_filename = ".".join(qualified.split(".")[:-1]) + ".txt"
 77        (dst / compact_filename).write_text("\n".join(cols))
 78    return dst
 79
 80
 81def checkout_copy_schemas_revisions(
 82    head: str, base: str, repository: Path, artifact: Path
 83) -> Tuple[Path, Path]:
 84    """Checkout two revisions of the schema repository into the artifact
 85    directory. This returns paths to the head and the base directories."""
 86    repo = Repo(repository)
 87    if repo.is_dirty():
 88        raise ValueError("the repo is dirty, stash any changes and try again")
 89    head_path = None
 90    base_path = None
 91    # get the head to the closest symbolic reference
 92    current_ref = repo.git.rev_parse("HEAD", abbrev_ref=True)
 93    # note: if we try using --abbrev-ref on something like
 94    # `generated-schemas~1`, we may end up with an empty string. We should
 95    # fallback to the commit-hash if used.
 96    head_rev = repo.rev_parse(head).hexsha
 97    base_rev = repo.rev_parse(base).hexsha
 98    try:
 99        repo.git.checkout(head_rev)
100        head_path = copy_schemas(head_rev, repository, artifact)
101        repo.git.checkout(base_rev)
102        base_path = copy_schemas(base_rev, repository, artifact)
103    finally:
104        repo.git.checkout(current_ref)
105    return head_path, base_path
106
107
108def parse_incompatibility_allowlist(allowlist: Path) -> list:
109    res = []
110    if not allowlist or not allowlist.exists():
111        return res
112    lines = [line.strip() for line in allowlist.read_text().split("\n")]
113    for line in lines:
114        if not line or line.startswith("#"):
115            continue
116        res.append(line)
117    return res
118
119
120@click.group()
121def validate():
122    """Click command group."""
123
124
125@validate.command()
126@click.option("--head", type=str, default="local-working-branch")
127@click.option("--base", type=str, default="generated-schemas")
128@click.option(
129    "--repository",
130    type=click.Path(exists=True, file_okay=False),
131    default=BASE_DIR / "mozilla-pipeline-schemas",
132)
133@click.option(
134    "--artifact",
135    type=click.Path(file_okay=False),
136    default=BASE_DIR / "validate_schema_evolution",
137)
138@click.option(
139    "--incompatibility-allowlist",
140    type=click.Path(dir_okay=False),
141    help="newline delimited globs of schemas with allowed schema incompatibilities",
142    default=BASE_DIR / "mozilla-schema-generator/incompatibility-allowlist",
143)
144def local_validation(head, base, repository, artifact, incompatibility_allowlist):
145    """Validate schemas using a heuristic from the compact schemas."""
146    head_path, base_path = checkout_copy_schemas_revisions(
147        head, base, repository, artifact
148    )
149    is_error = 0
150
151    # look at the compact schemas
152    head_files = (head_path).glob("*.txt")
153    base_files = (base_path).glob("*.txt")
154
155    # also look at the exceptions
156    allowed_incompatibility_base_files = []
157    if incompatibility_allowlist:
158        for glob in parse_incompatibility_allowlist(Path(incompatibility_allowlist)):
159            allowed_incompatibility_base_files += list((base_path).glob(f"{glob}.txt"))
160
161    a = set([p.name for p in base_files])
162    b = set([p.name for p in head_files])
163    allowed_incompatibility = set([p.name for p in allowed_incompatibility_base_files])
164
165    # Check that we're not removing any schemas. If there are exceptions, we
166    # remove this from the base set before checking for evolution.
167    if allowed_incompatibility:
168        print("allowing incompatible changes in the following documents:")
169        print("\n".join([f"\t{x}" for x in allowed_incompatibility]))
170    is_error |= check_evolution((a - allowed_incompatibility), b, verbose=True)
171
172    for schema_name in a & b:
173        base = base_path / schema_name
174        head = head_path / schema_name
175        base_data = base.read_text().split("\n")
176        head_data = head.read_text().split("\n")
177        diff = "\n".join(
178            # control lines contain a newline at the end
179            [
180                line.strip()
181                for line in difflib.unified_diff(
182                    base_data,
183                    head_data,
184                    fromfile=base.as_posix(),
185                    tofile=head.as_posix(),
186                    n=1,
187                )
188            ]
189        )
190        if not diff:
191            # no difference detected
192            continue
193        # check if this is an error condition
194        print(diff + "\n")
195        err_code = check_evolution(base_data, head_data)
196        if err_code and schema_name in allowed_incompatibility:
197            print("found incompatible changes, but continuing")
198            continue
199        is_error |= err_code
200
201    if not is_error:
202        click.echo("no incompatible changes detected")
203    else:
204        click.echo("found incompatible changes")
205
206    sys.exit(is_error)
207
208
209if __name__ == "__main__":
210    validate()
BASE_DIR = PosixPath('/app')
def compute_compact_columns(document):
16def compute_compact_columns(document):
17    def traverse(prefix, columns):
18        res = []
19        for node in columns:
20            name = node["name"] + (".[]" if node["mode"] == "REPEATED" else "")
21            dtype = node["type"]
22            if dtype == "RECORD":
23                res += traverse(f"{prefix}.{name}", node["fields"])
24            else:
25                res += [f"{prefix}.{name} {dtype}"]
26        return res
27
28    res = traverse("root", document)
29    return sorted(res)
def check_evolution(base, head, verbose=False):
32def check_evolution(base, head, verbose=False):
33    def nop(*args, **kwargs):
34        pass
35
36    log = print if verbose else nop
37
38    a, b = set(base), set(head)
39    is_error = 0
40    # error condition
41    base_only = a - b
42    if len(base_only) > 0:
43        log("items removed from the base")
44        log("\n".join([f"-{x}" for x in base_only]))
45        log("")
46        # set the status
47        is_error = 1
48
49    # informative only
50    head_only = b - a
51    if len(head_only) > 0:
52        log("items added to the base")
53        log("\n".join([f"+{x}" for x in head_only]))
54        log("")
55    return is_error
def copy_schemas( head: str, repository: pathlib.Path, artifact: pathlib.Path) -> pathlib.Path:
58def copy_schemas(head: str, repository: Path, artifact: Path) -> Path:
59    """Copy BigQuery schemas to a directory as an intermediary step for schema
60    evolution checks."""
61    src = Path(repository)
62    repo = Repo(repository)
63    dst = Path(artifact) / repo.rev_parse(head).name_rev.replace(" ", "_")
64    dst.mkdir(parents=True, exist_ok=True)
65    schemas = sorted(src.glob("**/*.bq"))
66    if not schemas:
67        raise ValueError("no schemas found")
68    for path in schemas:
69        namespace = path.parts[-3]
70        doc = path.parts[-1]
71        qualified = f"{namespace}.{doc}"
72        click.echo(qualified)
73        copyfile(path, dst / qualified)
74
75        # also generate something easy to diff
76        cols = compute_compact_columns(json.loads(path.read_text()))
77        compact_filename = ".".join(qualified.split(".")[:-1]) + ".txt"
78        (dst / compact_filename).write_text("\n".join(cols))
79    return dst

Copy BigQuery schemas to a directory as an intermediary step for schema evolution checks.

def checkout_copy_schemas_revisions( head: str, base: str, repository: pathlib.Path, artifact: pathlib.Path) -> Tuple[pathlib.Path, pathlib.Path]:
 82def checkout_copy_schemas_revisions(
 83    head: str, base: str, repository: Path, artifact: Path
 84) -> Tuple[Path, Path]:
 85    """Checkout two revisions of the schema repository into the artifact
 86    directory. This returns paths to the head and the base directories."""
 87    repo = Repo(repository)
 88    if repo.is_dirty():
 89        raise ValueError("the repo is dirty, stash any changes and try again")
 90    head_path = None
 91    base_path = None
 92    # get the head to the closest symbolic reference
 93    current_ref = repo.git.rev_parse("HEAD", abbrev_ref=True)
 94    # note: if we try using --abbrev-ref on something like
 95    # `generated-schemas~1`, we may end up with an empty string. We should
 96    # fallback to the commit-hash if used.
 97    head_rev = repo.rev_parse(head).hexsha
 98    base_rev = repo.rev_parse(base).hexsha
 99    try:
100        repo.git.checkout(head_rev)
101        head_path = copy_schemas(head_rev, repository, artifact)
102        repo.git.checkout(base_rev)
103        base_path = copy_schemas(base_rev, repository, artifact)
104    finally:
105        repo.git.checkout(current_ref)
106    return head_path, base_path

Checkout two revisions of the schema repository into the artifact directory. This returns paths to the head and the base directories.

def parse_incompatibility_allowlist(allowlist: pathlib.Path) -> list:
109def parse_incompatibility_allowlist(allowlist: Path) -> list:
110    res = []
111    if not allowlist or not allowlist.exists():
112        return res
113    lines = [line.strip() for line in allowlist.read_text().split("\n")]
114    for line in lines:
115        if not line or line.startswith("#"):
116            continue
117        res.append(line)
118    return res
validate = <Group validate>

Click command group.

local_validation = <Command local-validation>

Validate schemas using a heuristic from the compact schemas.