This commit is contained in:
Daniel Thorn 2020-07-28 15:56:24 -07:00 коммит произвёл GitHub
Родитель 741574545b
Коммит d375025ff9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
19 изменённых файлов: 703 добавлений и 60 удалений

Просмотреть файл

@ -84,6 +84,7 @@ sharding
SQLite
stderr
stdout
stdin
sunsetting
TCP
templating

Просмотреть файл

@ -5,17 +5,22 @@
A monorepo for documentation and implementation of the Mozilla telemetry
ingestion system deployed to Google Cloud Platform (GCP).
There are currently two components:
There are currently four components:
- [ingestion-edge](ingestion-edge): a simple Python service for accepting HTTP
messages and delivering to Google Cloud Pub/Sub
- [ingestion-beam](ingestion-beam): a Java module defining
[Apache Beam](https://beam.apache.org/) jobs for streaming and batch
transformations of ingested messages
- [ingestion-sink](ingestion-sink): a Java application that runs
in Kubernetes, reading input from Google Cloud Pub/Sub and emitting
records to outputs like GCS or BigQuery
- [ingestion-core](ingestion-core): a Java module for code shared between
ingestion-beam and ingestion-sink
For more information, see [the documentation](https://mozilla.github.io/gcp-ingestion).
Java 11 support is a work in progress for the Beam Java SDK, so this project requires
Java 8 and will likely fail to compile using newer versions of the JDK.
To manage multiple local JDKs, consider [jenv](https://www.jenv.be/) and the
`jenv enable-plugin maven` command.
To manage multiple local JDKs, consider [jenv](https://www.jenv.be/) and the
`jenv enable-plugin maven` command.

Просмотреть файл

@ -15,7 +15,11 @@ mkdir -p ~/.m2
INTERACTIVE_FLAGS=""
if [ -z "$NONINTERACTIVE" ]; then
INTERACTIVE_FLAGS="-it"
if [ -t 0 ]; then
INTERACTIVE_FLAGS="-it"
else
INTERACTIVE_FLAGS="-i"
fi
fi
MOUNT_CREDENTIALS_FLAGS=""
@ -39,6 +43,7 @@ docker run $INTERACTIVE_FLAGS --rm \
-e MAVEN_OPTS \
-e MAVEN_CONFIG=/var/maven/.m2 \
-e GOOGLE_APPLICATION_CREDENTIALS \
$(for e in $PASS_ENV; do echo "-e $e"; done) \
$MOUNT_CREDENTIALS_FLAGS \
maven:3-jdk-8 \
mvn -Duser.home=/var/maven "$@"

Просмотреть файл

@ -6,12 +6,12 @@ deployed to Google Cloud Platform (GCP).
The components are:
- [ingestion-edge](./ingestion-edge/index.md): a simple Python service for accepting HTTP
- [ingestion-edge](ingestion-edge): a simple Python service for accepting HTTP
messages and delivering to Google Cloud Pub/Sub
- [ingestion-beam](ingestion-beam): a Java module defining
[Apache Beam](https://beam.apache.org/) jobs for streaming and batch
transformations of ingested messages
- ingestion-sink (documentation pending): a Java application that runs
- [ingestion-sink](ingestion-sink): a Java application that runs
in Kubernetes, reading input from Google Cloud Pub/Sub and emitting
records to batch-oriented outputs like GCS or BigQuery

Просмотреть файл

@ -11,6 +11,7 @@ There are currently three jobs defined, please see the respective sections on th
documentation:
* [Sink job](./sink-job/): A job for delivering messages between Google Cloud services
* deprecated in favor of [ingestion-sink](../ingestion-sink)
* [Decoder job](./decoder-job/): A job for normalizing ingestion messages
* [Republisher job](./republisher-job/): A job for republishing subsets of decoded messages to new destinations

Просмотреть файл

@ -2,6 +2,10 @@
A job for delivering messages between Google Cloud services. Defined in the `com.mozilla.telemetry.Sink` class ([source](https://github.com/mozilla/gcp-ingestion/blob/master/ingestion-beam/src/main/java/com/mozilla/telemetry/Sink.java)).
## Deprecated
This job has been replaced by [ingestion-sink](../ingestion-sink) for loading messages from Google Cloud PubSub into BigQuery.
## Supported Input and Outputs
Supported inputs:

Просмотреть файл

@ -0,0 +1,231 @@
# Ingestion Sink
A Java application that runs in Kubernetes, reading input from Google Cloud
Pub/Sub and emitting records to batch-oriented outputs like GCS or BigQuery.
Defined in the `ingestion-sink` package
([source](https://github.com/mozilla/gcp-ingestion/tree/master/ingestion-sink/)).
## Supported Input and Outputs
Supported inputs:
* Google Cloud PubSub
Supported outputs:
* Google Cloud PubSub
* Google Cloud Storage
* Google Cloud BigQuery
### Test Input and Output
Test inputs will stop when an exception is raised or the end of the pipe or
file is reached. Supported test inputs:
* `System.in` (stdin), by setting `INPUT_PIPE` to any of `-`, `0`, `in`,
`stdin`, `/dev/stdin`
* A single file, by setting `INPUT_PIPE` to `/path/to/input_file`
Test outputs don't exercise batching and will write messages as newline
delimited JSON in the order they are received. Supported test outputs:
* `System.out` (stdout), by setting `OUTPUT_PIPE` to any of `-`, `1`, `out`,
`stdout`, `/dev/stdout`
* `System.err` (stderr), by setting `OUTPUT_PIPE` to any of `2`, `err`,
`stderr`, `/dev/stderr`
* A single file, by setting `OUTPUT_PIPE` to `/path/to/output_file`
## Configuration
All configuration is controlled by environment variables.
## Output Specification
Depending on the environment variables provided, the application will
automatically determine where to deliver messages.
If `OUTPUT_BUCKET` is specified without `BIG_QUERY_OUTPUT_MODE`, then messages
will be delivered to Google Cloud Storage.
If `OUTPUT_TOPIC` is specified without `OUTPUT_BUCKET` or
`BIG_QUERY_OUTPUT_MODE`, then messages will be delivered to Google Cloud
Pub/Sub.
If `OUTPUT_TABLE` is specified without `BIG_QUERY_OUTPUT_MODE` or with
`BIG_QUERY_OUTPUT_MODE=streaming`, then messages will be delivered to BigQuery
via the streaming API.
If `OUTPUT_TABLE` is specified with `BIG_QUERY_OUTPUT_MODE=file_loads`, then
messages will be delivered to Google Cloud Storage based on `OUTPUT_BUCKET` and
for each blob a notification will be delivered to Google Cloud Pub/Sub based on
`OUTPUT_TOPIC`. Separate instances of ingestion-sink must consume notifications
from Google Cloud Pub/Sub and deliver messages to BigQuery via load jobs.
If `OUTPUT_TABLE` is specified with `BIG_QUERY_OUTPUT_MODE=mixed`, then
messages will be delivered to BigQuery via both the streaming API and load
jobs, and `OUTPUT_BUCKET` is required. If `OUTPUT_TOPIC` is specified then it
will be used the same as with `BIG_QUERY_OUTPUT_MODE=file_loads`, otherwise
load jobs will be submitted by each running instance of ingestion-sink.
If none of the above configuration options are provided, then messages must be
notifications from `BIG_QUERY_OUTPUT_MODE=file_loads` or
`BIG_QUERY_OUTPUT_MODE=mixed`, and the blobs they indicate will be submitted to
BigQuery via load jobs.
### BigQuery
`OUTPUT_TABLE` must be a `tableSpec` of form `dataset.tablename`
or the more verbose `projectId.dataset.tablename`. The values can contain
attribute placeholders of form `${attribute_name}`. To set dataset to the
document namespace and table name to the document type, specify:
OUTPUT_TABLE='${document_namespace}.${document_type}'
All `-` characters in the attributes will be converted to `_` per BigQuery
naming restrictions. Additionally, document namespace and type values will
be processed to ensure they are in snake case format (`untrustedModules`
becomes `untrusted_modules`).
Defaults for the placeholders using `${attribute_name:-default_value}`
are supported, but likely don't make much sense since it's unlikely that
there is a default table whose schema is compatible with all potential
payloads.
### Attribute placeholders
We support routing individual messages to different output locations based on
the `PubsubMessage` attribute map. Routing is accomplished by adding
placeholders of form `${attribute_name:-default_value}` to the path.
For example, to route based on a `document_type` attribute, your path might
look like:
OUTPUT_BUCKET=gs://mybucket/mydocs/${document_type:-UNSPECIFIED}/myfileprefix
Messages with `document_type` of "main" would be grouped together and end up in
the following directory:
gs://mybucket/mydocs/main/
Messages with `document_type` set to `null` or missing that attribute
completely would be grouped together and end up in directory:
gs://mybucket/mydocs/UNSPECIFIED/
Note that placeholders _must_ specify a default value so that a poorly
formatted message doesn't cause a pipeline exception. A placeholder without a
default will result in an `IllegalArgumentException` on pipeline startup.
File-based outputs support the additional _derived_ attributes
`"submission_date"` and `"submission_hour"` which will be parsed from the value
of the `submission_timestamp` attribute if it exists. These can be useful for
making sure your output specification buckets messages into hourly directories.
The templating and default syntax used here is based on the
[Apache commons-text `StringSubstitutor`](https://commons.apache.org/proper/commons-text/javadocs/api-release/org/apache/commons/text/StringSubstitutor.html),
which in turn bases its syntax on common practice in bash and other Unix/Linux
shells. Beware the need for proper escaping on the command line (use `\$` in
place of `$`), as your shell may try to substitute in values for your
placeholders before they're passed to `Sink`.
[Google's PubsubMessage format](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage)
allows arbitrary strings for attribute names and values. We place the following restrictions
on attribute names and default values used in placeholders:
- attribute names may not contain the string `:-`
- attribute names may not contain curly braces (`{` or `}`)
- default values may not contain curly braces (`{` or `}`)
### Encoding
When writing messages to Google Cloud Storage or BigQuery, the message received
from Google Cloud Pub/Sub will be encoded as a JSON object.
When `OUTPUT_FORMAT` is unspecified or `raw`, messages will have bytes encoded as
a `"payload"` field with base64 encoding, and each attribute encoded as field.
This is the format used for `payload_bytes_raw.*` tables.
When `OUTPUT_FORMAT` is `decoded` messages will have bytes encoded as with
`OUTPUT_FORMAT=raw`, but attributes will be encoded using the nested metadata
format of decoded pings. This is the format used for `payload_bytes_decoded.*`
tables.
When `OUTPUT_FORMAT` is `payload` messages will have bytes decoded as JSON, and
will be transformed to coerce types and use snake case for compatibility with BigQuery.
This is the format used for `*_live.*` tables. This requires specifying a local
path to a gzipped tar archive that contains BigQuery table schemas as
`SCHEMAS_LOCATION`. If messages bytes are compressed then
`INPUT_COMPRESSION=gzip` must also be specified to ensure they are decompressed
before they are decoded as JSON.
When `OUTPUT_FORMAT` is `beam` messages will have bytes encoded as with
`OUTPUT_FORMAT=raw`, but attributes will be encoded as an `"attributeMap"` field
that contains a JSON object. This is the same format as produced by ingestion-beam
when using `--outputType=file` and `--outputFileFormat=json`.
### Google Cloud Storage file prefix
Google Cloud Storage files are named like:
$OUTPUT_BUCKET/{UUID.randomUUID().toString()}.ndjson
or if `OUTPUT_TABLE` and `BIG_QUERY_OUTPUT_MODE` are specified:
$OUTPUT_BUCKET/OUTPUT_TABLE=$OUTPUT_TABLE/{UUID.randomUUID().toString()}.ndjson
for example, with `OUTPUT_BUCKET=gs://test-bucket/test-output`:
gs://test-bucket/test-output/ad715b24-7500-45e2-9691-cb91e3b9c2cc.ndjson
or with `OUTPUT_BUCKET=gs://test-bucket/test-output`, `OUTPUT_TABLE=my_dataset.raw_table`, and
`BIG_QUERY_OUTPUT_MODE=file_loads`:
gs://test-bucket/test-output/OUTPUT_TABLE=my_dataset.raw_table/3b17c648-f8b9-4250-bdc1-5c2e472fdc26.ndjson
## Executing
### Locally with Docker
The provided `bin/mvn` script downloads and runs maven via docker so that less
setup is needed on the local machine. For prolonged development performance is
likely to be significantly better, especially in MacOS, if `mvn` is installed and
run natively without docker.
```bash
# create a test input file
mkdir -p tmp/
echo '{"payload":"dGVzdA==","attributeMap":{"host":"test"}}' > tmp/input.ndjson
# consume messages from the test file, decode and re-encode them, and write to a directory
PASS_ENV="INPUT_PIPE=tmp/input.ndjson OUTPUT_PIPE=tmp/output.ndjson" ./bin/mvn compile exec:java
# check that the message was delivered
cat tmp/output.ndjson
# read message from stdin and write to stdout
cat tmp/input.ndjson | PASS_ENV="INPUT_PIPE=- OUTPUT_PIPE=-" ./bin/mvn compile exec:java
# read message from stdin and write to gcs
# note that $ needs to be escaped with \ to prevent shell substitution
cat tmp/input.ndjson | PASS_ENV="\
INPUT_PIPE=- \
OUTPUT_BUCKET=gs://my_bucket/\${document_type:-UNSPECIFIED}/ \
" ./bin/mvn compile exec:java
```
### Locally without Docker
If you install Java and maven, you can invoke `VAR=... mvn` in the above commands
instead of using `PASS_ENV="VAR=..." ./bin/mvn`. Be aware that Java 8 is the target JVM and some
reflection warnings may be thrown on newer versions. Though these are generally
harmless, you may need to comment out the
`<compilerArgument>-Werror</compilerArgument>` line in the `pom.xml` in the git
root.
```bash
# consume messages from the test file, decode and re-encode them, and write to a directory
INPUT_PIPE=tmp/input.ndjson OUTPUT_PIPE=tmp/output.ndjson mvn compile exec:java
# read message from stdin and write to stdout
cat tmp/input.ndjson | INPUT_PIPE=- OUTPUT_PIPE=- ./bin/mvn compile exec:java
```

Просмотреть файл

@ -59,6 +59,7 @@ public class Constant {
}
public static final String ADDITIONAL_PROPERTIES = "additional_properties";
public static final String ATTRIBUTE_MAP = "attributeMap";
public static final String KEY = "key";
public static final String LIST = "list";
public static final String PAYLOAD = "payload";

Просмотреть файл

@ -58,6 +58,39 @@ public abstract class PubsubMessageToObjectNode {
return apply(null, attributes, data);
}
public static class Beam extends PubsubMessageToObjectNode {
private static final Beam INSTANCE = new Beam();
public static Beam of() {
return INSTANCE;
}
/**
* Turn message into an {@link ObjectNode} in the json format used by ingestion-beam when
* writing files.
*
* <p>ingestion-beam uses the default jackson output format for apache beam's PubsubMessage,
* which looks like:
* <pre>
* {
* "payload": "${base64 encoded byte array}",
* "attributeMap": {"${key}": "${value}"...}
* }
* </pre>
*/
@Override
public ObjectNode apply(TableId tableId, Map<String, String> attributes, byte[] data) {
ObjectNode contents = Json.createObjectNode();
Optional.of(attributes).filter(map -> !map.isEmpty()).map(Json::asObjectNode)
.ifPresent(obj -> contents.set(FieldName.ATTRIBUTE_MAP, obj));
Optional.ofNullable(data).filter(bytes -> bytes.length > 0)
.map(BASE64_ENCODER::encodeToString)
.ifPresent(payload -> contents.put(FieldName.PAYLOAD, payload));
return contents;
}
}
public static class Raw extends PubsubMessageToObjectNode {
private static final Raw INSTANCE = new Raw();

Просмотреть файл

@ -13,6 +13,8 @@ import com.mozilla.telemetry.ingestion.core.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.sink.io.BigQuery;
import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors;
import com.mozilla.telemetry.ingestion.sink.io.Gcs;
import com.mozilla.telemetry.ingestion.sink.io.Input;
import com.mozilla.telemetry.ingestion.sink.io.Pipe;
import com.mozilla.telemetry.ingestion.sink.io.Pubsub;
import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.CompressPayload;
@ -22,7 +24,12 @@ import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedSt
import com.mozilla.telemetry.ingestion.sink.util.Env;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@ -36,6 +43,7 @@ public class SinkConfig {
private static final String INPUT_COMPRESSION = "INPUT_COMPRESSION";
private static final String INPUT_PARALLELISM = "INPUT_PARALLELISM";
private static final String INPUT_PIPE = "INPUT_PIPE";
private static final String INPUT_SUBSCRIPTION = "INPUT_SUBSCRIPTION";
private static final String BATCH_MAX_BYTES = "BATCH_MAX_BYTES";
private static final String BATCH_MAX_DELAY = "BATCH_MAX_DELAY";
@ -49,6 +57,7 @@ public class SinkConfig {
private static final String OUTPUT_COMPRESSION = "OUTPUT_COMPRESSION";
private static final String OUTPUT_FORMAT = "OUTPUT_FORMAT";
private static final String OUTPUT_PARALLELISM = "OUTPUT_PARALLELISM";
private static final String OUTPUT_PIPE = "OUTPUT_PIPE";
private static final String OUTPUT_TOPIC = "OUTPUT_TOPIC";
private static final String MAX_OUTSTANDING_ELEMENT_COUNT = "MAX_OUTSTANDING_ELEMENT_COUNT";
private static final String MAX_OUTSTANDING_REQUEST_BYTES = "MAX_OUTSTANDING_REQUEST_BYTES";
@ -60,12 +69,13 @@ public class SinkConfig {
private static final String STRICT_SCHEMA_DOCTYPES = "STRICT_SCHEMA_DOCTYPES";
private static final Set<String> INCLUDE_ENV_VARS = ImmutableSet.of(INPUT_COMPRESSION,
INPUT_PARALLELISM, INPUT_SUBSCRIPTION, BATCH_MAX_BYTES, BATCH_MAX_DELAY, BATCH_MAX_MESSAGES,
BIG_QUERY_OUTPUT_MODE, BIG_QUERY_DEFAULT_PROJECT, LOAD_MAX_BYTES, LOAD_MAX_DELAY,
LOAD_MAX_FILES, OUTPUT_BUCKET, OUTPUT_COMPRESSION, OUTPUT_FORMAT, OUTPUT_PARALLELISM,
OUTPUT_TABLE, OUTPUT_TOPIC, MAX_OUTSTANDING_ELEMENT_COUNT, MAX_OUTSTANDING_REQUEST_BYTES,
SCHEMAS_LOCATION, STREAMING_BATCH_MAX_BYTES, STREAMING_BATCH_MAX_DELAY,
STREAMING_BATCH_MAX_MESSAGES, STREAMING_DOCTYPES, STRICT_SCHEMA_DOCTYPES);
INPUT_PARALLELISM, INPUT_PIPE, INPUT_SUBSCRIPTION, BATCH_MAX_BYTES, BATCH_MAX_DELAY,
BATCH_MAX_MESSAGES, BIG_QUERY_OUTPUT_MODE, BIG_QUERY_DEFAULT_PROJECT, LOAD_MAX_BYTES,
LOAD_MAX_DELAY, LOAD_MAX_FILES, OUTPUT_BUCKET, OUTPUT_COMPRESSION, OUTPUT_FORMAT,
OUTPUT_PARALLELISM, OUTPUT_PIPE, OUTPUT_TABLE, OUTPUT_TOPIC, MAX_OUTSTANDING_ELEMENT_COUNT,
MAX_OUTSTANDING_REQUEST_BYTES, SCHEMAS_LOCATION, STREAMING_BATCH_MAX_BYTES,
STREAMING_BATCH_MAX_DELAY, STREAMING_BATCH_MAX_MESSAGES, STREAMING_DOCTYPES,
STRICT_SCHEMA_DOCTYPES);
// BigQuery.Write.Batch.getByteSize reports protobuf size, which can be ~1/3rd more
// efficient than the JSON that actually gets sent over HTTP, so we use to 60% of the
@ -131,6 +141,38 @@ public class SinkConfig {
}
enum OutputType {
pipe {
@Override
Output getOutput(Env env, Executor executor) {
final String outputPipe = env.getString(OUTPUT_PIPE);
final PrintStream pipe;
switch (outputPipe) {
case "-":
case "1":
case "out":
case "stdout":
case "/dev/stdout":
pipe = System.out;
break;
case "2":
case "err":
case "stderr":
case "/dev/stderr":
pipe = System.err;
break;
default:
try {
pipe = new PrintStream(outputPipe);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
}
return new Output(env, this, Pipe.Write.of(pipe, env.optString(OUTPUT_TABLE)
.map(PubsubMessageToTemplatedString::forBigQuery).orElse(null), getFormat(env)));
}
},
pubsub {
private CompressPayload getOutputCompression(Env env) {
@ -322,7 +364,9 @@ public class SinkConfig {
static OutputType get(Env env) {
boolean hasBigQueryOutputMode = env.containsKey(BIG_QUERY_OUTPUT_MODE);
if (env.containsKey(OUTPUT_BUCKET) && !hasBigQueryOutputMode) {
if (env.containsKey(OUTPUT_PIPE)) {
return OutputType.pipe;
} else if (env.containsKey(OUTPUT_BUCKET) && !hasBigQueryOutputMode) {
return OutputType.gcs;
} else if (env.containsKey(OUTPUT_TOPIC) && !hasBigQueryOutputMode) {
return OutputType.pubsub;
@ -363,6 +407,8 @@ public class SinkConfig {
case "payload":
return PubsubMessageToObjectNode.Payload.of(env.getStrings(STRICT_SCHEMA_DOCTYPES, null),
env.getString(SCHEMAS_LOCATION, null), FileInputStream::new).withOpenCensusMetrics();
case "beam":
return PubsubMessageToObjectNode.Beam.of();
default:
throw new IllegalArgumentException("Format not yet implemented: " + format);
}
@ -411,30 +457,50 @@ public class SinkConfig {
}
/** Return a configured input transform. */
public static Pubsub.Read getInput(Output output) throws IOException {
// read pubsub messages from INPUT_SUBSCRIPTION
Pubsub.Read input = new Pubsub.Read(output.env.getString(INPUT_SUBSCRIPTION), output,
builder -> builder
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(
output.type.getMaxOutstandingElementCount(output.env))
.setMaxOutstandingRequestBytes(
output.type.getMaxOutstandingRequestBytes(output.env))
.build())
// The number of streaming subscriber connections for reading from Pub/Sub.
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L141
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L318-L320
// The default number of executor threads is max(6, 2*parallelPullCount).
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L566-L568
// Subscriber connections are expected to be CPU bound until flow control thresholds are
// reached, so parallelism should be no less than the number of available processors.
.setParallelPullCount(
output.env.getInt(INPUT_PARALLELISM, Runtime.getRuntime().availableProcessors())),
getInputCompression(output.env));
public static Input getInput(Output output) throws IOException {
final DecompressPayload decompress = getInputCompression(output.env);
final Input input;
if (output.env.containsKey(INPUT_PIPE)) {
final String inputPipe = output.env.getString(INPUT_PIPE);
final InputStream pipe;
switch (inputPipe) {
case "-":
case "0":
case "in":
case "stdin":
case "/dev/stdin":
pipe = System.in;
break;
default:
pipe = Files.newInputStream(Paths.get(output.env.getString(INPUT_PIPE)));
}
input = Pipe.Read.of(pipe, output.write, decompress);
} else {
// read pubsub messages from INPUT_SUBSCRIPTION
input = new Pubsub.Read(output.env.getString(INPUT_SUBSCRIPTION), output,
builder -> builder
.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(
output.type.getMaxOutstandingElementCount(output.env))
.setMaxOutstandingRequestBytes(
output.type.getMaxOutstandingRequestBytes(output.env))
.build())
// The number of streaming subscriber connections for reading from Pub/Sub.
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L141
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L318-L320
// The default number of executor threads is max(6, 2*parallelPullCount).
// https://github.com/googleapis/java-pubsub/blob/v1.105.0/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java#L566-L568
// Subscriber connections are expected to be CPU bound until flow control thresholds
// are
// reached, so parallelism should be no less than the number of available processors.
.setParallelPullCount(
output.env.getInt(INPUT_PARALLELISM, Runtime.getRuntime().availableProcessors())),
decompress);
// Setup OpenCensus stackdriver exporter after all measurement views have been registered,
// as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/
StackdriverStatsExporter.createAndRegister();
}
output.env.requireAllVarsUsed();
// Setup OpenCensus stackdriver exporter after all measurement views have been registered,
// as seen in https://opencensus.io/exporters/supported-exporters/java/stackdriver-stats/
StackdriverStatsExporter.createAndRegister();
return input;
}
}

Просмотреть файл

@ -57,7 +57,7 @@ public class BigQuery {
return parseTableId(outputTableMatcher.group(1));
}
private static TableId parseTableId(String input) {
static TableId parseTableId(String input) {
final String[] tableSpecParts = input.replaceAll(":", ".").split("\\.", 3);
if (tableSpecParts.length == 3) {
return TableId.of(tableSpecParts[0], tableSpecParts[1], tableSpecParts[2]);

Просмотреть файл

@ -0,0 +1,9 @@
package com.mozilla.telemetry.ingestion.sink.io;
public interface Input {
void run();
default void stopAsync() {
}
}

Просмотреть файл

@ -0,0 +1,80 @@
package com.mozilla.telemetry.ingestion.sink.io;
import com.google.cloud.bigquery.TableId;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.core.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.core.util.Json;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString;
import com.mozilla.telemetry.ingestion.sink.transform.StringToPubsubMessage;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class Pipe {
/** Never instantiate this class. */
private Pipe() {
}
public static class Read<ResultT> implements Input {
private final InputStream pipe;
private final Function<PubsubMessage, CompletableFuture<ResultT>> output;
private final Function<PubsubMessage, PubsubMessage> decompress;
private Read(InputStream pipe, Function<PubsubMessage, CompletableFuture<ResultT>> output,
Function<PubsubMessage, PubsubMessage> decompress) {
this.pipe = pipe;
this.output = output;
this.decompress = decompress;
}
public static <T> Read<T> of(InputStream pipe,
Function<PubsubMessage, CompletableFuture<T>> output,
Function<PubsubMessage, PubsubMessage> decompress) {
return new Read<>(pipe, output, decompress);
}
/** Read stdin until end of stream. */
@Override
public void run() {
BufferedReader in = new BufferedReader(new InputStreamReader(pipe, StandardCharsets.UTF_8));
in.lines().map(StringToPubsubMessage::apply).map(decompress).map(output)
.forEach(CompletableFuture::join);
}
}
public static class Write implements Function<PubsubMessage, CompletableFuture<Void>> {
private final PrintStream pipe;
private final PubsubMessageToObjectNode encoder;
private final Function<PubsubMessage, TableId> tableTemplate;
private Write(PrintStream pipe, PubsubMessageToTemplatedString tableTemplate,
PubsubMessageToObjectNode encoder) {
this.pipe = pipe;
if (tableTemplate == null) {
this.tableTemplate = m -> null;
} else {
this.tableTemplate = m -> BigQuery.parseTableId(tableTemplate.apply(m));
}
this.encoder = encoder;
}
public static Write of(PrintStream pipe, PubsubMessageToTemplatedString tableTemplate,
PubsubMessageToObjectNode encoder) {
return new Write(pipe, tableTemplate, encoder);
}
@Override
public CompletableFuture<Void> apply(PubsubMessage message) {
pipe.println(Json.asString(encoder.apply(tableTemplate.apply(message),
message.getAttributesMap(), message.getData().toByteArray())));
return CompletableFuture.completedFuture(null);
}
}
}

Просмотреть файл

@ -26,7 +26,7 @@ public class Pubsub {
private Pubsub() {
}
public static class Read {
public static class Read implements Input {
private static final Logger LOG = LoggerFactory.getLogger(Read.class);
@ -63,7 +63,8 @@ public class Pubsub {
.build();
}
/** Run the subscriber until terminated. */
/** Run until stopAsync() is called. */
@Override
public void run() {
try {
subscriber.startAsync();
@ -72,6 +73,11 @@ public class Pubsub {
subscriber.stopAsync();
}
}
@Override
public void stopAsync() {
subscriber.stopAsync();
}
}
public static class Write implements Function<PubsubMessage, CompletableFuture<String>> {

Просмотреть файл

@ -0,0 +1,46 @@
package com.mozilla.telemetry.ingestion.sink.transform;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.core.Constant.FieldName;
import com.mozilla.telemetry.ingestion.core.util.Json;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Base64;
public class StringToPubsubMessage {
/** Never instantiate this class. */
private StringToPubsubMessage() {
}
private static final Base64.Decoder BASE64_DECODER = Base64.getDecoder();
/** Construct a PubsubMessage from JSON. */
public static PubsubMessage apply(String line) {
try {
ObjectNode node = Json.readObjectNode(line);
PubsubMessage.Builder messageBuilder = PubsubMessage.newBuilder();
JsonNode dataNode = node.path(FieldName.PAYLOAD);
if (!dataNode.isMissingNode() && !dataNode.isNull()) {
final byte[] data;
if (dataNode.isTextual()) {
data = BASE64_DECODER.decode(dataNode.asText());
} else {
data = Json.asBytes(dataNode);
}
messageBuilder.setData(ByteString.copyFrom(data));
}
JsonNode attributeNode = node.path("attributeMap");
if (attributeNode.isObject()) {
attributeNode.fields().forEachRemaining(
entry -> messageBuilder.putAttributes(entry.getKey(), entry.getValue().asText()));
}
return messageBuilder.build();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

Просмотреть файл

@ -0,0 +1,126 @@
package com.mozilla.telemetry.ingestion.sink;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.Base64;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.contrib.java.lang.system.SystemErrRule;
import org.junit.contrib.java.lang.system.SystemOutRule;
import org.junit.contrib.java.lang.system.TextFromStandardInputStream;
import org.junit.rules.TemporaryFolder;
public class SinkPipeTest {
@Rule
public final TextFromStandardInputStream systemIn = TextFromStandardInputStream
.emptyStandardInputStream();
@Rule
public final SystemOutRule systemOut = new SystemOutRule();
@Rule
public final SystemErrRule systemErr = new SystemErrRule();
@Rule
public TemporaryFolder tempdir = new TemporaryFolder();
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
private String asBase64(String data) {
return Base64.getEncoder().encodeToString(data.getBytes(StandardCharsets.UTF_8));
}
@Test
public void canFormatAsBeam() throws IOException {
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_PIPE", "-");
environmentVariables.set("OUTPUT_FORMAT", "beam");
systemIn.provideLines("{\"attributeMap\":{\"meta\":\"data\"},\"payload\":\"dGVzdA==\"}",
"{\"payload\":{\"meta\":\"data\"}}", "{\"payload\":null}", "{}");
systemOut.enableLog();
systemOut.mute();
Sink.main(null);
assertEquals("{\"attributeMap\":{\"meta\":\"data\"},\"payload\":\"dGVzdA==\"}\n" //
+ "{\"payload\":\"" + asBase64("{\"meta\":\"data\"}") + "\"}\n" //
+ "{}\n" + "{}\n", //
systemOut.getLog());
}
@Test
public void canWriteStderr() throws IOException {
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_PIPE", "2");
systemIn.provideLines("{}");
systemErr.enableLog();
systemErr.mute();
Sink.main(null);
assertEquals("{}\n", systemErr.getLog());
}
@Test
public void canReadAndWriteFiles() throws IOException {
final File input = tempdir.newFile("input.ndjson");
environmentVariables.set("INPUT_PIPE", input.getPath());
final File output = tempdir.newFile("output.ndjson");
environmentVariables.set("OUTPUT_PIPE", output.getPath());
try (PrintStream inputStream = new PrintStream(input)) {
inputStream.println("{}");
}
Sink.main(null);
assertEquals(ImmutableList.of("{}"), Files.readAllLines(output.toPath()));
}
@Test
public void throwsOnMissingOutputDir() {
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_PIPE",
tempdir.getRoot().toPath().resolve("missing dir").resolve("output.ndjson").toString());
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class,
() -> Sink.main(null));
assertTrue(thrown.getCause() instanceof FileNotFoundException);
}
@Test
public void throwsOnMissingInputDir() {
environmentVariables.set("INPUT_PIPE",
tempdir.getRoot().toPath().resolve("input.ndjson").toString());
environmentVariables.set("OUTPUT_PIPE", "-");
assertThrows(NoSuchFileException.class, () -> Sink.main(null));
}
@Test
public void throwsOnInvalidTableId() {
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_PIPE", "-");
environmentVariables.set("OUTPUT_TABLE", "table_without_dataset");
systemIn.provideLines("{}");
IllegalArgumentException thrown = assertThrows(IllegalArgumentException.class,
() -> Sink.main(null));
assertEquals("TableId requires dataset but none found in: table_without_dataset",
thrown.getMessage());
}
@Test
public void throwsOnInvalidInput() {
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_PIPE", "-");
systemIn.provideLines("}");
UncheckedIOException thrown = assertThrows(UncheckedIOException.class, () -> Sink.main(null));
assertTrue(thrown.getCause() instanceof JsonParseException);
}
}

Просмотреть файл

@ -3,12 +3,14 @@ package com.mozilla.telemetry.ingestion.sink;
import static org.junit.Assert.assertEquals;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.sink.util.BoundedSink;
import com.mozilla.telemetry.ingestion.core.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.core.util.Json;
import com.mozilla.telemetry.ingestion.sink.util.PubsubTopics;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -17,15 +19,22 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
import org.junit.contrib.java.lang.system.TextFromStandardInputStream;
public class SinkPubsubIntegrationTest {
@Rule
public final PubsubTopics pubsub = new PubsubTopics(3);
public final PubsubTopics pubsub = new PubsubTopics(2);
@Rule
public final TextFromStandardInputStream systemIn = TextFromStandardInputStream
.emptyStandardInputStream();
@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
private final PubsubMessageToObjectNode encoder = PubsubMessageToObjectNode.Beam.of();
@Before
public void unregisterStackdriver() {
// unregister stackdriver stats exporter in case a previous test already registered one.
@ -34,26 +43,29 @@ public class SinkPubsubIntegrationTest {
@Test
public void canRepublishMessages() throws IOException {
final List<Map<String, Object>> expected = IntStream.range(1, pubsub.numTopics)
.mapToObj(index -> {
String[] topicParts = pubsub.getTopic(index).split("/", 4);
String project = topicParts[1];
String topic = topicParts[3];
PubsubMessage message = PubsubMessage.newBuilder().putAttributes("project", project)
.putAttributes("topic", topic).build();
pubsub.publish(0, message);
Map<String, Object> attributes = new HashMap<>(message.getAttributesMap());
attributes.put("index", index);
return attributes;
}).collect(Collectors.toList());
final List<String> input = new LinkedList<>();
final List<Map<String, Object>> expected = new LinkedList<>();
for (int index = 0; index < pubsub.numTopics; index++) {
String[] topicParts = pubsub.getTopic(index).split("/", 4);
String project = topicParts[1];
String topic = topicParts[3];
PubsubMessage message = PubsubMessage.newBuilder().putAttributes("project", project)
.putAttributes("topic", topic).build();
input.add(Json.asString(
encoder.apply(null, message.getAttributesMap(), message.getData().toByteArray())));
Map<String, Object> attributes = new HashMap<>(message.getAttributesMap());
attributes.put("index", index);
expected.add(attributes);
}
environmentVariables.set("INPUT_SUBSCRIPTION", pubsub.getSubscription(0));
systemIn.provideLines(input.toArray(new String[] {}));
environmentVariables.set("INPUT_PIPE", "-");
environmentVariables.set("OUTPUT_TOPIC", "projects/${project}/topics/${topic}");
BoundedSink.run(expected.size(), 30);
Sink.main(null);
List<Map<String, Object>> actual = IntStream.range(1, pubsub.numTopics).boxed()
.flatMap(index -> pubsub.pull(index, 2, true).stream().map(message -> {
List<Map<String, Object>> actual = IntStream.range(0, pubsub.numTopics).boxed()
.flatMap(index -> pubsub.pull(index, 1, false).stream().map(message -> {
Map<String, Object> attributes = new HashMap<>(message.getAttributesMap());
attributes.put("index", index);
return attributes;

Просмотреть файл

@ -0,0 +1,17 @@
package com.mozilla.telemetry.ingestion.sink.io;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import org.junit.Test;
public class PipeTest {
@Test
public void canStopAsync() {
Input input = Pipe.Read.of(new ByteArrayInputStream("".getBytes(StandardCharsets.UTF_8)), m -> {
throw new AssertionError("unreachable");
}, m -> m);
input.run();
input.stopAsync();
}
}

Просмотреть файл

@ -1,7 +1,7 @@
package com.mozilla.telemetry.ingestion.sink.util;
import com.mozilla.telemetry.ingestion.sink.config.SinkConfig;
import com.mozilla.telemetry.ingestion.sink.io.Pubsub;
import com.mozilla.telemetry.ingestion.sink.io.Input;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
import java.io.IOException;
import java.time.Duration;
@ -21,11 +21,11 @@ public class BoundedSink extends SinkConfig {
private static CompletableFuture<Void> runAsync(int messageCount) throws IOException {
final Output output = getOutput();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicReference<Pubsub.Read> input = new AtomicReference<>();
final AtomicReference<Input> input = new AtomicReference<>();
input.set(getInput(output.via(message -> output.apply(message).thenApplyAsync(v -> {
final int currentMessages = counter.incrementAndGet();
if (currentMessages >= messageCount) {
input.get().subscriber.stopAsync();
input.get().stopAsync();
StackdriverStatsExporter.unregister();
}
return v;