Apache Beam Jobs for Ingestion
This ingestion-beam java module contains our Apache Beam jobs for use in Ingestion. Google Cloud Dataflow is a Google Cloud Platform service that natively runs Apache Beam jobs.
The source code lives in the ingestion-beam subdirectory of the gcp-ingestion repository.
Sink Job
A job for delivering messages between Google Cloud services.
Supported Input and Outputs
Supported inputs:
- Google Cloud PubSub
- Google Cloud Storage
Supported outputs:
- Google Cloud PubSub
- Google Cloud Storage
- Google Cloud BigQuery
- stdout
- stderr
Supported error outputs, must include attributes and must not validate messages:
- Google Cloud PubSub
- Google Cloud Storage with JSON encoding
- stdout with JSON encoding
- stderr with JSON encoding
Encoding
Internally messages are stored and transported as PubsubMessage.
Supported file formats for Cloud Storage are json
or text
. The json
file
format stores newline delimited JSON, encoding the field payload
as a base64
string, and attributeMap
as an optional object with string keys and values.
The text
file format stores newline delimited strings, encoding the field
payload
as UTF-8
.
We'll construct example inputs based on the following two values and their base64 encodings:
$ echo -en "test" | base64
dGVzdA==
$ echo -en "test\n" | base64
dGVzdAo=
Example json
file:
{"payload":"dGVzdA==","attributeMap":{"meta":"data"}}
{"payload":"dGVzdAo=","attributeMap":null}
{"payload":"dGVzdA=="}
The above file when stored in the text
format:
test
test
test
Note that the newline embedded at the end of the second JSON message results in two text messages, one of which is blank.
Output Path Specification
Depending on the specified output type, the --output
path that you provide controls
several aspects of the behavior.
BigQuery
When --outputType=bigquery
, --output
is a tableSpec
of form dataset.tablename
or the more verbose projectId:dataset.tablename
. The values can contain
attribute placeholders of form ${attribute_name}
. To set dataset to the
document namespace and table name to the document type, specify:
--output='${document_namespace}.${document_type}'
All -
characters in the attributes will be converted to _
per BigQuery
naming restrictions. Additionally, document namespace and type values will
be processed to ensure they are in snake case format (untrustedModules
becomes untrusted_modules
).
Defaults for the placeholders using ${attribute_name:-default_value}
are supported, but likely don't make much sense since it's unlikely that
there is a default table whose schema is compatible with all potential
payloads.
Instead, records missing an attribute required by a placeholder
will be redirected to error output if no default is provided.
Protocol
When --outputType=file
, --output
may be prefixed by a protocol specifier
to determine the
target data store. Without a protocol prefix, the output path is assumed
to be a relative or absolute path on the filesystem. To write to Google
Cloud Storage, use a gs://
path like:
--output=gs://mybucket/somdir/myfileprefix
Attribute placeholders
We support FileIO
's "Dynamic destinations" feature (FileIO.writeDynamic
) where
it's possible to route individual messages to different output locations based
on properties of the message.
In our case, we allow routing messages based on the PubsubMessage
attribute map.
Routing is accomplished by adding placeholders of form ${attribute_name:-default_value}
to the path.
For example, to route based on a document_type
attribute, your path might look like:
--output=gs://mybucket/mydocs/${document_type:-UNSPECIFIED}/myfileprefix
Messages with document_type
of "main" would be grouped together and end up in
the following directory:
gs://mybucket/mydocs/main/
Messages with document_type
set to null
or missing that attribute completely
would be grouped together and end up in directory:
gs://mybucket/mydocs/UNSPECIFIED/
Note that placeholders must specify a default value so that a poorly formatted
message doesn't cause a pipeline exception. A placeholder without a default will
result in an IllegalArgumentException
on pipeline startup.
File-based outputs support the additional derived attributes
"submission_date"
and "submission_hour"
which will be parsed from the value
of the submission_timestamp
attribute if it exists.
These can be useful for making sure your output specification buckets messages
into hourly directories.
The templating and default syntax used here is based on the
Apache commons-text StringSubstitutor
,
which in turn bases its syntax on common practice in bash and other Unix/Linux shells.
Beware the need for proper escaping on the command line (use \$
in place of $
),
as your shell may try to substitute in values
for your placeholders before they're passed to Sink
.
Google's PubsubMessage format allows arbitrary strings for attribute names and values. We place the following restrictions on attribute names and default values used in placeholders:
- attribute names may not contain the string
:-
- attribute names may not contain curly braces (
{
or}
) - default values may not contain curly braces (
{
or}
)
File prefix
Individual files are named by replacing :
with -
in the default format discussed in
the "File naming" section of Beam's
FileIO
Javadoc:
$prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix
In our case, $prefix
is determined from the last /
-delimited piece of the --output
path. If you specify a path ending in /
, you'll end up with an empty prefix
and your file names will begin with -
. This is probably not what you want,
so it's recommended to end your output path with a non-empty file prefix. We replace :
with -
because Hadoop can't handle :
in file names.
For example, given:
--output=/tmp/output/out
An output file might be:
/tmp/output/out--290308-12-21T20-00-00.000Z--290308-12-21T20-10-00.000Z-00000-of-00001.ndjson
Executing Jobs
Note: -Dexec.args
does not handle newlines gracefully, but bash will remove
\
escaped newlines in "
s.
Locally
If you install Java and maven, you can invoke mvn
directly in the following commands;
be aware, though, that Java 8 is the target JVM and some reflection warnings may be thrown on
newer versions, though these are generally harmless.
The provided bin/mvn
script downloads and runs maven via docker so that less
setup is needed on the local machine.
# create a test input file
mkdir -p tmp/
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' > tmp/input.json
# consume messages from the test file, decode and re-encode them, and write to a directory
./bin/mvn compile exec:java -Dexec.args="\
--inputFileFormat=json \
--inputType=file \
--input=tmp/input.json \
--outputFileFormat=json \
--outputType=file \
--output=tmp/output/out \
--errorOutputType=file \
--errorOutput=tmp/error \
"
# check that the message was delivered
cat tmp/output/*
# write message payload straight to stdout
./bin/mvn compile exec:java -Dexec.args="\
--inputFileFormat=json \
--inputType=file \
--input=tmp/input.json \
--outputFileFormat=text \
--outputType=stdout \
--errorOutputType=stderr \
"
# check the help page to see types of options
./bin/mvn compile exec:java -Dexec.args=--help
# check the SinkOptions help page for options specific to Sink
./bin/mvn compile exec:java -Dexec.args=--help=SinkOptions
On Dataflow
# Pick a bucket to store files in
BUCKET="gs://$(gcloud config get-value project)"
# create a test input file
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' | gsutil cp - $BUCKET/input.json
# Set credentials; beam is not able to use gcloud credentials
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/creds.json"
# consume messages from the test file, decode and re-encode them, and write to a bucket
./bin/mvn compile exec:java -Dexec.args="\
--runner=Dataflow \
--inputFileFormat=json \
--inputType=file \
--input=$BUCKET/input.json \
--outputFileFormat=json \
--outputType=file \
--output=$BUCKET/output \
--errorOutputType=file \
--errorOutput=$BUCKET/error \
"
# wait for the job to finish
gcloud dataflow jobs list
# check that the message was delivered
gsutil cat $BUCKET/output/*
On Dataflow with templates
Dataflow templates make a distinction between
runtime parameters that implement the ValueProvider
interface
and compile-time parameters which do not.
All option can be specified at template compile time by passing command line flags,
but runtime parameters can also be overridden when
executing the template
via the --parameters
flag.
In the output of --help=SinkOptions
, runtime parameters are those
with type ValueProvider
.
# Pick a bucket to store files in
BUCKET="gs://$(gcloud config get-value project)"
# Set credentials; beam is not able to use gcloud credentials
export GOOGLE_APPLICATION_CREDENTIALS="path/to/your/creds.json"
# create a template
./bin/mvn compile exec:java -Dexec.args="\
--runner=Dataflow \
--project=$(gcloud config get-value project) \
--inputFileFormat=json \
--inputType=file \
--outputFileFormat=json \
--outputType=file \
--errorOutputType=file \
--templateLocation=$BUCKET/sink/templates/JsonFileToJsonFile \
--stagingLocation=$BUCKET/sink/staging \
"
# create a test input file
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' | gsutil cp - $BUCKET/input.json
# run the dataflow template with gcloud
JOBNAME=FileToFile1
gcloud dataflow jobs run $JOBNAME --gcs-location=$BUCKET/sink/templates/JsonFileToJsonFile --parameters "input=$BUCKET/input.json,output=$BUCKET/output/,errorOutput=$BUCKET/error"
# get the job id
JOB_ID="$(gcloud dataflow jobs list --filter name=fileToStdout1 | tail -1 | cut -d' ' -f1)"
# wait for the job to finish
gcloud dataflow jobs show "$JOB_ID"
# check that the message was delivered
gsutil cat $BUCKET/output/*
In streaming mode
If --inputType=pubsub
, Beam will execute in streaming mode, requiring some
extra configuration for file-based outputs. You will need to specify sharding like:
--outputNumShards=10
--errorOutputNumShards=10
As discussed in the
Beam documentation for FileIO.Write#withNumShards
,
batch mode is most efficient when the runner is left to determine sharding,
so numShards
options should normally be left to their default of 0
, but
streaming mode can't perform the same optimizations thus an exception will be thrown
during pipeline construction if sharding is not specified.
As codified in apache/beam/pull/1952,
the Dataflow runner suggests a reasonable starting point numShards
is 2 * maxWorkers
or 10 if --maxWorkers
is unspecified.
Decoder Job
A job for normalizing ingestion messages.
Transforms
These transforms are currently executed against each message in order.
Parse URI
Attempt to extract attributes from uri
, on failure send messages to the
configured error output.
Decompress
Attempt to decompress payload with gzip, on failure pass the message through unmodified.
GeoIP Lookup
- Extract
ip
from thex_forwarded_for
attribute - when the
x_pipeline_proxy
attribute is not present, use the second-to-last value (since the last value is a forwarding rule IP added by Google load balancer) - when the
x_pipeline_proxy
attribute is present, use the third-to-last value (since the tee introduces an additional proxy IP) - fall back to the
remote_addr
attribute, then to an empty string - Execute the following steps until one fails and ignore the exception
- Parse
ip
usingInetAddress.getByName
- Lookup
ip
in the configuredGeoIP2City.mmdb
- Extract
country.iso_code
asgeo_country
- Extract
city.name
asgeo_city
ifcities15000.txt
is not configured orcity.geo_name_id
is in the configuredcities15000.txt
- Extract
subdivisions[0].iso_code
asgeo_subdivision1
- Extract
subdivisions[1].iso_code
asgeo_subdivision2
- Parse
- Remove the
x_forwarded_for
andremote_addr
attributes - Remove any
null
values added to attributes
Parse User Agent
Attempt to extract browser, browser version, and os from the user_agent
attribute, drop any nulls, and remove user_agent
from attributes.
Executing Decoder Jobs
Decoder jobs are executed the same way as executing sink jobs but with a few extra flags:
-Dexec.mainClass=com.mozilla.telemetry.Decoder
--geoCityDatabase=/path/to/GeoIP2-City.mmdb
--geoCityFilter=/path/to/cities15000.txt
(optional)
Example:
# create a test input file
mkdir -p tmp/
echo '{"payload":"dGVzdA==","attributeMap":{"remote_addr":"63.245.208.195"}}' > tmp/input.json
# Download `cities15000.txt`, `GeoLite2-City.mmdb`, and `schemas.tar.gz`
./bin/download-cities15000
./bin/download-geolite2
./bin/download-schemas
# do geo lookup on messages to stdout
./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Decoder -Dexec.args="\
--geoCityDatabase=GeoLite2-City.mmdb \
--geoCityFilter=cities15000.txt \
--schemasLocation=schemas.tar.gz \
--inputType=file \
--input=tmp/input.json \
--outputType=stdout \
--errorOutputType=stderr \
"
# check the DecoderOptions help page for options specific to Decoder
./bin/mvn compile exec:java -Dexec.args=--help=DecoderOptions
"
Republisher Job
A job for republishing subsets of decoded messages to new destinations.
The primary intention is to produce smaller derived Pub/Sub topics so that consumers that only need a specific subset of messages don't incur the cost of reading the entire stream of decoded messages.
The Republisher has the additional responsibility of marking messages as seen
in Cloud MemoryStore
for deduplication purposes. That functionality exists
here to avoid the expense of an additional separate consumer of the full
decoded topic.
Capabilities
Marking Messages As Seen
The job needs to connect to Redis in order to mark document_id
s of consumed
messages as seen. The Decoder is able to use that information to drop duplicate
messages flowing through the pipeline.
Debug Republishing
If --enableDebugDestination
is set, messages containing an x_debug_id
attribute will be republished to a destination that's configurable at runtime.
This is currently expected to be a feature specific to structured ingestion,
so should not be set for telemetry-decoded
input.
Per-docType
Republishing
If --perDocTypeEnabledList
is provided, a separate producer will be created
for each docType
specified in the given comma-separated list.
See the --help
output for details on format.
Per-Channel Sampled Republishing
If --perChannelSampleRatios
is provided, a separate producer will be created
for each specified release channel. The messages will be randomly sampled
according to the ratios provided per channel.
This is currently intended as a feature only for telemetry data, so should
not be set for structured-decoded
input.
See the --help
output for details on format.
Executing Republisher Jobs
Republisher jobs are executed the same way as executing sink jobs
but with a few differences in flags. You'll need to set the mainClass
:
-Dexec.mainClass=com.mozilla.telemetry.Republisher
The --outputType
flag is still required as in the sink, but the --output
configuration is ignored for the Republisher. Instead, there is a separate
destination configuration flag for each of the three republishing types.
For each type, there is an compile-time option that affects what publishers
are generated in the graph for the Dataflow job along with a runtime option
that determines the specific location (usually a topic name) for each publisher.
To enable debug republishing:
--enableDebugDestination
(compile-time)--debugDestination=/some/pubsub/topic/path
To enable per-docType
republishing:
--perDocTypeEnabledList=event,heartbeat
(compile-time)--perDocTypeDestination=/some/pubsub/topic/path/per-doctype-${document_namespace}-${document_type}
(compile-time)
To enable per-channel sampled republishing:
--perChannelSampleRatios='{"nightly":1.0,"beta":0.1,"release":0.01}'
(compile-time)--perChannelDestination=/some/pubsub/topic/path/per-channel-${channel}
(compile-time)
Example:
# create a test input file
mkdir -p tmp/
echo '{"payload":"dGVzdA==","attributeMap":{"x_debug_id":"mysession"}}' > tmp/input.json
# Republish only messages with x_debug_id attribute to stdout.
./bin/mvn compile exec:java -Dexec.mainClass=com.mozilla.telemetry.Republisher -Dexec.args="\
--inputType=file \
--input=tmp/input.json \
--outputType=stdout \
--errorOutputType=stderr \
--enableDebugDestination
"
# check the RepublisherOptions help page for options specific to Republisher
./bin/mvn compile exec:java -Dexec.args=--help=RepublisherOptions
"
Testing
Before anything else, be sure to download the test data:
./bin/download-cities15000
./bin/download-geolite2
./bin/download-schemas
Run tests locally with CircleCI Local CLI
(cd .. && circleci build --job ingestion-beam)
To make more targeted test invocations, you can install Java and maven locally or
use the bin/mvn
executable to run maven in docker:
./bin/mvn clean test
To run the project in a sandbox against production data, see this document on configuring an integration testing workflow.
Code Formatting
Use spotless to automatically reformat code:
mvn spotless:apply
or use just check what changes it requires:
mvn spotless:check