зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1336001 - Refactor BatchingUploader's state-holder objects to fix threading problems r=rnewman
Previous state: - Two threads were racing to get to batchMeta - one to reset its state, and the other to read its internal state to construct a network request, and then to update its internal state. - This resulted in data corruption when payloads had to be split into multiple batches. A core problem was that there is a lot of state shared across thread boundaries. Specifically, BatchMeta is being written and read both by record consumer threads running off of a thread pool, and by the network worker thread(s). This patch refactors BatchMeta and scheduling of network runnables to ensure that cross-thread access is minimized, and "who owns/accesses what" is explicit. - PayloadDispatcher owns scheduling payload runnables and any data they need to share between each other. - UploaderMeta owns information that's necessary to process incoming records. MozReview-Commit-ID: 9hFs3fXGaGM --HG-- rename : mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchMeta.java => mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/UploaderMeta.java rename : mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchMetaTest.java => mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/UploaderMetaTest.java extra : rebase_source : f0f1a05f19f40a6514d4f0dac9d531b086c3f3ed
This commit is contained in:
Родитель
c69d150710
Коммит
6aca49eb8a
|
@ -1022,8 +1022,10 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
|
|||
'sync/repositories/uploaders/BufferSizeTracker.java',
|
||||
'sync/repositories/uploaders/MayUploadProvider.java',
|
||||
'sync/repositories/uploaders/Payload.java',
|
||||
'sync/repositories/uploaders/PayloadDispatcher.java',
|
||||
'sync/repositories/uploaders/PayloadUploadDelegate.java',
|
||||
'sync/repositories/uploaders/RecordUploadRunnable.java',
|
||||
'sync/repositories/uploaders/UploaderMeta.java',
|
||||
'sync/Server11PreviousPostFailedException.java',
|
||||
'sync/Server11RecordPostFailedException.java',
|
||||
'sync/setup/activities/ActivityUtils.java',
|
||||
|
|
|
@ -168,10 +168,6 @@ public class SyncStorageRequest implements Resource {
|
|||
public SyncStorageRequestDelegate delegate;
|
||||
protected BaseResource resource;
|
||||
|
||||
public SyncStorageRequest() {
|
||||
super();
|
||||
}
|
||||
|
||||
// Default implementation. Override this.
|
||||
protected BaseResourceDelegate makeResourceDelegate(SyncStorageRequest request) {
|
||||
return new SyncStorageResourceDelegate(request);
|
||||
|
|
|
@ -4,82 +4,94 @@
|
|||
|
||||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import android.support.annotation.CheckResult;
|
||||
import android.support.annotation.NonNull;
|
||||
import android.support.annotation.Nullable;
|
||||
|
||||
import org.mozilla.gecko.background.common.log.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.TokenModifiedException;
|
||||
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedChangedUnexpectedly;
|
||||
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader.LastModifiedDidNotChange;
|
||||
|
||||
/**
|
||||
* Keeps track of token, Last-Modified value and GUIDs of succeeded records.
|
||||
* Keeps track of various meta information about a batch series.
|
||||
*
|
||||
* NB regarding concurrent access:
|
||||
* - this class expects access by possibly different, sequentially running threads.
|
||||
* - concurrent access is not supported.
|
||||
*/
|
||||
/* @ThreadSafe */
|
||||
public class BatchMeta extends BufferSizeTracker {
|
||||
public class BatchMeta {
|
||||
private static final String LOG_TAG = "BatchMeta";
|
||||
|
||||
// Will be set once first payload upload succeeds. We don't expect this to change until we
|
||||
// commit the batch, and which point it must change.
|
||||
/* @GuardedBy("this") */ private Long lastModified;
|
||||
private volatile Boolean inBatchingMode;
|
||||
@Nullable private volatile Long lastModified;
|
||||
private volatile String token;
|
||||
|
||||
// Will be set once first payload upload succeeds. We don't expect this to ever change until
|
||||
// a commit succeeds, at which point this gets set to null.
|
||||
/* @GuardedBy("this") */ private String token;
|
||||
// NB: many of the operations on ConcurrentLinkedQueue are not atomic (toArray, for example),
|
||||
// and so use of this queue type is only possible because this class does not support concurrent
|
||||
// access.
|
||||
private final ConcurrentLinkedQueue<String> successRecordGuids = new ConcurrentLinkedQueue<>();
|
||||
|
||||
/* @GuardedBy("accessLock") */ private boolean isUnlimited = false;
|
||||
|
||||
// Accessed by synchronously running threads.
|
||||
/* @GuardedBy("accessLock") */ private final List<String> successRecordGuids = new ArrayList<>();
|
||||
|
||||
/* @GuardedBy("accessLock") */ private boolean needsCommit = false;
|
||||
|
||||
protected final Long collectionLastModified;
|
||||
|
||||
public BatchMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords, @Nullable Long collectionLastModified) {
|
||||
super(payloadLock, maxBytes, maxRecords);
|
||||
this.collectionLastModified = collectionLastModified;
|
||||
BatchMeta(@Nullable Long initialLastModified, Boolean initialInBatchingMode) {
|
||||
lastModified = initialLastModified;
|
||||
inBatchingMode = initialInBatchingMode;
|
||||
}
|
||||
|
||||
protected void setIsUnlimited(boolean isUnlimited) {
|
||||
synchronized (accessLock) {
|
||||
this.isUnlimited = isUnlimited;
|
||||
String[] getSuccessRecordGuids() {
|
||||
// NB: This really doesn't play well with concurrent access.
|
||||
final String[] guids = new String[this.successRecordGuids.size()];
|
||||
this.successRecordGuids.toArray(guids);
|
||||
return guids;
|
||||
}
|
||||
|
||||
void recordSucceeded(final String recordGuid) {
|
||||
// Sanity check.
|
||||
if (recordGuid == null) {
|
||||
throw new IllegalStateException("Record guid is unexpectedly null");
|
||||
}
|
||||
|
||||
successRecordGuids.add(recordGuid);
|
||||
}
|
||||
|
||||
/* package-local */ void setInBatchingMode(boolean inBatchingMode) {
|
||||
this.inBatchingMode = inBatchingMode;
|
||||
}
|
||||
|
||||
/* package-local */ Boolean getInBatchingMode() {
|
||||
return inBatchingMode;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
protected Long getLastModified() {
|
||||
return lastModified;
|
||||
}
|
||||
|
||||
void setLastModified(final Long newLastModified, final boolean expectedToChange) throws BatchingUploader.LastModifiedChangedUnexpectedly, BatchingUploader.LastModifiedDidNotChange {
|
||||
if (lastModified == null) {
|
||||
lastModified = newLastModified;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!expectedToChange && !lastModified.equals(newLastModified)) {
|
||||
Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
|
||||
throw new BatchingUploader.LastModifiedChangedUnexpectedly();
|
||||
|
||||
} else if (expectedToChange && lastModified.equals(newLastModified)) {
|
||||
Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
|
||||
throw new BatchingUploader.LastModifiedDidNotChange();
|
||||
|
||||
} else {
|
||||
lastModified = newLastModified;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canFit(long recordDeltaByteCount) {
|
||||
synchronized (accessLock) {
|
||||
return isUnlimited || super.canFit(recordDeltaByteCount);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@CheckResult
|
||||
protected boolean addAndEstimateIfFull(long recordDeltaByteCount) {
|
||||
synchronized (accessLock) {
|
||||
needsCommit = true;
|
||||
boolean isFull = super.addAndEstimateIfFull(recordDeltaByteCount);
|
||||
return !isUnlimited && isFull;
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean needToCommit() {
|
||||
synchronized (accessLock) {
|
||||
return needsCommit;
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized String getToken() {
|
||||
@Nullable
|
||||
protected String getToken() {
|
||||
return token;
|
||||
}
|
||||
|
||||
protected synchronized void setToken(final String newToken, boolean isCommit) throws TokenModifiedException {
|
||||
void setToken(final String newToken, boolean isCommit) throws BatchingUploader.TokenModifiedException {
|
||||
// Set token once in a batching mode.
|
||||
// In a non-batching mode, this.token and newToken will be null, and this is a no-op.
|
||||
if (token == null) {
|
||||
|
@ -91,7 +103,7 @@ public class BatchMeta extends BufferSizeTracker {
|
|||
if (isCommit) {
|
||||
// We expect token to be null when commit payload succeeds.
|
||||
if (newToken != null) {
|
||||
throw new TokenModifiedException();
|
||||
throw new BatchingUploader.TokenModifiedException();
|
||||
} else {
|
||||
token = null;
|
||||
}
|
||||
|
@ -100,66 +112,11 @@ public class BatchMeta extends BufferSizeTracker {
|
|||
|
||||
// We expect new token to always equal current token for non-commit payloads.
|
||||
if (!token.equals(newToken)) {
|
||||
throw new TokenModifiedException();
|
||||
throw new BatchingUploader.TokenModifiedException();
|
||||
}
|
||||
}
|
||||
|
||||
protected synchronized Long getLastModified() {
|
||||
if (lastModified == null) {
|
||||
return collectionLastModified;
|
||||
}
|
||||
return lastModified;
|
||||
}
|
||||
|
||||
protected synchronized void setLastModified(final Long newLastModified, final boolean expectedToChange) throws LastModifiedChangedUnexpectedly, LastModifiedDidNotChange {
|
||||
if (lastModified == null) {
|
||||
lastModified = newLastModified;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!expectedToChange && !lastModified.equals(newLastModified)) {
|
||||
Logger.debug(LOG_TAG, "Last-Modified timestamp changed when we didn't expect it");
|
||||
throw new LastModifiedChangedUnexpectedly();
|
||||
|
||||
} else if (expectedToChange && lastModified.equals(newLastModified)) {
|
||||
Logger.debug(LOG_TAG, "Last-Modified timestamp did not change when we expected it to");
|
||||
throw new LastModifiedDidNotChange();
|
||||
|
||||
} else {
|
||||
lastModified = newLastModified;
|
||||
}
|
||||
}
|
||||
|
||||
protected ArrayList<String> getSuccessRecordGuids() {
|
||||
synchronized (accessLock) {
|
||||
return new ArrayList<>(this.successRecordGuids);
|
||||
}
|
||||
}
|
||||
|
||||
protected void recordSucceeded(final String recordGuid) {
|
||||
// Sanity check.
|
||||
if (recordGuid == null) {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
synchronized (accessLock) {
|
||||
successRecordGuids.add(recordGuid);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
|
||||
return isUnlimited || super.canFitRecordByteDelta(byteDelta, recordCount, byteCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reset() {
|
||||
synchronized (accessLock) {
|
||||
super.reset();
|
||||
token = null;
|
||||
lastModified = null;
|
||||
successRecordGuids.clear();
|
||||
needsCommit = false;
|
||||
}
|
||||
BatchMeta nextBatchMeta() {
|
||||
return new BatchMeta(lastModified, inBatchingMode);
|
||||
}
|
||||
}
|
|
@ -17,6 +17,9 @@ import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDeleg
|
|||
import org.mozilla.gecko.sync.repositories.domain.Record;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -38,14 +41,19 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
*
|
||||
* Once we go past using one batch this uploader is no longer "atomic". Partial state is exposed
|
||||
* to other clients after our first batch is committed and before our last batch is committed.
|
||||
* However, our per-batch limits are high, X-I-U-S mechanics help protect downloading clients
|
||||
* (as long as they implement X-I-U-S) with 412 error codes in case of interleaving upload and download,
|
||||
* and most mobile clients will not be uploading large-enough amounts of data (especially structured
|
||||
* data, such as bookmarks).
|
||||
* However, our per-batch limits are (hopefully) high, X-I-U-S mechanics help protect downloading clients
|
||||
* (as long as they implement X-I-U-S) with 412 error codes in case of interleaving upload and download.
|
||||
*
|
||||
* Last-Modified header returned with the first batch payload POST success is maintained for a batch,
|
||||
* to guard against concurrent-modification errors (different uploader commits before we're done).
|
||||
*
|
||||
* Implementation notes:
|
||||
* - RecordsChannel (via RepositorySession) delivers a stream of records for upload via {@link #process(Record)}
|
||||
* - UploaderMeta is used to track batch-level information necessary for processing outgoing records
|
||||
* - PayloadMeta is used to track payload-level information necessary for processing outgoing records
|
||||
* - BatchMeta within PayloadDispatcher acts as a shared whiteboard which is used for tracking
|
||||
* information across batches (last-modified, batching mode) as well as batch side-effects (stored guids)
|
||||
*
|
||||
* Non-batching mode notes:
|
||||
* We also support Sync servers which don't enable batching for uploads. In this case, we respect
|
||||
* payload limits for individual uploads, and every upload is considered a commit. Batching limits
|
||||
|
@ -56,29 +64,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
public class BatchingUploader {
|
||||
private static final String LOG_TAG = "BatchingUploader";
|
||||
|
||||
private final Uri collectionUri;
|
||||
|
||||
private volatile boolean recordUploadFailed = false;
|
||||
|
||||
private final BatchMeta batchMeta;
|
||||
private final Payload payload;
|
||||
|
||||
// Accessed by synchronously running threads, OK to not synchronize and just make it volatile.
|
||||
private volatile Boolean inBatchingMode;
|
||||
|
||||
// Used to ensure we have thread-safe access to the following:
|
||||
// - byte and record counts in both Payload and BatchMeta objects
|
||||
// - buffers in the Payload object
|
||||
private final Object payloadLock = new Object();
|
||||
|
||||
protected Executor workQueue;
|
||||
protected final RepositorySessionStoreDelegate sessionStoreDelegate;
|
||||
protected final Server11RepositorySession repositorySession;
|
||||
|
||||
protected AtomicLong uploadTimestamp = new AtomicLong(0);
|
||||
|
||||
protected static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
|
||||
protected static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
|
||||
private static final int PER_RECORD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORD_SEPARATOR.length;
|
||||
/* package-local */ static final int PER_PAYLOAD_OVERHEAD_BYTE_COUNT = RecordUploadRunnable.RECORDS_END.length;
|
||||
|
||||
// Sanity check. RECORD_SEPARATOR and RECORD_START are assumed to be of the same length.
|
||||
static {
|
||||
|
@ -87,20 +74,38 @@ public class BatchingUploader {
|
|||
}
|
||||
}
|
||||
|
||||
// Accessed by the record consumer thread pool.
|
||||
// Will be re-created, so mark it as volatile.
|
||||
private volatile Payload payload;
|
||||
|
||||
// Accessed by both the record consumer thread pool and the network worker thread(s).
|
||||
/* package-local */ final Uri collectionUri;
|
||||
/* package-local */ final RepositorySessionStoreDelegate sessionStoreDelegate;
|
||||
/* package-local */ @VisibleForTesting final PayloadDispatcher payloadDispatcher;
|
||||
private final Server11RepositorySession repositorySession;
|
||||
// Will be re-created, so mark it as volatile.
|
||||
private volatile UploaderMeta uploaderMeta;
|
||||
|
||||
// Used to ensure we have thread-safe access to the following:
|
||||
// - byte and record counts in both Payload and BatchMeta objects
|
||||
// - buffers in the Payload object
|
||||
private final Object payloadLock = new Object();
|
||||
|
||||
|
||||
public BatchingUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
|
||||
this.repositorySession = repositorySession;
|
||||
this.workQueue = workQueue;
|
||||
this.sessionStoreDelegate = sessionStoreDelegate;
|
||||
this.collectionUri = Uri.parse(repositorySession.getServerRepository().collectionURI().toString());
|
||||
|
||||
InfoConfiguration config = repositorySession.getServerRepository().getInfoConfiguration();
|
||||
this.batchMeta = new BatchMeta(
|
||||
payloadLock, config.maxTotalBytes, config.maxTotalRecords,
|
||||
repositorySession.getServerRepository().getCollectionLastModified()
|
||||
);
|
||||
this.uploaderMeta = new UploaderMeta(payloadLock, config.maxTotalBytes, config.maxTotalRecords);
|
||||
this.payload = new Payload(payloadLock, config.maxPostBytes, config.maxPostRecords);
|
||||
|
||||
this.payloadDispatcher = new PayloadDispatcher(
|
||||
workQueue, this, repositorySession.getServerRepository().getCollectionLastModified());
|
||||
}
|
||||
|
||||
// Called concurrently from the threads running off of a record consumer thread pool.
|
||||
public void process(final Record record) {
|
||||
final String guid = record.guid;
|
||||
final byte[] recordBytes = record.toJSONBytes();
|
||||
|
@ -115,7 +120,7 @@ public class BatchingUploader {
|
|||
}
|
||||
|
||||
synchronized (payloadLock) {
|
||||
final boolean canFitRecordIntoBatch = batchMeta.canFit(recordDeltaByteCount);
|
||||
final boolean canFitRecordIntoBatch = uploaderMeta.canFit(recordDeltaByteCount);
|
||||
final boolean canFitRecordIntoPayload = payload.canFit(recordDeltaByteCount);
|
||||
|
||||
// Record fits!
|
||||
|
@ -139,7 +144,6 @@ public class BatchingUploader {
|
|||
flush(true, false);
|
||||
|
||||
Logger.debug(LOG_TAG, "Recording the incoming record into a new batch");
|
||||
batchMeta.reset();
|
||||
|
||||
// Keep track of the overflow record.
|
||||
addAndFlushIfNecessary(recordDeltaByteCount, recordBytes, guid);
|
||||
|
@ -150,12 +154,11 @@ public class BatchingUploader {
|
|||
// Convenience function used from the process method; caller must hold a payloadLock.
|
||||
private void addAndFlushIfNecessary(long byteCount, byte[] recordBytes, String guid) {
|
||||
boolean isPayloadFull = payload.addAndEstimateIfFull(byteCount, recordBytes, guid);
|
||||
boolean isBatchFull = batchMeta.addAndEstimateIfFull(byteCount);
|
||||
boolean isBatchFull = uploaderMeta.addAndEstimateIfFull(byteCount);
|
||||
|
||||
// Preemptive commit batch or upload a payload if they're estimated to be full.
|
||||
if (isBatchFull) {
|
||||
flush(true, false);
|
||||
batchMeta.reset();
|
||||
} else if (isPayloadFull) {
|
||||
flush(false, false);
|
||||
}
|
||||
|
@ -164,136 +167,40 @@ public class BatchingUploader {
|
|||
public void noMoreRecordsToUpload() {
|
||||
Logger.debug(LOG_TAG, "Received 'no more records to upload' signal.");
|
||||
|
||||
// Run this after the last payload succeeds, so that we know for sure if we're in a batching
|
||||
// mode and need to commit with a potentially empty payload.
|
||||
workQueue.execute(new Runnable() {
|
||||
// If we have any pending records in the Payload, flush them!
|
||||
if (!payload.isEmpty()) {
|
||||
flush(true, true);
|
||||
return;
|
||||
}
|
||||
|
||||
// If we don't have any pending records, we still might need to send an empty "commit"
|
||||
// payload if we are in the batching mode.
|
||||
// The dispatcher will run the final flush on its executor if necessary after all payloads
|
||||
// succeed and it knows for sure if we're in a batching mode.
|
||||
payloadDispatcher.finalizeQueue(uploaderMeta.needToCommit(), new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
commitIfNecessaryAfterLastPayload();
|
||||
flush(true, true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void commitIfNecessaryAfterLastPayload() {
|
||||
// Must be called after last payload upload finishes.
|
||||
synchronized (payload) {
|
||||
// If we have any pending records in the Payload, flush them!
|
||||
if (!payload.isEmpty()) {
|
||||
flush(true, true);
|
||||
|
||||
// If we have an empty payload but need to commit the batch in the batching mode, flush!
|
||||
} else if (batchMeta.needToCommit() && Boolean.TRUE.equals(inBatchingMode)) {
|
||||
flush(true, true);
|
||||
|
||||
// Otherwise, we're done.
|
||||
} else {
|
||||
finished(uploadTimestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* We've been told by our upload delegate that a payload succeeded.
|
||||
* Depending on the type of payload and batch mode status, inform our delegate of progress.
|
||||
*
|
||||
* @param response success response to our commit post
|
||||
* @param isCommit was this a commit upload?
|
||||
* @param isLastPayload was this a very last payload we'll upload?
|
||||
*/
|
||||
public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
|
||||
// Sanity check.
|
||||
if (inBatchingMode == null) {
|
||||
throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
|
||||
}
|
||||
|
||||
// We consider records to have been committed if we're not in a batching mode or this was a commit.
|
||||
// If records have been committed, notify our store delegate.
|
||||
if (!inBatchingMode || isCommit) {
|
||||
for (String guid : batchMeta.getSuccessRecordGuids()) {
|
||||
sessionStoreDelegate.onRecordStoreSucceeded(guid);
|
||||
}
|
||||
}
|
||||
|
||||
// If this was our very last commit, we're done storing records.
|
||||
// Get Last-Modified timestamp from the response, and pass it upstream.
|
||||
if (isLastPayload) {
|
||||
finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
|
||||
}
|
||||
}
|
||||
|
||||
public void lastPayloadFailed() {
|
||||
finished(uploadTimestamp);
|
||||
}
|
||||
|
||||
private void finished(long lastModifiedTimestamp) {
|
||||
bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
|
||||
finished(uploadTimestamp);
|
||||
}
|
||||
|
||||
private void finished(AtomicLong lastModifiedTimestamp) {
|
||||
/* package-local */ void finished(AtomicLong lastModifiedTimestamp) {
|
||||
repositorySession.storeDone(lastModifiedTimestamp.get());
|
||||
}
|
||||
|
||||
public BatchMeta getCurrentBatch() {
|
||||
return batchMeta;
|
||||
}
|
||||
|
||||
public void setInBatchingMode(boolean inBatchingMode) {
|
||||
this.inBatchingMode = inBatchingMode;
|
||||
|
||||
// Will be called from a thread dispatched by PayloadDispatcher.
|
||||
// NB: Access to `uploaderMeta.isUnlimited` is guarded by the payloadLock.
|
||||
/* package-local */ void setUnlimitedMode(boolean isUnlimited) {
|
||||
// If we know for sure that we're not in a batching mode,
|
||||
// consider our batch to be of unlimited size.
|
||||
this.batchMeta.setIsUnlimited(!inBatchingMode);
|
||||
this.uploaderMeta.setIsUnlimited(isUnlimited);
|
||||
}
|
||||
|
||||
public Boolean getInBatchingMode() {
|
||||
return inBatchingMode;
|
||||
}
|
||||
|
||||
public void setLastModified(final Long lastModified, final boolean isCommit) throws BatchingUploaderException {
|
||||
// Sanity check.
|
||||
if (inBatchingMode == null) {
|
||||
throw new IllegalStateException("Can't process Last-Modified before we know we're in a batching mode.");
|
||||
}
|
||||
|
||||
// In non-batching mode, every time we receive a Last-Modified timestamp, we expect it to change
|
||||
// since records are "committed" (become visible to other clients) on every payload.
|
||||
// In batching mode, we only expect Last-Modified to change when we commit a batch.
|
||||
batchMeta.setLastModified(lastModified, isCommit || !inBatchingMode);
|
||||
}
|
||||
|
||||
public void recordSucceeded(final String recordGuid) {
|
||||
Logger.debug(LOG_TAG, "Record store succeeded: " + recordGuid);
|
||||
batchMeta.recordSucceeded(recordGuid);
|
||||
}
|
||||
|
||||
public void recordFailed(final String recordGuid) {
|
||||
recordFailed(new Server11RecordPostFailedException(), recordGuid);
|
||||
}
|
||||
|
||||
public void recordFailed(final Exception e, final String recordGuid) {
|
||||
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
|
||||
recordUploadFailed = true;
|
||||
sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
|
||||
}
|
||||
|
||||
public Server11RepositorySession getRepositorySession() {
|
||||
/* package-local */ Server11RepositorySession getRepositorySession() {
|
||||
return repositorySession;
|
||||
}
|
||||
|
||||
private static void bumpTimestampTo(final AtomicLong current, long newValue) {
|
||||
while (true) {
|
||||
long existing = current.get();
|
||||
if (existing > newValue) {
|
||||
return;
|
||||
}
|
||||
if (current.compareAndSet(existing, newValue)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void flush(final boolean isCommit, final boolean isLastPayload) {
|
||||
final ArrayList<byte[]> outgoing;
|
||||
final ArrayList<String> outgoingGuids;
|
||||
|
@ -306,39 +213,28 @@ public class BatchingUploader {
|
|||
outgoingGuids = payload.getRecordGuidsBuffer();
|
||||
byteCount = payload.getByteCount();
|
||||
}
|
||||
payload = payload.nextPayload();
|
||||
|
||||
workQueue.execute(new RecordUploadRunnable(
|
||||
new BatchingAtomicUploaderMayUploadProvider(),
|
||||
collectionUri,
|
||||
batchMeta,
|
||||
new PayloadUploadDelegate(this, outgoingGuids, isCommit, isLastPayload),
|
||||
outgoing,
|
||||
byteCount,
|
||||
isCommit
|
||||
));
|
||||
payloadDispatcher.queue(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload);
|
||||
|
||||
payload.reset();
|
||||
}
|
||||
|
||||
private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
|
||||
public boolean mayUpload() {
|
||||
return !recordUploadFailed;
|
||||
if (isCommit && !isLastPayload) {
|
||||
uploaderMeta = uploaderMeta.nextUploaderMeta();
|
||||
}
|
||||
}
|
||||
|
||||
public static class BatchingUploaderException extends Exception {
|
||||
/* package-local */ static class BatchingUploaderException extends Exception {
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
public static class RecordTooLargeToUpload extends BatchingUploaderException {
|
||||
/* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
public static class LastModifiedDidNotChange extends BatchingUploaderException {
|
||||
/* package-local */ static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
public static class LastModifiedChangedUnexpectedly extends BatchingUploaderException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
public static class TokenModifiedException extends BatchingUploaderException {
|
||||
/* package-local */ static class TokenModifiedException extends BatchingUploaderException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
};
|
||||
private static class RecordTooLargeToUpload extends BatchingUploaderException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
}
|
||||
}
|
|
@ -19,7 +19,7 @@ public abstract class BufferSizeTracker {
|
|||
|
||||
/* @GuardedBy("accessLock") */ private long byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
|
||||
/* @GuardedBy("accessLock") */ private long recordCount = 0;
|
||||
/* @GuardedBy("accessLock") */ protected Long smallestRecordByteCount;
|
||||
/* @GuardedBy("accessLock") */ private Long smallestRecordByteCount;
|
||||
|
||||
protected final long maxBytes;
|
||||
protected final long maxRecords;
|
||||
|
@ -87,14 +87,6 @@ public abstract class BufferSizeTracker {
|
|||
}
|
||||
}
|
||||
|
||||
@CallSuper
|
||||
protected void reset() {
|
||||
synchronized (accessLock) {
|
||||
byteCount = BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT;
|
||||
recordCount = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@CallSuper
|
||||
protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
|
||||
return recordCount < maxRecords
|
||||
|
|
|
@ -37,15 +37,6 @@ public class Payload extends BufferSizeTracker {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void reset() {
|
||||
synchronized (accessLock) {
|
||||
super.reset();
|
||||
recordsBuffer.clear();
|
||||
recordGuidsBuffer.clear();
|
||||
}
|
||||
}
|
||||
|
||||
protected ArrayList<byte[]> getRecordsBuffer() {
|
||||
synchronized (accessLock) {
|
||||
return new ArrayList<>(recordsBuffer);
|
||||
|
@ -63,4 +54,8 @@ public class Payload extends BufferSizeTracker {
|
|||
return recordsBuffer.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
Payload nextPayload() {
|
||||
return new Payload(accessLock, maxBytes, maxRecords);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,184 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import android.support.annotation.Nullable;
|
||||
import android.support.annotation.VisibleForTesting;
|
||||
|
||||
import org.mozilla.gecko.background.common.log.Logger;
|
||||
import org.mozilla.gecko.sync.Server11RecordPostFailedException;
|
||||
import org.mozilla.gecko.sync.net.SyncResponse;
|
||||
import org.mozilla.gecko.sync.net.SyncStorageResponse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Dispatches payload runnables and handles their results.
|
||||
*
|
||||
* All of the methods, except for `queue` and `finalizeQueue`, will be called from the thread(s)
|
||||
* running sequentially on the SingleThreadExecutor `executor`.
|
||||
*/
|
||||
class PayloadDispatcher {
|
||||
private static final String LOG_TAG = "PayloadDispatcher";
|
||||
|
||||
// All payload runnables share the same whiteboard.
|
||||
// It's accessed directly by the runnables; tests also make use of this direct access.
|
||||
volatile BatchMeta batchWhiteboard;
|
||||
private final AtomicLong uploadTimestamp = new AtomicLong(0);
|
||||
|
||||
// Accessed from different threads sequentially running on the 'executor'.
|
||||
private volatile boolean recordUploadFailed = false;
|
||||
|
||||
private final Executor executor;
|
||||
private final BatchingUploader uploader;
|
||||
|
||||
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
|
||||
// Initially we don't know if we're in a batching mode.
|
||||
this.batchWhiteboard = new BatchMeta(initialLastModified, null);
|
||||
this.uploader = uploader;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
void queue(
|
||||
final ArrayList<byte[]> outgoing,
|
||||
final ArrayList<String> outgoingGuids,
|
||||
final long byteCount,
|
||||
final boolean isCommit, final boolean isLastPayload) {
|
||||
|
||||
// Note that `executor` is expected to be a SingleThreadExecutor.
|
||||
executor.execute(new BatchContextRunnable(isCommit) {
|
||||
@Override
|
||||
public void run() {
|
||||
new RecordUploadRunnable(
|
||||
new BatchingAtomicUploaderMayUploadProvider(),
|
||||
uploader.collectionUri,
|
||||
batchWhiteboard.getToken(),
|
||||
new PayloadUploadDelegate(
|
||||
uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider(),
|
||||
PayloadDispatcher.this,
|
||||
outgoingGuids,
|
||||
isCommit,
|
||||
isLastPayload
|
||||
),
|
||||
outgoing,
|
||||
byteCount,
|
||||
isCommit
|
||||
).run();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void setInBatchingMode(boolean inBatchingMode) {
|
||||
batchWhiteboard.setInBatchingMode(inBatchingMode);
|
||||
uploader.setUnlimitedMode(!inBatchingMode);
|
||||
}
|
||||
|
||||
/**
|
||||
* We've been told by our upload delegate that a payload succeeded.
|
||||
* Depending on the type of payload and batch mode status, inform our delegate of progress.
|
||||
*
|
||||
* @param response success response to our commit post
|
||||
* @param guids list of successfully posted record guids
|
||||
* @param isCommit was this a commit upload?
|
||||
* @param isLastPayload was this a very last payload we'll upload?
|
||||
*/
|
||||
void payloadSucceeded(final SyncStorageResponse response, final String[] guids, final boolean isCommit, final boolean isLastPayload) {
|
||||
// Sanity check.
|
||||
if (batchWhiteboard.getInBatchingMode() == null) {
|
||||
throw new IllegalStateException("Can't process payload success until we know if we're in a batching mode");
|
||||
}
|
||||
|
||||
// We consider records to have been committed if we're not in a batching mode or this was a commit.
|
||||
// If records have been committed, notify our store delegate.
|
||||
if (!batchWhiteboard.getInBatchingMode() || isCommit) {
|
||||
for (String guid : guids) {
|
||||
uploader.sessionStoreDelegate.onRecordStoreSucceeded(guid);
|
||||
}
|
||||
}
|
||||
|
||||
// If this was our very last commit, we're done storing records.
|
||||
// Get Last-Modified timestamp from the response, and pass it upstream.
|
||||
if (isLastPayload) {
|
||||
finished(response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED));
|
||||
}
|
||||
}
|
||||
|
||||
void lastPayloadFailed() {
|
||||
uploader.finished(uploadTimestamp);
|
||||
}
|
||||
|
||||
private void finished(long lastModifiedTimestamp) {
|
||||
bumpTimestampTo(uploadTimestamp, lastModifiedTimestamp);
|
||||
uploader.finished(uploadTimestamp);
|
||||
}
|
||||
|
||||
void finalizeQueue(final boolean needToCommit, final Runnable finalRunnable) {
|
||||
executor.execute(new NonPayloadContextRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Must be called after last payload upload finishes.
|
||||
if (needToCommit && Boolean.TRUE.equals(batchWhiteboard.getInBatchingMode())) {
|
||||
finalRunnable.run();
|
||||
|
||||
// Otherwise, we're done.
|
||||
} else {
|
||||
uploader.finished(uploadTimestamp);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void recordFailed(final String recordGuid) {
|
||||
recordFailed(new Server11RecordPostFailedException(), recordGuid);
|
||||
}
|
||||
|
||||
void recordFailed(final Exception e, final String recordGuid) {
|
||||
Logger.debug(LOG_TAG, "Record store failed for guid " + recordGuid + " with exception: " + e.toString());
|
||||
recordUploadFailed = true;
|
||||
uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
|
||||
}
|
||||
|
||||
void prepareForNextBatch() {
|
||||
batchWhiteboard = batchWhiteboard.nextBatchMeta();
|
||||
}
|
||||
|
||||
private static void bumpTimestampTo(final AtomicLong current, long newValue) {
|
||||
while (true) {
|
||||
long existing = current.get();
|
||||
if (existing > newValue) {
|
||||
return;
|
||||
}
|
||||
if (current.compareAndSet(existing, newValue)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows tests to easily peek into the flow of upload tasks.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
abstract static class BatchContextRunnable implements Runnable {
|
||||
boolean isCommit;
|
||||
|
||||
BatchContextRunnable(boolean isCommit) {
|
||||
this.isCommit = isCommit;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows tests to tell apart non-payload runnables going through the executor.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
abstract static class NonPayloadContextRunnable implements Runnable {}
|
||||
|
||||
private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
|
||||
public boolean mayUpload() {
|
||||
return !recordUploadFailed;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -18,18 +18,20 @@ import org.mozilla.gecko.sync.net.SyncStorageResponse;
|
|||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
||||
class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
||||
private static final String LOG_TAG = "PayloadUploadDelegate";
|
||||
|
||||
private static final String KEY_BATCH = "batch";
|
||||
|
||||
private final BatchingUploader uploader;
|
||||
private final AuthHeaderProvider headerProvider;
|
||||
private final PayloadDispatcher dispatcher;
|
||||
private ArrayList<String> postedRecordGuids;
|
||||
private final boolean isCommit;
|
||||
private final boolean isLastPayload;
|
||||
|
||||
public PayloadUploadDelegate(BatchingUploader uploader, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
|
||||
this.uploader = uploader;
|
||||
PayloadUploadDelegate(AuthHeaderProvider headerProvider, PayloadDispatcher dispatcher, ArrayList<String> postedRecordGuids, boolean isCommit, boolean isLastPayload) {
|
||||
this.headerProvider = headerProvider;
|
||||
this.dispatcher = dispatcher;
|
||||
this.postedRecordGuids = postedRecordGuids;
|
||||
this.isCommit = isCommit;
|
||||
this.isLastPayload = isLastPayload;
|
||||
|
@ -37,16 +39,17 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
|
||||
@Override
|
||||
public AuthHeaderProvider getAuthHeaderProvider() {
|
||||
return uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider();
|
||||
return headerProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String ifUnmodifiedSince() {
|
||||
final Long lastModified = uploader.getCurrentBatch().getLastModified();
|
||||
final Long lastModified = dispatcher.batchWhiteboard.getLastModified();
|
||||
if (lastModified == null) {
|
||||
return null;
|
||||
} else {
|
||||
return Utils.millisecondsToDecimalSecondsString(lastModified);
|
||||
}
|
||||
return Utils.millisecondsToDecimalSecondsString(lastModified);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,8 +83,8 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
// If we got a 200, it could be either a non-batching result, or a batch commit.
|
||||
// - if we're in a batching mode, we expect this to be a commit.
|
||||
// If we got a 202, we expect there to be a token present in the response
|
||||
if (response.getStatusCode() == 200 && uploader.getCurrentBatch().getToken() != null) {
|
||||
if (uploader.getInBatchingMode() && !isCommit) {
|
||||
if (response.getStatusCode() == 200 && dispatcher.batchWhiteboard.getToken() != null) {
|
||||
if (dispatcher.batchWhiteboard.getInBatchingMode() && !isCommit) {
|
||||
handleRequestError(
|
||||
new IllegalStateException("Got 200 OK in batching mode, but this was not a commit payload")
|
||||
);
|
||||
|
@ -98,14 +101,14 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
|
||||
// With sanity checks out of the way, can now safely say if we're in a batching mode or not.
|
||||
// We only do this once per session.
|
||||
if (uploader.getInBatchingMode() == null) {
|
||||
uploader.setInBatchingMode(body.containsKey(KEY_BATCH));
|
||||
if (dispatcher.batchWhiteboard.getInBatchingMode() == null) {
|
||||
dispatcher.setInBatchingMode(body.containsKey(KEY_BATCH));
|
||||
}
|
||||
|
||||
// Tell current batch about the token we've received.
|
||||
// Throws if token changed after being set once, or if we got a non-null token after a commit.
|
||||
try {
|
||||
uploader.getCurrentBatch().setToken(body.getString(KEY_BATCH), isCommit);
|
||||
dispatcher.batchWhiteboard.setToken(body.getString(KEY_BATCH), isCommit);
|
||||
} catch (BatchingUploader.BatchingUploaderException e) {
|
||||
handleRequestError(e);
|
||||
return;
|
||||
|
@ -113,9 +116,14 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
|
||||
// Will throw if Last-Modified changed when it shouldn't have.
|
||||
try {
|
||||
uploader.setLastModified(
|
||||
// In non-batching mode, every time we receive a Last-Modified timestamp, we expect it
|
||||
// to change since records are "committed" (become visible to other clients) on every
|
||||
// payload.
|
||||
// In batching mode, we only expect Last-Modified to change when we commit a batch.
|
||||
dispatcher.batchWhiteboard.setLastModified(
|
||||
response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED),
|
||||
isCommit);
|
||||
isCommit || !dispatcher.batchWhiteboard.getInBatchingMode()
|
||||
);
|
||||
} catch (BatchingUploader.BatchingUploaderException e) {
|
||||
handleRequestError(e);
|
||||
return;
|
||||
|
@ -134,7 +142,7 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
Logger.trace(LOG_TAG, "Successful records: " + success.toString());
|
||||
for (Object o : success) {
|
||||
try {
|
||||
uploader.recordSucceeded((String) o);
|
||||
dispatcher.batchWhiteboard.recordSucceeded((String) o);
|
||||
} catch (ClassCastException e) {
|
||||
Logger.error(LOG_TAG, "Got exception parsing POST success guid.", e);
|
||||
// Not much to be done.
|
||||
|
@ -155,14 +163,23 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
if (failed != null && !failed.object.isEmpty()) {
|
||||
Logger.debug(LOG_TAG, "Failed records: " + failed.object.toString());
|
||||
for (String guid : failed.keySet()) {
|
||||
uploader.recordFailed(guid);
|
||||
dispatcher.recordFailed(guid);
|
||||
}
|
||||
}
|
||||
// GC
|
||||
failed = null;
|
||||
|
||||
// And we're done! Let uploader finish up.
|
||||
uploader.payloadSucceeded(response, isCommit, isLastPayload);
|
||||
dispatcher.payloadSucceeded(
|
||||
response,
|
||||
dispatcher.batchWhiteboard.getSuccessRecordGuids(),
|
||||
isCommit,
|
||||
isLastPayload
|
||||
);
|
||||
|
||||
if (isCommit && !isLastPayload) {
|
||||
dispatcher.prepareForNextBatch();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,13 +190,13 @@ public class PayloadUploadDelegate implements SyncStorageRequestDelegate {
|
|||
@Override
|
||||
public void handleRequestError(Exception e) {
|
||||
for (String guid : postedRecordGuids) {
|
||||
uploader.recordFailed(e, guid);
|
||||
dispatcher.recordFailed(e, guid);
|
||||
}
|
||||
// GC
|
||||
postedRecordGuids = null;
|
||||
|
||||
if (isLastPayload) {
|
||||
uploader.lastPayloadFailed();
|
||||
dispatcher.lastPayloadFailed();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,10 +5,12 @@
|
|||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import android.net.Uri;
|
||||
import android.support.annotation.Nullable;
|
||||
import android.support.annotation.VisibleForTesting;
|
||||
|
||||
import org.mozilla.gecko.background.common.log.Logger;
|
||||
import org.mozilla.gecko.sync.Server11PreviousPostFailedException;
|
||||
import org.mozilla.gecko.sync.Utils;
|
||||
import org.mozilla.gecko.sync.net.SyncStorageRequest;
|
||||
import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
|
||||
|
||||
|
@ -45,11 +47,11 @@ public class RecordUploadRunnable implements Runnable {
|
|||
@VisibleForTesting
|
||||
public final boolean isCommit;
|
||||
private final Uri collectionUri;
|
||||
private final BatchMeta batchMeta;
|
||||
private final String batchToken;
|
||||
|
||||
public RecordUploadRunnable(MayUploadProvider mayUploadProvider,
|
||||
Uri collectionUri,
|
||||
BatchMeta batchMeta,
|
||||
String batchToken,
|
||||
SyncStorageRequestDelegate uploadDelegate,
|
||||
ArrayList<byte[]> outgoing,
|
||||
long byteCount,
|
||||
|
@ -58,7 +60,7 @@ public class RecordUploadRunnable implements Runnable {
|
|||
this.uploadDelegate = uploadDelegate;
|
||||
this.outgoing = outgoing;
|
||||
this.byteCount = byteCount;
|
||||
this.batchMeta = batchMeta;
|
||||
this.batchToken = batchToken;
|
||||
this.collectionUri = collectionUri;
|
||||
this.isCommit = isCommit;
|
||||
}
|
||||
|
@ -144,7 +146,7 @@ public class RecordUploadRunnable implements Runnable {
|
|||
// Fortunately, BaseResource is currently synchronous.
|
||||
// If that ever changes, you'll need to block here.
|
||||
|
||||
final URI postURI = buildPostURI(isCommit, batchMeta, collectionUri);
|
||||
final URI postURI = buildPostURI(isCommit, batchToken, collectionUri);
|
||||
final SyncStorageRequest request = new SyncStorageRequest(postURI);
|
||||
request.delegate = uploadDelegate;
|
||||
|
||||
|
@ -153,9 +155,8 @@ public class RecordUploadRunnable implements Runnable {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static URI buildPostURI(boolean isCommit, BatchMeta batchMeta, Uri collectionUri) {
|
||||
public static URI buildPostURI(boolean isCommit, @Nullable String batchToken, Uri collectionUri) {
|
||||
final Uri.Builder uriBuilder = collectionUri.buildUpon();
|
||||
final String batchToken = batchMeta.getToken();
|
||||
|
||||
if (batchToken != null) {
|
||||
uriBuilder.appendQueryParameter(QUERY_PARAM_BATCH, batchToken);
|
||||
|
|
|
@ -0,0 +1,58 @@
|
|||
/* This Source Code Form is subject to the terms of the Mozilla Public
|
||||
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
||||
|
||||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import android.support.annotation.CheckResult;
|
||||
import android.support.annotation.NonNull;
|
||||
|
||||
|
||||
public class UploaderMeta extends BufferSizeTracker {
|
||||
// Will be written and read by different threads.
|
||||
/* @GuardedBy("accessLock") */ private volatile boolean isUnlimited = false;
|
||||
|
||||
/* @GuardedBy("accessLock") */ private boolean needsCommit = false;
|
||||
|
||||
public UploaderMeta(@NonNull Object payloadLock, long maxBytes, long maxRecords) {
|
||||
super(payloadLock, maxBytes, maxRecords);
|
||||
}
|
||||
|
||||
protected void setIsUnlimited(boolean isUnlimited) {
|
||||
synchronized (accessLock) {
|
||||
this.isUnlimited = isUnlimited;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canFit(long recordDeltaByteCount) {
|
||||
synchronized (accessLock) {
|
||||
return isUnlimited || super.canFit(recordDeltaByteCount);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@CheckResult
|
||||
protected boolean addAndEstimateIfFull(long recordDeltaByteCount) {
|
||||
synchronized (accessLock) {
|
||||
needsCommit = true;
|
||||
boolean isFull = super.addAndEstimateIfFull(recordDeltaByteCount);
|
||||
return !isUnlimited && isFull;
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean needToCommit() {
|
||||
synchronized (accessLock) {
|
||||
return needsCommit;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean canFitRecordByteDelta(long byteDelta, long recordCount, long byteCount) {
|
||||
return isUnlimited || super.canFitRecordByteDelta(byteDelta, recordCount, byteCount);
|
||||
}
|
||||
|
||||
UploaderMeta nextUploaderMeta() {
|
||||
return new UploaderMeta(accessLock, maxBytes, maxRecords);
|
||||
}
|
||||
}
|
|
@ -72,7 +72,12 @@ public class TestServer11RepositorySession {
|
|||
static final String SYNC_KEY = "eh7ppnb82iwr5kt3z3uyi5vr44";
|
||||
|
||||
public final AuthHeaderProvider authHeaderProvider = new BasicAuthHeaderProvider(TEST_USERNAME, TEST_PASSWORD);
|
||||
protected final InfoCollections infoCollections = new InfoCollections();
|
||||
protected final InfoCollections infoCollections = new InfoCollections() {
|
||||
@Override
|
||||
public Long getTimestamp(String collection) {
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
|
||||
|
||||
// Few-second timeout so that our longer operations don't time out and cause spurious error-handling results.
|
||||
|
|
|
@ -1,6 +1,3 @@
|
|||
/* Any copyright is dedicated to the Public Domain.
|
||||
http://creativecommons.org/publicdomain/zero/1.0/ */
|
||||
|
||||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import org.junit.Before;
|
||||
|
@ -13,92 +10,80 @@ import static org.junit.Assert.*;
|
|||
@RunWith(TestRunner.class)
|
||||
public class BatchMetaTest {
|
||||
private BatchMeta batchMeta;
|
||||
private long byteLimit = 1024;
|
||||
private long recordLimit = 5;
|
||||
private Object lock = new Object();
|
||||
private Long collectionLastModified = 123L;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
batchMeta = new BatchMeta(lock, byteLimit, recordLimit, collectionLastModified);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructor() {
|
||||
assertEquals(batchMeta.collectionLastModified, collectionLastModified);
|
||||
|
||||
BatchMeta otherBatchMeta = new BatchMeta(lock, byteLimit, recordLimit, null);
|
||||
assertNull(otherBatchMeta.collectionLastModified);
|
||||
batchMeta = new BatchMeta(null, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetLastModified() {
|
||||
// Defaults to collection L-M
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
|
||||
assertNull(batchMeta.getLastModified());
|
||||
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {}
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
}
|
||||
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
|
||||
assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetLastModified() {
|
||||
assertEquals(batchMeta.getLastModified(), collectionLastModified);
|
||||
BatchingUploaderTest.TestRunnableWithTarget<BatchMeta> tests = new BatchingUploaderTest.TestRunnableWithTarget<BatchMeta>() {
|
||||
@Override
|
||||
void tests() {
|
||||
try {
|
||||
batchMeta.setLastModified(123L, true);
|
||||
assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
}
|
||||
|
||||
try {
|
||||
batchMeta.setLastModified(123L, true);
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
}
|
||||
try {
|
||||
batchMeta.setLastModified(123L, false);
|
||||
assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange | BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
}
|
||||
|
||||
// Now the same, but passing in 'false' for "expecting to change".
|
||||
batchMeta.reset();
|
||||
assertEquals(batchMeta.getLastModified(), collectionLastModified);
|
||||
// Test that we can't modify L-M when we're not expecting to
|
||||
try {
|
||||
batchMeta.setLastModified(333L, false);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
assertTrue("Must throw when L-M changes unexpectedly", true);
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Not expecting did-not-change throw");
|
||||
}
|
||||
assertEquals(Long.valueOf(123L), batchMeta.getLastModified());
|
||||
|
||||
try {
|
||||
batchMeta.setLastModified(123L, false);
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Should not check for modifications on first L-M set");
|
||||
}
|
||||
// Test that we can modify L-M when we're expecting to
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Not expecting changed-unexpectedly throw");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Not expecting did-not-change throw");
|
||||
}
|
||||
assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
|
||||
|
||||
// Test that we can't modify L-M when we're not expecting to
|
||||
try {
|
||||
batchMeta.setLastModified(333L, false);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
assertTrue("Must throw when L-M changes unexpectedly", true);
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Not expecting did-not-change throw");
|
||||
}
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(123L));
|
||||
// Test that we catch L-M modifications that expect to change but actually don't
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Not expecting changed-unexpectedly throw");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
|
||||
}
|
||||
assertEquals(Long.valueOf(333), batchMeta.getLastModified());
|
||||
}
|
||||
};
|
||||
|
||||
// Test that we can modify L-M when we're expecting to
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Not expecting changed-unexpectedly throw");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
fail("Not expecting did-not-change throw");
|
||||
}
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(333L));
|
||||
|
||||
// Test that we catch L-M modifications that expect to change but actually don't
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
fail("Not expecting changed-unexpectedly throw");
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {
|
||||
assertTrue("Expected-to-change-but-did-not-change didn't throw", true);
|
||||
}
|
||||
assertEquals(batchMeta.getLastModified(), Long.valueOf(333));
|
||||
tests
|
||||
.setTarget(batchMeta)
|
||||
.run()
|
||||
.setTarget(new BatchMeta(123L, null))
|
||||
.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -136,12 +121,12 @@ public class BatchMetaTest {
|
|||
|
||||
@Test
|
||||
public void testRecordSucceeded() {
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
|
||||
assertEquals(0, batchMeta.getSuccessRecordGuids().length);
|
||||
|
||||
batchMeta.recordSucceeded("guid1");
|
||||
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().contains("guid1"));
|
||||
assertEquals(1, batchMeta.getSuccessRecordGuids().length);
|
||||
assertEquals("guid1", batchMeta.getSuccessRecordGuids()[0]);
|
||||
|
||||
try {
|
||||
batchMeta.recordSucceeded(null);
|
||||
|
@ -150,133 +135,4 @@ public class BatchMetaTest {
|
|||
assertTrue("Should not be able to 'succeed' a null guid", true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteLimits() {
|
||||
assertTrue(batchMeta.canFit(0));
|
||||
|
||||
// Should just fit
|
||||
assertTrue(batchMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
|
||||
// Can't fit a record due to payload overhead.
|
||||
assertFalse(batchMeta.canFit(byteLimit));
|
||||
|
||||
assertFalse(batchMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertFalse(batchMeta.canFit(byteLimit * 1000));
|
||||
|
||||
long recordDelta = byteLimit / 2;
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(recordDelta));
|
||||
|
||||
// Record delta shouldn't fit due to payload overhead.
|
||||
assertFalse(batchMeta.canFit(recordDelta));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountLimits() {
|
||||
// Our record limit is 5, let's add 4.
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
// 5th record still fits in
|
||||
assertTrue(batchMeta.canFit(1));
|
||||
|
||||
// Add the 5th record
|
||||
assertTrue(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
// 6th record won't fit
|
||||
assertFalse(batchMeta.canFit(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedCommit() {
|
||||
assertFalse(batchMeta.needToCommit());
|
||||
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(batchMeta.needToCommit());
|
||||
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(batchMeta.needToCommit());
|
||||
|
||||
batchMeta.reset();
|
||||
|
||||
assertFalse(batchMeta.needToCommit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAdd() {
|
||||
// Ensure we account for payload overhead twice when the batch is empty.
|
||||
// Payload overhead is either RECORDS_START or RECORDS_END, and for an empty payload
|
||||
// we need both.
|
||||
assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(batchMeta.getRecordCount() == 0);
|
||||
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(batchMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertTrue(batchMeta.getRecordCount() == 1);
|
||||
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(batchMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertTrue(batchMeta.getRecordCount() == 4);
|
||||
|
||||
assertTrue(batchMeta.addAndEstimateIfFull(1));
|
||||
|
||||
try {
|
||||
assertTrue(batchMeta.addAndEstimateIfFull(1));
|
||||
fail("BatchMeta should not let us insert records that won't fit");
|
||||
} catch (IllegalStateException e) {
|
||||
assertTrue(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() {
|
||||
assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(batchMeta.getRecordCount() == 0);
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
|
||||
|
||||
// Shouldn't throw even if already empty
|
||||
batchMeta.reset();
|
||||
assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(batchMeta.getRecordCount() == 0);
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
|
||||
|
||||
assertFalse(batchMeta.addAndEstimateIfFull(1));
|
||||
batchMeta.recordSucceeded("guid1");
|
||||
try {
|
||||
batchMeta.setToken("MTIzNA", false);
|
||||
} catch (BatchingUploader.TokenModifiedException e) {}
|
||||
try {
|
||||
batchMeta.setLastModified(333L, true);
|
||||
} catch (BatchingUploader.LastModifiedChangedUnexpectedly e) {
|
||||
} catch (BatchingUploader.LastModifiedDidNotChange e) {}
|
||||
assertEquals(Long.valueOf(333L), batchMeta.getLastModified());
|
||||
assertEquals("MTIzNA", batchMeta.getToken());
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().size() == 1);
|
||||
|
||||
batchMeta.reset();
|
||||
|
||||
// Counts must be reset
|
||||
assertTrue(batchMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(batchMeta.getRecordCount() == 0);
|
||||
assertTrue(batchMeta.getSuccessRecordGuids().isEmpty());
|
||||
|
||||
// Collection L-M shouldn't change
|
||||
assertEquals(batchMeta.collectionLastModified, collectionLastModified);
|
||||
|
||||
// Token must be reset
|
||||
assertNull(batchMeta.getToken());
|
||||
|
||||
// L-M must be reverted to collection L-M
|
||||
assertEquals(batchMeta.getLastModified(), collectionLastModified);
|
||||
}
|
||||
}
|
|
@ -28,22 +28,28 @@ import java.util.concurrent.ExecutorService;
|
|||
@RunWith(TestRunner.class)
|
||||
public class BatchingUploaderTest {
|
||||
class MockExecutorService implements Executor {
|
||||
public int totalPayloads = 0;
|
||||
public int commitPayloads = 0;
|
||||
int totalPayloads = 0;
|
||||
int commitPayloads = 0;
|
||||
|
||||
@Override
|
||||
public void execute(@NonNull Runnable command) {
|
||||
if (command instanceof PayloadDispatcher.NonPayloadContextRunnable) {
|
||||
command.run();
|
||||
return;
|
||||
}
|
||||
|
||||
++totalPayloads;
|
||||
if (((RecordUploadRunnable) command).isCommit) {
|
||||
if (((PayloadDispatcher.BatchContextRunnable) command).isCommit) {
|
||||
++commitPayloads;
|
||||
}
|
||||
command.run();
|
||||
}
|
||||
}
|
||||
|
||||
class MockStoreDelegate implements RepositorySessionStoreDelegate {
|
||||
public int storeFailed = 0;
|
||||
public int storeSucceeded = 0;
|
||||
public int storeCompleted = 0;
|
||||
int storeFailed = 0;
|
||||
int storeSucceeded = 0;
|
||||
int storeCompleted = 0;
|
||||
|
||||
@Override
|
||||
public void onRecordStoreFailed(Exception ex, String recordGuid) {
|
||||
|
@ -76,62 +82,76 @@ public class BatchingUploaderTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testProcessEvenPayloadBatch() {
|
||||
BatchingUploader uploader = makeConstrainedUploader(2, 4);
|
||||
public void testProcessEvenPayloadBatch() throws Exception {
|
||||
TestRunnableWithTarget<BatchingUploader> tests = new TestRunnableWithTarget<BatchingUploader>() {
|
||||
@Override
|
||||
public void tests() {
|
||||
MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
|
||||
// 1st
|
||||
target.process(record);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 2nd -> payload full
|
||||
target.process(record);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 3rd
|
||||
target.process(record);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 4th -> batch & payload full
|
||||
target.process(record);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 5th
|
||||
target.process(record);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 6th -> payload full
|
||||
target.process(record);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 7th
|
||||
target.process(record);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 8th -> batch & payload full
|
||||
target.process(record);
|
||||
assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 9th
|
||||
target.process(record);
|
||||
assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 10th -> payload full
|
||||
target.process(record);
|
||||
assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 11th
|
||||
target.process(record);
|
||||
assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 12th -> batch & payload full
|
||||
target.process(record);
|
||||
assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 13th
|
||||
target.process(record);
|
||||
assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
|
||||
}
|
||||
};
|
||||
|
||||
MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
|
||||
// 1st
|
||||
uploader.process(record);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 2nd -> payload full
|
||||
uploader.process(record);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 3rd
|
||||
uploader.process(record);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 4th -> batch & payload full
|
||||
uploader.process(record);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 5th
|
||||
uploader.process(record);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 6th -> payload full
|
||||
uploader.process(record);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 7th
|
||||
uploader.process(record);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 8th -> batch & payload full
|
||||
uploader.process(record);
|
||||
assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 9th
|
||||
uploader.process(record);
|
||||
assertEquals(4, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 10th -> payload full
|
||||
uploader.process(record);
|
||||
assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 11th
|
||||
uploader.process(record);
|
||||
assertEquals(5, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(2, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 12th -> batch & payload full
|
||||
uploader.process(record);
|
||||
assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
|
||||
// 13th
|
||||
uploader.process(record);
|
||||
assertEquals(6, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(3, ((MockExecutorService) workQueue).commitPayloads);
|
||||
tests
|
||||
.setTarget(makeConstrainedUploader(2, 4))
|
||||
.run();
|
||||
|
||||
// clear up between test runs
|
||||
setUp();
|
||||
|
||||
tests
|
||||
.setTarget(makeConstrainedUploader(2, 4, true))
|
||||
.run();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -214,7 +234,7 @@ public class BatchingUploaderTest {
|
|||
|
||||
// And now we tell uploader that batching isn't supported.
|
||||
// It shouldn't bother with batches from now on, just payloads.
|
||||
uploader.setInBatchingMode(false);
|
||||
uploader.setUnlimitedMode(true);
|
||||
|
||||
// 6th
|
||||
uploader.process(record);
|
||||
|
@ -294,7 +314,7 @@ public class BatchingUploaderTest {
|
|||
BatchingUploader uploader = makeConstrainedUploader(2, 4);
|
||||
|
||||
final Random random = new Random();
|
||||
uploader.setInBatchingMode(false);
|
||||
uploader.setUnlimitedMode(true);
|
||||
for (int i = 0; i < 15000; i++) {
|
||||
uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
|
||||
}
|
||||
|
@ -310,7 +330,7 @@ public class BatchingUploaderTest {
|
|||
final int delay = random.nextInt(20);
|
||||
for (int i = 0; i < 15000; i++) {
|
||||
if (delay == i) {
|
||||
uploader.setInBatchingMode(false);
|
||||
uploader.setUnlimitedMode(true);
|
||||
}
|
||||
uploader.process(new MockRecord(Utils.generateGuid(), null, 0, false, random.nextInt(15000)));
|
||||
}
|
||||
|
@ -325,8 +345,8 @@ public class BatchingUploaderTest {
|
|||
MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
|
||||
uploader.process(record);
|
||||
uploader.process(record);
|
||||
uploader.setInBatchingMode(true);
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.payloadDispatcher.setInBatchingMode(true);
|
||||
uploader.noMoreRecordsToUpload();
|
||||
// One will be a payload post, the other one is batch commit (empty payload)
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
|
@ -342,7 +362,7 @@ public class BatchingUploaderTest {
|
|||
uploader.process(record);
|
||||
uploader.process(record);
|
||||
uploader.process(record);
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.noMoreRecordsToUpload();
|
||||
// One will be a payload post, the other one is batch commit (one record payload)
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
|
@ -352,7 +372,7 @@ public class BatchingUploaderTest {
|
|||
public void testNoMoreRecordsNoOp() {
|
||||
BatchingUploader uploader = makeConstrainedUploader(2, 4);
|
||||
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.noMoreRecordsToUpload();
|
||||
assertEquals(0, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
}
|
||||
|
@ -369,7 +389,7 @@ public class BatchingUploaderTest {
|
|||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.noMoreRecordsToUpload();
|
||||
assertEquals(2, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
}
|
||||
|
@ -383,8 +403,8 @@ public class BatchingUploaderTest {
|
|||
MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
|
||||
uploader.process(record);
|
||||
uploader.process(record);
|
||||
uploader.setInBatchingMode(false);
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.setUnlimitedMode(true);
|
||||
uploader.noMoreRecordsToUpload();
|
||||
// One will be a payload post, the other one is batch commit (one record payload)
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(0, ((MockExecutorService) workQueue).commitPayloads);
|
||||
|
@ -398,24 +418,28 @@ public class BatchingUploaderTest {
|
|||
MockRecord record = new MockRecord(Utils.generateGuid(), null, 0, false);
|
||||
uploader.process(record);
|
||||
|
||||
uploader.commitIfNecessaryAfterLastPayload();
|
||||
uploader.noMoreRecordsToUpload();
|
||||
assertEquals(1, ((MockExecutorService) workQueue).totalPayloads);
|
||||
assertEquals(1, ((MockExecutorService) workQueue).commitPayloads);
|
||||
}
|
||||
|
||||
private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords) {
|
||||
return makeConstrainedUploader(maxPostRecords, maxTotalRecords, false);
|
||||
}
|
||||
|
||||
private BatchingUploader makeConstrainedUploader(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
|
||||
Server11RepositorySession server11RepositorySession = new Server11RepositorySession(
|
||||
makeCountConstrainedRepository(maxPostRecords, maxTotalRecords)
|
||||
makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync)
|
||||
);
|
||||
server11RepositorySession.setStoreDelegate(storeDelegate);
|
||||
return new BatchingUploader(server11RepositorySession, workQueue, storeDelegate);
|
||||
}
|
||||
|
||||
private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords) {
|
||||
return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords);
|
||||
private Server11Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
|
||||
return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync);
|
||||
}
|
||||
|
||||
private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords) {
|
||||
private Server11Repository makeConstrainedRepository(long maxRequestBytes, long maxPostBytes, long maxPostRecords, long maxTotalBytes, long maxTotalRecords, boolean firstSync) {
|
||||
ExtendedJSONObject infoConfigurationJSON = new ExtendedJSONObject();
|
||||
infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_BYTES, maxTotalBytes);
|
||||
infoConfigurationJSON.put(InfoConfiguration.MAX_TOTAL_RECORDS, maxTotalRecords);
|
||||
|
@ -425,12 +449,29 @@ public class BatchingUploaderTest {
|
|||
|
||||
InfoConfiguration infoConfiguration = new InfoConfiguration(infoConfigurationJSON);
|
||||
|
||||
InfoCollections infoCollections;
|
||||
if (firstSync) {
|
||||
infoCollections = new InfoCollections() {
|
||||
@Override
|
||||
public Long getTimestamp(String collection) {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
infoCollections = new InfoCollections() {
|
||||
@Override
|
||||
public Long getTimestamp(String collection) {
|
||||
return 0L;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
return new Server11Repository(
|
||||
"dummyCollection",
|
||||
"http://dummy.url/",
|
||||
null,
|
||||
new InfoCollections(),
|
||||
infoCollections,
|
||||
infoConfiguration
|
||||
);
|
||||
} catch (URISyntaxException e) {
|
||||
|
@ -438,4 +479,22 @@ public class BatchingUploaderTest {
|
|||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
static abstract class TestRunnableWithTarget<T> {
|
||||
T target;
|
||||
|
||||
TestRunnableWithTarget() {}
|
||||
|
||||
TestRunnableWithTarget<T> setTarget(T target) {
|
||||
this.target = target;
|
||||
return this;
|
||||
}
|
||||
|
||||
TestRunnableWithTarget<T> run() {
|
||||
tests();
|
||||
return this;
|
||||
}
|
||||
|
||||
abstract void tests();
|
||||
}
|
||||
}
|
|
@ -106,32 +106,4 @@ public class PayloadTest {
|
|||
assertTrue(true);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() {
|
||||
assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(payload.getRecordCount() == 0);
|
||||
assertTrue(payload.getRecordsBuffer().isEmpty());
|
||||
assertTrue(payload.getRecordGuidsBuffer().isEmpty());
|
||||
assertTrue(payload.isEmpty());
|
||||
|
||||
// Shouldn't throw even if already empty
|
||||
payload.reset();
|
||||
assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(payload.getRecordCount() == 0);
|
||||
assertTrue(payload.getRecordsBuffer().isEmpty());
|
||||
assertTrue(payload.getRecordGuidsBuffer().isEmpty());
|
||||
assertTrue(payload.isEmpty());
|
||||
|
||||
byte[] recordBytes1 = new byte[100];
|
||||
assertFalse(payload.addAndEstimateIfFull(1, recordBytes1, "guid1"));
|
||||
assertFalse(payload.isEmpty());
|
||||
payload.reset();
|
||||
|
||||
assertTrue(payload.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(payload.getRecordCount() == 0);
|
||||
assertTrue(payload.getRecordsBuffer().isEmpty());
|
||||
assertTrue(payload.getRecordGuidsBuffer().isEmpty());
|
||||
assertTrue(payload.isEmpty());
|
||||
}
|
||||
}
|
|
@ -11,11 +11,14 @@ import org.mozilla.gecko.sync.HTTPFailureException;
|
|||
import org.mozilla.gecko.sync.InfoCollections;
|
||||
import org.mozilla.gecko.sync.InfoConfiguration;
|
||||
import org.mozilla.gecko.sync.NonObjectJSONException;
|
||||
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
|
||||
import org.mozilla.gecko.sync.net.SyncResponse;
|
||||
import org.mozilla.gecko.sync.net.SyncStorageResponse;
|
||||
import org.mozilla.gecko.sync.repositories.RepositorySession;
|
||||
import org.mozilla.gecko.sync.repositories.Server11Repository;
|
||||
import org.mozilla.gecko.sync.repositories.Server11RepositorySession;
|
||||
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.util.ArrayList;
|
||||
|
@ -28,14 +31,16 @@ import ch.boye.httpclientandroidlib.entity.BasicHttpEntity;
|
|||
import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
|
||||
import ch.boye.httpclientandroidlib.message.BasicStatusLine;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(TestRunner.class)
|
||||
public class PayloadUploadDelegateTest {
|
||||
private BatchingUploader batchingUploader;
|
||||
private PayloadDispatcher payloadDispatcher;
|
||||
private AuthHeaderProvider authHeaderProvider;
|
||||
|
||||
class MockUploader extends BatchingUploader {
|
||||
public final ArrayList<String> successRecords = new ArrayList<>();
|
||||
class MockPayloadDispatcher extends PayloadDispatcher {
|
||||
public final HashMap<String, Exception> failedRecords = new HashMap<>();
|
||||
public boolean didLastPayloadFail = false;
|
||||
|
||||
|
@ -43,26 +48,24 @@ public class PayloadUploadDelegateTest {
|
|||
public int commitPayloadsSucceeded = 0;
|
||||
public int lastPayloadsSucceeded = 0;
|
||||
|
||||
public MockUploader(final Server11RepositorySession repositorySession, final Executor workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate) {
|
||||
super(repositorySession, workQueue, sessionStoreDelegate);
|
||||
public int committedGuids = 0;
|
||||
|
||||
public MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader) {
|
||||
super(workQueue, uploader, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void payloadSucceeded(final SyncStorageResponse response, final boolean isCommit, final boolean isLastPayload) {
|
||||
public void payloadSucceeded(final SyncStorageResponse response, String[] guids, final boolean isCommit, final boolean isLastPayload) {
|
||||
successResponses.add(response);
|
||||
if (isCommit) {
|
||||
++commitPayloadsSucceeded;
|
||||
committedGuids += guids.length;
|
||||
}
|
||||
if (isLastPayload) {
|
||||
++lastPayloadsSucceeded;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordSucceeded(final String recordGuid) {
|
||||
successRecords.add(recordGuid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void recordFailed(final String recordGuid) {
|
||||
recordFailed(new Exception(), recordGuid);
|
||||
|
@ -85,14 +88,20 @@ public class PayloadUploadDelegateTest {
|
|||
"dummyCollection",
|
||||
"http://dummy.url/",
|
||||
null,
|
||||
new InfoCollections(),
|
||||
new InfoCollections() {
|
||||
@Override
|
||||
public Long getTimestamp(String collection) {
|
||||
return 0L;
|
||||
}
|
||||
},
|
||||
new InfoConfiguration()
|
||||
);
|
||||
batchingUploader = new MockUploader(
|
||||
new Server11RepositorySession(server11Repository),
|
||||
payloadDispatcher = new MockPayloadDispatcher(
|
||||
null,
|
||||
null
|
||||
mock(BatchingUploader.class)
|
||||
);
|
||||
|
||||
authHeaderProvider = mock(AuthHeaderProvider.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -101,16 +110,16 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
postedGuids.add("testGuid2");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
// Test that non-2* responses aren't processed
|
||||
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(404, null, null));
|
||||
assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -119,16 +128,16 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
postedGuids.add("testGuid2");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
// Test that responses without X-Last-Modified header aren't processed
|
||||
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, null, null));
|
||||
assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -137,16 +146,16 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
postedGuids.add("testGuid2");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, true);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
|
||||
// Test that we catch json processing errors
|
||||
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "non json body", "123"));
|
||||
assertEquals(2, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(NonObjectJSONException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
assertEquals(NonObjectJSONException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -154,13 +163,13 @@ public class PayloadUploadDelegateTest {
|
|||
ArrayList<String> postedGuids = new ArrayList<>(1);
|
||||
postedGuids.add("testGuid1");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, true);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
|
||||
// Test that we catch absent tokens in 202 responses
|
||||
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(202, "{\"success\": []}", "123"));
|
||||
assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -169,19 +178,19 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
// Test that if in batching mode and saw the token, 200 must be a response to a commit
|
||||
try {
|
||||
batchingUploader.getCurrentBatch().setToken("MTIzNA", true);
|
||||
payloadDispatcher.batchWhiteboard.setToken("MTIzNA", true);
|
||||
} catch (BatchingUploader.BatchingUploaderException e) {}
|
||||
batchingUploader.setInBatchingMode(true);
|
||||
payloadDispatcher.setInBatchingMode(true);
|
||||
|
||||
// not a commit, so should fail
|
||||
payloadUploadDelegate.handleRequestSuccess(makeSyncStorageResponse(200, "{\"success\": []}", "123"));
|
||||
assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(IllegalStateException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -191,32 +200,32 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("guid2");
|
||||
postedGuids.add("guid3");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"]}", "123"));
|
||||
assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
|
||||
assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
|
||||
assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(3, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
|
||||
|
||||
// These should fail, because we're returning a non-changed L-M in a non-batching mode
|
||||
postedGuids.add("guid4");
|
||||
postedGuids.add("guid6");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid4\", 5, \"guid6\"]}", "123"));
|
||||
assertEquals(5, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(3, ((MockUploader) batchingUploader).successRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(1, ((MockUploader) batchingUploader).successResponses.size());
|
||||
assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
|
||||
assertEquals(0, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
|
||||
assertEquals(5, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(3, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
|
||||
assertEquals(BatchingUploader.LastModifiedDidNotChange.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("guid4").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("guid4").getClass());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -226,7 +235,7 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("guid2");
|
||||
postedGuids.add("guid3");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
|
||||
|
||||
|
@ -234,94 +243,98 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("guid4");
|
||||
postedGuids.add("guid5");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid4\", \"guid5\"], \"failed\": {}}", "333"));
|
||||
|
||||
postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid6");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, true);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "444"));
|
||||
|
||||
assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(6, ((MockUploader) batchingUploader).successRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(3, ((MockUploader) batchingUploader).successResponses.size());
|
||||
assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
|
||||
assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
|
||||
assertFalse(batchingUploader.getInBatchingMode());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(6, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
|
||||
assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
|
||||
postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid7");
|
||||
postedGuids.add("guid8");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, true);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid8\"], \"failed\": {\"guid7\": \"reason\"}}", "555"));
|
||||
assertEquals(1, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertTrue(((MockUploader) batchingUploader).failedRecords.containsKey("guid7"));
|
||||
assertEquals(7, ((MockUploader) batchingUploader).successRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
|
||||
assertEquals(0, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
|
||||
assertEquals(2, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
|
||||
assertFalse(batchingUploader.getInBatchingMode());
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertTrue(((MockPayloadDispatcher) payloadDispatcher).failedRecords.containsKey("guid7"));
|
||||
assertEquals(7, payloadDispatcher.batchWhiteboard.getSuccessRecordGuids().length);
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
|
||||
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
|
||||
assertFalse(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleRequestSuccessBatching() {
|
||||
assertNull(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
|
||||
ArrayList<String> postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid1");
|
||||
postedGuids.add("guid2");
|
||||
postedGuids.add("guid3");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid1\", \"guid2\", \"guid3\"], \"failed\": {}}", "123"));
|
||||
|
||||
assertTrue(batchingUploader.getInBatchingMode());
|
||||
assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
|
||||
assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
assertEquals("MTIzNA", payloadDispatcher.batchWhiteboard.getToken());
|
||||
|
||||
postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid4");
|
||||
postedGuids.add("guid5");
|
||||
postedGuids.add("guid6");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, false, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(202, "{\"batch\": \"MTIzNA\", \"success\": [\"guid4\", \"guid5\", \"guid6\"], \"failed\": {}}", "123"));
|
||||
|
||||
assertTrue(batchingUploader.getInBatchingMode());
|
||||
assertEquals("MTIzNA", batchingUploader.getCurrentBatch().getToken());
|
||||
assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
assertEquals("MTIzNA", payloadDispatcher.batchWhiteboard.getToken());
|
||||
|
||||
postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid7");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, true, false);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, true, false);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid6\"], \"failed\": {}}", "222"));
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid7\"], \"failed\": {}}", "222"));
|
||||
|
||||
assertEquals(7, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
|
||||
|
||||
// Even though everything indicates we're not in a batching, we were, so test that
|
||||
// we don't reset the flag.
|
||||
assertTrue(batchingUploader.getInBatchingMode());
|
||||
assertNull(batchingUploader.getCurrentBatch().getToken());
|
||||
assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
assertNull(payloadDispatcher.batchWhiteboard.getToken());
|
||||
|
||||
postedGuids = new ArrayList<>();
|
||||
postedGuids.add("guid8");
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, postedGuids, true, true);
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, true, true);
|
||||
payloadUploadDelegate.handleRequestSuccess(
|
||||
makeSyncStorageResponse(200, "{\"success\": [\"guid7\"], \"failed\": {}}", "333"));
|
||||
|
||||
assertEquals(0, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(8, ((MockUploader) batchingUploader).successRecords.size());
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(4, ((MockUploader) batchingUploader).successResponses.size());
|
||||
assertEquals(2, ((MockUploader) batchingUploader).commitPayloadsSucceeded);
|
||||
assertEquals(1, ((MockUploader) batchingUploader).lastPayloadsSucceeded);
|
||||
assertTrue(batchingUploader.getInBatchingMode());
|
||||
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(8, ((MockPayloadDispatcher) payloadDispatcher).committedGuids);
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
assertEquals(4, ((MockPayloadDispatcher) payloadDispatcher).successResponses.size());
|
||||
assertEquals(2, ((MockPayloadDispatcher) payloadDispatcher).commitPayloadsSucceeded);
|
||||
assertEquals(1, ((MockPayloadDispatcher) payloadDispatcher).lastPayloadsSucceeded);
|
||||
assertTrue(payloadDispatcher.batchWhiteboard.getInBatchingMode());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -330,21 +343,23 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
postedGuids.add("testGuid2");
|
||||
postedGuids.add("testGuid3");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
IllegalStateException e = new IllegalStateException();
|
||||
payloadUploadDelegate.handleRequestError(e);
|
||||
|
||||
assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid1"));
|
||||
assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid2"));
|
||||
assertEquals(e, ((MockUploader) batchingUploader).failedRecords.get("testGuid3"));
|
||||
assertFalse(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1"));
|
||||
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2"));
|
||||
assertEquals(e, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3"));
|
||||
assertFalse(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
payloadUploadDelegate.handleRequestError(e);
|
||||
assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -353,35 +368,44 @@ public class PayloadUploadDelegateTest {
|
|||
postedGuids.add("testGuid1");
|
||||
postedGuids.add("testGuid2");
|
||||
postedGuids.add("testGuid3");
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, false);
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, false);
|
||||
|
||||
final HttpResponse response = new BasicHttpResponse(
|
||||
new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 503, "Illegal method/protocol"));
|
||||
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
|
||||
assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertEquals(HTTPFailureException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid1").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid1").getClass());
|
||||
assertEquals(HTTPFailureException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid2").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid2").getClass());
|
||||
assertEquals(HTTPFailureException.class,
|
||||
((MockUploader) batchingUploader).failedRecords.get("testGuid3").getClass());
|
||||
((MockPayloadDispatcher) payloadDispatcher).failedRecords.get("testGuid3").getClass());
|
||||
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(batchingUploader, postedGuids, false, true);
|
||||
payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
authHeaderProvider, payloadDispatcher, postedGuids, false, true);
|
||||
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
|
||||
assertEquals(3, ((MockUploader) batchingUploader).failedRecords.size());
|
||||
assertTrue(((MockUploader) batchingUploader).didLastPayloadFail);
|
||||
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
|
||||
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfUnmodifiedSince() {
|
||||
public void testIfUnmodifiedSinceNoLM() {
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
batchingUploader, new ArrayList<String>(), false, false);
|
||||
authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);
|
||||
|
||||
assertNull(payloadUploadDelegate.ifUnmodifiedSince());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIfUnmodifiedSinceWithLM() {
|
||||
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(
|
||||
authHeaderProvider, payloadDispatcher, new ArrayList<String>(), false, false);
|
||||
try {
|
||||
batchingUploader.getCurrentBatch().setLastModified(1471645412480L, true);
|
||||
} catch (BatchingUploader.BatchingUploaderException e) {}
|
||||
payloadDispatcher.batchWhiteboard.setLastModified(1471645412480L, true);
|
||||
} catch (BatchingUploader.BatchingUploaderException e) {
|
||||
fail();
|
||||
}
|
||||
|
||||
assertEquals("1471645412.480", payloadUploadDelegate.ifUnmodifiedSince());
|
||||
}
|
||||
|
|
|
@ -17,22 +17,20 @@ import static org.junit.Assert.*;
|
|||
public class RecordUploadRunnableTest {
|
||||
@Test
|
||||
public void testBuildPostURI() throws Exception {
|
||||
BatchMeta batchMeta = new BatchMeta(new Object(), 1, 1, null);
|
||||
URI postURI = RecordUploadRunnable.buildPostURI(
|
||||
false, batchMeta, Uri.parse("http://example.com/"));
|
||||
false, null, Uri.parse("http://example.com/"));
|
||||
assertEquals("http://example.com/?batch=true", postURI.toString());
|
||||
|
||||
postURI = RecordUploadRunnable.buildPostURI(
|
||||
true, batchMeta, Uri.parse("http://example.com/"));
|
||||
true, null, Uri.parse("http://example.com/"));
|
||||
assertEquals("http://example.com/?batch=true&commit=true", postURI.toString());
|
||||
|
||||
batchMeta.setToken("MTIzNA", false);
|
||||
postURI = RecordUploadRunnable.buildPostURI(
|
||||
false, batchMeta, Uri.parse("http://example.com/"));
|
||||
false, "MTIzNA", Uri.parse("http://example.com/"));
|
||||
assertEquals("http://example.com/?batch=MTIzNA", postURI.toString());
|
||||
|
||||
postURI = RecordUploadRunnable.buildPostURI(
|
||||
true, batchMeta, Uri.parse("http://example.com/"));
|
||||
true, "MTIzNA", Uri.parse("http://example.com/"));
|
||||
assertEquals("http://example.com/?batch=MTIzNA&commit=true", postURI.toString());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/* Any copyright is dedicated to the Public Domain.
|
||||
http://creativecommons.org/publicdomain/zero/1.0/ */
|
||||
|
||||
package org.mozilla.gecko.sync.repositories.uploaders;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mozilla.gecko.background.testhelpers.TestRunner;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@RunWith(TestRunner.class)
|
||||
public class UploaderMetaTest {
|
||||
private UploaderMeta uploaderMeta;
|
||||
private long byteLimit = 1024;
|
||||
private long recordLimit = 5;
|
||||
private Object lock = new Object();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
uploaderMeta = new UploaderMeta(lock, byteLimit, recordLimit);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testByteLimits() {
|
||||
assertTrue(uploaderMeta.canFit(0));
|
||||
|
||||
// Should just fit
|
||||
assertTrue(uploaderMeta.canFit(byteLimit - BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
|
||||
// Can't fit a record due to payload overhead.
|
||||
assertFalse(uploaderMeta.canFit(byteLimit));
|
||||
|
||||
assertFalse(uploaderMeta.canFit(byteLimit + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertFalse(uploaderMeta.canFit(byteLimit * 1000));
|
||||
|
||||
long recordDelta = byteLimit / 2;
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(recordDelta));
|
||||
|
||||
// Record delta shouldn't fit due to payload overhead.
|
||||
assertFalse(uploaderMeta.canFit(recordDelta));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountLimits() {
|
||||
// Our record limit is 5, let's add 4.
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
// 5th record still fits in
|
||||
assertTrue(uploaderMeta.canFit(1));
|
||||
|
||||
// Add the 5th record
|
||||
assertTrue(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
// 6th record won't fit
|
||||
assertFalse(uploaderMeta.canFit(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNeedCommit() {
|
||||
assertFalse(uploaderMeta.needToCommit());
|
||||
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(uploaderMeta.needToCommit());
|
||||
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(uploaderMeta.needToCommit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAdd() {
|
||||
// Ensure we account for payload overhead twice when the batch is empty.
|
||||
// Payload overhead is either RECORDS_START or RECORDS_END, and for an empty payload
|
||||
// we need both.
|
||||
assertTrue(uploaderMeta.getByteCount() == 2 * BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT);
|
||||
assertTrue(uploaderMeta.getRecordCount() == 0);
|
||||
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(uploaderMeta.getByteCount() == (1 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertTrue(uploaderMeta.getRecordCount() == 1);
|
||||
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
assertFalse(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
assertTrue(uploaderMeta.getByteCount() == (4 + BatchingUploader.PER_PAYLOAD_OVERHEAD_BYTE_COUNT));
|
||||
assertTrue(uploaderMeta.getRecordCount() == 4);
|
||||
|
||||
assertTrue(uploaderMeta.addAndEstimateIfFull(1));
|
||||
|
||||
try {
|
||||
assertTrue(uploaderMeta.addAndEstimateIfFull(1));
|
||||
fail("BatchMeta should not let us insert records that won't fit");
|
||||
} catch (IllegalStateException e) {
|
||||
assertTrue(true);
|
||||
}
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче