# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING
from metric_config_parser import AnalysisUnit
from typing_extensions import assert_never
from mozanalysis.types import IncompatibleAnalysisUnit
if TYPE_CHECKING:
from metric_config_parser.data_source import DataSource as ParserDataSource
from mozanalysis.experiment import TimeLimits
import logging
import attr
logger = logging.getLogger(__name__)
[docs]
class AnalysisBasis(Enum):
"""Determines what the population used for the analysis will be based on."""
ENROLLMENTS = "enrollments"
EXPOSURES = "exposures"
# attr.s converters aren't compatible with mypy, define our own
# see: https://mypy.readthedocs.io/en/stable/additional_features.html#id1
def client_id_column_converter(client_id_column: str | None) -> str:
if client_id_column is None:
return AnalysisUnit.CLIENT.value
else:
return client_id_column
def group_id_column_converter(group_id_column: str | None) -> str:
if group_id_column is None:
return AnalysisUnit.PROFILE_GROUP.value
else:
return group_id_column
def submission_date_column_converter(submission_date_column: str | None) -> str:
if submission_date_column is None:
return "submission_date"
else:
return submission_date_column
[docs]
@attr.s(frozen=True, slots=True)
class DataSource:
"""Represents a table or view, from which Metrics may be defined.
Args:
name (str): Name for the Data Source. Used in sanity metric
column names.
from_expr (str): FROM expression - often just a fully-qualified
table name. Sometimes a subquery. May contain the string
``{dataset}`` which will be replaced with an app-specific
dataset for Glean apps. If the expression is templated
on dataset, default_dataset is mandatory.
experiments_column_type (str or None): Info about the schema
of the table or view:
* 'simple': There is an ``experiments`` column, which is an
(experiment_slug:str -> branch_name:str) map.
* 'native': There is an ``experiments`` column, which is an
(experiment_slug:str -> struct) map, where the struct
contains a ``branch`` field, which is the branch as a
string.
* 'glean': There is an ``experiments`` column inside ping_info,
which is an (experiment_slug:str -> struct) map, where the
struct contains a ``branch`` field, which is the branch as a
string.
* 'events_stream': There is an ``experiment`` within a JSON
column ``event_extra``. ``branch`` is in the same column.
* None: There is no ``experiments`` column, so skip the
sanity checks that rely on it. We'll also be unable to
filter out pre-enrollment data from day 0 in the
experiment.
client_id_column (str, optional): Name of the column that
contains the ``client_id`` (join key). Defaults to
'client_id'.
submission_date_column (str, optional): Name of the column
that contains the submission date (as a date, not
timestamp). Defaults to 'submission_date'.
default_dataset (str, optional): The value to use for
`{dataset}` in from_expr if a value is not provided
at runtime. Mandatory if from_expr contains a
`{dataset}` parameter.
app_name: (str, optional): app_name used with metric-hub,
used for validation
group_id_column (str, optional): Name of the column that
contains the ``profile_group_id`` (join key). Defaults to
'profile_group_id'.
glean_client_id_column (str, optional): Name of the column that
contains the *glean* telemetry ``client_id`` (join key).
This is also used to specify that the data source supports glean.
legacy_client_id_column (str, optional): Name of the column that
contains the *legacy* telemetry ``client_id`` (join key).
This is also used to specify that the data source supports legacy.
"""
name = attr.ib(validator=attr.validators.instance_of(str))
_from_expr = attr.ib(validator=attr.validators.instance_of(str))
experiments_column_type = attr.ib(default="simple", type=str | None)
client_id_column = attr.ib(
default=AnalysisUnit.CLIENT.value,
type=str,
validator=[attr.validators.instance_of(str), attr.validators.min_len(1)],
converter=client_id_column_converter,
)
submission_date_column = attr.ib(
default="submission_date",
type=str,
validator=[attr.validators.instance_of(str), attr.validators.min_len(1)],
converter=submission_date_column_converter,
)
default_dataset = attr.ib(default=None, type=str | None)
app_name = attr.ib(default=None, type=str | None)
group_id_column = attr.ib(
default=AnalysisUnit.PROFILE_GROUP.value,
type=str,
validator=[attr.validators.instance_of(str), attr.validators.min_len(1)],
converter=group_id_column_converter,
)
glean_client_id_column = attr.ib(default=None, type=str)
legacy_client_id_column = attr.ib(default=None, type=str)
EXPERIMENT_COLUMN_TYPES = (None, "simple", "native", "glean", "events_stream")
@experiments_column_type.validator
def _check_experiments_column_type(self, attribute, value):
if value not in self.EXPERIMENT_COLUMN_TYPES:
raise ValueError(
f"experiments_column_type {value!r} must be one of: "
f"{self.EXPERIMENT_COLUMN_TYPES!r}"
)
@default_dataset.validator
def _check_default_dataset_provided_if_needed(self, attribute, value):
self.from_expr_for(None)
[docs]
def from_expr_for(self, dataset: str | None) -> str:
"""Expands the ``from_expr`` template for the given dataset.
If ``from_expr`` is not a template, returns ``from_expr``.
Args:
dataset (str or None): Dataset name to substitute
into the from expression.
"""
effective_dataset = dataset or self.default_dataset
if effective_dataset is None:
try:
return self._from_expr.format()
except Exception as e:
raise ValueError(
f"{self.name}: from_expr contains a dataset template but no value was provided." # noqa:E501
) from e
return self._from_expr.format(dataset=effective_dataset)
@property
def experiments_column_expr(self) -> str:
if self.experiments_column_type is None:
return ""
elif self.experiments_column_type == "simple":
return """AND (
ds.{submission_date} != e.enrollment_date
OR `mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
) IS NOT NULL
)"""
elif self.experiments_column_type == "native":
return """AND (
ds.{submission_date} != e.enrollment_date
OR `mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
).branch IS NOT NULL
)"""
elif self.experiments_column_type == "glean":
return """AND (
ds.{submission_date} != e.enrollment_date
OR `mozfun.map.get_key`(
ds.ping_info.experiments, '{experiment_slug}'
).branch IS NOT NULL
)"""
elif self.experiments_column_type == "events_stream":
return """AND (
ds.{submission_date} != e.enrollment_date
OR IF(
JSON_VALUE(ds.event_extra, '$.experiment') = '{experiment_slug}',
JSON_VALUE(ds.event_extra, '$.branch'),
NULL
) IS NOT NULL
)""" # noqa:E501
else:
raise ValueError
[docs]
def build_query(
self,
metric_list: list[Metric],
time_limits: TimeLimits,
experiment_slug: str,
from_expr_dataset: str | None = None,
analysis_basis: AnalysisBasis = AnalysisBasis.ENROLLMENTS,
analysis_unit: AnalysisUnit = AnalysisUnit.CLIENT,
exposure_signal=None,
use_glean_ids: bool | None = None,
) -> str:
"""Return a nearly-self contained SQL query.
This query does not define ``enrollments`` but otherwise could
be executed to query all metrics from this data source.
"""
if analysis_unit == AnalysisUnit.CLIENT:
if use_glean_ids:
ds_id = self.glean_client_id_column
elif use_glean_ids is not None:
ds_id = self.legacy_client_id_column
else:
ds_id = self.client_id_column
elif analysis_unit == AnalysisUnit.PROFILE_GROUP:
ds_id = self.group_id_column
else:
assert_never(analysis_unit)
if use_glean_ids is not None and not ds_id:
chosen_id = (
"glean_client_id_column" if use_glean_ids else "legacy_client_id_column"
)
logger.warning(
f"use_glean_ids set to {use_glean_ids} but {chosen_id} not set."
f"Falling back to client_id_column {self.client_id_column}"
)
ds_id = self.client_id_column
return """SELECT
e.analysis_id,
e.branch,
e.analysis_window_start,
e.analysis_window_end,
e.num_exposure_events,
e.exposure_date,
{metrics}
FROM enrollments e
LEFT JOIN {from_expr} ds
ON ds.{ds_id} = e.analysis_id
AND ds.{submission_date} BETWEEN '{fddr}' AND '{lddr}'
AND ds.{submission_date} BETWEEN
DATE_ADD(e.{date}, interval e.analysis_window_start day)
AND DATE_ADD(e.{date}, interval e.analysis_window_end day)
{ignore_pre_enroll_first_day}
GROUP BY
e.analysis_id,
e.branch,
e.num_exposure_events,
e.exposure_date,
e.analysis_window_start,
e.analysis_window_end""".format(
ds_id=ds_id,
submission_date=self.submission_date_column,
from_expr=self.from_expr_for(from_expr_dataset),
fddr=time_limits.first_date_data_required,
lddr=time_limits.last_date_data_required,
metrics=",\n ".join(
f"{m.select_expr.format(experiment_slug=experiment_slug)} AS {m.name}"
for m in metric_list
),
date=(
"exposure_date"
if analysis_basis == AnalysisBasis.EXPOSURES and exposure_signal is None
else "enrollment_date"
),
ignore_pre_enroll_first_day=self.experiments_column_expr.format(
submission_date=self.submission_date_column,
experiment_slug=experiment_slug,
),
)
[docs]
def build_query_union_metric_rows(
self,
metric_list: list[Metric],
experiment_slug: str,
metrics_query_table: str = "metrics",
) -> str:
"""Return a query that produces a unioned and pivoted row-per-metric
version of the `build_query` results.
The `metrics_query_table` parameter specifies where the results to be
pivoted exist. This is expected to be of the format that is returned by
the query in `build_query`, and can either be a CTE or table/view.
EXAMPLE:
--------------------------------------------------------------
client_id | enrollment_date | metric_1 | metric_2
asdf1234 | 2024-01-01 | True | 12
---- becomes ----
client_id | enrollment_date | metric_slug | metric_value
asdf1234 | 2024-01-01 | metric_1 | True
asdf1234 | 2024-01-01 | metric_2 | 12
--------------------------------------------------------------
"""
query_parts = []
all_metrics = metric_list + self.get_sanity_metrics(experiment_slug)
metric_names = [m.name for m in all_metrics]
for metric in all_metrics:
query_parts.append(f"""
SELECT
* EXCEPT ({", ".join(metric_names)}),
'{metric.name}' AS metric_slug,
SAFE_CAST({metric.name} AS STRING) AS metric_value
FROM {metrics_query_table}
""")
return "\nUNION ALL\n".join(query_parts)
[docs]
def build_query_targets(
self,
metric_list: list[Metric],
time_limits: TimeLimits,
experiment_name: str,
analysis_length: int,
from_expr_dataset: str | None = None,
continuous_enrollment: bool = False,
analysis_unit: AnalysisUnit = AnalysisUnit.CLIENT,
) -> str:
"""Return a nearly-self contained SQL query that constructs
the metrics query for targeting historical data without
an associated experiment slug.
This query does not define ``targets`` but otherwise could
be executed to query all metrics from this data source.
"""
if analysis_unit != AnalysisUnit.CLIENT:
raise IncompatibleAnalysisUnit(
"`build_query_targets` currently only supports client_id analysis"
)
return """
SELECT
t.client_id,
t.enrollment_date,
t.analysis_window_start,
t.analysis_window_end,
{metrics}
FROM targets t
LEFT JOIN {from_expr} ds
ON ds.{client_id} = t.client_id
{date_clause}
GROUP BY
t.client_id,
t.enrollment_date,
t.analysis_window_start,
t.analysis_window_end""".format(
client_id=self.client_id_column,
from_expr=self.from_expr_for(from_expr_dataset),
metrics=",\n ".join(
f"{m.select_expr.format(experiment_name=experiment_name)} AS {m.name}"
for m in metric_list
),
date_clause=(
f"""
AND ds.{self.submission_date_column} BETWEEN '{time_limits.first_date_data_required}' AND '{time_limits.last_date_data_required}'
AND ds.{self.submission_date_column} BETWEEN
DATE_ADD(t.enrollment_date, interval t.analysis_window_start day) AND
DATE_ADD(t.enrollment_date, interval t.analysis_window_end day)""" # noqa: E501
if not continuous_enrollment
else f"""AND ds.{self.submission_date_column} BETWEEN
t.enrollment_date AND
DATE_ADD(t.enrollment_date, interval {analysis_length} day)
"""
),
)
def get_sanity_metrics(self, experiment_slug: str) -> list[Metric]:
if self.experiments_column_type is None:
return []
elif self.experiments_column_type == "simple":
return [
Metric(
name=self.name + "_has_contradictory_branch",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
) != e.branch"""
),
),
Metric(
name=self.name + "_has_non_enrolled_data",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
) IS NULL"""
),
),
]
elif self.experiments_column_type == "native":
return [
Metric(
name=self.name + "_has_contradictory_branch",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
).branch != e.branch"""
),
),
Metric(
name=self.name + "_has_non_enrolled_data",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.experiments, '{experiment_slug}'
).branch IS NULL"""
),
),
]
elif self.experiments_column_type == "glean":
return [
Metric(
name=self.name + "_has_contradictory_branch",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.ping_info.experiments, '{experiment_slug}'
).branch != e.branch"""
),
),
Metric(
name=self.name + "_has_non_enrolled_data",
data_source=self,
select_expr=agg_any(
f"""`mozfun.map.get_key`(
ds.ping_info.experiments, '{experiment_slug}'
).branch IS NULL"""
),
),
]
elif self.experiments_column_type == "events_stream":
return [
Metric(
name=self.name + "_has_contradictory_branch",
data_source=self,
select_expr=agg_any(
f"""IF(
JSON_VALUE(ds.event_extra, '$.experiment') = '{experiment_slug}',
JSON_VALUE(ds.event_extra, '$.branch'),
NULL
) != e.branch """
),
),
Metric(
name=self.name + "_has_non_enrolled_data",
data_source=self,
select_expr=agg_any(
f"""IF(
JSON_VALUE(ds.event_extra, '$.experiment') = '{experiment_slug}',
JSON_VALUE(ds.event_extra, '$.branch'),
NULL
) IS NULL"""
),
),
]
else:
raise ValueError
[docs]
@classmethod
def from_mcp_data_source(
cls,
parser_data_source: ParserDataSource,
app_name: str | None = None,
) -> DataSource:
"""metric-config-parser DataSource objects do not have an `app_name`"""
return cls(
name=parser_data_source.name,
from_expr=parser_data_source.from_expression,
client_id_column=parser_data_source.client_id_column,
submission_date_column=parser_data_source.submission_date_column,
experiments_column_type=(
None
if parser_data_source.experiments_column_type == "none"
else parser_data_source.experiments_column_type
),
default_dataset=parser_data_source.default_dataset,
app_name=app_name,
group_id_column=parser_data_source.group_id_column,
glean_client_id_column=parser_data_source.glean_client_id_column,
legacy_client_id_column=parser_data_source.legacy_client_id_column,
)
[docs]
@attr.s(frozen=True, slots=True)
class Metric:
"""Represents an experiment metric.
Needs to be combined with an analysis window to be measurable!
Args:
name (str): A slug; uniquely identifies this metric in tables
data_source (DataSource): where to find the metric
select_expr (str): a SQL snippet representing a clause of a SELECT
expression describing how to compute the metric; must include an
aggregation function since it will be GROUPed BY the analysis unit
and branch
friendly_name (str): A human-readable dashboard title for this metric
description (str): A paragraph of Markdown-formatted text describing
what the metric measures, to be shown on dashboards
app_name: (str, optional): app_name used with metric-hub,
used for validation
"""
name = attr.ib(type=str, validator=attr.validators.instance_of(str))
data_source = attr.ib(
type=DataSource, validator=attr.validators.instance_of(DataSource)
)
select_expr = attr.ib(type=str, validator=attr.validators.instance_of(str))
friendly_name = attr.ib(type=str | None, default=None)
description = attr.ib(type=str | None, default=None)
bigger_is_better = attr.ib(type=bool, default=True)
app_name = attr.ib(type=str | None, default=None)
[docs]
def agg_sum(select_expr: str) -> str:
"""Return a SQL fragment for the sum over the data, with 0-filled nulls."""
return f"COALESCE(SUM({select_expr}), 0)"
[docs]
def agg_any(select_expr: str) -> str:
"""Return the logical OR, with FALSE-filled nulls."""
return f"COALESCE(LOGICAL_OR({select_expr}), FALSE)"
[docs]
def agg_histogram_mean(select_expr: str) -> str:
"""Produces an expression for the mean of an unparsed histogram."""
return f"""SAFE_DIVIDE(
SUM(CAST(JSON_EXTRACT_SCALAR({select_expr}, "$.sum") AS int64)),
SUM((SELECT SUM(value) FROM UNNEST(mozfun.hist.extract({select_expr}).values)))
)""" # noqa