Source code for mozetl.addon_aggregates.addon_aggregates

"""ETL code for the addon_aggregates dataset"""

from pyspark.sql import SparkSession
import pyspark.sql.functions as fun
import click

MS_FIELDS = [
    "client_id",
    "normalized_channel",
    "app_version",
    "locale",
    "sample_id",
    "profile_creation_date",
]

ADDON_FIELDS = [
    "addons.addon_id",
    "addons.foreign_install",
    "addons.is_system",
    "addons.is_web_extension",
    "addons.install_day",
]


[docs]def get_dest(output_bucket, output_prefix, output_version, date=None, sample_id=None): """ Stitches together an s3 destination. :param output_bucket: s3 output_bucket :param output_prefix: s3 output_prefix (within output_bucket) :param output_version: dataset output_version :retrn str -> s3://output_bucket/output_prefix/output_version/submissin_date_s3=[date]/sample_id=[sid] """ suffix = "" if date is not None: suffix += "/submission_date_s3={}".format(date) if sample_id is not None: suffix += "/sample_id={}".format(sample_id) return ( "s3://" + "/".join([output_bucket, output_prefix, output_version]) + suffix + "/" )
[docs]def load_main_summary(spark, input_bucket, input_prefix, input_version): """ Loads main_summary from the bucket constructed from input_bucket, input_prefix, input_version :param spark: SparkSession object :param input_bucket: s3 bucket (telemetry-parquet) :param input_prefix: s3 prefix (main_summary) :param input_version: dataset version (v4) :return SparkDF """ dest = get_dest(input_bucket, input_prefix, input_version) print("loading...", dest) return spark.read.option("mergeSchema", True).parquet(dest)
[docs]def ms_explode_addons(ms): """ Explodes the active_addons object in the ms DataFrame and selects relevant fields :param ms: a subset of main_summary :return SparkDF """ addons_df = ( ms.select(MS_FIELDS + [fun.explode("active_addons").alias("addons")]) .select(MS_FIELDS + ADDON_FIELDS) .withColumn("app_version", fun.substring("app_version", 1, 2)) ) return addons_df
[docs]def add_addon_columns(df): """ Constructs additional indicator columns decribing the add-on/theme present in a given record. The columns are is_self_install is_shield_addon is_foreign_install is_system is_web_extension Which maps True -> 1 and False -> 0 :param df: SparkDF, exploded on active_addons, each record maps to a single add-on :return df with the above columns added """ addons_expanded = ( df.withColumn( "is_self_install", fun.when( (df.addon_id.isNotNull()) & (~df.is_system) & (~df.foreign_install) & (~df.addon_id.like("%mozilla%")) & (~df.addon_id.like("%cliqz%")) & (~df.addon_id.like("%@unified-urlbar%")), 1, ).otherwise(0), ) .withColumn( "is_shield_addon", fun.when(df.addon_id.like("%@shield.mozilla%"), 1).otherwise(0), ) .withColumn("is_foreign_install", fun.when(df.foreign_install, 1).otherwise(0)) .withColumn("is_system", fun.when(df.is_system, 1).otherwise(0)) .withColumn("is_web_extension", fun.when(df.is_web_extension, 1).otherwise(0)) ) return addons_expanded
[docs]def aggregate_addons(df): """ Aggregates add-on indicators by client, channel, version and locale. The result is a DataFrame with the additional aggregate columns: n_self_installed_addons (int) n_shield_addons (int) n_foreign_installed_addons (int) n_system_addons (int) n_web_extensions (int) first_addon_install_date (str %Y%m%d) profile_creation_date (str %Y%m%d) for each of the above facets. :param df: an expoded instance of main_summary by active_addons with various additional indicator columns :return SparkDF: an aggregated dataset with each of the above columns """ addon_aggregates = ( df.distinct() .groupBy("client_id", "normalized_channel", "app_version", "locale") .agg( fun.sum("is_self_install").alias("n_self_installed_addons"), fun.sum("is_shield_addon").alias("n_shield_addons"), fun.sum("is_foreign_install").alias("n_foreign_installed_addons"), fun.sum("is_system").alias("n_system_addons"), fun.sum("is_web_extension").alias("n_web_extensions"), fun.min( fun.when( df.is_self_install == 1, fun.date_format( fun.from_unixtime(fun.col("install_day") * 60 * 60 * 24), "yyyyMMdd", ), ).otherwise(None) ).alias("first_addon_install_date"), fun.date_format( fun.from_unixtime(fun.min("profile_creation_date") * 60 * 60 * 24), "yyyyMMdd", ).alias("profile_creation_date"), ) ) return addon_aggregates
@click.command() @click.option("--date", required=True) @click.option("--input-bucket", default="telemetry-parquet") @click.option("--input-prefix", default="main_summary") @click.option("--input-version", default="v4") @click.option("--output-bucket", default="telemetry-parquet") @click.option("--output-prefix", default="addons/agg") @click.option("--output-version", default="v2") def main( date, input_bucket, input_prefix, input_version, output_bucket, output_prefix, output_version, ): """ Loads main_summary where submission_date_s3 == date Partition by sample_id and write aggregated data to s3 """ spark = SparkSession.builder.appName("addon_aggregates").getOrCreate() # don't write _SUCCESS files, which interfere w/ReDash discovery spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") # load main_summary ms = load_main_summary(spark, input_bucket, input_prefix, input_version) # filter main summary to the date supplied in the environment ms_day = ms.filter(ms.submission_date_s3 == date) # partition each date into 100 partitions by sample_id for sample_id in range(100): exploded = ms_explode_addons(ms_day.filter(ms_day.sample_id == sample_id)) exploded_addons = add_addon_columns(exploded) aggregates = aggregate_addons(exploded_addons) dest = get_dest(output_bucket, output_prefix, output_version, date, sample_id) # 1 partition is ~ 50MB aggregates.repartition(1).write.format("parquet").save(dest, mode="overwrite") if __name__ == "__main__": main()