Contextual Services Reporter Job
The contextual services reporter forwards contextual-services click and impression events to an external partner, with a particular eye towards minimizing the scope of contextual metadata that is shared. For more context, see the Data and Firefox Suggest blog post.
The code is defined in the com.mozilla.telemetry.ContextualServicesReporter
class.
The input of this job is all contextual-services
namespace messages for desktop Firefox, which includes topsites-impression
, topsites-click
, quicksuggest-impression
, and quicksuggest-click
. It also includes topsites-impression
pings from various mobile applications.
Data flows for Contextual Services
Note that in addition to this near real-time Dataflow job, we have several batch workflows for preparing Contextual Services data for both internal analytic use and for sending to external partners.
Search terms handling starts with log routing configuration in cloudops-infra
(Mozilla internal)
and then proceeds with a series of nightly scheduled queries defined under search_terms_derived
in bigquery-etl
.
Additional aggregations for clicks and impressions are defined under contextual_services_derived
in bigquery-etl
.
Some daily aggregate data is shared with an external partner via the adm_export
DAG defined in telemetry-airflow
.
Beam Pipeline Transforms
The following diagram shows the steps in the Beam pipeline. This is up to date as of 2021-08-23.
Figure: An overview of the execution graph for the ContextualServicesReporter
.
Pub/Sub republished topic
The input to this job is the subset of decoded messages in the contextual-services
namespace as well as topsites-impression
pings from various namespaces associated with mobile applications sending telemetry via Glean.
FilterByDoctype
This step filters out document types based on the document_type
attribute using the --allowedDocTypes
pipeline option. For example, this can be used to allow the job to process only sponsored tiles or only Suggest events, or only clicks or only impressions.
VerifyMetadata
Message contents are validated in this step using a few simple heuristics such as user agent and user agent version. Messages that fail this check are rejected and sent to the error table.
DecompressPayload
This step attempts to decompress a gzip-compressed payload. This transform is shared with other Beam pipelines such as the decoder.
ParseReportingUrl
This is where the URLs used for reporting events are built. The reporting_url
value from the message payload is used as the base URL, then additional query parameters are added based on the message metadata such as the client’s country, region, and OS. The updated reporting_url
value is put in the message payload and attributes.
LabelClickSpikes
This step counts the number of click events per client (using the context_id
attribute) in a time interval. This transform uses Beam’s state and timers; a state and a timer is maintained for every client. The state is a list of recent timestamps of clicks from the current client and the timer will clear the state if there are no recent clicks from the client. If the number of elements in the list exceeds the set threshold, any additional clicks will be marked with a click-status
. Because a state needs to be maintained per client, the memory required for this step increases with the number of unique clients.
AggregateImpressions
This step groups impressions together in a timed window based on the reporting URLs. The purpose of this step is to reduce the number of HTTP requests made to external endpoints. One URL is output for each unique reporting URL in the timed window by counting the number of occurrences of each URL. The output URL is the original reporting URL with the additional query parameters impressions
, begin-timestamp
, and end-timestamp
.
A Beam IntervalWindow
is used to group by impressions together based on the processing time - the time at which the message enters this transform. The output is generated at the end of the window which means that there is an increased delay between when the impression enters the pipeline and when the corresponding reporting URL is requested. This transform needs to keep track of a window for every unique reporting URL it receives which means memory required increases with the number of unique URLs.
SendRequest
This step sends a HTTP GET request to the URL specified by the reporting_url
attribute in the message. To keep track of requests sent by the job for debugging purposes, successful requests will throw a RequestContentException
which will add them to the BigQuery error table (see below).
BigQuery Error Table
The job is configured at certain steps to catch runtime exceptions and write the message contents to BigQuery. The configured table is moz-fx-data-shared-prod.payload_bytes_error.contextual_services
. This can be used for debugging or to backfill messages that initially failed to process.
Working with the Beam Job
Options specific to this job are found in https://github.com/mozilla/gcp-ingestion/blob/main/ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/ContextualServicesReporterOptions.java
Test Deployment
This job can be deployed in a sandbox project for testing. The contextual-services-dev
project is currently used for this purpose.
There are a few required components to get a job running:
- A PubSub subscription on the republished
contextual-services
topic - A BigQuery table with the
payload_bytes_error
schema used for error output - A URL allowed list stored in GCS
- The Beam pipeline running on Dataflow, reading from the PubSub subscription, and writing to the BigQuery table
Example script to start the Dataflow job from the ingestion-beam directory:
#!/bin/bash
set -ux
PROJECT="contextual-services-dev"
JOB_NAME="contextual-services"
mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.ContextualServicesReporter -Dexec.args="\
--runner=Dataflow \
--jobName=$JOB_NAME \
--project=$PROJECT \
--inputType=pubsub \
--input='projects/contextual-services-dev/subscriptions/ctxsvc-input' \
--outputTableRowFormat=payload \
--errorBqWriteMethod=streaming \
--errorOutputType=bigquery \
--errorOutput=$PROJECT:contextual_services.reporting_errors \
--region=us-central1 \
--usePublicIps=true \
--gcsUploadBufferSizeBytes=16777216 \
--urlAllowList=gs://contextual-services-data-dev/urlAllowlist.csv \
--allowedDocTypes=topsites-impression,topsites-click, \
--reportingEnabled=false \
--aggregationWindowDuration=10m \
--clickSpikeWindowDuration=3m \
--clickSpikeThreshold=10 \
--logReportingUrls=true \
--maxNumWorkers=2 \
--numWorkers=1 \
--autoscalingAlgorithm=THROUGHPUT_BASED \
"