Source code for mozetl.taar.taar_locale

"""
Bug 1396549 - TAAR Top addons per locale dictionary
This notebook is adapted from a gist that computes the top N addons per
locale after filtering for good candidates (e.g. no unsigned, no disabled,
...) [1].

[1] https://gist.github.com/mlopatka/46dddac9d063589275f06b0443fcc69d

"""

import click
import json
import logging

from datetime import date, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pandas import DataFrame, IndexSlice
from numpy.random import laplace as rlaplace

from .taar_utils import store_json_to_s3
from .taar_utils import load_amo_curated_whitelist

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

LOCALE_FILE_NAME = "top10_dict"
# Set the max multiplicative DP difference in probabilities to
# exp(espilon) ~= 1.5.
EPSILON = 0.4


[docs]def get_client_addons(spark, start_date, end_date=None): """Returns a Spark DF listing add-ons by client_id and locale. Only Firefox release clients are considered. The query finds each client's most recent record in the `clients_daily` dataset over the given time period and returns its installed add-ons. System add-ons, disabled add-ons, and unsigned add-ons are filtered out. :param start_date: the earliest submission date to include (yyyymmdd) :param end_date, optional: the latest submission date to include (yyyymmdd) :return: a DF with columns `locale`, `client_id`, `addon` """ addons_query_template = """ WITH sample AS ( SELECT client_id, submission_date_s3, locale, active_addons FROM clients_daily WHERE app_name='Firefox' AND channel='release' AND submission_date_s3 >= '{start_date}' {end_date_filter} AND client_id IS NOT NULL ), sample_dedup AS ( SELECT client_id, locale, explode(active_addons) AS addon_info FROM ( SELECT *, -- retain the most recent active day for each client row_number() OVER ( PARTITION BY client_id ORDER BY submission_date_s3 DESC ) AS idx FROM sample ) WHERE idx = 1 ) SELECT locale, client_id, addon_info.addon_id as addon FROM sample_dedup WHERE addon_info.blocklisted = FALSE -- not blocklisted AND addon_info.type = 'extension' -- nice webextensions only AND addon_info.signed_state = 2 -- fully reviewed addons only AND addon_info.user_disabled = FALSE -- active addons only get counted AND addon_info.app_disabled = FALSE -- exclude compatibility disabled addons AND addon_info.is_system = FALSE -- exclude system addons AND locale <> 'null' AND addon_info.addon_id IS NOT NULL """ end_date_filter = ( "AND submission_date_s3 <= '{}'".format(end_date) if end_date else "" ) addons_query = addons_query_template.format( start_date=start_date, end_date_filter=end_date_filter ) return spark.sql(addons_query)
[docs]def limit_client_addons(spark, client_addons_df, addon_limits, whitelist): """Limit the number of add-ons associated with a single client ID. This is a part of the privacy protection mechanism applied to the raw data. For each client in the dataset, we retain a randomly selected subset of their add-ons which belong to the whitelist. The max number of add-ons may differ by locale. :param client_addons_df: a DF listing add-on IDs by client ID and locale, as generated by `get_client_addons()` :param addon_limits: a dict mapping locale strings to ints representing the max number of add-ons retained per client in that locale. Any locale not present in the dict is excluded from the final dataset. :param whitelist: a list of add-on IDs belonging to the AMO whitelist :return: a DF containing a subset of the rows of `client_addons_df` """ # Convert the dict of limits to a Spark DF that can be joined in. addon_limits_schema = StructType( [ StructField("locale", StringType(), False), StructField("client_max_addons", IntegerType(), False), ] ) addon_limits_df = spark.createDataFrame( addon_limits.items(), schema=addon_limits_schema ) # Inner join means that data for any locale not listed in the dict gets dropped. client_df = client_addons_df.join(addon_limits_df, on="locale", how="inner") # client_df now has columns (locale, client_id, addon, client_max_addons). # Restrict to whitelist add-ons. client_df = client_df.where(col("addon").isin(whitelist)) client_df.createOrReplaceTempView("client_addons") # Limit each client's add-ons to a random subset of max allowed size. # Add-ons are shuffled by generating an auxiliary column of independent Uniform(0,1) # random variables and sorting along this column within clients. # Shuffling only needs to be done if the user has more than the max allowed # add-ons (otherwise they will all be retained). return spark.sql( """ WITH addons AS ( -- add a convenience column listing the number of add-ons each client has SELECT *, COUNT(client_id) OVER (PARTITION BY client_id) AS num_client_addons FROM client_addons ), shuffle_ord AS ( -- add the auxiliary sorting column SELECT *, -- only need to shuffle if the client has too many add-ons CASE WHEN num_client_addons > client_max_addons THEN RAND() ELSE NULL END AS ord FROM addons ) SELECT client_id, locale, addon FROM ( SELECT *, row_number() OVER (PARTITION BY client_id ORDER BY ord) AS idx FROM shuffle_ord ) WHERE idx <= client_max_addons """ )
[docs]def compute_noisy_counts(locale_addon_counts, addon_limits, whitelist, eps=EPSILON): """Apply DP protections to the raw per-locale add-on frequency counts. Laplace noise is added to each of the counts. Additionally, each per-locale set of frequency counts is expanded to include every add-on in the whitelist, even if some were not observed in the raw data. This computation is done in local memory, rather than in Spark, to simplify working with random number generation. This relies on the assumption that the number of unique locales and whitelist add-ons each remain small (on the order of 100-1000). :param locale_addon_counts: a Pandas DF of per-locale add-on frequency counts, with columns `locale`, `addon`, `count` :param addon_limits: a dict mapping locale strings to ints representing the max number of add-ons retained per client in that locale. Any locale not present in the dict is excluded from the final dataset. :param whitelist: a list of add-on IDs belonging to the AMO whitelist :param eps: the DP epsilon parameter, representing the privacy budget :return: a DF with the same structure as `locale_addon_counts`. Counts may now be non-integer and negative. """ # First expand the frequency count table to include all whitelisted add-ons # in each locale. locale_wl_addons = DataFrame.from_records( [(loc, a) for loc in addon_limits.keys() for a in whitelist], columns=["locale", "addon"], ) raw_counts = locale_addon_counts.set_index(["locale", "addon"]) locale_wl = locale_wl_addons.set_index(["locale", "addon"]) # Left join means the result will have every add-on in the whitelist and # only the locales in the limits dict. expanded_counts = locale_wl.join(raw_counts, how="left").fillna(0) # Add the Laplace noise. # # For each add-on in the whitelist, in each locale, we take the observed # installation frequency count and add independent random noise. # Observed frequencies may be 0 if no profile had those add-ons installed. # # The random noise is Laplace-distributed with scale parameter $m/\epsilon$, # where epsilon is the DP privacy budget, and m is the max number of add-ons # reported per client in the current locale. # # Since the Laplace parametrization depends only on locale, we iterate over # locales and add a numpy array of independent simulated Laplace random # values to the series of add-on frequency counts. # # Since the Laplace noise is continuous and real-valued, counts will no # longer be integer, and may become negative. for locale in expanded_counts.index.unique("locale"): # The scale parameter depends on the max number of add-ons per client, # which varies by locale. locale_laplace_param = float(addon_limits[locale]) / eps # Select counts for all add-ons in the current locale. locale_idx = IndexSlice[locale, :] locale_counts = expanded_counts.loc[locale_idx, "count"] locale_counts += rlaplace(scale=locale_laplace_param, size=len(locale_counts)) expanded_counts.loc[locale_idx, "count"] = locale_counts return expanded_counts.reset_index()
[docs]def get_addon_limits_by_locale(client_addons_df): """Determine the max number of add-ons per user in each locale. We allow for the possibility of basing this on a summary statistic computed from the original data, in which case the limits will remain private. :param client_addons_df: a DF listing add-on IDs by client ID and locale, as generated by `get_client_addons()` :return: a dict mapping locale strings to their add-on limits """ # For now, set add-on limits to 1 per client for each locale observed in # the dataset. all_locales = client_addons_df.select("locale").distinct().collect() return {r["locale"]: 1 for r in all_locales}
[docs]def get_protected_locale_addon_counts(spark, client_addons_df): """Compute DP-protected per-locale add-on frequency counts. Privacy-preserving counts are generated using the Laplace mechanism, restricting to add-ons in the AMO whitelist. :param client_addons_df: a DF listing add-on IDs by client ID and locale, as generated by `get_client_addons()` :return: a Pandas DF with columns `locale`, `addon`, `count` containing DP-protected counts for each whitelist add-on within each locale. Unlike the true counts, weights may be non-integer or negative. """ # Load external whitelist based on AMO data. amo_whitelist = load_amo_curated_whitelist() # Determine the max number of add-ons per user in each locale. locale_addon_limits = get_addon_limits_by_locale(client_addons_df) # Limit the number of add-ons per client. limited_addons_df = limit_client_addons( spark, client_addons_df, locale_addon_limits, amo_whitelist ) # Aggregate to a Pandas DF of locale/add-on frequency counts. locale_addon_counts = ( limited_addons_df.groupBy("locale", "addon").count().toPandas() ) # Add noise to the frequency counts. noisy_addon_counts = compute_noisy_counts( locale_addon_counts, locale_addon_limits, amo_whitelist, EPSILON ) return noisy_addon_counts
[docs]def get_top_addons_by_locale(addon_counts, num_addons): """Generate a dictionary of top-weighted add-ons by locale. Raw counts are normalized by converting to relative proportions. :param addon_counts: a Pandas DF of per-locale add-on counts, with columns `locale`, `addon`, `count`. :param num_addons: requested number of recommendations. :return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}` """ top_addons = {} addons_by_locale = addon_counts.set_index(["locale", "addon"]) for locale in addons_by_locale.index.unique("locale"): # For each locale, work with a Series of counts indexed by add-on. counts = addons_by_locale.loc[locale, "count"] # Shift counts so as to align the smallest count with 0 in each locale. # Since DP-protected counts can be negative, this is done to avoid # problems computing the sum. shifted_counts = counts - counts.min() rel_counts = shifted_counts / shifted_counts.sum() top_rel_counts = rel_counts.sort_values(ascending=False).head(num_addons) top_addons[locale] = list(top_rel_counts.iteritems()) return top_addons
[docs]def generate_dictionary(spark, num_addons, dataset_num_days): """Compile lists of top add-ons by locale from per-client add-on data. Runs a fresh data pull against `clients_daily`, computes DP-protected frequency counts, and generates a weighted list of top add-ons by locale. :param num_addons: number of add-on recommendations to report for each locale :param dataset_num_days: number of days the raw data should cover :return: a dictionary `{<locale>: [('GUID1', 0.4), ('GUID2', 0.25), ...]}` as returned by `get_top_addons_by_locale()` """ # Execute spark.SQL query to get fresh addons from clients_daily. # Add an extra day to the date range since the latest date in the dataset # tends to lag 1 day behind. earliest_date = date.today() - timedelta(days=dataset_num_days + 1) earliest_date_fmt = earliest_date.strftime("%Y%m%d") addon_df = get_client_addons(spark, earliest_date_fmt) # Compute DP-protected frequency-based counts for each add-on by locale. addon_df_protected = get_protected_locale_addon_counts(spark, addon_df) # Find the top add-ons in each locale based on these counts. top_addons = get_top_addons_by_locale(addon_df_protected, num_addons) return top_addons
@click.command() @click.option("--date", required=True) @click.option("--bucket", default="telemetry-private-analysis-2") @click.option("--prefix", default="taar/locale/") @click.option("--num_addons", default=10) @click.option("--client_data_num_days", default=28) def main(date, bucket, prefix, num_addons, client_data_num_days): spark = ( SparkSession.builder.appName("taar_locale").enableHiveSupport().getOrCreate() ) logger.info("Processing top N addons per locale") locale_dict = generate_dictionary(spark, num_addons, client_data_num_days) store_json_to_s3( json.dumps(locale_dict, indent=2), LOCALE_FILE_NAME, date, prefix, bucket ) spark.stop()