Source code for mozetl.experimentsdaily.rollup
import click
from pyspark.sql import SparkSession
from mozetl.clientsdaily.rollup import extract_search_counts
from mozetl.utils import extract_submission_window_for_activity_day
from mozetl.utils import format_spark_path
EXCLUDED_ID = "pref-flip-screenshots-release-1369150"
[docs]def load_experiments_summary(spark, parquet_path):
return (
spark.read.option("mergeSchema", "true")
.parquet(parquet_path)
.where("experiment_id != '{}'".format(EXCLUDED_ID))
)
[docs]def to_experiment_profile_day_aggregates(frame_with_extracts):
from mozetl.clientsdaily.fields import EXPERIMENT_FIELD_AGGREGATORS
from mozetl.clientsdaily.fields import ACTIVITY_DATE_COLUMN
if "activity_date" not in frame_with_extracts.columns:
with_activity_date = frame_with_extracts.select("*", ACTIVITY_DATE_COLUMN)
else:
with_activity_date = frame_with_extracts
grouped = with_activity_date.groupby("experiment_id", "client_id", "activity_date")
return grouped.agg(*EXPERIMENT_FIELD_AGGREGATORS)
@click.command()
@click.option("--date", default=None, help="Start date to run on")
@click.option(
"--input-bucket", default="telemetry-parquet", help="Bucket of the input dataset"
)
@click.option(
"--input-prefix", default="experiments/v1/", help="Prefix of the input dataset"
)
@click.option(
"--output-bucket",
default="net-mozaws-prod-us-west-2-pipeline-analysis",
help="Bucket of the output dataset",
)
@click.option(
"--output-prefix", default="/experiments_daily", help="Prefix of the output dataset"
)
@click.option("--output-version", default=3, help="Version of the output dataset")
@click.option(
"--lag-days", default=10, help="Number of days to allow for submission latency"
)
def main(
date,
input_bucket,
input_prefix,
output_bucket,
output_prefix,
output_version,
lag_days,
):
"""
Aggregate by (client_id, experiment_id, day).
Note that the target day will actually be `lag-days` days before
the supplied date. In other words, if you pass in 2017-01-20 and
set `lag-days` to 5, the aggregation will be processed for
day 2017-01-15 (the resulting data will cover submission dates
including the activity day itself plus 5 days of lag for a total
of 6 days).
"""
spark = SparkSession.builder.appName("experiments_daily").getOrCreate()
parquet_path = format_spark_path(input_bucket, input_prefix)
frame = load_experiments_summary(spark, parquet_path)
day_frame, start_date = extract_submission_window_for_activity_day(
frame, date, lag_days
)
searches_frame = extract_search_counts(frame)
results = to_experiment_profile_day_aggregates(searches_frame)
spark.conf.set(
"mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"
) # Don't write _SUCCESS files, which interfere w/ReDash discovery
output_base_path = "{}/v{}/activity_date_s3={}".format(
format_spark_path(output_bucket, output_prefix),
output_version,
start_date.strftime("%Y-%m-%d"),
)
results.write.mode("overwrite").parquet(output_base_path)
if __name__ == "__main__":
main()