from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import click
from mozetl.utils import extract_submission_window_for_activity_day
from mozetl.utils import format_spark_path
SEARCH_ACCESS_POINTS = [
"abouthome",
"contextmenu",
"newtab",
"searchbar",
"system",
"urlbar",
]
SEARCH_ACCESS_COLUMN_TEMPLATE = "search_count_{}"
SEARCH_ACCESS_COLUMNS = [
SEARCH_ACCESS_COLUMN_TEMPLATE.format(sap) for sap in SEARCH_ACCESS_POINTS
]
[docs]def load_main_summary(spark, input_bucket, input_prefix):
main_summary_path = format_spark_path(input_bucket, input_prefix)
return spark.read.option("mergeSchema", "true").parquet(main_summary_path)
[docs]def to_profile_day_aggregates(frame_with_extracts):
from .fields import MAIN_SUMMARY_FIELD_AGGREGATORS
if "activity_date" not in frame_with_extracts.columns:
from .fields import ACTIVITY_DATE_COLUMN
with_activity_date = frame_with_extracts.select("*", ACTIVITY_DATE_COLUMN)
else:
with_activity_date = frame_with_extracts
if "geo_subdivision1" not in with_activity_date.columns:
from .fields import NULL_STRING_COLUMN
with_activity_date = with_activity_date.withColumn(
"geo_subdivision1", NULL_STRING_COLUMN
)
if "geo_subdivision2" not in with_activity_date.columns:
from .fields import NULL_STRING_COLUMN
with_activity_date = with_activity_date.withColumn(
"geo_subdivision2", NULL_STRING_COLUMN
)
grouped = with_activity_date.groupby("client_id", "activity_date")
return grouped.agg(*MAIN_SUMMARY_FIELD_AGGREGATORS)
[docs]def write_one_activity_day(results, date, output_prefix, partition_count):
output_path = "{}/activity_date_s3={}".format(
output_prefix, date.strftime("%Y-%m-%d")
)
to_write = results.coalesce(partition_count)
to_write.write.parquet(output_path, mode="overwrite")
to_write.unpersist()
[docs]def get_partition_count_for_writing(is_sampled):
"""
Return a reasonable partition count.
using_sample_id: boolean
One day is O(140MB) if filtering down to a single sample_id, but
O(14GB) if not. Google reports 256MB < partition size < 1GB as ideal.
"""
if is_sampled:
return 1
return 25
@click.command()
@click.option("--date", default=None, help="Start date to run on")
@click.option(
"--input-bucket", default="telemetry-parquet", help="Bucket of the input dataset"
)
@click.option(
"--input-prefix", default="main_summary/v4", help="Prefix of the input dataset"
)
@click.option(
"--output-bucket",
default="net-mozaws-prod-us-west-2-pipeline-analysis",
help="Bucket of the output dataset",
)
@click.option(
"--output-prefix", default="/clients_daily", help="Prefix of the output dataset"
)
@click.option("--output-version", default=5, help="Version of the output dataset")
@click.option("--sample-id", default=None, help="Sample_id to restrict results to")
@click.option(
"--lag-days", default=10, help="Number of days to allow for submission latency"
)
def main(
date,
input_bucket,
input_prefix,
output_bucket,
output_prefix,
output_version,
sample_id,
lag_days,
):
"""
Aggregate by (client_id, day) for a given day.
Note that the target day will actually be `lag-days` days before
the supplied date. In other words, if you pass in 2017-01-20 and
set `lag-days` to 5, the aggregation will be processed for
day 2017-01-15 (the resulting data will cover submission dates
including the activity day itself plus 5 days of lag for a total
of 6 days).
"""
spark = SparkSession.builder.appName("clients_daily").getOrCreate()
# Per https://issues.apache.org/jira/browse/PARQUET-142 ,
# don't write _SUCCESS files, which interfere w/ReDash discovery
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
main_summary = load_main_summary(spark, input_bucket, input_prefix)
day_frame, start_date = extract_submission_window_for_activity_day(
main_summary, date, lag_days
)
if sample_id:
day_frame = day_frame.where("sample_id = '{}'".format(sample_id))
with_searches = extract_search_counts(day_frame)
results = to_profile_day_aggregates(with_searches)
partition_count = get_partition_count_for_writing(bool(sample_id))
output_base_path = "{}/v{}/".format(
format_spark_path(output_bucket, output_prefix), output_version
)
write_one_activity_day(results, start_date, output_base_path, partition_count)
if __name__ == "__main__":
main()