Ingestion Sink
A Java application that runs in Kubernetes, reading input from Google Cloud
Pub/Sub and emitting records to batch-oriented outputs like GCS or BigQuery.
Defined in the ingestion-sink package
(source).
Supported Input and Outputs
Supported inputs:
- Google Cloud PubSub
Supported outputs:
- Google Cloud PubSub
- Google Cloud Storage
- Google Cloud BigQuery
Test Input and Output
Test inputs will stop when an exception is raised or the end of the pipe or file is reached. Supported test inputs:
System.in(stdin), by settingINPUT_PIPEto any of-,0,in,stdin,/dev/stdin- A single file, by setting
INPUT_PIPEto/path/to/input_file
Test outputs don't exercise batching and will write messages as newline delimited JSON in the order they are received. Supported test outputs:
System.out(stdout), by settingOUTPUT_PIPEto any of-,1,out,stdout,/dev/stdoutSystem.err(stderr), by settingOUTPUT_PIPEto any of2,err,stderr,/dev/stderr- A single file, by setting
OUTPUT_PIPEto/path/to/output_file
Configuration
All configuration is controlled by environment variables.
Output Specification
Depending on the environment variables provided, the application will automatically determine where to deliver messages.
If OUTPUT_BUCKET is specified without BIG_QUERY_OUTPUT_MODE, then messages
will be delivered to Google Cloud Storage.
If OUTPUT_TOPIC is specified without OUTPUT_BUCKET or
BIG_QUERY_OUTPUT_MODE, then messages will be delivered to Google Cloud
Pub/Sub.
If OUTPUT_TABLE is specified without BIG_QUERY_OUTPUT_MODE or with
BIG_QUERY_OUTPUT_MODE=streaming, then messages will be delivered to BigQuery
via the streaming API.
If OUTPUT_TABLE is specified with BIG_QUERY_OUTPUT_MODE=file_loads, then
messages will be delivered to Google Cloud Storage based on OUTPUT_BUCKET and
for each blob a notification will be delivered to Google Cloud Pub/Sub based on
OUTPUT_TOPIC. Separate instances of ingestion-sink must consume notifications
from Google Cloud Pub/Sub and deliver messages to BigQuery via load jobs.
If OUTPUT_TABLE is specified with BIG_QUERY_OUTPUT_MODE=mixed, then
messages will be delivered to BigQuery via both the streaming API and load
jobs, and OUTPUT_BUCKET is required. If OUTPUT_TOPIC is specified then it
will be used the same as with BIG_QUERY_OUTPUT_MODE=file_loads, otherwise
load jobs will be submitted by each running instance of ingestion-sink.
If none of the above configuration options are provided, then messages must be
notifications from BIG_QUERY_OUTPUT_MODE=file_loads or
BIG_QUERY_OUTPUT_MODE=mixed, and the blobs they indicate will be submitted to
BigQuery via load jobs.
BigQuery
OUTPUT_TABLE must be a tableSpec of form dataset.tablename
or the more verbose projectId.dataset.tablename. The values can contain
attribute placeholders of form ${attribute_name}. To set dataset to the
document namespace and table name to the document type, specify:
OUTPUT_TABLE='${document_namespace}.${document_type}'
All - characters in the attributes will be converted to _ per BigQuery
naming restrictions. Additionally, document namespace and type values will
be processed to ensure they are in snake case format (untrustedModules
becomes untrusted_modules).
Defaults for the placeholders using ${attribute_name:-default_value}
are supported, but likely don't make much sense since it's unlikely that
there is a default table whose schema is compatible with all potential
payloads.
Attribute placeholders
We support routing individual messages to different output locations based on
the PubsubMessage attribute map. Routing is accomplished by adding
placeholders of form ${attribute_name:-default_value} to the path.
For example, to route based on a document_type attribute, your path might
look like:
OUTPUT_BUCKET=gs://mybucket/mydocs/${document_type:-UNSPECIFIED}/myfileprefix
Messages with document_type of "main" would be grouped together and end up in
the following directory:
gs://mybucket/mydocs/main/
Messages with document_type set to null or missing that attribute
completely would be grouped together and end up in directory:
gs://mybucket/mydocs/UNSPECIFIED/
Note that placeholders must specify a default value so that a poorly
formatted message doesn't cause a pipeline exception. A placeholder without a
default will result in an IllegalArgumentException on pipeline startup.
File-based outputs support the additional derived attributes
"submission_date" and "submission_hour" which will be parsed from the value
of the submission_timestamp attribute if it exists. These can be useful for
making sure your output specification buckets messages into hourly directories.
The templating and default syntax used here is based on the
Apache commons-text StringSubstitutor,
which in turn bases its syntax on common practice in bash and other Unix/Linux
shells. Beware the need for proper escaping on the command line (use \$ in
place of $), as your shell may try to substitute in values for your
placeholders before they're passed to Sink.
Google's PubsubMessage format allows arbitrary strings for attribute names and values. We place the following restrictions on attribute names and default values used in placeholders:
- attribute names may not contain the string
:- - attribute names may not contain curly braces (
{or}) - default values may not contain curly braces (
{or})
Encoding
When writing messages to Google Cloud Storage or BigQuery, the message received from Google Cloud Pub/Sub will be encoded as a JSON object.
When OUTPUT_FORMAT is unspecified or raw, messages will have bytes encoded as
a "payload" field with base64 encoding, and each attribute encoded as field.
This is the format used for payload_bytes_raw.* tables.
When OUTPUT_FORMAT is decoded messages will have bytes encoded as with
OUTPUT_FORMAT=raw, but attributes will be encoded using the nested metadata
format of decoded pings. This format is currently unused.
When OUTPUT_FORMAT is payload messages will have bytes decoded as JSON, and
will be transformed to coerce types and use snake case for compatibility with BigQuery.
This is the format used for *_live.* tables. This requires specifying a local
path to a gzipped tar archive that contains BigQuery table schemas as
SCHEMAS_LOCATION. If messages bytes are compressed then
INPUT_COMPRESSION=gzip must also be specified to ensure they are decompressed
before they are decoded as JSON.
When OUTPUT_FORMAT is beam messages will have bytes encoded as with
OUTPUT_FORMAT=raw, but attributes will be encoded as an "attributeMap" field
that contains a JSON object. This is the same format as produced by ingestion-beam
when using --outputType=file and --outputFileFormat=json.
Google Cloud Storage file prefix
Google Cloud Storage files are named like:
$OUTPUT_BUCKET/{UUID.randomUUID().toString()}.ndjson
or if OUTPUT_TABLE and BIG_QUERY_OUTPUT_MODE are specified:
$OUTPUT_BUCKET/OUTPUT_TABLE=$OUTPUT_TABLE/{UUID.randomUUID().toString()}.ndjson
for example, with OUTPUT_BUCKET=gs://test-bucket/test-output:
gs://test-bucket/test-output/ad715b24-7500-45e2-9691-cb91e3b9c2cc.ndjson
or with OUTPUT_BUCKET=gs://test-bucket/test-output, OUTPUT_TABLE=my_dataset.raw_table, and
BIG_QUERY_OUTPUT_MODE=file_loads:
gs://test-bucket/test-output/OUTPUT_TABLE=my_dataset.raw_table/3b17c648-f8b9-4250-bdc1-5c2e472fdc26.ndjson
Executing
Locally with Docker
The provided bin/mvn script downloads and runs maven via docker so that less
setup is needed on the local machine. For prolonged development performance is
likely to be significantly better, especially in MacOS, if mvn is installed and
run natively without docker.
# create a test input file
mkdir -p tmp/
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' > tmp/input.ndjson
# consume messages from the test file, decode and re-encode them, and write to a directory
PASS_ENV="INPUT_PIPE=tmp/input.ndjson OUTPUT_PIPE=tmp/output.ndjson" ./bin/mvn compile exec:java
# check that the message was delivered
cat tmp/output.ndjson
# read message from stdin and write to stdout
cat tmp/input.ndjson | PASS_ENV="INPUT_PIPE=- OUTPUT_PIPE=-" ./bin/mvn compile exec:java
# read message from stdin and write to gcs
# note that $ needs to be escaped with \ to prevent shell substitution
cat tmp/input.ndjson | PASS_ENV="\
INPUT_PIPE=- \
OUTPUT_BUCKET=gs://my_bucket/\${document_type:-UNSPECIFIED}/ \
" ./bin/mvn compile exec:java
Locally without Docker
If you install Java and maven, you can invoke VAR=... mvn in the above commands
instead of using PASS_ENV="VAR=..." ./bin/mvn. Be aware that Java 11 is the target JVM and some
reflection warnings may be thrown on newer versions. Though these are generally
harmless, you may need to comment out the
<compilerArgument>-Werror</compilerArgument> line in the pom.xml in the git
root.
# consume messages from the test file, decode and re-encode them, and write to a directory
INPUT_PIPE=tmp/input.ndjson OUTPUT_PIPE=tmp/output.ndjson mvn compile exec:java
# read message from stdin and write to stdout
cat tmp/input.ndjson | INPUT_PIPE=- OUTPUT_PIPE=- ./bin/mvn compile exec:java