Fix ingestion-sink errors and performance (#750)

This commit is contained in:
Daniel Thorn 2019-08-20 11:18:47 -07:00 коммит произвёл GitHub
Родитель 3c9e2b315b
Коммит 8a0fcb1b51
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 150 добавлений и 67 удалений

Просмотреть файл

@ -45,6 +45,8 @@ jobs:
echo "Run tests because [run-tests] in commit message"
elif ! git diff --quiet origin/master -- "$(git rev-parse --show-toplevel)"/.circleci; then
echo "Run tests because .circleci/ was modified since branching off master"
elif test -f pom.xml && ! git diff --quiet origin/master -- "$(git rev-parse --show-toplevel)"/pom.xml; then
echo "Run tests because pom.xml was modified since branching off master"
elif ! git diff --quiet origin/master... -- .; then
echo "Run tests because $(git rev-parse --show-prefix) was modified since branching off master"
else

Просмотреть файл

@ -35,6 +35,11 @@
<artifactId>commons-text</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.28</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

Просмотреть файл

@ -4,6 +4,7 @@
package com.mozilla.telemetry.ingestion;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.mozilla.telemetry.ingestion.io.BigQuery;
import com.mozilla.telemetry.ingestion.io.Pubsub;
@ -23,13 +24,25 @@ public class Sink {
PubsubMessageToTableRow.TableRowFormat.valueOf(Env.getString("OUTPUT_FORMAT", "raw")));
// output messages to BigQuery
BigQuery.Write output = new BigQuery.Write(BigQueryOptions.getDefaultInstance().getService());
BigQuery.Write output = new BigQuery.Write(BigQueryOptions.getDefaultInstance().getService(),
// PubsubMessageToTableRow reports protobuf size, which can be ~1/3rd more efficient than
// the JSON that actually gets sent over HTTP, so we use 60% of the API limit by default.
Env.getInt("BATCH_MAX_BYTES", 6_000_000), // HTTP request size limit: 10 MB
Env.getInt("BATCH_MAX_MESSAGES", 10_000), // Maximum rows per request: 10,000
Env.getLong("BATCH_MAX_DELAY_MILLIS", 1000L)); // Default 1 second
// read pubsub messages from INPUT_SUBSCRIPTION
new Pubsub.Read(Env.getString("INPUT_SUBSCRIPTION"),
message -> CompletableFuture.supplyAsync(() -> message) // start new future with message
.thenApplyAsync(routeMessage::apply) // determine output path of message
.thenComposeAsync(output)) // output message
.run(); // run pubsub consumer
.thenComposeAsync(output), // output message
builder -> builder.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(
// Upstream default is 10K, but it can be higher as long as we don't OOM
Env.getLong("FLOW_CONTROL_MAX_OUTSTANDING_ELEMENT_COUNT", 50_000L)) // 50K
.setMaxOutstandingRequestBytes(
// Upstream default is 1GB, but it needs to be lower so we don't OOM
Env.getLong("FLOW_CONTROL_MAX_OUTSTANDING_REQUEST_BYTES", 100_000_000L)) // 100MB
.build())).run(); // run pubsub consumer
}
}

Просмотреть файл

@ -17,6 +17,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQuery {
@ -35,16 +37,22 @@ public class BigQuery {
public static class Write implements Function<Write.TableRow, CompletableFuture<Write.TableRow>> {
private static final Logger LOG = LoggerFactory.getLogger(Write.class);
private final com.google.cloud.bigquery.BigQuery bigquery;
private final int maxBytes = 10_000_000; // HTTP request size limit: 10 MB
private final int maxMessages = 10_000; // Maximum rows per request: 10,000
private final long maxDelayMillis = 1000; // Default to 1 second
private final int maxBytes;
private final int maxMessages;
private final long maxDelayMillis;
@VisibleForTesting
final ConcurrentMap<TableId, Batch> batches = new ConcurrentHashMap<>();
public Write(com.google.cloud.bigquery.BigQuery bigquery) {
public Write(com.google.cloud.bigquery.BigQuery bigquery, int maxBytes, int maxMessages,
long maxDelayMillis) {
this.bigquery = bigquery;
this.maxBytes = maxBytes;
this.maxMessages = maxMessages;
this.maxDelayMillis = maxDelayMillis;
}
@Override
@ -55,8 +63,8 @@ public class BigQuery {
batch.add(row).ifPresent(futureRef::set);
}
if (futureRef.get() == null) {
batch = new Batch(bigquery, row.tableId);
batch.add(row).ifPresent(futureRef::set);
batch = new Batch(bigquery, row);
batch.first.ifPresent(futureRef::set);
}
return batch;
});
@ -83,6 +91,8 @@ public class BigQuery {
@VisibleForTesting
class Batch {
final Optional<CompletableFuture<TableRow>> first;
private final com.google.cloud.bigquery.BigQuery bigquery;
private final CompletableFuture<Void> full;
private final CompletableFuture<InsertAllResponse> result;
@ -93,15 +103,22 @@ public class BigQuery {
private int size = 0;
private int byteSize = 0;
private Batch(com.google.cloud.bigquery.BigQuery bigquery, TableId tableId) {
private Batch(com.google.cloud.bigquery.BigQuery bigquery, TableRow row) {
this.bigquery = bigquery;
builder = InsertAllRequest.newBuilder(tableId)
builder = InsertAllRequest.newBuilder(row.tableId)
// ignore row values for columns not present in the table
.setIgnoreUnknownValues(true)
// insert all valid rows when invalid rows are present in the request
.setSkipInvalidRows(true);
full = CompletableFuture.runAsync(this::timeout);
// block full from completing before the first add()
CompletableFuture<Void> init = new CompletableFuture<>();
full = init.thenRunAsync(this::timeout);
result = full.handleAsync(this::insertAll);
// expose the return value for the first add()
first = add(row);
// allow full to complete
init.complete(null);
}
private void timeout() {
@ -137,7 +154,8 @@ public class BigQuery {
int index = newSize - 1;
return Optional.of(result.thenApplyAsync(r -> {
List<BigQueryError> errors = r.getErrorsFor(index);
if (!errors.isEmpty()) {
if (errors != null && !errors.isEmpty()) {
LOG.warn(errors.toString());
throw new WriteErrors(errors);
}
return row;

Просмотреть файл

@ -10,6 +10,8 @@ import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Pubsub {
@ -18,6 +20,8 @@ public class Pubsub {
public static class Read {
private static final Logger LOG = LoggerFactory.getLogger(Read.class);
@VisibleForTesting
Subscriber subscriber;
@ -30,16 +34,13 @@ public class Pubsub {
if (exception == null) {
consumer.ack();
} else {
LOG.warn("Exception while attempting to deliver message:", exception);
consumer.nack();
}
})))
.build();
}
public Read(String subscriptionName, Function<PubsubMessage, CompletableFuture<?>> output) {
this(subscriptionName, output, b -> b);
}
public void run() {
try {
subscriber.startAsync();

Просмотреть файл

@ -9,6 +9,7 @@ import static com.mozilla.telemetry.ingestion.util.Attribute.CLIENT_ID;
import com.google.cloud.bigquery.TableId;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.io.BigQuery.Write.TableRow;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -22,6 +23,8 @@ public class PubsubMessageToTableRow {
public static final String PAYLOAD = "payload";
private static final Base64.Encoder base64Encoder = Base64.getEncoder();
private final String tableSpecTemplate;
private final TableRowFormat tableRowFormat;
@ -78,7 +81,8 @@ public class PubsubMessageToTableRow {
*/
private Map<String, Object> rawContents(PubsubMessage message) {
Map<String, Object> contents = new HashMap<>(message.getAttributesMap());
contents.put(PAYLOAD, message.getData().toByteArray());
// bytes must be inserted as base64 encoded strings
contents.put(PAYLOAD, base64Encoder.encodeToString(message.getData().toByteArray()));
return contents;
}
@ -88,7 +92,8 @@ public class PubsubMessageToTableRow {
static Map<String, Object> decodedContents(PubsubMessage message) {
Map<String, Object> contents = AddMetadata
.attributesToMetadataPayload(message.getAttributesMap());
contents.put(PAYLOAD, message.getData().toByteArray());
// bytes must be inserted as base64 encoded strings
contents.put(PAYLOAD, base64Encoder.encodeToString(message.getData().toByteArray()));
// Also include client_id if present.
Optional.ofNullable(message.getAttributesOrDefault(CLIENT_ID, null))
.ifPresent(clientId -> contents.put(CLIENT_ID, clientId));

Просмотреть файл

@ -22,4 +22,12 @@ public class Env {
public static String getString(String key, String defaultValue) {
return Optional.ofNullable(System.getenv(key)).orElse(defaultValue);
}
public static Integer getInt(String key, Integer defaultValue) {
return Optional.ofNullable(System.getenv(key)).map(Integer::new).orElse(defaultValue);
}
public static Long getLong(String key, Long defaultValue) {
return Optional.ofNullable(System.getenv(key)).map(Long::new).orElse(defaultValue);
}
}

Просмотреть файл

@ -7,6 +7,7 @@ package com.mozilla.telemetry.ingestion.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -17,13 +18,41 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import org.junit.Before;
import org.junit.Test;
public class BigQueryTest {
private com.google.cloud.bigquery.BigQuery bigquery;
private BigQuery.Write output;
private InsertAllResponse response;
@Before
public void mockBigQueryResponse() {
bigquery = mock(com.google.cloud.bigquery.BigQuery.class);
response = mock(InsertAllResponse.class);
when(bigquery.insertAll(any())).thenReturn(response);
when(response.getErrorsFor(anyLong())).thenReturn(ImmutableList.of());
output = new BigQuery.Write(bigquery, 10, 10, 100);
}
@Test
public void canReturnSuccess() {
BigQuery.Write.TableRow row = new BigQuery.Write.TableRow(TableId.of("", ""), 0,
Optional.empty(), ImmutableMap.of("", ""));
assertEquals(row, output.apply(row).join());
}
@Test
public void canSendWithNoDelay() {
output = new BigQuery.Write(bigquery, 1, 1, 0);
output.apply(
new BigQuery.Write.TableRow(TableId.of("", ""), 0, Optional.empty(), ImmutableMap.of()));
assertEquals(1, output.batches.get(TableId.of("", "")).builder.build().getRows().size());
}
@Test
public void canBatchMessages() {
BigQuery.Write output = new BigQuery.Write(null);
for (int i = 0; i < 2; i++) {
output.apply(
new BigQuery.Write.TableRow(TableId.of("", ""), 0, Optional.empty(), ImmutableMap.of()));
@ -33,48 +62,25 @@ public class BigQueryTest {
@Test
public void canLimitBatchMessageCount() {
BigQuery.Write output = new BigQuery.Write(null);
for (int i = 0; i < 10_0001; i++) {
for (int i = 0; i < 11; i++) {
output.apply(
new BigQuery.Write.TableRow(TableId.of("", ""), 0, Optional.empty(), ImmutableMap.of()));
}
assertTrue(output.batches.get(TableId.of("", "")).builder.build().getRows().size() < 10_000);
}
@Test
public void canLimitBatchByteSize() {
BigQuery.Write output = new BigQuery.Write(null);
for (int i = 0; i < 10; i++) {
output.apply(new BigQuery.Write.TableRow(TableId.of("", ""), 1_000_000, Optional.empty(),
ImmutableMap.of()));
}
assertTrue(output.batches.get(TableId.of("", "")).builder.build().getRows().size() < 10);
}
@Test
public void canReturnRowOnSuccess() throws InterruptedException {
com.google.cloud.bigquery.BigQuery bigquery = mock(com.google.cloud.bigquery.BigQuery.class);
InsertAllResponse response = mock(InsertAllResponse.class);
when(bigquery.insertAll(any())).thenReturn(response);
when(response.getErrorsFor(0)).thenReturn(ImmutableList.of());
BigQuery.Write.TableRow row = new BigQuery.Write.TableRow(TableId.of("", ""), 0,
Optional.empty(), ImmutableMap.of("", ""));
assertEquals(row, new BigQuery.Write(bigquery).apply(row).join());
public void canLimitBatchByteSize() {
for (int i = 0; i < 2; i++) {
output.apply(
new BigQuery.Write.TableRow(TableId.of("", ""), 6, Optional.empty(), ImmutableMap.of()));
}
assertTrue(output.batches.get(TableId.of("", "")).builder.build().getRows().size() < 2);
}
@Test(expected = BigQuery.WriteErrors.class)
public void failsOnInsertErrors() throws Throwable {
com.google.cloud.bigquery.BigQuery bigquery = mock(com.google.cloud.bigquery.BigQuery.class);
InsertAllResponse response = mock(InsertAllResponse.class);
// BigQueryError is a final class, so mocking it requires 'mock-maker-inline' in
// src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
BigQueryError error = mock(BigQueryError.class);
when(bigquery.insertAll(any())).thenReturn(response);
when(response.getErrorsFor(0)).thenReturn(ImmutableList.of(error));
BigQuery.Write output = new BigQuery.Write(bigquery);
when(response.getErrorsFor(0)).thenReturn(ImmutableList.of(new BigQueryError("", "", "")));
try {
output.apply(new BigQuery.Write.TableRow(TableId.of("", ""), 0, Optional.empty(),
@ -86,7 +92,6 @@ public class BigQueryTest {
@Test(expected = IllegalArgumentException.class)
public void failsOnOversizedMessage() {
new BigQuery.Write(null)
.apply(new BigQuery.Write.TableRow(TableId.of("", ""), 10_000_000, Optional.empty(), null));
output.apply(new BigQuery.Write.TableRow(TableId.of("", ""), 11, Optional.empty(), null));
}
}

Просмотреть файл

@ -9,6 +9,8 @@ import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.util.TestWithPubsubResources;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -39,4 +41,35 @@ public class PubsubReadIntegrationTest extends TestWithPubsubResources {
assertEquals("test", new String(received.get().getData().toByteArray()));
}
@Test
public void canCanRetryOnException() throws Exception {
String messageId = publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFrom("test".getBytes())).build())
.get(10, TimeUnit.SECONDS);
List<PubsubMessage> received = new LinkedList<>();
AtomicReference<Pubsub.Read> input = new AtomicReference<>();
input.set(new Pubsub.Read(subscriptionName.toString(),
// handler
message -> CompletableFuture.completedFuture(message) // create a future with message
.thenAccept(received::add) // add message to received
.thenRun(() -> {
// throw an error to nack the message the first time
if (received.size() == 1) {
throw new RuntimeException("test");
}
}).thenRun(() -> input.get().subscriber.stopAsync()), // stop the subscriber
// config
builder -> channelProvider.map(channelProvider -> builder
.setChannelProvider(channelProvider).setCredentialsProvider(noCredentialsProvider))
.orElse(builder)));
input.get().run();
assertEquals(messageId, received.get(0).getMessageId());
assertEquals(messageId, received.get(1).getMessageId());
assertEquals(2, received.size());
}
}

Просмотреть файл

@ -28,9 +28,8 @@ public class PubsubMessageToTableRowTest {
assertEquals("telemetry_raw", actual.tableId.getDataset());
assertEquals("main_v4", actual.tableId.getTable());
assertEquals(104, actual.byteSize);
assertEquals("test", new String((byte[]) actual.content.remove("payload")));
assertEquals(ImmutableMap.of("document_id", "id", "document_namespace", "telemetry",
"document_type", "main", "document_version", "4"), actual.content);
"document_type", "main", "document_version", "4", "payload", "dGVzdA=="), actual.content);
}
@Test
@ -42,10 +41,8 @@ public class PubsubMessageToTableRowTest {
assertEquals("_raw", actual.tableId.getDataset());
assertEquals("_v", actual.tableId.getTable());
assertEquals(65, actual.byteSize);
assertEquals("", new String((byte[]) actual.content.remove("payload")));
assertEquals(
ImmutableMap.of("document_namespace", "", "document_type", "", "document_version", ""),
actual.content);
assertEquals(ImmutableMap.of("document_namespace", "", "document_type", "", "document_version",
"", "payload", ""), actual.content);
}
@Test(expected = IllegalArgumentException.class)
@ -103,7 +100,6 @@ public class PubsubMessageToTableRowTest {
assertEquals("telemetry_decoded", actual.tableId.getDataset());
assertEquals("document_type_vdocument_version", actual.tableId.getTable());
assertEquals(916, actual.byteSize);
assertEquals("test", new String((byte[]) actual.content.remove("payload")));
assertEquals(
ImmutableMap.builder()
@ -131,7 +127,7 @@ public class PubsubMessageToTableRowTest {
.put("normalized_country_code", "normalized_country_code")
.put("normalized_os", "normalized_os")
.put("normalized_os_version", "normalized_os_version").put("sample_id", 42)
.put("client_id", "client_id").build(),
.put("payload", "dGVzdA==").put("client_id", "client_id").build(),
actual.content);
}
@ -165,7 +161,6 @@ public class PubsubMessageToTableRowTest {
assertEquals("document_namespace_decoded", actual.tableId.getDataset());
assertEquals("document_type_vdocument_version", actual.tableId.getTable());
assertEquals(925, actual.byteSize);
assertEquals("test", new String((byte[]) actual.content.remove("payload")));
assertEquals(
ImmutableMap.builder()
@ -190,7 +185,7 @@ public class PubsubMessageToTableRowTest {
.put("normalized_country_code", "normalized_country_code")
.put("normalized_os", "normalized_os")
.put("normalized_os_version", "normalized_os_version").put("sample_id", 42)
.put("client_id", "client_id").build(),
.put("payload", "dGVzdA==").put("client_id", "client_id").build(),
actual.content);
}
@ -203,9 +198,8 @@ public class PubsubMessageToTableRowTest {
assertEquals("_decoded", actual.tableId.getDataset());
assertEquals("_v", actual.tableId.getTable());
assertEquals(80, actual.byteSize);
assertEquals("", new String((byte[]) actual.content.remove("payload")));
assertEquals(
ImmutableMap.of("metadata",
ImmutableMap.of("payload", "", "metadata",
ImmutableMap.builder().put("document_namespace", "").put("document_type", "")
.put("document_version", "").put("geo", ImmutableMap.of())
.put("header", ImmutableMap.of()).put("user_agent", ImmutableMap.of()).build()),

Просмотреть файл

@ -1 +0,0 @@
mock-maker-inline

Просмотреть файл

@ -27,7 +27,7 @@
<!-- Keep these dependency versions in sync with those pulled in by beam;
check https://mvnrepository.com/artifact/org.apache.beam -->
<google-cloud.version>1.49.0</google-cloud.version>
<google-cloud.version>1.61.0</google-cloud.version>
<jackson.version>2.9.9</jackson.version>
</properties>