Source code for mozetl.sync.bookmark_validation
"""
# Sync Bookmark Validation Dataset
This notebook is adapted from a gist that transforms the `sync_summary` into a
flat table to avoid straining the resources on the Presto cluster.[1] The
bookmark totals table generates statistics relative to the server clock.
See bugs 1349065, 1374831, 1410963
[1] https://gist.github.com/kitcambridge/364f56182f3e96fb3131bf38ff648609
"""
import logging
import arrow
import click
from pyspark.sql import SparkSession, functions as F
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
[docs]def extract(spark, path, start_date):
"""Register a temporary `sync_summary` view on the start date."""
sync_summary = spark.read.option("mergeSchema", "true").parquet(path)
subset = sync_summary.where(F.col("submission_date_s3") == start_date)
subset.createOrReplaceTempView("sync_summary")
[docs]def transform(spark):
"""Create the bookmark problem and summary tables."""
query = """
SELECT s.app_build_id,
s.app_version,
s.app_display_version,
s.app_name,
s.app_channel,
s.uid,
s.device_id AS device_id,
s.submission_date_s3 AS submission_day,
date_format(from_unixtime(s.when / 1000), 'YYYYMMdd') AS sync_day,
s.when,
s.status,
e.name AS engine_name,
e.status AS engine_status,
e.failure_reason AS engine_failure_reason,
e.validation.problems IS NOT NULL AS engine_has_problems,
e.validation.version AS engine_validation_version,
e.validation.checked AS engine_validation_checked,
e.validation.took AS engine_validation_took,
p.name AS engine_validation_problem_name,
p.count AS engine_validation_problem_count
FROM sync_summary s
LATERAL VIEW explode(s.engines) AS e
LATERAL VIEW OUTER explode(e.validation.problems) AS p
WHERE s.failure_reason IS NULL
"""
engine_validations = spark.sql(query)
bookmark_validations = engine_validations.where(
F.col("engine_name").isin("bookmarks", "bookmarks-buffered")
)
bookmark_validation_problems = bookmark_validations.where(
F.col("engine_has_problems")
)
# Generate aggregates over all bookmarks
bookmark_aggregates = (
bookmark_validations.where(F.col("engine_validation_checked").isNotNull())
# see bug 1410963 for submission date vs sync date
.groupBy("submission_day").agg(
F.countDistinct("uid", "device_id", "when").alias(
"total_bookmark_validations"
),
F.countDistinct("uid").alias("total_validated_users"),
F.sum("engine_validation_checked").alias("total_bookmarks_checked"),
)
)
bookmark_validation_problems.createOrReplaceTempView("bmk_validation_problems")
bookmark_aggregates.createOrReplaceTempView("bmk_total_per_day")
[docs]def load(spark, bucket, prefix, version, start_date):
"""Save tables to disk."""
for table_name in ["bmk_validation_problems", "bmk_total_per_day"]:
path = "s3://{}/{}/{}/v{}/start_date={}".format(
bucket, prefix, table_name, version, start_date
)
logger.info(
"Saving table {} on start_date {} to {}".format(
table_name, start_date, path
)
)
df = spark.sql("SELECT * FROM {}".format(table_name))
df.repartition(1).write.parquet(path, mode="overwrite")
@click.command()
@click.option("--start_date", required=True, help="Date to process")
@click.option("--end_date", help="Optional end date to run until")
@click.option("--bucket", default="telemetry-parquet")
@click.option("--prefix", default="sync")
@click.option("--input_bucket", default="telemetry-parquet")
@click.option("--input_prefix", default="sync_summary/v2")
def main(start_date, end_date, bucket, prefix, input_bucket, input_prefix):
spark = SparkSession.builder.appName("sync_bookmark").getOrCreate()
version = 1
input_path = "s3://{}/{}".format(input_bucket, input_prefix)
# use the airflow date convention
ds_format = "YYYYMMDD"
start = arrow.get(start_date, ds_format)
end = arrow.get(end_date if end_date else start_date, ds_format)
for date in arrow.Arrow.range("day", start, end):
current_date = date.format(ds_format)
logger.info("Processing sync bookmark validation for {}".format(current_date))
extract(spark, input_path, current_date)
transform(spark)
load(spark, bucket, prefix, version, current_date)
spark.stop()