Decoder Service Specification
This document specifies the behavior of the service that decodes messages in the Structured Ingestion pipeline.
Data Flow
- Consume messages from Google Cloud PubSub raw topic
- Deduplicate message by
uri
(which generally containsdocId
) - Disabled for stub-installer, which does not include a UUID in the URI
- Perform GeoIP lookup and drop
x_forwarded_for
andremote_addr
and optionallygeo_city
based on population - Parse the
uri
attribute to determine document type, etc. - Decode the body from base64, optionally decompress, and parse as JSON
- Scrub the message, checking the content against a list of known signatures that should cause the message to be dropped as toxic, sent to error output, or to have specific fields redacted
- Validate the schema of the body
- Extract user agent information and drop
user_agent
- Sanitize metadata based on per-document type configuration
- Add metadata fields to message
- Write message to PubSub decoded topic based on
namespace
anddocType
Implementation
The above steps will be executed as a single Apache Beam job that can accept either a streaming input from PubSub or a batch input from Cloud Storage.
Decoding Errors
All messages that are rejected at any step of the Data Flow above will be
forwarded to a PubSub error topic for backfill and monitoring purposes.
If we determine that a message has already been successfully processed
based on docId
, we drop the duplicated body and publish just the metadata
to the error topic.
Error message schema
The message that failed decoding, with several additional attributes:
...
required group attributes {
...
required string error_type // example: "schema"
required string error_message // example: "message did not match json schema for <namespace>/<docVersion>/<docType>"
required string exception_class // example: "java.lang.RuntimeException"
required string stack_trace
optional string stack_trace_cause_1
optional string stack_trace_cause_2
optional string stack_trace_cause_3
optional string stack_trace_cause_4
optional string stack_trace_cause_5
}
Raw message schema
See Edge Service PubSub Message Schema.
Decoded message metadata schema
Decoded messages published to Pub/Sub will contain the following attributes:
required group attributes {
...
required string document_version // from uri for non-Telemetry, from message for Telemetry
required string document_id // from uri
required string document_namespace // from uri
required string document_type // from uri
optional string app_name // from uri for Telemetry
optional string app_version // from uri for Telemetry
optional string app_update_channel // from uri for Telemetry
optional string app_build_id // from uri for Telemetry
optional string geo_country // from geoip lookup
optional string geo_subdivision1 // from geoip lookup
optional string geo_subdivision2 // from geoip lookup
optional string geo_city // from geoip lookup
required string submission_timestamp // from edge metadata
optional string date // header from client
optional string dnt // header from client
optional string x_pingsender_version // header from client
optional string x_debug_id // header from client
optional string x_foxsec_ip_reputation // header from iprepd
optional string x_lb_tags // header from load balancer
optional string x_source_tags // header from client
optional string x_telemetry_agent // header from client
optional string user_agent_browser // from user_agent
optional string user_agent_browser_version // from user_agent
optional string user_agent_os // from user_agent
optional string user_agent_os_version // from user_agent
optional string normalized_app_name // based on parsed json payload
optional string normalized_channel // based on parsed json payload or URI
required string normalized_country_code // from geoip lookup
optional string normalized_os // based on parsed json payload
optional string normalized_os_version // based on parsed json payload
optional string sample_id // based on parsed json payload
}
Many of these fields are also injected into the JSON payload either at the top
level or nested inside a metadata
object. The schema for injected metadata
is maintained under the metadata
namespace in mozilla-pipeline-schemas
.
Other Considerations
Message Acks
Messages should only be acknowledged in the PubSub raw topic subscription after delivery to either a decoded topic or the error topic.
If this is not possible then any time a message is not successfully delivered to PubSub it should by treated as lost data and the appropriate time window will be backfilled from Cloud Storage in batch mode, and appropriate steps will be taken downstream to handle the backfill.
Deployments should always terminate functional pipelines using the drain
method, to ensure ack'd messages are fully delivered.
Deduplication
Each uri
will be allowed through "at least once", and only be
rejected as a duplicate if we have completed delivery of a message with the
same uri
. We assume that each uri
contains a UUID that uniquely identifies
the document.
"Exactly once" semantics can be applied to derived data sets using SQL in
BigQuery, and GroupByKey in Beam and Spark.
Note that deduplication is only provided with a "best effort" quality of service using a 10 minute window.