зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1389233 - Record outgoing batches in the android sync ping r=Grisha
MozReview-Commit-ID: JUHSMluUE8q --HG-- extra : rebase_source : 70616ba96961c53ccbd0cf27830ebb3ad061dd52
This commit is contained in:
Родитель
a98bc19750
Коммит
b43432e557
|
@ -7,13 +7,14 @@ import android.os.Bundle;
|
|||
import android.os.Parcelable;
|
||||
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import org.json.simple.JSONArray;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mozilla.gecko.background.testhelpers.TestRunner;
|
||||
import org.mozilla.gecko.sync.ExtendedJSONObject;
|
||||
import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
|
||||
import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
|
||||
import org.mozilla.gecko.sync.validation.BookmarkValidationResults;
|
||||
import org.mozilla.gecko.sync.validation.ValidationResults;
|
||||
|
@ -61,6 +62,7 @@ public class TelemetrySyncPingBuilderTest {
|
|||
assertEquals("device-id", payload.getString("deviceID"));
|
||||
assertTrue(payload.getLong("when") != null);
|
||||
assertEquals(true, payload.getBoolean("restarted"));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -82,15 +84,26 @@ public class TelemetrySyncPingBuilderTest {
|
|||
stage.inboundFailed = 1;
|
||||
stage.reconciled = 1;
|
||||
|
||||
stage.outbound = new ArrayList<>();
|
||||
stage.outbound.add(new StoreBatchTracker.Batch(1, 1));
|
||||
stage.outbound.add(new StoreBatchTracker.Batch(9, 0));
|
||||
stage.outbound.add(new StoreBatchTracker.Batch(0, 0));
|
||||
|
||||
stages.put("testing", stage);
|
||||
|
||||
TelemetryStageCollector stage2 = new TelemetryStageCollector(null);
|
||||
// If it's actually completely empty, it will get omitted.
|
||||
stage2.inbound = 1;
|
||||
stage2.inboundStored = 1;
|
||||
stages.put("testing2", stage2);
|
||||
|
||||
TelemetryLocalPing localPing = builder
|
||||
.setStages(stages)
|
||||
.build();
|
||||
ExtendedJSONObject payload = localPing.getPayload();
|
||||
|
||||
assertEquals(1, payload.getArray("engines").size());
|
||||
ExtendedJSONObject engine = (ExtendedJSONObject)payload.getArray("engines").get(0);
|
||||
assertEquals(2, payload.getArray("engines").size());
|
||||
ExtendedJSONObject engine = (ExtendedJSONObject) payload.getArray("engines").get(0);
|
||||
|
||||
assertEquals("testing", engine.getString("name"));
|
||||
assertEquals(Long.valueOf(5L), engine.getLong("took"));
|
||||
|
@ -101,8 +114,6 @@ public class TelemetrySyncPingBuilderTest {
|
|||
assertEquals(Integer.valueOf(stage.inboundFailed), inbound.getIntegerSafely("failed"));
|
||||
assertEquals(Integer.valueOf(stage.reconciled), inbound.getIntegerSafely("reconciled"));
|
||||
|
||||
// TODO: Test outbound once bug 1389233 is addressed
|
||||
|
||||
ExtendedJSONObject error = engine.getObject("failureReason");
|
||||
assertEquals("unexpectederror", error.getString("name"));
|
||||
assertEquals("test", error.getString("error"));
|
||||
|
@ -111,6 +122,25 @@ public class TelemetrySyncPingBuilderTest {
|
|||
assertEquals(stage.validation.getLong("took"), validation.getLong("took"));
|
||||
assertEquals(stage.validation.getLong("checked"), validation.getLong("checked"));
|
||||
assertEquals(0, stage.validation.getArray("problems").size());
|
||||
|
||||
JSONArray outgoing = engine.getArray("outgoing");
|
||||
assertEquals(outgoing.size(), 3);
|
||||
|
||||
ExtendedJSONObject firstBatch = (ExtendedJSONObject) outgoing.get(0);
|
||||
assertEquals(firstBatch.getLong("sent", -1), 1);
|
||||
assertEquals(firstBatch.getLong("failed", -1), 1);
|
||||
|
||||
ExtendedJSONObject secondBatch = (ExtendedJSONObject) outgoing.get(1);
|
||||
assertEquals(secondBatch.getLong("sent", -1), 9);
|
||||
assertFalse(secondBatch.containsKey("failed"));
|
||||
|
||||
// Ensure we include "all zero" batches, since we can actually send those on android.
|
||||
ExtendedJSONObject lastBatch = (ExtendedJSONObject) outgoing.get(2);
|
||||
assertFalse(lastBatch.containsKey("sent"));
|
||||
assertFalse(lastBatch.containsKey("failed"));
|
||||
|
||||
ExtendedJSONObject emptyEngine = (ExtendedJSONObject) payload.getArray("engines").get(1);
|
||||
assertFalse(emptyEngine.containsKey("outgoing"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -1075,6 +1075,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
|
|||
'sync/synchronizer/ServerLocalSynchronizer.java',
|
||||
'sync/synchronizer/ServerLocalSynchronizerSession.java',
|
||||
'sync/synchronizer/SessionNotBegunException.java',
|
||||
'sync/synchronizer/StoreBatchTracker.java',
|
||||
'sync/synchronizer/Synchronizer.java',
|
||||
'sync/synchronizer/SynchronizerDelegate.java',
|
||||
'sync/synchronizer/SynchronizerSession.java',
|
||||
|
|
|
@ -9,10 +9,12 @@ package org.mozilla.gecko.telemetry.pingbuilders;
|
|||
import android.os.Bundle;
|
||||
import android.os.Parcelable;
|
||||
import android.support.annotation.NonNull;
|
||||
import android.support.annotation.Nullable;
|
||||
|
||||
import org.json.simple.JSONArray;
|
||||
import org.json.simple.JSONObject;
|
||||
import org.mozilla.gecko.sync.ExtendedJSONObject;
|
||||
import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
|
||||
import org.mozilla.gecko.sync.telemetry.TelemetryContract;
|
||||
import org.mozilla.gecko.sync.telemetry.TelemetryStageCollector;
|
||||
import org.mozilla.gecko.telemetry.TelemetryLocalPing;
|
||||
|
@ -20,6 +22,7 @@ import org.mozilla.gecko.telemetry.TelemetryLocalPing;
|
|||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Local ping builder which understands how to process sync data.
|
||||
|
@ -35,7 +38,8 @@ public class TelemetrySyncPingBuilder extends TelemetryLocalPingBuilder {
|
|||
final TelemetryStageCollector stage = stages.get(stageName);
|
||||
|
||||
// Skip stages that did nothing.
|
||||
if (stage.inbound == 0 && stage.outbound == 0 && stage.error == null && stage.validation == null) {
|
||||
if (stage.inbound == 0 && (stage.outbound == null || stage.outbound.size() == 0) &&
|
||||
stage.error == null && stage.validation == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -63,19 +67,10 @@ public class TelemetrySyncPingBuilder extends TelemetryLocalPingBuilder {
|
|||
stageJSON.put("incoming", incomingJSON);
|
||||
}
|
||||
|
||||
if (stage.outbound > 0) {
|
||||
final ExtendedJSONObject outgoingJSON = new ExtendedJSONObject();
|
||||
// We specifically do not check if `outboundStored` is greater than zero.
|
||||
// `outbound` schema is simpler than `inbound`, namely there isn't an "attempted
|
||||
// to send" count.
|
||||
// Stage telemetry itself has that data (outbound = outboundStored + outboundFailed),
|
||||
// and so this is our way to relay slightly more information.
|
||||
// e.g. we'll know there's something wrong if `sent = 0` and `failed` is missing.
|
||||
outgoingJSON.put("sent", stage.outboundStored);
|
||||
if (stage.outboundFailed > 0) {
|
||||
outgoingJSON.put("failed", stage.outboundFailed);
|
||||
}
|
||||
stageJSON.put("outgoing", outgoingJSON);
|
||||
JSONArray outbound = buildOutgoing(stage.outbound);
|
||||
|
||||
if (outbound != null) {
|
||||
stageJSON.put("outgoing", outbound);
|
||||
}
|
||||
|
||||
// We depend on the error builder from TelemetryCollector to produce the right schema.
|
||||
|
@ -104,6 +99,26 @@ public class TelemetrySyncPingBuilder extends TelemetryLocalPingBuilder {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static JSONArray buildOutgoing(List<StoreBatchTracker.Batch> batches) {
|
||||
if (batches == null || batches.size() == 0) {
|
||||
return null;
|
||||
}
|
||||
JSONArray arr = new JSONArray();
|
||||
for (int i = 0; i < batches.size(); ++i) {
|
||||
StoreBatchTracker.Batch batch = batches.get(i);
|
||||
ExtendedJSONObject o = new ExtendedJSONObject();
|
||||
if (batch.sent != 0) {
|
||||
o.put("sent", (long) batch.sent);
|
||||
}
|
||||
if (batch.failed != 0) {
|
||||
o.put("failed", (long) batch.failed);
|
||||
}
|
||||
addUnchecked(arr, o);
|
||||
}
|
||||
return arr.size() == 0 ? null : arr;
|
||||
}
|
||||
|
||||
public TelemetrySyncPingBuilder setRestarted(boolean didRestart) {
|
||||
if (!didRestart) {
|
||||
return this;
|
||||
|
|
|
@ -29,6 +29,11 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS
|
|||
performNotify("Store failed", ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
performNotify("Stores committed ", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {}
|
||||
|
||||
|
@ -77,6 +82,16 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
self.onBatchCommitted();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStoreFailed(Exception e) {
|
||||
|
||||
|
|
|
@ -13,6 +13,11 @@ public abstract class SimpleSuccessStoreDelegate extends DefaultDelegate impleme
|
|||
performNotify("Store failed", ex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
performNotify("Store committed", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
|
||||
return this;
|
||||
|
|
|
@ -206,6 +206,11 @@ public class VersioningDelegateHelper {
|
|||
inner.onStoreFailed(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
inner.onBatchCommitted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
|
||||
return inner.deferredStoreDelegate(executor);
|
||||
|
|
|
@ -74,4 +74,14 @@ public class DeferredRepositorySessionStoreDelegate implements
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
inner.onBatchCommitted();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,5 +25,7 @@ public interface RepositorySessionStoreDelegate {
|
|||
void onRecordStoreSucceeded(String guid);
|
||||
void onStoreCompleted();
|
||||
void onStoreFailed(Exception e);
|
||||
// Only relevant for store batches, and exists to help us record correct telemetry.
|
||||
void onBatchCommitted();
|
||||
RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
|
||||
}
|
||||
|
|
|
@ -96,6 +96,10 @@ class PayloadDispatcher {
|
|||
batchWhiteboard.clearSuccessRecordGuids();
|
||||
}
|
||||
|
||||
if (isCommit || !batchWhiteboard.getInBatchingMode()) {
|
||||
uploader.sessionStoreDelegate.onBatchCommitted();
|
||||
}
|
||||
|
||||
// If this was our very last commit, we're done storing records.
|
||||
// Get Last-Modified timestamp from the response, and pass it upstream.
|
||||
if (isLastPayload) {
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.mozilla.gecko.sync.telemetry.TelemetryCollector;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
|
@ -632,14 +633,12 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
|
|||
int outboundCountStored = synchronizerSession.getOutboundCountStored();
|
||||
int outboundCountFailed = synchronizerSession.getOutboundCountFailed();
|
||||
|
||||
telemetryStageCollector.outbound = synchronizerSession.getOutboundBatches();
|
||||
telemetryStageCollector.finished = stageCompleteTimestamp;
|
||||
telemetryStageCollector.inbound = inboundCount;
|
||||
telemetryStageCollector.inboundStored = inboundCountStored;
|
||||
telemetryStageCollector.inboundFailed = inboundCountFailed;
|
||||
telemetryStageCollector.reconciled = inboundCountReconciled;
|
||||
telemetryStageCollector.outbound = outboundCount;
|
||||
telemetryStageCollector.outboundStored = outboundCountStored;
|
||||
telemetryStageCollector.outboundFailed = outboundCountFailed;
|
||||
|
||||
Logger.info(LOG_TAG, "Stage " + getEngineName()
|
||||
+ " received " + inboundCount
|
||||
|
|
|
@ -7,6 +7,7 @@ package org.mozilla.gecko.sync.synchronizer;
|
|||
import android.support.annotation.NonNull;
|
||||
import android.support.annotation.Nullable;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -87,6 +88,8 @@ public class RecordsChannel implements
|
|||
private final AtomicInteger storeFailedCount = new AtomicInteger();
|
||||
private final AtomicInteger storeReconciledCount = new AtomicInteger();
|
||||
|
||||
private final StoreBatchTracker storeTracker = new StoreBatchTracker();
|
||||
|
||||
public RecordsChannel(RepositorySession source, RepositorySession sink, RecordsChannelDelegate delegate) {
|
||||
this.source = source;
|
||||
this.sink = sink;
|
||||
|
@ -185,6 +188,7 @@ public class RecordsChannel implements
|
|||
storeAcceptedCount.set(0);
|
||||
storeFailedCount.set(0);
|
||||
storeReconciledCount.set(0);
|
||||
storeTracker.reset();
|
||||
// Start a consumer thread.
|
||||
this.consumer = new ConcurrentRecordConsumer(this);
|
||||
ThreadPool.run(this.consumer);
|
||||
|
@ -213,6 +217,7 @@ public class RecordsChannel implements
|
|||
@Override
|
||||
public void store(Record record) {
|
||||
storeAttemptedCount.incrementAndGet();
|
||||
storeTracker.onRecordStoreAttempted();
|
||||
try {
|
||||
sink.store(record);
|
||||
} catch (NoStoreDelegateException e) {
|
||||
|
@ -221,6 +226,10 @@ public class RecordsChannel implements
|
|||
}
|
||||
}
|
||||
|
||||
/* package-local */ ArrayList<StoreBatchTracker.Batch> getStoreBatches() {
|
||||
return this.storeTracker.getStoreBatches();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFetchFailed(Exception ex) {
|
||||
Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
|
||||
|
@ -251,10 +260,17 @@ public class RecordsChannel implements
|
|||
this.sink.storeFlush();
|
||||
}
|
||||
|
||||
// Sent for "store" batches.
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
storeTracker.onBatchFinished();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRecordStoreFailed(Exception ex, String recordGuid) {
|
||||
Logger.trace(LOG_TAG, "Failed to store record with guid " + recordGuid);
|
||||
storeFailedCount.incrementAndGet();
|
||||
storeTracker.onRecordStoreFailed();
|
||||
this.consumer.stored();
|
||||
delegate.onFlowStoreFailed(this, ex, recordGuid);
|
||||
// TODO: abort?
|
||||
|
@ -264,6 +280,7 @@ public class RecordsChannel implements
|
|||
public void onRecordStoreSucceeded(String guid) {
|
||||
Logger.trace(LOG_TAG, "Stored record with guid " + guid);
|
||||
storeAcceptedCount.incrementAndGet();
|
||||
storeTracker.onRecordStoreSucceeded();
|
||||
this.consumer.stored();
|
||||
}
|
||||
|
||||
|
@ -316,6 +333,8 @@ public class RecordsChannel implements
|
|||
setReflowException((ReflowIsNecessaryException) ex);
|
||||
}
|
||||
|
||||
storeTracker.onBatchFailed();
|
||||
|
||||
// NB: consumer might or might not be running at this point. There are two cases here:
|
||||
// 1) If we're storing records remotely, we might fail due to a 412.
|
||||
// -- we might hit 412 at any point, so consumer might be in either state.
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
package org.mozilla.gecko.sync.synchronizer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* This class encapsulates most of the logic for recording telemetry information about outgoing
|
||||
* batches.
|
||||
*/
|
||||
public class StoreBatchTracker {
|
||||
private final AtomicInteger currentStoreBatchAttempted = new AtomicInteger();
|
||||
private final AtomicInteger currentStoreBatchAccepted = new AtomicInteger();
|
||||
private final AtomicInteger currentStoreBatchFailed = new AtomicInteger();
|
||||
private final ConcurrentLinkedQueue<Batch> completedBatches = new ConcurrentLinkedQueue<>();
|
||||
|
||||
public static class Batch {
|
||||
public final int sent;
|
||||
public final int failed;
|
||||
public Batch(final int sent, final int failed) {
|
||||
this.sent = sent;
|
||||
this.failed = failed;
|
||||
}
|
||||
}
|
||||
|
||||
/* package-local */ void reset() {
|
||||
currentStoreBatchFailed.set(0);
|
||||
currentStoreBatchAccepted.set(0);
|
||||
currentStoreBatchAttempted.set(0);
|
||||
completedBatches.clear();
|
||||
}
|
||||
|
||||
private boolean haveUncommittedBatchData() {
|
||||
return currentStoreBatchAttempted.get() != 0 ||
|
||||
currentStoreBatchAccepted.get() != 0 ||
|
||||
currentStoreBatchFailed.get() != 0;
|
||||
}
|
||||
|
||||
/* package-local */ void onBatchFinished() {
|
||||
// Not actually thread safe, but should be good enough for telemetry info. (Especially
|
||||
// since we shouldn't be completing batches for the same channel on more than one thread)
|
||||
final int sent = currentStoreBatchAttempted.getAndSet(0);
|
||||
final int knownFailed = currentStoreBatchFailed.getAndSet(0);
|
||||
final int knownSucceeded = currentStoreBatchAccepted.getAndSet(0);
|
||||
|
||||
// These might be different if we "forced" a failure due to an error unrelated to uploading
|
||||
// a record.
|
||||
final int failed = Math.max(knownFailed, sent - knownSucceeded);
|
||||
|
||||
completedBatches.add(new Batch(sent, failed));
|
||||
}
|
||||
|
||||
/* package-local */ void onBatchFailed() {
|
||||
if (currentStoreBatchFailed.get() == 0 && currentStoreBatchAccepted.get() == currentStoreBatchAttempted.get()) {
|
||||
// Force the failure. It's unclear if this ever can happen for cases where we care about what
|
||||
// is inside completedBatches (cases where we're uploading).
|
||||
currentStoreBatchFailed.incrementAndGet();
|
||||
}
|
||||
onBatchFinished();
|
||||
}
|
||||
|
||||
/* package-local */ void onRecordStoreFailed() {
|
||||
currentStoreBatchFailed.incrementAndGet();
|
||||
}
|
||||
|
||||
/* package-local */ void onRecordStoreSucceeded() {
|
||||
currentStoreBatchAccepted.incrementAndGet();
|
||||
}
|
||||
|
||||
/* package-local */ void onRecordStoreAttempted() {
|
||||
currentStoreBatchAttempted.incrementAndGet();
|
||||
}
|
||||
|
||||
// Note that this finishes the current batch (if any exists).
|
||||
/* package-local */ ArrayList<Batch> getStoreBatches() {
|
||||
if (haveUncommittedBatchData()) {
|
||||
onBatchFinished();
|
||||
}
|
||||
return new ArrayList<>(completedBatches);
|
||||
}
|
||||
}
|
|
@ -5,11 +5,15 @@
|
|||
package org.mozilla.gecko.sync.synchronizer;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.mozilla.gecko.background.common.log.Logger;
|
||||
import org.mozilla.gecko.sync.SyncException;
|
||||
import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
|
||||
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
|
||||
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
|
||||
import org.mozilla.gecko.sync.repositories.RepositorySession;
|
||||
|
@ -81,6 +85,9 @@ public class SynchronizerSession implements RecordsChannelDelegate, RepositorySe
|
|||
private final AtomicInteger numOutboundRecords = new AtomicInteger(-1);
|
||||
private final AtomicInteger numOutboundRecordsStored = new AtomicInteger(-1);
|
||||
private final AtomicInteger numOutboundRecordsFailed = new AtomicInteger(-1);
|
||||
// Doesn't need to be ConcurrentLinkedQueue or anything like that since we don't do partial
|
||||
// changes to it.
|
||||
private final AtomicReference<List<Batch>> outboundBatches = new AtomicReference<>();
|
||||
|
||||
private Exception fetchFailedCauseException;
|
||||
private Exception storeFailedCauseException;
|
||||
|
@ -184,6 +191,11 @@ public class SynchronizerSession implements RecordsChannelDelegate, RepositorySe
|
|||
return numInboundRecordsReconciled.get();
|
||||
}
|
||||
|
||||
// Returns outboundBatches.
|
||||
public List<Batch> getOutboundBatches() {
|
||||
return outboundBatches.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of records fetched from the second repository (usually the
|
||||
* local store, hence outbound).
|
||||
|
@ -228,6 +240,7 @@ public class SynchronizerSession implements RecordsChannelDelegate, RepositorySe
|
|||
numOutboundRecords.set(-1);
|
||||
numOutboundRecordsStored.set(-1);
|
||||
numOutboundRecordsFailed.set(-1);
|
||||
outboundBatches.set(null);
|
||||
|
||||
// First thing: decide whether we should.
|
||||
if (sessionA.shouldSkip() ||
|
||||
|
@ -320,6 +333,7 @@ public class SynchronizerSession implements RecordsChannelDelegate, RepositorySe
|
|||
numOutboundRecords.set(recordsChannel.getFetchCount());
|
||||
numOutboundRecordsStored.set(recordsChannel.getStoreAcceptedCount());
|
||||
numOutboundRecordsFailed.set(recordsChannel.getStoreFailureCount());
|
||||
outboundBatches.set(recordsChannel.getStoreBatches());
|
||||
flowBToACompleted = true;
|
||||
|
||||
// Finish the two sessions.
|
||||
|
|
|
@ -5,6 +5,9 @@
|
|||
package org.mozilla.gecko.sync.telemetry;
|
||||
|
||||
import org.mozilla.gecko.sync.ExtendedJSONObject;
|
||||
import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Gathers telemetry details about an individual sync stage.
|
||||
|
@ -21,12 +24,10 @@ public class TelemetryStageCollector {
|
|||
public volatile int inbound = 0;
|
||||
public volatile int inboundStored = 0;
|
||||
public volatile int inboundFailed = 0;
|
||||
public volatile int outbound = 0;
|
||||
public volatile int outboundStored = 0;
|
||||
public volatile int outboundFailed = 0;
|
||||
public volatile int reconciled = 0;
|
||||
public volatile ExtendedJSONObject error = null;
|
||||
public volatile ExtendedJSONObject validation = null;
|
||||
public volatile List<StoreBatchTracker.Batch> outbound = null;
|
||||
|
||||
public TelemetryStageCollector(TelemetryCollector syncCollector) {
|
||||
this.syncCollector = syncCollector;
|
||||
|
|
|
@ -43,6 +43,11 @@ public class ExpectSuccessRepositorySessionStoreDelegate extends
|
|||
log("Store reconciled record " + guid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
log("Batch committed");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
|
||||
return this;
|
||||
|
|
|
@ -159,6 +159,10 @@ public class BatchingUploaderTest {
|
|||
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
|
||||
return this;
|
||||
|
|
|
@ -121,6 +121,10 @@ public class PayloadUploadDelegateTest {
|
|||
public void onRecordStoreReconciled(String guid, String oldGuid, Integer newVersion) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onBatchCommitted() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
|
||||
return this;
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
/* Any copyright is dedicated to the Public Domain.
|
||||
http://creativecommons.org/publicdomain/zero/1.0/ */
|
||||
|
||||
package org.mozilla.gecko.sync.synchronizer;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mozilla.gecko.background.testhelpers.TestRunner;
|
||||
import org.mozilla.gecko.sync.synchronizer.StoreBatchTracker.Batch;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(TestRunner.class)
|
||||
public class StoreBatchTrackerTest {
|
||||
private StoreBatchTracker tracker;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
tracker = new StoreBatchTracker();
|
||||
}
|
||||
|
||||
private void recordCounts(int attempted, int succeeded, int failed) {
|
||||
for (int i = 0; i < attempted; ++i) {
|
||||
tracker.onRecordStoreAttempted();
|
||||
}
|
||||
for (int i = 0; i < succeeded; ++i) {
|
||||
tracker.onRecordStoreSucceeded();
|
||||
}
|
||||
for (int i = 0; i < failed; ++i) {
|
||||
tracker.onRecordStoreFailed();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleBatch() {
|
||||
{
|
||||
recordCounts(3, 3, 0);
|
||||
tracker.onBatchFinished();
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
assertEquals(batches.size(), 1);
|
||||
assertEquals(batches.get(0).failed, 0);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
tracker.reset();
|
||||
}
|
||||
{
|
||||
recordCounts(3, 2, 1);
|
||||
// Don't bother calling onBatchFinished, to ensure we finish it automatically.
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
assertEquals(batches.size(), 1);
|
||||
assertEquals(batches.get(0).failed, 1);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
tracker.reset();
|
||||
}
|
||||
{
|
||||
recordCounts(3, 0, 3);
|
||||
tracker.onBatchFinished();
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
assertEquals(batches.size(), 1);
|
||||
assertEquals(batches.get(0).failed, 3);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
tracker.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBatchFail() {
|
||||
recordCounts(3, 3, 0);
|
||||
tracker.onBatchFailed();
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
assertEquals(batches.size(), 1);
|
||||
// The important thing is that there's a non-zero number of failed here.
|
||||
assertEquals(batches.get(0).failed, 1);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleBatches() {
|
||||
recordCounts(3, 3, 0);
|
||||
tracker.onBatchFinished();
|
||||
recordCounts(8, 8, 0);
|
||||
tracker.onBatchFinished();
|
||||
recordCounts(5, 5, 0);
|
||||
tracker.onBatchFinished();
|
||||
// Fake an empty "commit" POST.
|
||||
tracker.onBatchFinished();
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
|
||||
assertEquals(batches.size(), 4);
|
||||
|
||||
assertEquals(batches.get(0).failed, 0);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
|
||||
assertEquals(batches.get(1).failed, 0);
|
||||
assertEquals(batches.get(1).sent, 8);
|
||||
|
||||
assertEquals(batches.get(2).failed, 0);
|
||||
assertEquals(batches.get(2).sent, 5);
|
||||
|
||||
assertEquals(batches.get(3).failed, 0);
|
||||
assertEquals(batches.get(3).sent, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDroppedStores() {
|
||||
recordCounts(3, 1, 0);
|
||||
tracker.onBatchFinished();
|
||||
ArrayList<Batch> batches = tracker.getStoreBatches();
|
||||
assertEquals(batches.size(), 1);
|
||||
assertEquals(batches.get(0).failed, 2);
|
||||
assertEquals(batches.get(0).sent, 3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() {
|
||||
assertEquals(tracker.getStoreBatches().size(), 0);
|
||||
recordCounts(2, 1, 1);
|
||||
assertEquals(tracker.getStoreBatches().size(), 1);
|
||||
tracker.reset();
|
||||
assertEquals(tracker.getStoreBatches().size(), 0);
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче