Skip to content

Decoder Service Specification

This document specifies the behavior of the service that decodes messages in the Structured Ingestion pipeline.

Data Flow

  1. Consume messages from Google Cloud PubSub raw topic
  2. Deduplicate message by uri (which generally contains docId)
  3. Disabled for stub-installer, which does not include a UUID in the URI
  4. Perform GeoIP lookup and drop x_forwarded_for and remote_addr and optionally geo_city based on population
  5. Parse the uri attribute to determine document type, etc.
  6. Decode the body from base64, optionally decompress, and parse as JSON
  7. Scrub the message, checking the content against a list of known signatures that should cause the message to be dropped as toxic, sent to error output, or to have specific fields redacted
  8. Validate the schema of the body
  9. Extract user agent information and drop user_agent
  10. Sanitize metadata based on per-document type configuration
  11. Add metadata fields to message
  12. Write message to PubSub decoded topic based on namespace and docType

Implementation

The above steps will be executed as a single Apache Beam job that can accept either a streaming input from PubSub or a batch input from Cloud Storage.

Decoding Errors

All messages that are rejected at any step of the Data Flow above will be forwarded to a PubSub error topic for backfill and monitoring purposes. If we determine that a message has already been successfully processed based on docId, we drop the duplicated body and publish just the metadata to the error topic.

Error message schema

The message that failed decoding, with several additional attributes:

...
required group attributes {
  ...
  required string error_type    // example: "schema"
  required string error_message // example: "message did not match json schema for <namespace>/<docVersion>/<docType>"
  required string exception_class // example: "java.lang.RuntimeException"
  required string stack_trace
  optional string stack_trace_cause_1
  optional string stack_trace_cause_2
  optional string stack_trace_cause_3
  optional string stack_trace_cause_4
  optional string stack_trace_cause_5
}

Raw message schema

See Edge Service PubSub Message Schema.

Decoded message metadata schema

Decoded messages published to Pub/Sub will contain the following attributes:

required group attributes {
  ...
  required string document_version           // from uri for non-Telemetry, from message for Telemetry
  required string document_id                // from uri
  required string document_namespace         // from uri
  required string document_type              // from uri
  optional string app_name                   // from uri for Telemetry
  optional string app_version                // from uri for Telemetry
  optional string app_update_channel         // from uri for Telemetry
  optional string app_build_id               // from uri for Telemetry
  optional string geo_country                // from geoip lookup
  optional string geo_subdivision1           // from geoip lookup
  optional string geo_subdivision2           // from geoip lookup
  optional string geo_city                   // from geoip lookup
  required string submission_timestamp       // from edge metadata
  optional string date                       // header from client
  optional string dnt                        // header from client
  optional string x_pingsender_version       // header from client
  optional string x_debug_id                 // header from client
  optional string x_foxsec_ip_reputation     // header from iprepd
  optional string x_lb_tags                  // header from load balancer
  optional string x_source_tags              // header from client
  optional string x_telemetry_agent          // header from client
  optional string user_agent_browser         // from user_agent
  optional string user_agent_browser_version // from user_agent
  optional string user_agent_os              // from user_agent
  optional string user_agent_os_version      // from user_agent
  optional string normalized_app_name        // based on parsed json payload
  optional string normalized_channel         // based on parsed json payload or URI
  required string normalized_country_code    // from geoip lookup
  optional string normalized_os              // based on parsed json payload
  optional string normalized_os_version      // based on parsed json payload
  optional string sample_id                  // based on parsed json payload
}

Many of these fields are also injected into the JSON payload either at the top level or nested inside a metadata object. The schema for injected metadata is maintained under the metadata namespace in mozilla-pipeline-schemas.

Other Considerations

Message Acks

Messages should only be acknowledged in the PubSub raw topic subscription after delivery to either a decoded topic or the error topic.

If this is not possible then any time a message is not successfully delivered to PubSub it should by treated as lost data and the appropriate time window will be backfilled from Cloud Storage in batch mode, and appropriate steps will be taken downstream to handle the backfill.

Deployments should always terminate functional pipelines using the drain method, to ensure ack'd messages are fully delivered.

Deduplication

Each uri will be allowed through "at least once", and only be rejected as a duplicate if we have completed delivery of a message with the same uri. We assume that each uri contains a UUID that uniquely identifies the document. "Exactly once" semantics can be applied to derived data sets using SQL in BigQuery, and GroupByKey in Beam and Spark.

Note that deduplication is only provided with a "best effort" quality of service using a 10 minute window.