Contextual Services Reporter Job
The contextual services reporter forwards contextual-services click and impression events to an external partner, with a particular eye towards minimizing the scope of contextual metadata that is shared. For more context, see the Data and Firefox Suggest blog post.
The code is defined in the
The input of this job is all
contextual-services namespace messages for desktop Firefox, which includes
quicksuggest-click. It also includes
topsites-impression pings from various mobile applications.
Data flows for Contextual Services
Note that in addition to this near real-time Dataflow job, we have several batch workflows for preparing Contextual Services data for both internal analytic use and for sending to external partners.
Search terms handling starts with log routing configuration in
cloudops-infra (Mozilla internal)
and then proceeds with a series of nightly scheduled queries defined under
Additional aggregations for clicks and impressions are defined under
Some daily aggregate data is shared with an external partner via the
adm_export DAG defined in
Beam Pipeline Transforms
The following diagram shows the steps in the Beam pipeline. This is up to date as of 2021-08-23.
Figure: An overview of the execution graph for the
Pub/Sub republished topic
The input to this job is the subset of decoded messages in the
contextual-services namespace as well as
topsites-impression pings from various namespaces associated with mobile applications sending telemetry via Glean.
This step filters out document types based on the
document_type attribute using the
--allowedDocTypes pipeline option. For example, this can be used to allow the job to process only sponsored tiles or only Suggest events, or only clicks or only impressions.
Message contents are validated in this step using a few simple heuristics such as user agent and user agent version. Messages that fail this check are rejected and sent to the error table.
This step attempts to decompress a gzip-compressed payload. This transform is shared with other Beam pipelines such as the decoder.
This is where the URLs used for reporting events are built. The
reporting_url value from the message payload is used as the base URL, then additional query parameters are added based on the message metadata such as the client’s country, region, and OS. The updated
reporting_url value is put in the message payload and attributes.
This step counts the number of click events per client (using the
context_id attribute) in a time interval. This transform uses Beam’s state and timers; a state and a timer is maintained for every client. The state is a list of recent timestamps of clicks from the current client and the timer will clear the state if there are no recent clicks from the client. If the number of elements in the list exceeds the set threshold, any additional clicks will be marked with a
click-status. Because a state needs to be maintained per client, the memory required for this step increases with the number of unique clients.
This step groups impressions together in a timed window based on the reporting URLs. The purpose of this step is to reduce the number of HTTP requests made to external endpoints. One URL is output for each unique reporting URL in the timed window by counting the number of occurrences of each URL. The output URL is the original reporting URL with the additional query parameters
IntervalWindow is used to group by impressions together based on the processing time - the time at which the message enters this transform. The output is generated at the end of the window which means that there is an increased delay between when the impression enters the pipeline and when the corresponding reporting URL is requested. This transform needs to keep track of a window for every unique reporting URL it receives which means memory required increases with the number of unique URLs.
This step sends a HTTP GET request to the URL specified by the
reporting_url attribute in the message. To keep track of requests sent by the job for debugging purposes, successful requests will throw a
RequestContentException which will add them to the BigQuery error table (see below).
BigQuery Error Table
The job is configured at certain steps to catch runtime exceptions and write the message contents to BigQuery. The configured table is
moz-fx-data-shared-prod.payload_bytes_error.contextual_services. This can be used for debugging or to backfill messages that initially failed to process.
Working with the Beam Job
Options specific to this job are found in https://github.com/mozilla/gcp-ingestion/blob/main/ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/ContextualServicesReporterOptions.java
This job can be deployed in a sandbox project for testing. The
contextual-services-dev project is currently used for this purpose.
There are a few required components to get a job running:
- A PubSub subscription on the republished
- A BigQuery table with the
payload_bytes_errorschema used for error output
- A URL allowed list stored in GCS
- The Beam pipeline running on Dataflow, reading from the PubSub subscription, and writing to the BigQuery table
Example script to start the Dataflow job from the ingestion-beam directory:
#!/bin/bash set -ux PROJECT="contextual-services-dev" JOB_NAME="contextual-services" mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.ContextualServicesReporter -Dexec.args="\ --runner=Dataflow \ --jobName=$JOB_NAME \ --project=$PROJECT \ --inputType=pubsub \ --input='projects/contextual-services-dev/subscriptions/ctxsvc-input' \ --outputTableRowFormat=payload \ --errorBqWriteMethod=streaming \ --errorOutputType=bigquery \ --errorOutput=$PROJECT:contextual_services.reporting_errors \ --region=us-central1 \ --usePublicIps=true \ --gcsUploadBufferSizeBytes=16777216 \ --urlAllowList=gs://contextual-services-data-dev/urlAllowlist.csv \ --allowedDocTypes=topsites-impression,topsites-click, \ --reportingEnabled=false \ --aggregationWindowDuration=10m \ --clickSpikeWindowDuration=3m \ --clickSpikeThreshold=10 \ --logReportingUrls=true \ --maxNumWorkers=2 \ --numWorkers=1 \ --autoscalingAlgorithm=THROUGHPUT_BASED \ "