"""
This ETL job computes the co-installation occurrence of white-listed
Firefox webextensions for a sample of the longitudinal telemetry dataset.
"""
import click
import datetime as dt
import json
import logging
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType
import mozetl.taar.taar_utils as taar_utils
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
OUTPUT_BUCKET = "telemetry-parquet"
OUTPUT_PREFIX = "taar/lite/"
OUTPUT_BASE_FILENAME = "guid_coinstallation"
AMO_DUMP_BUCKET = "telemetry-parquet"
AMO_DUMP_KEY = "telemetry-ml/addon_recommender/addons_database.json"
MAIN_SUMMARY_PATH = "s3://telemetry-parquet/main_summary/v4/"
ONE_WEEK_AGO = (dt.datetime.now() - dt.timedelta(days=7)).strftime("%Y%m%d")
[docs]def is_valid_addon(broadcast_amo_whitelist, guid, addon):
"""Filter individual addons out to exclude, system addons,
legacy addons, disabled addons, sideloaded addons.
"""
return not (
addon.is_system
or addon.app_disabled
or addon.type != "extension"
or addon.user_disabled
or addon.foreign_install
or
# make sure the amo_whitelist has been broadcast to worker nodes.
guid not in broadcast_amo_whitelist.value
or
# Make sure that the Pioneer addon is explicitly
# excluded
guid == "pioneer-opt-in@mozilla.org"
)
[docs]def get_addons_per_client(broadcast_amo_whitelist, users_df):
"""Extracts a DataFrame that contains one row
for each client along with the list of active add-on GUIDs.
"""
# Create an add-ons dataset un-nesting the add-on map from each
# user to a list of add-on GUIDs. Also filter undesired add-ons.
rdd_list = users_df.rdd.map(
lambda p: (
p["client_id"],
[
addon_data.addon_id
for addon_data in p["active_addons"]
if is_valid_addon(
broadcast_amo_whitelist, addon_data.addon_id, addon_data
)
],
)
)
filtered_rdd = rdd_list.filter(lambda p: len(p[1]) > 1)
df = filtered_rdd.toDF(["client_id", "addon_ids"])
return df
[docs]def get_initial_sample(spark):
"""Takes an initial sample from the longitudinal dataset
(randomly sampled from main summary). Coarse filtering on:
- number of installed addons (greater than 1)
- corrupt and generally wierd telemetry entries
- isolating release channel
- column selection
"""
# Could scale this up to grab more than what is in
# longitudinal and see how long it takes to run.
return (
spark.table("clients_daily")
.where("active_addons IS NOT null")
.where("size(active_addons) > 1")
.where("channel = 'release'")
.where("normalized_channel = 'release'")
.where("app_name = 'Firefox'")
.selectExpr("client_id", "active_addons")
)
[docs]def key_all(a):
"""
Return (for each Row) a two column set of Rows that contains each individual
installed addon (the key_addon) as the first column and an array of guids of
all *other* addons that were seen co-installed with the key_addon. Excluding
the key_addon from the second column to avoid inflated counts in later aggregation.
"""
return [(i, [b for b in a if b is not i]) for i in a]
[docs]def load_s3(result_df, date, prefix, bucket):
result_list = result_df.collect()
result_json = {}
for row in result_list:
key_addon = row.key_addon
coinstalls = row.coinstallation_counts
value_json = {}
for _id, n in coinstalls:
value_json[_id] = n
result_json[key_addon] = value_json
taar_utils.store_json_to_s3(
json.dumps(result_json, indent=2), OUTPUT_BASE_FILENAME, date, prefix, bucket
)
@click.command()
@click.option("--date", required=True)
@click.option("--bucket", default=OUTPUT_BUCKET)
@click.option("--prefix", default=OUTPUT_PREFIX)
def main(date, bucket, prefix):
spark = SparkSession.builder.appName("taar_lite=").enableHiveSupport().getOrCreate()
logging.info("Loading telemetry sample.")
longitudinal_addons = extract_telemetry(spark)
result_df = transform(longitudinal_addons)
load_s3(result_df, date, prefix, bucket)
spark.stop()