Source code for mozanalysis.experiment

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import annotations

import logging
from enum import Enum
from typing import TYPE_CHECKING, cast

import attr
from metric_config_parser import AnalysisUnit
from typing_extensions import assert_never

from mozanalysis import APPS
from mozanalysis.bq import BigQueryContext, sanitize_table_name_for_bq
from mozanalysis.config import ConfigLoader
from mozanalysis.metrics import AnalysisBasis, DataSource, Metric
from mozanalysis.segments import Segment, SegmentDataSource
from mozanalysis.types import IncompatibleAnalysisUnit
from mozanalysis.utils import add_days, date_sub, hash_ish

if TYPE_CHECKING:
    from pandas import DataFrame

    from mozanalysis.exposure import ExposureSignal

logger = logging.getLogger(__name__)


[docs] class EnrollmentsQueryType(str, Enum): CIRRUS = "cirrus" FENIX_FALLBACK = "fenix-fallback" NORMANDY = "normandy" GLEAN_EVENT = "glean-event"
[docs] @attr.s(frozen=True, slots=True) class Experiment: """Query experiment data; store experiment metadata. The methods here query data in a way compatible with the following principles, which are important for experiment analysis: * The population of clients in each branch must have the same properties, aside from the intervention itself and its consequences; i.e. there must be no underlying bias in the branch populations. * We must measure the same thing for each client, to minimize the variance associated with our measurement. So that our analyses follow these abstract principles, we follow these rules: * Start with a list of all clients who enrolled. * We can filter this list of clients only based on information known to us at or before the time that they enrolled, because later information might be causally connected to the intervention. * For any given metric, every client gets a non-null value; we don't implicitly ignore anyone, even if they churned and stopped sending data. * Typically if an enrolled client no longer qualifies for enrollment, we'll still want to include their data in the analysis, unless we're explicitly using stats methods that handle censored data. * We define a "analysis window" with respect to clients' enrollment dates. Each metric only uses data collected inside this analysis window. We can only analyze data for a client if we have data covering their entire analysis window. Example usage (in a colab notebook):: from google.colab import auth auth.authenticate_user() print('Authenticated') from mozanalysis.experiment import Experiment from mozanalysis.bq import BigQueryContext from mozanalysis.config import ConfigLoader active_hours = ConfigLoader.get_metric("active_hours", "firefox_desktop") uri_count = ConfigLoader.get_metric("uri_count", "firefox_desktop") bq_context = BigQueryContext( dataset_id='your-dataset-id', # e.g. mine's flawrence project_id='moz-fx-data-bq-data-science' # this is the default anyway ) experiment = Experiment( experiment_slug='pref-fingerprinting-protections-retention-study-release-70', start_date='2019-10-29', num_dates_enrollment=8 ) # Run the query and get the results as a DataFrame res = experiment.get_single_window_data( bq_context, [ active_hours, uri_count ], last_date_full_data='2019-12-01', analysis_start_days=0, analysis_length_days=7 ) Args: experiment_slug (str): Name of the study, used to identify the enrollment events specific to this study. start_date (str): e.g. '2019-01-01'. First date on which enrollment events were received. num_dates_enrollment (int, optional): Only include this many dates of enrollments. If ``None`` then use the maximum number of dates as determined by the metric's analysis window and ``last_date_full_data``. Typically ``7n+1``, e.g. ``8``. The factor '7' removes weekly seasonality, and the ``+1`` accounts for the fact that enrollment typically starts a few hours before UTC midnight. app_id (str, optional): For a Glean app, the name of the BigQuery dataset derived from its app ID, like `org_mozilla_firefox`. app_name (str, optional): The Glean app name, like `fenix`. analysis_unit (AnalysisUnit, optional): the "unit" of analysis, which defines an experimental unit. For example: `CLIENT` for mobile experiments or `GROUP` for desktop experiments. Is used as the join key when building queries and sub-unit level data is aggregated up to that level. Defaults to `AnalysisUnit.CLIENT` unless specified Attributes: experiment_slug (str): Name of the study, used to identify the enrollment events specific to this study. start_date (str): e.g. '2019-01-01'. First date on which enrollment events were received. num_dates_enrollment (int, optional): Only include this many days of enrollments. If ``None`` then use the maximum number of days as determined by the metric's analysis window and ``last_date_full_data``. Typically ``7n+1``, e.g. ``8``. The factor '7' removes weekly seasonality, and the ``+1`` accounts for the fact that enrollment typically starts a few hours before UTC midnight. """ experiment_slug = attr.ib(type=str, validator=attr.validators.instance_of(str)) start_date = attr.ib() num_dates_enrollment = attr.ib(default=None) app_id = attr.ib(default=None) app_name = attr.ib(default=None) analysis_unit = attr.ib( type=AnalysisUnit, default=AnalysisUnit.CLIENT, validator=attr.validators.instance_of(AnalysisUnit), )
[docs] def get_app_name(self): """ Determine the correct app name. If no explicit app name has been passed into Experiment, lookup app name from a pre-defined list. (this is deprecated) """ if self.app_name is None: logger.warning( "Experiment without `app_name` is deprecated. " + "Please specify an app_name explicitly" ) app_name = next(key for key, value in APPS.items() if self.app_id in value) if app_name is None: raise Exception(f"No app name for app_id {self.app_id}") return self.app_name
[docs] def get_single_window_data( self, bq_context: BigQueryContext, metric_list: list, last_date_full_data: str, analysis_start_days: int, analysis_length_days: int, enrollments_query_type: EnrollmentsQueryType = EnrollmentsQueryType.NORMANDY, custom_enrollments_query: str | None = None, custom_exposure_query: str | None = None, exposure_signal: ExposureSignal | None = None, segment_list=None, ) -> DataFrame: """Return a DataFrame containing per-client metric values. Also store them in a permanent table in BigQuery. The name of this table will be printed. Subsequent calls to this function will simply read the results from this table. Args: bq_context (BigQueryContext): BigQuery configuration and client. metric_list (list of mozanalysis.metric.Metric or str): The metrics to analyze. last_date_full_data (str): The most recent date for which we have complete data, e.g. '2019-03-22'. If you want to ignore all data collected after a certain date (e.g. when the experiment recipe was deactivated), then do that here. analysis_start_days (int): the start of the analysis window, measured in 'days since the client enrolled'. We ignore data collected outside this analysis window. analysis_length_days (int): the length of the analysis window, measured in days. enrollments_query_type (EnrollmentsQueryType): ('normandy', 'glean-event', 'cirrus', or 'fenix-fallback') Specifies the query type to use to get the experiment's enrollments, unless overridden by ``custom_enrollments_query``. custom_enrollments_query (str): A full SQL query that will generate the `enrollments` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_enrolled_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). WARNING: this query's results must be uniquely keyed by (analysis_id, branch), or else your results will be subtly wrong. custom_exposure_query (str): A full SQL query that will generate the `exposures` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_exposure_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). If not provided, the exposure will be determined based on `exposure_signal`, if provided, or Normandy and Nimbus exposure events. `custom_exposure_query` takes precedence over `exposure_signal`. exposure_signal (ExposureSignal): Optional signal definition of when a client has been exposed to the experiment. If not provided, the exposure will be determined based on Normandy exposure events for desktop and Nimbus exposure events for Fenix and iOS. segment_list (list of mozanalysis.segment.Segment or str): The user segments to study. Returns: A pandas DataFrame of experiment data. One row per ``client_id``. Some metadata columns, then one column per metric in ``metric_list``, and one column per sanity-check metric. Columns (not necessarily in order): * client_id (str): Not necessary for "happy path" analyses. * branch (str): The client's branch * other columns of ``enrollments``. * [metric 1]: The client's value for the first metric in ``metric_list``. * ... * [metric n]: The client's value for the nth (final) metric in ``metric_list``. * [sanity check 1]: The client's value for the first sanity check metric for the first data source that supports sanity checks. * ... * [sanity check n]: The client's value for the last sanity check metric for the last data source that supports sanity checks. This format - the schema plus there being one row per enrolled client, regardless of whether the client has data in ``data_source`` - was agreed upon by the DS team, and is the standard format for queried experimental data. """ time_limits = TimeLimits.for_single_analysis_window( self.start_date, last_date_full_data, analysis_start_days, analysis_length_days, self.num_dates_enrollment, ) enrollments_sql = self.build_enrollments_query( time_limits=time_limits, enrollments_query_type=enrollments_query_type, custom_enrollments_query=custom_enrollments_query, custom_exposure_query=custom_exposure_query, exposure_signal=exposure_signal, segment_list=segment_list, ) enrollments_table_name = sanitize_table_name_for_bq( "_".join( [ last_date_full_data, "enrollments", self.experiment_slug, hash_ish(enrollments_sql), ] ) ) bq_context.run_query(enrollments_sql, enrollments_table_name) metrics_sql = self.build_metrics_query( metric_list=metric_list, time_limits=time_limits, enrollments_table=bq_context.fully_qualify_table_name( enrollments_table_name ), ) full_res_table_name = sanitize_table_name_for_bq( "_".join([last_date_full_data, self.experiment_slug, hash_ish(metrics_sql)]) ) return bq_context.run_query(metrics_sql, full_res_table_name).to_dataframe()
[docs] def get_time_series_data( self, bq_context: BigQueryContext, metric_list: list, last_date_full_data: str, time_series_period: str = "weekly", enrollments_query_type: EnrollmentsQueryType = EnrollmentsQueryType.NORMANDY, custom_enrollments_query: str | None = None, custom_exposure_query: str | None = None, exposure_signal: ExposureSignal | None = None, segment_list=None, ) -> TimeSeriesResult: """Return a TimeSeriesResult with per-client metric values. Roughly equivalent to looping over :meth:`.get_single_window_data` with different analysis windows, and reorganising the results. Args: bq_context (BigQueryContext): BigQuery configuration and client. metric_list (list of mozanalysis.metric.Metric): The metrics to analyze. last_date_full_data (str): The most recent date for which we have complete data, e.g. '2019-03-22'. If you want to ignore all data collected after a certain date (e.g. when the experiment recipe was deactivated), then do that here. time_series_period ('daily' or 'weekly'): How long each analysis window should be. enrollments_query_type (EnrollmentsQueryType): ('normandy', 'glean-event', 'cirrus', or 'fenix-fallback') Specifies the query type to use to get the experiment's enrollments, unless overridden by ``custom_enrollments_query``. custom_enrollments_query (str): A full SQL query that will generate the `enrollments` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_enrolled_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). WARNING: this query's results must be uniquely keyed by (analysis_id, branch), or else your results will be subtly wrong. custom_exposure_query (str): A full SQL query that will generate the `exposures` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_exposure_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). If not provided, the exposure will be determined based on `exposure_signal`, if provided, or Normandy and Nimbus exposure events. `custom_exposure_query` takes precedence over `exposure_signal`. exposure_signal (ExposureSignal): Optional signal definition of when a client has been exposed to the experiment. If not provided, the exposure will be determined based on Normandy exposure events for desktop and Nimbus exposure events for Fenix and iOS. segment_list (list of mozanalysis.segment.Segment): The user segments to study. Returns: A :class:`mozanalysis.experiment.TimeSeriesResult` object, which may be used to obtain a pandas DataFrame of per-client metric data, for each analysis window. Each DataFrame is a pandas DataFrame in "the standard format": one row per client, some metadata columns, plus one column per metric and sanity-check metric. Its columns (not necessarily in order): * branch (str): The client's branch * other columns of ``enrollments``. * [metric 1]: The client's value for the first metric in ``metric_list``. * ... * [metric n]: The client's value for the nth (final) metric in ``metric_list``. * [sanity check 1]: The client's value for the first sanity check metric for the first data source that supports sanity checks. * ... * [sanity check n]: The client's value for the last sanity check metric for the last data source that supports sanity checks. """ time_limits = TimeLimits.for_ts( self.start_date, last_date_full_data, time_series_period, self.num_dates_enrollment, ) enrollments_sql = self.build_enrollments_query( time_limits=time_limits, enrollments_query_type=enrollments_query_type, custom_enrollments_query=custom_enrollments_query, custom_exposure_query=custom_exposure_query, exposure_signal=exposure_signal, segment_list=segment_list, ) enrollments_table_name = sanitize_table_name_for_bq( "_".join( [ last_date_full_data, "enrollments", self.experiment_slug, hash_ish(enrollments_sql), ] ) ) bq_context.run_query(enrollments_sql, enrollments_table_name) metrics_sql = self.build_metrics_query( metric_list=metric_list, time_limits=time_limits, enrollments_table=bq_context.fully_qualify_table_name( enrollments_table_name ), ) full_res_table_name = sanitize_table_name_for_bq( "_".join([last_date_full_data, self.experiment_slug, hash_ish(metrics_sql)]) ) bq_context.run_query(metrics_sql, full_res_table_name) return TimeSeriesResult( fully_qualified_table_name=bq_context.fully_qualify_table_name( full_res_table_name ), analysis_windows=time_limits.analysis_windows, )
[docs] def build_enrollments_query( self, time_limits: TimeLimits, enrollments_query_type: EnrollmentsQueryType = EnrollmentsQueryType.NORMANDY, custom_enrollments_query: str | None = None, custom_exposure_query: str | None = None, exposure_signal: ExposureSignal | None = None, segment_list=None, sample_size: int = 100, suppress_custom_query_validation: bool = False, ) -> str: """Return a SQL query for querying enrollment and exposure data. Args: time_limits (TimeLimits): An object describing the interval(s) to query enrollments_query_type (EnrollmentsQueryType): ('normandy', 'glean-event', 'cirrus', or 'fenix-fallback') Specifies the query type to use to get the experiment's enrollments, unless overridden by ``custom_enrollments_query``. custom_enrollments_query (str): A full SQL query that will generate the `enrollments` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_enrolled_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). WARNING: this query's results must be uniquely keyed by (analysis_id, branch), or else your results will be subtly wrong. custom_exposure_query (str): A full SQL query that will generate the `exposures` common table expression used in the main query. The query must produce the columns `analysis_id`, `branch`, `enrollment_date`, and `num_exposure_events`. `analysis_id` should be an alias for the `client_id` or `profile_group_id` (e.g., `SELECT client_id AS analysis_id`). exposure_signal (ExposureSignal): Optional signal definition of when a client has been exposed to the experiment segment_list (list of mozanalysis.segment.Segment or str): The user segments to study. sample_size (int): Optional integer percentage of clients, used for downsampling enrollments. Default 100. Returns: A string containing a BigQuery SQL expression. """ sample_size = sample_size or 100 # Validate custom_enrollments_query and custom_exposures_query if custom_enrollments_query and ( ( self.analysis_unit == AnalysisUnit.CLIENT and AnalysisUnit.PROFILE_GROUP.value in custom_enrollments_query ) or ( self.analysis_unit == AnalysisUnit.PROFILE_GROUP and AnalysisUnit.CLIENT.value in custom_enrollments_query ) ): wrong_value = ( AnalysisUnit.CLIENT.value if self.analysis_unit == AnalysisUnit.PROFILE_GROUP else AnalysisUnit.PROFILE_GROUP.value ) if not suppress_custom_query_validation: raise ValueError( f"custom_enrollments_query contains {wrong_value}, but experiment " + f"uses {self.analysis_unit.value}. This could indicate a problem " + "with the custom enrollments query." ) else: logger.warning( f"custom_enrollments_query contains {wrong_value}, but experiment " + f"uses {self.analysis_unit.value}. This could indicate a problem " + "with the custom enrollments query." ) if custom_exposure_query and ( ( self.analysis_unit == AnalysisUnit.CLIENT and AnalysisUnit.PROFILE_GROUP.value in custom_exposure_query ) or ( self.analysis_unit == AnalysisUnit.PROFILE_GROUP and AnalysisUnit.CLIENT.value in custom_exposure_query ) ): wrong_value = ( AnalysisUnit.CLIENT.value if self.analysis_unit == AnalysisUnit.PROFILE_GROUP else AnalysisUnit.PROFILE_GROUP.value ) if not suppress_custom_query_validation: raise ValueError( f"custom_exposure_query contains {wrong_value}, but experiment " + f"uses {self.analysis_unit.value}. This could indicate a problem " + "with the custom exposure query." ) else: logger.warning( f"custom_exposure_query contains {wrong_value}, but experiment " + f"uses {self.analysis_unit.value}. This could indicate a problem " + "with the custom exposure query." ) enrollments_query = custom_enrollments_query or self._build_enrollments_query( time_limits, enrollments_query_type, sample_size, ) if exposure_signal: exposure_query = custom_exposure_query or exposure_signal.build_query( time_limits, self.analysis_unit ) else: exposure_query = custom_exposure_query or self._build_exposure_query( time_limits, enrollments_query_type, ) segments_query = self._build_segments_query( segment_list, time_limits, ) return f""" WITH raw_enrollments AS ({enrollments_query}), segmented_enrollments AS ({segments_query}), exposures AS ({exposure_query}) SELECT se.*, e.* EXCEPT (analysis_id, branch) FROM segmented_enrollments se LEFT JOIN exposures e USING (analysis_id, branch) """
[docs] def build_metrics_query( self, metric_list: list, time_limits: TimeLimits, enrollments_table: str, analysis_basis=AnalysisBasis.ENROLLMENTS, exposure_signal: ExposureSignal | None = None, ) -> str: """Return a SQL query for querying metric data. For interactive use, prefer :meth:`.get_time_series_data` or :meth:`.get_single_window_data`, according to your use case, which will run the query for you and return a materialized dataframe. The optional ``exposure_signal`` parameter allows to check if clients have received the exposure signal during enrollment or after. When using the exposures analysis basis, metrics will be computed for these clients. Args: metric_list (list of mozanalysis.metric.Metric or str): The metrics to analyze. time_limits (TimeLimits): An object describing the interval(s) to query enrollments_table (str): The name of the enrollments table basis (AnalysisBasis): Use exposures as basis for calculating metrics if True, otherwise use enrollments. exposure_signal (Optional[ExposureSignal]): Optional exposure signal parameter that will be used for computing metrics for certain analysis bases (such as exposures). Returns: A string containing a BigQuery SQL expression. Building this query is the main goal of this module. """ analysis_windows_query = self._build_analysis_windows_query( time_limits.analysis_windows ) metrics_columns, metrics_joins = self._build_metrics_query_bits( metric_list, time_limits, analysis_basis, exposure_signal ) if exposure_signal and analysis_basis != AnalysisBasis.ENROLLMENTS: exposure_query = f""" SELECT * FROM ( {exposure_signal.build_query(time_limits, self.analysis_unit)} ) WHERE num_exposure_events > 0 """ else: exposure_query = """ SELECT * FROM raw_enrollments e """ return """ WITH analysis_windows AS ( {analysis_windows_query} ), raw_enrollments AS ( -- needed by "exposures" sub query SELECT e.*, aw.* FROM `{enrollments_table}` e CROSS JOIN analysis_windows aw ), exposures AS ({exposure_query}), enrollments AS ( SELECT e.* EXCEPT (exposure_date, num_exposure_events), x.exposure_date, x.num_exposure_events FROM exposures x RIGHT JOIN raw_enrollments e USING (analysis_id, branch) ) SELECT enrollments.*, {metrics_columns} FROM enrollments {metrics_joins} """.format( analysis_windows_query=analysis_windows_query, exposure_query=exposure_query, metrics_columns=",\n ".join(metrics_columns), metrics_joins="\n".join(metrics_joins), enrollments_table=enrollments_table, )
@staticmethod def _build_analysis_windows_query(analysis_windows) -> str: """Return SQL to construct a table of analysis windows. To query a time series, we construct a table of analysis windows and cross join it with the enrollments table to get one row per pair of client and analysis window. This method writes the SQL to define the analysis window table. """ return "\n UNION ALL\n ".join( f"(SELECT {aw.start} AS analysis_window_start, {aw.end} AS analysis_window_end)" # noqa:E501 for aw in analysis_windows ) def _build_enrollments_query( self, time_limits: TimeLimits, enrollments_query_type: EnrollmentsQueryType, sample_size: int = 100, ) -> str: """Return SQL to query a list of enrollments and their branches""" if enrollments_query_type == EnrollmentsQueryType.NORMANDY: return self._build_enrollments_query_normandy( time_limits, sample_size, ) elif enrollments_query_type == EnrollmentsQueryType.GLEAN_EVENT: if not self.app_id: raise ValueError( "App ID must be defined for building Glean enrollments query" ) if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Glean enrollments currently only support client_id analysis units" ) return self._build_enrollments_query_glean_event( time_limits, self.app_id, sample_size ) elif enrollments_query_type == EnrollmentsQueryType.FENIX_FALLBACK: if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Fenix fallback enrollments currently only support client_id analysis units" # noqa: E501 ) return self._build_enrollments_query_fenix_baseline( time_limits, sample_size ) elif enrollments_query_type == EnrollmentsQueryType.CIRRUS: if not self.app_id: raise ValueError( "App ID must be defined for building Cirrus enrollments query" ) if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Cirrus enrollments currently only support client_id analysis units" ) return self._build_enrollments_query_cirrus(time_limits, self.app_id) else: assert_never(enrollments_query_type) def _build_exposure_query( self, time_limits: TimeLimits, exposure_query_type: EnrollmentsQueryType, ) -> str: """Return SQL to query a list of exposures and their branches""" if exposure_query_type == EnrollmentsQueryType.NORMANDY: return self._build_exposure_query_normandy(time_limits) elif exposure_query_type == EnrollmentsQueryType.GLEAN_EVENT: if not self.app_id: raise ValueError( "App ID must be defined for building Glean exposures query" ) if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Glean exposures currently only support client_id analysis units" ) return self._build_exposure_query_glean_event(time_limits, self.app_id) elif exposure_query_type == EnrollmentsQueryType.FENIX_FALLBACK: if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Fenix fallback exposures currently only support client_id analysis units" # noqa: E501 ) return self._build_exposure_query_glean_event( time_limits, "org_mozilla_firefox" ) elif exposure_query_type == EnrollmentsQueryType.CIRRUS: if not self.app_id: raise ValueError( "App ID must be defined for building Cirrus exposures query" ) if not self.analysis_unit == AnalysisUnit.CLIENT: raise IncompatibleAnalysisUnit( "Cirrus exposures currently only support client_id analysis units" ) return self._build_exposure_query_glean_event( time_limits, self.app_id, client_id_field='mozfun.map.get_key(event.extra, "user_id")', event_category="cirrus_events", ) else: assert_never(exposure_query_type) def _build_enrollments_query_normandy( self, time_limits: TimeLimits, sample_size: int = 100, ) -> str: """Return SQL to query enrollments for a normandy experiment""" return f""" SELECT e.{self.analysis_unit.value} AS analysis_id, `mozfun.map.get_key`(e.event_map_values, 'branch') AS branch, MIN(e.submission_date) AS enrollment_date, COUNT(e.submission_date) AS num_enrollment_events FROM `moz-fx-data-shared-prod.telemetry.events` e WHERE e.event_category = 'normandy' AND e.event_method = 'enroll' AND e.submission_date BETWEEN '{time_limits.first_enrollment_date}' AND '{time_limits.last_enrollment_date}' AND e.event_string_value = '{self.experiment_slug}' AND e.sample_id < {sample_size} GROUP BY e.{self.analysis_unit.value}, branch """ # noqa:E501 def _build_enrollments_query_fenix_baseline( self, time_limits: TimeLimits, sample_size: int = 100 ) -> str: """Return SQL to query enrollments for a Fenix no-event experiment If enrollment events are available for this experiment, then you can take a better approach than this method. But in the absence of enrollment events (e.g. in a Mako-based experiment, which does not send enrollment events), you need to fall back to using ``ping_info.experiments`` to get a list of who is in what branch and when they enrolled. """ # Try to ignore users who enrolled early - but only consider a # 7 day window return """ SELECT b.client_info.client_id AS analysis_id, mozfun.map.get_key( b.ping_info.experiments, '{experiment_slug}' ).branch, DATE(MIN(b.submission_timestamp)) AS enrollment_date, COUNT(b.submission_date) AS num_enrollment_events FROM `moz-fx-data-shared-prod.{dataset}.baseline` b WHERE b.client_info.client_id IS NOT NULL AND DATE(b.submission_timestamp) BETWEEN DATE_SUB('{first_enrollment_date}', INTERVAL 7 DAY) AND '{last_enrollment_date}' AND mozfun.map.get_key( b.ping_info.experiments, '{experiment_slug}' ).branch IS NOT NULL AND b.sample_id < {sample_size} GROUP BY b.client_info.client_id, branch HAVING enrollment_date >= '{first_enrollment_date}' """.format( experiment_slug=self.experiment_slug, first_enrollment_date=time_limits.first_enrollment_date, last_enrollment_date=time_limits.last_enrollment_date, dataset=self.app_id or "org_mozilla_firefox", sample_size=sample_size, ) def _build_enrollments_query_glean_event( self, time_limits: TimeLimits, dataset: str, sample_size: int = 100 ) -> str: """Return SQL to query enrollments for a Glean no-event experiment If enrollment events are available for this experiment, then you can take a better approach than this method. But in the absence of enrollment events (e.g. in a Mako-based experiment, which does not send enrollment events), you need to fall back to using ``ping_info.experiments`` to get a list of who is in what branch and when they enrolled. """ return f""" SELECT events.client_info.client_id AS analysis_id, mozfun.map.get_key( e.extra, 'branch' ) AS branch, DATE(MIN(events.submission_timestamp)) AS enrollment_date, COUNT(events.submission_timestamp) AS num_enrollment_events FROM `moz-fx-data-shared-prod.{self.app_id or dataset}.events` events, UNNEST(events.events) AS e WHERE events.client_info.client_id IS NOT NULL AND DATE(events.submission_timestamp) BETWEEN '{time_limits.first_enrollment_date}' AND '{time_limits.last_enrollment_date}' AND e.category = "nimbus_events" AND mozfun.map.get_key(e.extra, "experiment") = '{self.experiment_slug}' AND e.name = 'enrollment' AND sample_id < {sample_size} GROUP BY events.client_info.client_id, branch """ # noqa:E501 def _build_enrollments_query_cirrus( self, time_limits: TimeLimits, dataset: str ) -> str: """Return SQL to query enrollments for a Cirrus experiment (uses Glean) If enrollment events are available for this experiment, then you can take a better approach than this method. But in the absence of enrollment events (e.g. in a Mako-based experiment, which does not send enrollment events), you need to fall back to using ``ping_info.experiments`` to get a list of who is in what branch and when they enrolled. """ return f""" SELECT mozfun.map.get_key(e.extra, "user_id") AS analysis_id, mozfun.map.get_key( e.extra, 'branch' ) AS branch, DATE(MIN(events.submission_timestamp)) AS enrollment_date, COUNT(events.submission_timestamp) AS num_enrollment_events FROM `moz-fx-data-shared-prod.{self.app_id or dataset}.enrollment` events, UNNEST(events.events) AS e WHERE mozfun.map.get_key(e.extra, "user_id") IS NOT NULL AND DATE(events.submission_timestamp) BETWEEN '{time_limits.first_enrollment_date}' AND '{time_limits.last_enrollment_date}' AND e.category = "cirrus_events" AND mozfun.map.get_key(e.extra, "experiment") = '{self.experiment_slug}' AND e.name = 'enrollment' AND client_info.app_channel = 'production' GROUP BY mozfun.map.get_key(e.extra, "user_id"), branch """ # noqa:E501 def _build_exposure_query_normandy(self, time_limits: TimeLimits) -> str: """Return SQL to query exposures for a normandy experiment""" return f""" SELECT e.analysis_id, e.branch, min(e.submission_date) AS exposure_date, COUNT(e.submission_date) AS num_exposure_events FROM raw_enrollments re LEFT JOIN ( SELECT {self.analysis_unit.value} AS analysis_id, `mozfun.map.get_key`(event_map_values, 'branchSlug') AS branch, submission_date FROM `moz-fx-data-shared-prod.telemetry.events` WHERE event_category = 'normandy' AND (event_method = 'exposure' OR event_method = 'expose') AND submission_date BETWEEN '{time_limits.first_enrollment_date}' AND '{time_limits.last_enrollment_date}' AND event_string_value = '{self.experiment_slug}' ) e ON re.analysis_id = e.analysis_id AND re.branch = e.branch AND e.submission_date >= re.enrollment_date GROUP BY e.analysis_id, e.branch """ # noqa: E501 def _build_exposure_query_glean_event( self, time_limits: TimeLimits, dataset: str, client_id_field: str = "client_info.client_id", event_category: str = "nimbus_events", ) -> str: """Return SQL to query exposures for a Glean no-event experiment""" return f""" SELECT exposures.analysis_id AS analysis_id, exposures.branch, DATE(MIN(exposures.submission_date)) AS exposure_date, COUNT(exposures.submission_date) AS num_exposure_events FROM raw_enrollments re LEFT JOIN ( SELECT {client_id_field} AS analysis_id, mozfun.map.get_key(event.extra, 'branch') AS branch, DATE(events.submission_timestamp) AS submission_date FROM `moz-fx-data-shared-prod.{self.app_id or dataset}.events` events, UNNEST(events.events) AS event WHERE DATE(events.submission_timestamp) BETWEEN '{time_limits.first_enrollment_date}' AND '{time_limits.last_enrollment_date}' AND event.category = '{event_category}' AND mozfun.map.get_key( event.extra, "experiment") = '{self.experiment_slug}' AND (event.name = 'expose' OR event.name = 'exposure') ) exposures ON re.analysis_id = exposures.analysis_id AND re.branch = exposures.branch AND exposures.submission_date >= re.enrollment_date GROUP BY analysis_id, branch """ # noqa: E501 def _build_metrics_query_bits( self, metric_list: list[Metric | str], time_limits: TimeLimits, analysis_basis=AnalysisBasis.ENROLLMENTS, exposure_signal: ExposureSignal | None = None, ) -> tuple[list[str], list[str]]: """Return lists of SQL fragments corresponding to metrics.""" metrics: list[Metric] = [] for metric in metric_list: if isinstance(metric, str): metrics.append(ConfigLoader.get_metric(metric, self.get_app_name())) else: metrics.append(metric) ds_metrics = self._partition_metrics_by_data_source(metrics) ds_metrics = cast(dict[DataSource, list[Metric]], ds_metrics) ds_metrics = { ds: metrics + ds.get_sanity_metrics(self.experiment_slug) for ds, metrics in ds_metrics.items() } metrics_columns = [] metrics_joins = [] for i, ds in enumerate(ds_metrics.keys()): query_for_metrics = ds.build_query( ds_metrics[ds], time_limits, self.experiment_slug, self.app_id, analysis_basis, self.analysis_unit, exposure_signal, ) metrics_joins.append( f""" LEFT JOIN ( {query_for_metrics} ) ds_{i} USING (analysis_id, branch, analysis_window_start, analysis_window_end) """ # noqa: E501 ) for m in ds_metrics[ds]: metrics_columns.append(f"ds_{i}.{m.name}") return metrics_columns, metrics_joins def _partition_segments_by_data_source( self, segment_list: list[Segment] ) -> dict[SegmentDataSource, list[Segment]]: """Return a dict mapping segment data sources to segment lists.""" data_sources = {s.data_source for s in segment_list} return { ds: [s for s in segment_list if s.data_source == ds] for ds in data_sources } def _partition_metrics_by_data_source( self, metric_list: list[Metric] ) -> dict[DataSource, list[Metric]]: """Return a dict mapping data sources to metric/segment lists.""" data_sources = {m.data_source for m in metric_list} return { ds: [m for m in metric_list if m.data_source == ds] for ds in data_sources } def _build_segments_query( self, segment_list: list[Segment], time_limits: TimeLimits, ) -> str: """Build a query adding segment columns to the enrollments view. The query takes a ``raw_enrollments`` view, and defines a new view by adding one non-NULL boolean column per segment. It does not otherwise tamper with the ``raw_enrollments`` view. """ # Do similar things to what we do for metrics, but in a less # ostentatious place, since people are likely to come to the # source code asking how metrics work, but less likely to # arrive with "how segments work" as their first question. segments_columns, segments_joins = self._build_segments_query_bits( cast(list[Segment | str], segment_list) or [], time_limits ) return """ SELECT raw_enrollments.*, {segments_columns} FROM raw_enrollments {segments_joins} """.format( segments_columns=",\n ".join(segments_columns), segments_joins="\n".join(segments_joins), ) def _build_segments_query_bits( self, segment_list: list[Segment | str], time_limits: TimeLimits, ) -> tuple[list[str], list[str]]: """Return lists of SQL fragments corresponding to segments.""" # resolve segment slugs segments: list[Segment] = [] for segment in segment_list: if isinstance(segment, str): segments.append(ConfigLoader.get_segment(segment, self.get_app_name())) else: segments.append(segment) ds_segments = self._partition_segments_by_data_source(segments) segments_columns = [] segments_joins = [] for i, ds in enumerate(ds_segments.keys()): query_for_segments = ds.build_query( ds_segments[ds], time_limits, self.experiment_slug, self.app_id, self.analysis_unit, ) segments_joins.append( f""" LEFT JOIN ( {query_for_segments} ) ds_{i} USING (analysis_id, branch) """ ) for m in ds_segments[ds]: segments_columns.append(f"ds_{i}.{m.name}") return segments_columns, segments_joins
[docs] @attr.s(frozen=True, slots=True) class TimeLimits: """Expresses time limits for different kinds of analysis windows. Instantiated and used by the :class:`Experiment` class; end users should not need to interact with it. Do not directly instantiate: use the constructors provided. There are several time constraints needed to specify a valid query for experiment data: * When did enrollments start? * When did enrollments stop? * How long after enrollment does the analysis window start? * How long is the analysis window? Even if these four quantities are specified directly, it is important to check that they are consistent with the available data - i.e. that we have data for the entire analysis window for every enrollment. Furthermore, there are some extra quantities that are useful for writing efficient queries: * What is the first date for which we need data from our data source? * What is the last date for which we need data from our data source? Instances of this class store all these quantities and do validation to make sure that they're consistent. The "store lots of overlapping state and validate" strategy was chosen over "store minimal state and compute on the fly" because different state is supplied in different contexts. """ first_enrollment_date = attr.ib(type=str) last_enrollment_date = attr.ib(type=str) first_date_data_required = attr.ib(type=str) last_date_data_required = attr.ib(type=str) analysis_windows = attr.ib() # type: tuple[AnalysisWindow,...]
[docs] @classmethod def for_single_analysis_window( cls, first_enrollment_date: str, last_date_full_data: str, analysis_start_days: int, analysis_length_dates: int, num_dates_enrollment: int | None = None, ) -> TimeLimits: """Return a ``TimeLimits`` instance with the following parameters Args: first_enrollment_date (str): First date on which enrollment events were received; the start date of the experiment. last_date_full_data (str): The most recent date for which we have complete data, e.g. '2019-03-22'. If you want to ignore all data collected after a certain date (e.g. when the experiment recipe was deactivated), then do that here. analysis_start_days (int): the start of the analysis window, measured in 'days since the client enrolled'. We ignore data collected outside this analysis window. analysis_length_days (int): the length of the analysis window, measured in days. num_dates_enrollment (int, optional): Only include this many days of enrollments. If ``None`` then use the maximum number of days as determined by the metric's analysis window and ``last_date_full_data``. Typically ``7n+1``, e.g. ``8``. The factor ``7`` removes weekly seasonality, and the ``+1`` accounts for the fact that enrollment typically starts a few hours before UTC midnight. """ analysis_window = AnalysisWindow( analysis_start_days, analysis_start_days + analysis_length_dates - 1 ) if num_dates_enrollment is None: last_enrollment_date = add_days(last_date_full_data, -analysis_window.end) else: last_enrollment_date = add_days( first_enrollment_date, num_dates_enrollment - 1 ) first_date_data_required = add_days( first_enrollment_date, analysis_window.start ) last_date_data_required = add_days(last_enrollment_date, analysis_window.end) if last_date_data_required > last_date_full_data: raise ValueError( f"You said you wanted {num_dates_enrollment} dates of enrollment, " + f"and need data from the {analysis_window.end}th day after enrollment. " # noqa: E501 + f"For that, you need to wait until we have data for {last_date_data_required}." # noqa:E501 ) tl = cls( first_enrollment_date=first_enrollment_date, last_enrollment_date=last_enrollment_date, first_date_data_required=first_date_data_required, last_date_data_required=last_date_data_required, analysis_windows=(analysis_window,), ) return tl
[docs] @classmethod def for_ts( cls, first_enrollment_date: str, last_date_full_data: str, time_series_period: str, num_dates_enrollment: int, ) -> TimeLimits: """Return a ``TimeLimits`` instance for a time series. Args: first_enrollment_date (str): First date on which enrollment events were received; the start date of the experiment. last_date_full_data (str): The most recent date for which we have complete data, e.g. '2019-03-22'. If you want to ignore all data collected after a certain date (e.g. when the experiment recipe was deactivated), then do that here. time_series_period: 'daily' or 'weekly'. num_dates_enrollment (int): Take this many days of client enrollments. This is a mandatory argument because it determines the number of points in the time series. """ period_duration = {"daily": 1, "weekly": 7, "28_day": 28} if time_series_period not in period_duration: raise ValueError(f"Unsupported time series period {time_series_period}") if num_dates_enrollment <= 0: raise ValueError("Number of enrollment dates must be a positive number") analysis_window_length_dates = period_duration[time_series_period] last_enrollment_date = add_days(first_enrollment_date, num_dates_enrollment - 1) max_dates_of_data = date_sub(last_date_full_data, last_enrollment_date) + 1 num_periods = max_dates_of_data // analysis_window_length_dates if num_periods <= 0: raise ValueError("Insufficient data") analysis_windows = tuple( [ AnalysisWindow( i * analysis_window_length_dates, (i + 1) * analysis_window_length_dates - 1, ) for i in range(num_periods) ] ) last_date_data_required = add_days( last_enrollment_date, analysis_windows[-1].end ) return cls( first_enrollment_date=first_enrollment_date, last_enrollment_date=last_enrollment_date, first_date_data_required=first_enrollment_date, last_date_data_required=last_date_data_required, analysis_windows=analysis_windows, )
@first_enrollment_date.validator def _validate_first_enrollment_date(self, attribute, value): assert self.first_enrollment_date <= self.last_enrollment_date, ( f"first enrollment date of {self.first_enrollment_date} ", f"was not on or before last enrollment date of {self.last_enrollment_date}", ) @first_date_data_required.validator def _validate_first_date_data_required(self, attribute, value): assert self.first_date_data_required <= self.last_date_data_required, ( f"first date data required of {self.first_date_data_required} was not on ", f"or before last date data required of {self.last_date_data_required}", ) min_analysis_window_start = min(aw.start for aw in self.analysis_windows) observation_period_start = add_days( self.first_enrollment_date, min_analysis_window_start ) assert self.first_date_data_required == observation_period_start, ( f"first date data required of {self.first_date_data_required} ", f"did not match computed start of observation {observation_period_start}", ) @last_date_data_required.validator def _validate_last_date_data_required(self, attribute, value): max_analysis_window_end = max(aw.end for aw in self.analysis_windows) observation_period_end = add_days( self.last_enrollment_date, max_analysis_window_end ) assert self.last_date_data_required == observation_period_end, ( f"last date data required of {self.last_date_data_required} ", f"did not match computed end of observation {observation_period_end}", )
[docs] @attr.s(frozen=True, slots=True) class AnalysisWindow: """Represents the range of days in which to measure a metric. The range is measured in "days relative enrollment", and is inclusive. For example, ``AnalysisWindow(0, 6)`` is the first week after enrollment and `AnalysisWindow(-8,-1)` is the week before enrollment Args: start (int): First day of the analysis window, in days relative to enrollment start. 0 indicates the date of enrollment. Positive numbers are after enrollment, negative are before. Must be the same sign as `end` (zero counts as positive) end (int): Final day of the analysis window, in days relative to enrollment start. 0 indicates the date of enrollment. Positive numbers are after enrollment, negative are before. Must be the same sign as `start` (zero counts as positive). """ start = attr.ib(type=int) end = attr.ib(type=int) @start.validator def _validate_start(self, attribute, value): assert (value >= 0 and self.end >= 0) or (value < 0 and self.end < 0) @end.validator def _validate_end(self, attribute, value): assert value >= self.start assert (value >= 0 and self.start >= 0) or (value < 0 and self.start < 0)
[docs] @attr.s(frozen=True, slots=True) class TimeSeriesResult: """Result from a time series query. For each analysis window, this object lets us get a dataframe in "the standard format" (one row per client). Example usage:: result_dict = dict(time_series_result.items(bq_context)) window_0 = result_dict[0] ``window_0`` would then be a pandas DataFrame of results for the analysis window starting at day 0. ``result_dict`` would be a dictionary of all such DataFrames, keyed by the start days of their analysis windows. Or, to load only one analysis window into RAM:: window_0 = time_series_result.get(bq_context, 0) """ fully_qualified_table_name = attr.ib(type=str) analysis_windows = attr.ib(type=tuple[AnalysisWindow, ...]) analysis_unit = attr.ib(type=AnalysisUnit, default=AnalysisUnit.CLIENT)
[docs] def get(self, bq_context: BigQueryContext, analysis_window) -> DataFrame: """Get the DataFrame for a specific analysis window. N.B. this makes a BigQuery query each time it is run; caching results is your responsibility. Args: bq_context (BigQueryContext) analysis_window (AnalysisWindow or int): The analysis window, or its start day as an int. """ if isinstance(analysis_window, int): try: analysis_window = next( aw for aw in self.analysis_windows if aw.start == analysis_window ) except StopIteration as err: raise KeyError( f"AnalysisWindow not found with start of {analysis_window}" ) from err return bq_context.run_query( self._build_analysis_window_subset_query(analysis_window) ).to_dataframe()
[docs] def get_full_data(self, bq_context: BigQueryContext) -> DataFrame: """Get the full DataFrame from TimeSeriesResult. This DataFrame has a row for each client for each period of the time series and may be very large. A warning will print the size of data to be downloaded. Args: bq_context (BigQueryContext) """ size = self._get_table_size(bq_context) print(f"Downloading {self.fully_qualified_table_name} ({size} GB)") table = bq_context.client.get_table(self.fully_qualified_table_name) return bq_context.client.list_rows(table).to_dataframe()
[docs] def get_aggregated_data( self, bq_context: BigQueryContext, metric_list: list, aggregate_function: str = "AVG", ) -> tuple[DataFrame, int]: """Results from a time series query, aggregated over analysis windows by a SQL aggregate function. This DataFrame has a row for each analysis window, with a column for each metric in the supplied metric_list. Args: bq_context (BigQueryContext) metric_list (list of mozanalysis.metrics.Metric) aggregate_fuction (str) """ return ( bq_context.run_query( self._build_aggregated_data_query(metric_list, aggregate_function) ).to_dataframe(), bq_context.run_query(self._table_sample_size_query()) .to_dataframe()["population_size"] .values[0], )
def keys(self): return [aw.start for aw in self.analysis_windows] def items(self, bq_context): for aw in self.analysis_windows: yield (aw.start, self.get(bq_context, aw)) def _get_table_size(self, bq_context: BigQueryContext) -> float: """ Get table size in memory for table being requested by `get_full_data`. """ table_info = self.fully_qualified_table_name.split(".") query = f""" SELECT SUM(size_bytes)/pow(10,9) AS size FROM `{table_info[0]}.{table_info[1]}`.__TABLES__ WHERE table_id = '{table_info[2]}' """ size = bq_context.run_query(query).to_dataframe() return size["size"].iloc[0].round(2) def _build_analysis_window_subset_query( self, analysis_window: AnalysisWindow ) -> str: """Return SQL for partitioning time series results. When we query data for a time series, we query it for all points of the time series, and we store this in a table. This method returns SQL to query this table to obtain results in "the standard format" for a single analysis window. """ except_clause = ( f"{self.analysis_unit.value}, analysis_window_start, analysis_window_end" ) return f""" SELECT * EXCEPT ({except_clause}) FROM {self.fully_qualified_table_name} WHERE analysis_window_start = {analysis_window.start} AND analysis_window_end = {analysis_window.end} """ def _build_aggregated_data_query( self, metric_list: list[Metric], aggregate_function: str ) -> str: return """ SELECT analysis_window_start, analysis_window_end, {agg_metrics} FROM {full_table_name} GROUP BY analysis_window_start, analysis_window_end ORDER BY analysis_window_start """.format( agg_metrics=",\n ".join( f"{aggregate_function}({m.name}) AS {m.name}" for m in metric_list ), full_table_name=self.fully_qualified_table_name, ) def _table_sample_size_query( self, client_id_column: str = AnalysisUnit.CLIENT.value ) -> str: return f""" SELECT COUNT(*) as population_size FROM (SELECT DISTINCT {client_id_column} FROM {self.fully_qualified_table_name}) """ @analysis_windows.validator def _check_analysis_windows(self, attribute, value): if len(value) != len({aw.start for aw in value}): raise ValueError("Each analysis window must start on a different day")