# coding: utf-8
# Started from mashing up:
# https://gist.githubusercontent.com/ilanasegall/b3ce1aa0d3cc8c117a35b4a4fb9d4681/raw/c8a96e823cd56072e896e4c2d94c496306b59c8c/blok_df.py
# with:
# https://github.com/mozilla/python_mozetl/blob/689afa3d23229ca717422314c5a56abd83a85a0d/mozetl/testpilot/containers.py
from datetime import date, timedelta
from pyspark.sql.types import StringType
from moztelemetry.dataset import Dataset
from ..basic import convert_pings, DataFrameConfig
SHIELD_ADDON_ID = "@shield-study-privacy"
# Note that the JSON paths in this config are unusual for Shield pings.
# See https://bugzil.la/1387236 for more details and discussion
COMMON_COLUMN_CONFIGS = [
("client_id", "clientId", None, StringType()),
("branch", "payload/branch", None, StringType()),
("study_state", "payload/study_state", None, StringType()),
("event", "payload/event", None, StringType()),
("originDomain", "payload/originDomain", None, StringType()),
("breakage", "payload/breakage", None, StringType()),
("notes", "payload/notes", None, StringType()),
]
STUDY_STATE_DATAFRAME_COLUMN_CONFIGS = COMMON_COLUMN_CONFIGS + [
("study", "payload/study_name", None, StringType())
]
STUDY_EVENT_DATAFRAME_COLUMN_CONFIGS = COMMON_COLUMN_CONFIGS + [
("study", "payload/study", None, StringType())
]
[docs]def include_event_pings(ping):
return ping["payload/study"] == SHIELD_ADDON_ID
[docs]def include_state_pings(ping):
return ping["payload/study_name"] == SHIELD_ADDON_ID
[docs]def etl_job(sc, sqlContext, submission_date=None, save=True):
s3_path = "s3n://telemetry-parquet/harter/privacy_prefs_shield/v2"
if submission_date is None:
submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d")
pings = (
Dataset.from_source("telemetry")
.where(
docType="shield-study", submissionDate=submission_date, appName="Firefox"
)
.records(sc)
)
transformed_event_pings = transform_event_pings(sqlContext, pings)
transformed_state_pings = transform_state_pings(sqlContext, pings)
transformed_pings = transformed_event_pings.union(transformed_state_pings)
if save:
path = s3_path + "/submission_date={}".format(submission_date)
transformed_pings.repartition(1).write.mode("overwrite").parquet(path)
return transformed_pings