Source code for glean_parser.lint

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import enum
from pathlib import Path
import re
import sys
from typing import (
    Any,
    Callable,
    Dict,
    Generator,
    List,
    Iterable,
    Optional,
    Tuple,
    Union,
)  # noqa


from . import metrics
from . import parser
from . import pings
from . import tags
from . import util


# Yield only an error message
LintGenerator = Generator[str, None, None]

# Yield fully constructed GlinterNits
NitGenerator = Generator["GlinterNit", None, None]


[docs] class CheckType(enum.Enum): warning = 0 error = 1
def _split_words(name: str) -> List[str]: """ Helper function to split words on either `.` or `_`. """ return re.split("[._-]", name) def _english_list(items: List[str]) -> str: """ Helper function to format a list [A, B, C] as "'A', 'B', or 'C'". """ if len(items) == 0: return "" elif len(items) == 1: return f"'{items[0]}'" else: return "{}, or '{}'".format( ", ".join([f"'{x}'" for x in items[:-1]]), items[-1] ) def _hamming_distance(str1: str, str2: str) -> int: """ Count the # of differences between strings str1 and str2, padding the shorter one with whitespace """ diffs = 0 if len(str1) < len(str2): str1, str2 = str2, str1 len_dist = len(str1) - len(str2) str2 += " " * len_dist for ch1, ch2 in zip(str1, str2): if ch1 != ch2: diffs += 1 return diffs
[docs] def check_common_prefix( category_name: str, metrics: Iterable[metrics.Metric] ) -> LintGenerator: """ Check if all metrics begin with a common prefix. """ metric_words = sorted([_split_words(metric.name) for metric in metrics]) if len(metric_words) < 2: return first = metric_words[0] last = metric_words[-1] for i in range(min(len(first), len(last))): if first[i] != last[i]: break if i > 0: common_prefix = "_".join(first[:i]) yield ( f"Within category '{category_name}', all metrics begin with " f"prefix '{common_prefix}'." "Remove the prefixes on the metric names and (possibly) " "rename the category." )
[docs] def check_unit_in_name( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: """ The metric name ends in a unit. """ TIME_UNIT_ABBREV = { "nanosecond": "ns", "microsecond": "us", "millisecond": "ms", "second": "s", "minute": "m", "hour": "h", "day": "d", } MEMORY_UNIT_ABBREV = { "byte": "b", "kilobyte": "kb", "megabyte": "mb", "gigabyte": "gb", } name_words = _split_words(metric.name) unit_in_name = name_words[-1] time_unit = getattr(metric, "time_unit", None) memory_unit = getattr(metric, "memory_unit", None) unit = getattr(metric, "unit", None) if time_unit is not None: if ( unit_in_name == TIME_UNIT_ABBREV.get(time_unit.name) or unit_in_name == time_unit.name ): yield ( f"Suffix '{unit_in_name}' is redundant with time_unit " f"'{time_unit.name}'. Only include time_unit." ) elif ( unit_in_name in TIME_UNIT_ABBREV.keys() or unit_in_name in TIME_UNIT_ABBREV.values() ): yield ( f"Suffix '{unit_in_name}' doesn't match time_unit " f"'{time_unit.name}'. " "Confirm the unit is correct and only include time_unit." ) elif memory_unit is not None: if ( unit_in_name == MEMORY_UNIT_ABBREV.get(memory_unit.name) or unit_in_name == memory_unit.name ): yield ( f"Suffix '{unit_in_name}' is redundant with memory_unit " f"'{memory_unit.name}'. " "Only include memory_unit." ) elif ( unit_in_name in MEMORY_UNIT_ABBREV.keys() or unit_in_name in MEMORY_UNIT_ABBREV.values() ): yield ( f"Suffix '{unit_in_name}' doesn't match memory_unit " f"{memory_unit.name}'. " "Confirm the unit is correct and only include memory_unit." ) elif unit is not None: if unit_in_name == unit: yield ( f"Suffix '{unit_in_name}' is redundant with unit param " f"'{unit}'. " "Only include unit." )
[docs] def check_category_generic( category_name: str, metrics: Iterable[metrics.Metric] ) -> LintGenerator: """ The category name is too generic. """ GENERIC_CATEGORIES = ["metrics", "events"] if category_name in GENERIC_CATEGORIES: yield ( f"Category '{category_name}' is too generic. " f"Don't use {_english_list(GENERIC_CATEGORIES)} for category names" )
[docs] def check_bug_number( metric: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any] ) -> LintGenerator: number_bugs = [str(bug) for bug in metric.bugs if isinstance(bug, int)] if len(number_bugs): yield ( f"For bugs {', '.join(number_bugs)}: " "Bug numbers are deprecated and should be changed to full URLs. " f"For example, use 'http://bugzilla.mozilla.org/{number_bugs[0]}' " f"instead of '{number_bugs[0]}'." )
[docs] def check_valid_in_baseline( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: allow_reserved = parser_config.get("allow_reserved", False) if not allow_reserved and "baseline" in metric.send_in_pings: yield ( "The baseline ping is Glean-internal. " "Remove 'baseline' from the send_in_pings array." )
[docs] def check_misspelled_pings( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: for ping in metric.send_in_pings: for builtin in pings.RESERVED_PING_NAMES: distance = _hamming_distance(ping, builtin) if distance == 1: yield f"Ping '{ping}' seems misspelled. Did you mean '{builtin}'?"
[docs] def check_tags_required( metric_or_ping: Union[metrics.Metric, pings.Ping], parser_config: Dict[str, Any] ) -> LintGenerator: if parser_config.get("require_tags", False) and not len( metric_or_ping.metadata.get("tags", []) ): yield "Tags are required but no tags specified"
[docs] def check_user_lifetime_expiration( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: if metric.lifetime == metrics.Lifetime.user and metric.expires != "never": yield ( "Metrics with 'user' lifetime cannot have an expiration date. " "They live as long as the user profile does. " "Set expires to 'never'." )
[docs] def check_expired_date( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: try: metric.validate_expires() except ValueError as e: yield (str(e))
[docs] def check_expired_metric( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: if metric.is_expired(): yield ("Metric has expired. Please consider removing it.")
[docs] def check_old_event_api( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: # Glean v52.0.0 removed the old events API. # The metrics-2-0-0 schema still supports it. # We want to warn about it. # This can go when we introduce 3-0-0 if not isinstance(metric, metrics.Event): return if not all("type" in x for x in metric.extra_keys.values()): yield ("The old event API is gone. Extra keys require a type.")
[docs] def check_metric_on_events_lifetime( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: """A non-event metric on the Events ping only makes sense if its value is immutable over the life of the ping.""" if ( "events" in metric.send_in_pings and "all_pings" not in metric.send_in_pings and metric.type != "event" and metric.lifetime == metrics.Lifetime.ping ): yield ( "Non-event metrics sent on the Events ping should not have the ping" " lifetime." )
[docs] def check_unexpected_unit( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: """ `unit` was allowed on all metrics and recently disallowed. We now warn about its use on all but quantity and custom distribution metrics. """ allowed_types = [metrics.Quantity, metrics.CustomDistribution] if not any([isinstance(metric, ty) for ty in allowed_types]) and metric.unit: yield ( "The `unit` property is only allowed for quantity " + "and custom distribution metrics." )
[docs] def check_empty_datareview( metric: metrics.Metric, parser_config: Dict[str, Any] ) -> LintGenerator: disallowed_datareview = ["", "todo"] data_reviews = [dr.lower() in disallowed_datareview for dr in metric.data_reviews] if any(data_reviews): yield "List of data reviews should not contain empty strings or TODO markers."
[docs] def check_redundant_ping( pings: pings.Ping, parser_config: Dict[str, Any] ) -> LintGenerator: """ Check if the pings contains 'ping' as the prefix or suffix, or 'ping' or 'custom' """ ping_words = _split_words(pings.name) if len(ping_words) != 0: ping_first_word = ping_words[0] ping_last_word = ping_words[-1] if ping_first_word == "ping": yield ("The prefix 'ping' is redundant.") elif ping_last_word == "ping": yield ("The suffix 'ping' is redundant.") elif "ping" in ping_words: yield ("The word 'ping' is redundant.") elif "custom" in ping_words: yield ("The word 'custom' is redundant.")
[docs] def check_unknown_ping( check_name: str, check_type: CheckType, all_pings: Dict[str, pings.Ping], metrics: Dict[str, metrics.Metric], parser_config: Dict[str, Any], ) -> NitGenerator: """ Check that all pings in `send_in_pings` for all metrics are either a builtin ping or in the list of defined custom pings. """ available_pings = [p for p in all_pings] for _, metric in metrics.items(): if check_name in metric.no_lint: continue send_in_pings = metric.send_in_pings for target_ping in send_in_pings: if target_ping in pings.RESERVED_PING_NAMES: continue if target_ping not in available_pings: msg = f"Ping `{target_ping} `in `send_in_pings` is unknown." name = ".".join([metric.category, metric.name]) nit = GlinterNit( check_name, name, msg, check_type, ) yield nit
# The checks that operate on an entire category of metrics: # {NAME: (function, is_error)} CATEGORY_CHECKS: Dict[ str, Tuple[Callable[[str, Iterable[metrics.Metric]], LintGenerator], CheckType] ] = { "COMMON_PREFIX": (check_common_prefix, CheckType.error), "CATEGORY_GENERIC": (check_category_generic, CheckType.error), } # The checks that operate on individual metrics: # {NAME: (function, is_error)} METRIC_CHECKS: Dict[ str, Tuple[Callable[[metrics.Metric, dict], LintGenerator], CheckType] ] = { "UNIT_IN_NAME": (check_unit_in_name, CheckType.error), "BUG_NUMBER": (check_bug_number, CheckType.error), "BASELINE_PING": (check_valid_in_baseline, CheckType.error), "MISSPELLED_PING": (check_misspelled_pings, CheckType.error), "TAGS_REQUIRED": (check_tags_required, CheckType.error), "EXPIRATION_DATE_TOO_FAR": (check_expired_date, CheckType.warning), "USER_LIFETIME_EXPIRATION": (check_user_lifetime_expiration, CheckType.warning), "EXPIRED": (check_expired_metric, CheckType.warning), "OLD_EVENT_API": (check_old_event_api, CheckType.warning), "METRIC_ON_EVENTS_LIFETIME": (check_metric_on_events_lifetime, CheckType.error), "UNEXPECTED_UNIT": (check_unexpected_unit, CheckType.warning), "EMPTY_DATAREVIEW": (check_empty_datareview, CheckType.warning), } # The checks that operate on individual pings: # {NAME: (function, is_error)} PING_CHECKS: Dict[ str, Tuple[Callable[[pings.Ping, dict], LintGenerator], CheckType] ] = { "BUG_NUMBER": (check_bug_number, CheckType.error), "TAGS_REQUIRED": (check_tags_required, CheckType.error), "REDUNDANT_PING": (check_redundant_ping, CheckType.error), } ALL_OBJECT_CHECKS: Dict[ str, Tuple[ Callable[ # check name, check type, pings, metrics, config [str, CheckType, dict, dict, dict], NitGenerator, ], CheckType, ], ] = { "UNKNOWN_PING_REFERENCED": (check_unknown_ping, CheckType.error), }
[docs] class GlinterNit: def __init__(self, check_name: str, name: str, msg: str, check_type: CheckType): self.check_name = check_name self.name = name self.msg = msg self.check_type = check_type
[docs] def format(self): return ( f"{self.check_type.name.upper()}: {self.check_name}: " f"{self.name}: {self.msg}" )
def _lint_item_tags( item_name: str, item_type: str, item_tag_names: List[str], valid_tag_names: List[str], ) -> List[GlinterNit]: invalid_tags = [tag for tag in item_tag_names if tag not in valid_tag_names] return ( [ GlinterNit( "INVALID_TAGS", item_name, f"Invalid tags specified in {item_type}: {', '.join(invalid_tags)}", CheckType.error, ) ] if len(invalid_tags) else [] ) def _lint_pings( category: Dict[str, Union[metrics.Metric, pings.Ping, tags.Tag]], parser_config: Dict[str, Any], valid_tag_names: List[str], ) -> List[GlinterNit]: nits: List[GlinterNit] = [] for ping_name, ping in sorted(list(category.items())): assert isinstance(ping, pings.Ping) for check_name, (check_func, check_type) in PING_CHECKS.items(): new_nits = list(check_func(ping, parser_config)) if len(new_nits): if check_name not in ping.no_lint: nits.extend( GlinterNit( check_name, ping_name, msg, check_type, ) for msg in new_nits ) nits.extend( _lint_item_tags( ping_name, "ping", ping.metadata.get("tags", []), valid_tag_names, ) ) return nits def _lint_all_objects( objects: Dict[str, Dict[str, Union[metrics.Metric, pings.Ping, tags.Tag]]], parser_config: Dict[str, Any], ) -> List[GlinterNit]: nits: List[GlinterNit] = [] pings = objects.get("pings") if not pings: return [] metrics = objects.get("all_metrics") if not metrics: return [] for check_name, (check_func, check_type) in ALL_OBJECT_CHECKS.items(): new_nits = list( check_func(check_name, check_type, pings, metrics, parser_config) ) nits.extend(new_nits) return nits
[docs] def lint_metrics( objs: metrics.ObjectTree, parser_config: Optional[Dict[str, Any]] = None, file=sys.stderr, ) -> List[GlinterNit]: """ Performs glinter checks on a set of metrics objects. :param objs: Tree of metric objects, as returns by `parser.parse_objects`. :param file: The stream to write errors to. :returns: List of nits. """ if parser_config is None: parser_config = {} nits: List[GlinterNit] = [] valid_tag_names = [tag for tag in objs.get("tags", [])] nits.extend(_lint_all_objects(objs, parser_config)) for category_name, category in sorted(list(objs.items())): if category_name == "pings": nits.extend(_lint_pings(category, parser_config, valid_tag_names)) continue if category_name == "tags": # currently we have no linting for tags continue # Make sure the category has only Metrics, not Pings or Tags category_metrics = dict( (name, metric) for (name, metric) in category.items() if isinstance(metric, metrics.Metric) ) for cat_check_name, (cat_check_func, check_type) in CATEGORY_CHECKS.items(): if any( cat_check_name in metric.no_lint for metric in category_metrics.values() ): continue nits.extend( GlinterNit(cat_check_name, category_name, msg, check_type) for msg in cat_check_func(category_name, category_metrics.values()) ) for _metric_name, metric in sorted(list(category_metrics.items())): for check_name, (check_func, check_type) in METRIC_CHECKS.items(): new_nits = list(check_func(metric, parser_config)) if len(new_nits): if check_name not in metric.no_lint: nits.extend( GlinterNit( check_name, ".".join([metric.category, metric.name]), msg, check_type, ) for msg in new_nits ) # also check that tags for metric are valid nits.extend( _lint_item_tags( ".".join([metric.category, metric.name]), "metric", metric.metadata.get("tags", []), valid_tag_names, ) ) if len(nits): print("Sorry, Glean found some glinter nits:", file=file) for nit in nits: print(nit.format(), file=file) print("", file=file) print("Please fix the above nits to continue.", file=file) print( "To disable a check, add a `no_lint` parameter " "with a list of check names to disable.\n" "This parameter can appear with each individual metric, or at the " "top-level to affect the entire file.", file=file, ) return nits
[docs] def glinter( input_filepaths: Iterable[Path], parser_config: Optional[Dict[str, Any]] = None, file=sys.stderr, ) -> int: """ Commandline helper for glinter. :param input_filepaths: List of Path objects to load metrics from. :param parser_config: Parser configuration object, passed to `parser.parse_objects`. :param file: The stream to write the errors to. :return: Non-zero if there were any glinter errors. """ if parser_config is None: parser_config = {} errors = 0 objs = parser.parse_objects(input_filepaths, parser_config) errors += util.report_validation_errors(objs) nits = lint_metrics(objs.value, parser_config=parser_config, file=file) errors += len([nit for nit in nits if nit.check_type == CheckType.error]) if errors == 0: print("✨ Your metrics are Glean! ✨", file=file) return 0 print(f"❌ Found {errors} errors.") return 1