Bug 1291821 - Allow BatchingDownloader to resume downloads using offset r=rnewman

BatchingDownloader uses provided RepositoryStateProvider instance in order to track
offset and high water mark as it performs batching.

The state holder objects are initialized by individual ServerSyncStages, and prefixes are used to ensure keys
won't clash.

Two RepositoryStateProvider implementations are used: persistent and non-persistent. Non-persistent
state provider does not allow for resuming after a sync restart, while persistent one does.

Persistent state provider is used by the history stage. It is fetched oldest-first, and records are applied
to live storage as they're downloaded. These conditions let use resume downloads. It's also possible to
resume downloads for stages which use a persistent buffer, but currently we do not have any.

Offset value and its context is reset if we hit a 412 error; it is maintained if we hit a sync deadline, allowing us to
minimize number of records we'll redownload. BatchingDownloaderController owns resuming and context checking logic.

High water mark (h.w.m.) is maintained across syncs and used instead of stage's "last-synced" timestamp if said stage is
set to fetch oldest-first and explicitely allows use of a h.w.m. Server15RepositorySession provides correct timestamp
to RecordsChannel, decoupling BatchingDownloader from this logic.

MozReview-Commit-ID: IH28YrDU4vW

--HG--
extra : rebase_source : 63bd7daaa1fd2a63e10289d6d4cd198aaf81498b
This commit is contained in:
Grisha Kruglov 2017-01-19 13:11:18 -08:00
Родитель 976fe61ec1
Коммит 283d1a3450
26 изменённых файлов: 1015 добавлений и 146 удалений

Просмотреть файл

@ -997,6 +997,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/repositories/domain/TabsRecordFactory.java',
'sync/repositories/domain/VersionConstants.java',
'sync/repositories/downloaders/BatchingDownloader.java',
'sync/repositories/downloaders/BatchingDownloaderController.java',
'sync/repositories/downloaders/BatchingDownloaderDelegate.java',
'sync/repositories/FetchFailedException.java',
'sync/repositories/HashSetStoreTracker.java',
@ -1009,15 +1010,18 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/repositories/MultipleRecordsForGuidException.java',
'sync/repositories/NoContentProviderException.java',
'sync/repositories/NoGuidForIdException.java',
'sync/repositories/NonPersistentRepositoryStateProvider.java',
'sync/repositories/NoStoreDelegateException.java',
'sync/repositories/NullCursorException.java',
'sync/repositories/ParentNotFoundException.java',
'sync/repositories/PersistentRepositoryStateProvider.java',
'sync/repositories/ProfileDatabaseException.java',
'sync/repositories/RecordFactory.java',
'sync/repositories/RecordFilter.java',
'sync/repositories/Repository.java',
'sync/repositories/RepositorySession.java',
'sync/repositories/RepositorySessionBundle.java',
'sync/repositories/RepositoryStateProvider.java',
'sync/repositories/Server15Repository.java',
'sync/repositories/Server15RepositorySession.java',
'sync/repositories/StoreFailedException.java',

Просмотреть файл

@ -158,11 +158,6 @@ import java.util.concurrent.Executors;
this.storeDelegate = delegate;
}
@Override
public long getHighWaterMarkTimestamp() {
return bufferStorage.latestModifiedTimestamp();
}
private boolean mayProceedToMergeBuffer() {
// If our buffer storage is not persistent, disallowing merging after buffer has been filled
// means throwing away records only to re-download them later.

Просмотреть файл

@ -27,9 +27,5 @@ public interface BufferStorage {
void clear();
// For buffers that are filled up oldest-first this is a high water mark, which enables resuming
// a sync.
long latestModifiedTimestamp();
boolean isPersistent();
}

Просмотреть файл

@ -48,23 +48,4 @@ public class MemoryBufferStorage implements BufferStorage {
public void clear() {
recordBuffer.clear();
}
@Override
public long latestModifiedTimestamp() {
long lastModified = 0;
synchronized (recordBuffer) {
if (recordBuffer.size() == 0) {
return lastModified;
}
for (Record record : recordBuffer.values()) {
if (record.lastModified > lastModified) {
lastModified = record.lastModified;
}
}
}
return lastModified;
}
}

Просмотреть файл

@ -9,10 +9,15 @@ import java.net.URISyntaxException;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.stage.ServerSyncStage;
/**
* A kind of Server15Repository that supports explicit setting of per-batch fetch limit,
* batching mode (single batch vs multi-batch), and a sort order.
* A kind of Server15Repository that supports explicit setting of:
* - per-batch fetch limit
* - batching mode (single batch vs multi-batch)
* - sort order
* - repository state provider (persistent vs non-persistent)
* - whereas use of high-water-mark is allowed
*
* @author rnewman
*
@ -20,7 +25,8 @@ import org.mozilla.gecko.sync.net.AuthHeaderProvider;
public class ConfigurableServer15Repository extends Server15Repository {
private final String sortOrder;
private final long batchLimit;
private final boolean allowMultipleBatches;
private final ServerSyncStage.MultipleBatches multipleBatches;
private final ServerSyncStage.HighWaterMark highWaterMark;
public ConfigurableServer15Repository(
String collection,
@ -31,18 +37,29 @@ public class ConfigurableServer15Repository extends Server15Repository {
InfoConfiguration infoConfiguration,
long batchLimit,
String sort,
boolean allowMultipleBatches) throws URISyntaxException {
ServerSyncStage.MultipleBatches multipleBatches,
ServerSyncStage.HighWaterMark highWaterMark,
RepositoryStateProvider stateProvider) throws URISyntaxException {
super(
collection,
syncDeadline,
storageURL,
authHeaderProvider,
infoCollections,
infoConfiguration
infoConfiguration,
stateProvider
);
this.batchLimit = batchLimit;
this.sortOrder = sort;
this.allowMultipleBatches = allowMultipleBatches;
this.multipleBatches = multipleBatches;
this.highWaterMark = highWaterMark;
// Sanity check: let's ensure we're configured correctly. At this point in time, it doesn't make
// sense to use H.W.M. with a non-persistent state provider. This might change if we start retrying
// during a download in case of 412s.
if (!stateProvider.isPersistent() && highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled)) {
throw new IllegalArgumentException("Can not use H.W.M. with NonPersistentRepositoryStateProvider");
}
}
@Override
@ -57,6 +74,11 @@ public class ConfigurableServer15Repository extends Server15Repository {
@Override
public boolean getAllowMultipleBatches() {
return allowMultipleBatches;
return multipleBatches.equals(ServerSyncStage.MultipleBatches.Enabled);
}
@Override
public boolean getAllowHighWaterMark() {
return highWaterMark.equals(ServerSyncStage.HighWaterMark.Enabled);
}
}

Просмотреть файл

@ -0,0 +1,76 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
import android.support.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* Simple non-persistent implementation of a repository state provider.
*
* Just like in the persistent implementation, changes to values are visible only after a commit.
*
* @author grisha
*/
public class NonPersistentRepositoryStateProvider implements RepositoryStateProvider {
// We'll have at least OFFSET and H.W.M. values set.
private final int INITIAL_CAPACITY = 2;
private final Map<String, Object> nonCommittedValuesMap = Collections.synchronizedMap(
new HashMap<String, Object>(INITIAL_CAPACITY)
);
// NB: Any changes are made by creating a new map instead of altering an existing one.
private volatile Map<String, Object> committedValuesMap = new HashMap<>(INITIAL_CAPACITY);
@Override
public boolean isPersistent() {
return false;
}
@Override
public boolean commit() {
committedValuesMap = new HashMap<>(nonCommittedValuesMap);
return true;
}
@Override
public NonPersistentRepositoryStateProvider clear(String key) {
nonCommittedValuesMap.remove(key);
return this;
}
@Override
public NonPersistentRepositoryStateProvider setString(String key, String value) {
nonCommittedValuesMap.put(key, value);
return this;
}
@Nullable
@Override
public String getString(String key) {
return (String) committedValuesMap.get(key);
}
@Override
public NonPersistentRepositoryStateProvider setLong(String key, Long value) {
nonCommittedValuesMap.put(key, value);
return this;
}
@Nullable
@Override
public Long getLong(String key) {
return (Long) committedValuesMap.get(key);
}
@Override
public boolean resetAndCommit() {
nonCommittedValuesMap.clear();
return commit();
}
}

Просмотреть файл

@ -0,0 +1,83 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
import android.support.annotation.Nullable;
import org.mozilla.gecko.background.common.PrefsBranch;
/**
* Simple persistent implementation of a repository state provider.
* Uses provided PrefsBranch object in order to persist values.
*
* Values must be committed before they become visible via getters.
* It is a caller's responsibility to perform a commit.
*
* @author grisha
*/
public class PersistentRepositoryStateProvider implements RepositoryStateProvider {
private final PrefsBranch prefs;
private final PrefsBranch.Editor editor;
public PersistentRepositoryStateProvider(PrefsBranch prefs) {
this.prefs = prefs;
// NB: It is a caller's responsibility to commit any changes it performs via setters.
this.editor = prefs.edit();
}
@Override
public boolean isPersistent() {
return true;
}
@Override
public boolean commit() {
return this.editor.commit();
}
@Override
public PersistentRepositoryStateProvider clear(String key) {
this.editor.remove(key);
return this;
}
@Override
public PersistentRepositoryStateProvider setString(String key, String value) {
this.editor.putString(key, value);
return this;
}
@Nullable
@Override
public String getString(String key) {
return this.prefs.getString(key, null);
}
@Override
public PersistentRepositoryStateProvider setLong(String key, Long value) {
this.editor.putLong(key, value);
return this;
}
@Nullable
@Override
public Long getLong(String key) {
if (!this.prefs.contains(key)) {
return null;
}
return this.prefs.getLong(key, 0);
}
@Override
public boolean resetAndCommit() {
return this.editor
.remove(KEY_HIGH_WATER_MARK)
.remove(KEY_OFFSET)
.remove(KEY_OFFSET_ORDER)
.remove(KEY_OFFSET_SINCE)
.commit();
}
}

Просмотреть файл

@ -76,11 +76,6 @@ public abstract class RepositorySession {
return lastSyncTimestamp;
}
// Override this in the buffering wrappers.
public long getHighWaterMarkTimestamp() {
return 0;
}
public static long now() {
return System.currentTimeMillis();
}

Просмотреть файл

@ -0,0 +1,47 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories;
import android.support.annotation.CheckResult;
import android.support.annotation.Nullable;
/**
* Interface describing a repository state provider.
* Repository's state might consist of a number of key-value pairs.
*
* Currently there are two types of implementations: persistent and non-persistent state.
* Persistent state survives between syncs, and is currently used by the BatchingDownloader to
* resume downloads in case of interruptions. Non-persistent state is used when resuming downloads
* is not possible.
*
* In order to safely use a persistent state provider for resuming downloads, a sync stage must match
* the following criteria:
* - records are downloaded with sort=oldest
* - records must be downloaded into a persistent buffer, or applied to live storage
*
* @author grisha
*/
public interface RepositoryStateProvider {
String KEY_HIGH_WATER_MARK = "highWaterMark";
String KEY_OFFSET = "offset";
String KEY_OFFSET_SINCE = "offsetSince";
String KEY_OFFSET_ORDER = "offsetOrder";
boolean isPersistent();
@CheckResult
boolean commit();
RepositoryStateProvider clear(String key);
RepositoryStateProvider setString(String key, String value);
@Nullable String getString(String key);
RepositoryStateProvider setLong(String key, Long value);
@Nullable Long getLong(String key);
@CheckResult
boolean resetAndCommit();
}

Просмотреть файл

@ -31,6 +31,8 @@ public class Server15Repository extends Repository {
protected final String collection;
protected final InfoCollections infoCollections;
protected RepositoryStateProvider stateProvider;
private final InfoConfiguration infoConfiguration;
private final static String DEFAULT_SORT_ORDER = "oldest";
private final static long DEFAULT_BATCH_LIMIT = 100;
@ -50,7 +52,8 @@ public class Server15Repository extends Repository {
@NonNull String storageURL,
AuthHeaderProvider authHeaderProvider,
@NonNull InfoCollections infoCollections,
@NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
@NonNull InfoConfiguration infoConfiguration,
@NonNull RepositoryStateProvider stateProvider) throws URISyntaxException {
if (collection == null) {
throw new IllegalArgumentException("collection must not be null");
}
@ -66,6 +69,7 @@ public class Server15Repository extends Repository {
this.authHeaderProvider = authHeaderProvider;
this.infoCollections = infoCollections;
this.infoConfiguration = infoConfiguration;
this.stateProvider = stateProvider;
}
@Override
@ -103,6 +107,10 @@ public class Server15Repository extends Repository {
return true;
}
public boolean getAllowHighWaterMark() {
return false;
}
/**
* A point in time by which this repository's session must complete fetch and store operations.
* Particularly pertinent for batching downloads performed by the session (should we fetch

Просмотреть файл

@ -13,6 +13,7 @@ import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDeleg
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionWipeDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloader;
import org.mozilla.gecko.sync.repositories.downloaders.BatchingDownloaderController;
import org.mozilla.gecko.sync.repositories.uploaders.BatchingUploader;
public class Server15RepositorySession extends RepositorySession {
@ -25,7 +26,14 @@ public class Server15RepositorySession extends RepositorySession {
public Server15RepositorySession(Repository repository) {
super(repository);
this.serverRepository = (Server15Repository) repository;
this.downloader = initializeDownloader(this);
this.downloader = new BatchingDownloader(
this.serverRepository.authHeaderProvider,
Uri.parse(this.serverRepository.collectionURI().toString()),
this.serverRepository.getSyncDeadline(),
this.serverRepository.getAllowMultipleBatches(),
this.serverRepository.getAllowHighWaterMark(),
this.serverRepository.stateProvider,
this);
}
@Override
@ -49,7 +57,9 @@ public class Server15RepositorySession extends RepositorySession {
@Override
public void fetchSince(long sinceTimestamp,
RepositorySessionFetchRecordsDelegate delegate) {
this.downloader.fetchSince(
BatchingDownloaderController.resumeFetchSinceIfPossible(
this.downloader,
this.serverRepository.stateProvider,
delegate,
sinceTimestamp,
serverRepository.getBatchLimit(),
@ -103,17 +113,32 @@ public class Server15RepositorySession extends RepositorySession {
uploader.noMoreRecordsToUpload();
}
/**
* @return Repository's high-water-mark if it's available, its use is allowed by the repository,
* repository is set to fetch oldest-first, and it's greater than collection's last-synced timestamp.
* Otherwise, returns repository's last-synced timestamp.
*/
@Override
public long getLastSyncTimestamp() {
if (!serverRepository.getAllowHighWaterMark() || !serverRepository.getSortOrder().equals("oldest")) {
return super.getLastSyncTimestamp();
}
final Long highWaterMark = serverRepository.stateProvider.getLong(
RepositoryStateProvider.KEY_HIGH_WATER_MARK);
// After a successful sync we expect that last-synced timestamp for a collection will be greater
// than the high-water-mark. High-water-mark is mostly useful in case of resuming a sync,
// and if we're resuming we did not bump our last-sync timestamps during the previous sync.
if (highWaterMark == null || super.getLastSyncTimestamp() > highWaterMark) {
return super.getLastSyncTimestamp();
}
return highWaterMark;
}
@Override
public boolean dataAvailable() {
return serverRepository.updateNeeded(getLastSyncTimestamp());
}
protected static BatchingDownloader initializeDownloader(final Server15RepositorySession serverRepositorySession) {
return new BatchingDownloader(
serverRepositorySession.serverRepository.authHeaderProvider,
Uri.parse(serverRepositorySession.serverRepository.collectionURI().toString()),
serverRepositorySession.serverRepository.getSyncDeadline(),
serverRepositorySession.serverRepository.getAllowMultipleBatches(),
serverRepositorySession);
}
}

Просмотреть файл

@ -191,6 +191,28 @@ public class AndroidBrowserHistoryRepositorySession extends AndroidBrowserReposi
storeDone(System.currentTimeMillis());
}
/**
* We need to flush our internal buffer of records in case of any interruptions of record flow
* from our "source". Downloader might be maintaining a "high-water-mark" based on the records
* it tried to store, so it's pertinent that all of the records that were queued for storage
* are eventually persisted.
*/
@Override
public void storeIncomplete() {
storeWorkQueue.execute(new Runnable() {
@Override
public void run() {
synchronized (recordsBufferMonitor) {
try {
flushNewRecords();
} catch (Exception e) {
Logger.warn(LOG_TAG, "Error flushing records to database.", e);
}
}
}
});
}
@Override
public void storeDone(final long end) {
storeWorkQueue.execute(new Runnable() {

Просмотреть файл

@ -20,6 +20,7 @@ import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import java.io.UnsupportedEncodingException;
@ -42,7 +43,7 @@ import java.util.concurrent.TimeUnit;
* we've received so far, and we perform an additional fetch, if we're allowed to do so by our
* configuration. Batching stops when offset token is no longer present (indicating that we're done).
*
* If we are not allowed to perform multiple batches, we consider batching to be succesfully complete
* If we are not allowed to perform multiple batches, we consider batching to be successfully completed
* after fist fetch request succeeds. Similarly, a trivial case of collection having less records than
* the batch limit will also successfully complete in one fetch.
*
@ -61,6 +62,9 @@ public class BatchingDownloader {
private final Uri baseCollectionUri;
private final long fetchDeadline;
private final boolean allowMultipleBatches;
private final boolean keepTrackOfHighWaterMark;
private RepositoryStateProvider stateProvider;
/* package-local */ final AuthHeaderProvider authHeaderProvider;
@ -74,12 +78,16 @@ public class BatchingDownloader {
Uri baseCollectionUri,
long fetchDeadline,
boolean allowMultipleBatches,
boolean keepTrackOfHighWaterMark,
RepositoryStateProvider stateProvider,
RepositorySession repositorySession) {
this.repositorySession = repositorySession;
this.authHeaderProvider = authHeaderProvider;
this.baseCollectionUri = baseCollectionUri;
this.allowMultipleBatches = allowMultipleBatches;
this.keepTrackOfHighWaterMark = keepTrackOfHighWaterMark;
this.fetchDeadline = fetchDeadline;
this.stateProvider = stateProvider;
}
@VisibleForTesting
@ -130,11 +138,6 @@ public class BatchingDownloader {
return new SyncStorageCollectionRequest(collectionURI);
}
public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder) {
this.fetchSince(fetchRecordsDelegate, timestamp, batchLimit, sortOrder, null);
}
@VisibleForTesting
public void fetchSince(RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, long timestamp, long batchLimit, String sortOrder, String offset) {
try {
SyncStorageCollectionRequest request = makeSyncStorageCollectionRequest(timestamp,
@ -199,6 +202,16 @@ public class BatchingDownloader {
final long normalizedTimestamp = response.normalizedTimestampForHeader(SyncResponse.X_LAST_MODIFIED);
Logger.debug(LOG_TAG, "Fetch completed. Timestamp is " + normalizedTimestamp);
// This isn't great, but shouldn't be too problematic - but do see notes below.
// Failing to reset a resume context after we're done with batching means that on next
// sync we'll erroneously try to resume downloading. If resume proceeds, we will fetch
// from an older timestamp, but offset by the amount of records we've fetched prior.
// Since we're diligent about setting a X-I-U-S header, any remote collection changes
// will be caught and we'll receive a 412.
if (!BatchingDownloaderController.resetResumeContextAndCommit(this.stateProvider)) {
Logger.warn(LOG_TAG, "Failed to reset resume context while completing a batch");
}
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
@ -209,6 +222,19 @@ public class BatchingDownloader {
return;
}
// This is unfortunate, but largely just means that in case we need to resume later on, it
// either won't be possible (and we'll fetch w/o resuming), or won't be as efficient (i.e.
// we'll download more records than necessary).
if (BatchingDownloaderController.isResumeContextSet(this.stateProvider)) {
if (!BatchingDownloaderController.updateResumeContextAndCommit(this.stateProvider, offset)) {
Logger.warn(LOG_TAG, "Failed to update resume context while processing a batch.");
}
} else {
if (!BatchingDownloaderController.setInitialResumeContextAndCommit(this.stateProvider, offset, newer, sort)) {
Logger.warn(LOG_TAG, "Failed to set initial resume context while processing a batch.");
}
}
// We need to make another batching request!
// Let the delegate know that a batch fetch just completed before we proceed.
// This operation needs to run after every call to onFetchedRecord for this batch has been
@ -233,6 +259,9 @@ public class BatchingDownloader {
limit, full, sort, ids, offset);
this.fetchWithParameters(newer, limit, full, sort, ids, newRequest, fetchRecordsDelegate);
} catch (final URISyntaxException | UnsupportedEncodingException e) {
if (!this.stateProvider.commit()) {
Logger.warn(LOG_TAG, "Failed to commit repository state while handling request creation error");
}
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
@ -253,6 +282,25 @@ public class BatchingDownloader {
@Nullable final SyncStorageCollectionRequest request) {
this.removeRequestFromPending(request);
this.abortRequests();
// Resume context is not discarded if we failed because of reaching our deadline. In this case,
// we keep it allowing us to resume our download exactly where we left off.
// Discard resume context for all other failures: 412 (concurrent modification), HTTP errors, ...
if (!(ex instanceof SyncDeadlineReachedException)) {
// Failing to reset context means that we will try to resume once we re-sync current stage.
// This won't affect X-I-U-S logic in case of 412 (it's set separately from resume context),
// and same notes apply after failing to reset context in onFetchCompleted (see above).
if (!BatchingDownloaderController.resetResumeContextAndCommit(stateProvider)) {
Logger.warn(LOG_TAG, "Failed to reset resume context while processing a non-deadline exception");
}
} else {
// Failing to commit the context here means that we didn't commit the latest high-water-mark,
// and won't be as efficient once we re-sync. That is, we might download more records than necessary.
if (!this.stateProvider.commit()) {
Logger.warn(LOG_TAG, "Failed to commit resume context while processing a deadline exception");
}
}
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
@ -265,8 +313,13 @@ public class BatchingDownloader {
public void onFetchedRecord(CryptoRecord record,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate) {
this.workTracker.incrementOutstanding();
try {
fetchRecordsDelegate.onFetchedRecord(record);
// NB: changes to stateProvider are committed in either onFetchCompleted or handleFetchFailed.
if (this.keepTrackOfHighWaterMark) {
this.stateProvider.setLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK, record.lastModified);
}
} catch (Exception ex) {
Logger.warn(LOG_TAG, "Got exception calling onFetchedRecord with WBO.", ex);
throw new RuntimeException(ex);

Просмотреть файл

@ -0,0 +1,118 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.downloaders;
import android.support.annotation.CheckResult;
import android.util.Log;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
/**
* Encapsulates logic for resuming batching downloads.
*
* It's possible to resume a batching download if we have an offset token and the context in which
* we obtained the offset token did not change. Namely, we ensure that `since` and `order` parameters
* remain the same if offset is being used. See Bug 1330839 for a discussion on this.
*
* @author grisha
*/
public class BatchingDownloaderController {
private final static String LOG_TAG = "BatchingDownloaderCtrl";
private BatchingDownloaderController() {}
private static class ResumeContext {
private final String offset;
private final Long since;
private final String order;
private ResumeContext(String offset, Long since, String order) {
this.offset = offset;
this.since = since;
this.order = order;
}
}
private static ResumeContext getResumeContext(RepositoryStateProvider stateProvider, Long since, String order) {
// Build a "default" context around passed-in values if no context is available.
if (!isResumeContextSet(stateProvider)) {
return new ResumeContext(null, since, order);
}
final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
// If context is still valid, we can use it!
if (order.equals(offsetOrder)) {
return new ResumeContext(offset, offsetSince, offsetOrder);
}
// Build a "default" context around passed-in values.
return new ResumeContext(null, since, order);
}
/**
* Resumes a fetch if there is an offset present, and offset's context matches provided values.
* Otherwise, performs a regular fetch.
*/
public static void resumeFetchSinceIfPossible(
BatchingDownloader downloader,
RepositoryStateProvider stateProvider,
RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
long since, long limit, String order) {
ResumeContext resumeContext = getResumeContext(stateProvider, since, order);
downloader.fetchSince(
fetchRecordsDelegate,
resumeContext.since,
limit,
resumeContext.order,
resumeContext.offset
);
}
@CheckResult
/* package-local */ static boolean setInitialResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset, long since, String order) {
if (isResumeContextSet(stateProvider)) {
throw new IllegalStateException("Not allowed to set resume context more than once. Use update instead.");
}
return stateProvider
.setString(RepositoryStateProvider.KEY_OFFSET, offset)
.setLong(RepositoryStateProvider.KEY_OFFSET_SINCE, since)
.setString(RepositoryStateProvider.KEY_OFFSET_ORDER, order)
.commit();
}
@CheckResult
/* package-local */ static boolean updateResumeContextAndCommit(RepositoryStateProvider stateProvider, String offset) {
if (!isResumeContextSet(stateProvider)) {
throw new IllegalStateException("Tried to update resume context before it was set.");
}
return stateProvider
.setString(RepositoryStateProvider.KEY_OFFSET, offset)
.commit();
}
@CheckResult
/* package-local */ static boolean resetResumeContextAndCommit(RepositoryStateProvider stateProvider) {
return stateProvider
.clear(RepositoryStateProvider.KEY_OFFSET)
.clear(RepositoryStateProvider.KEY_OFFSET_SINCE)
.clear(RepositoryStateProvider.KEY_OFFSET_ORDER)
.commit();
}
/*package-local */ static boolean isResumeContextSet(RepositoryStateProvider stateProvider) {
final String offset = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET);
final Long offsetSince = stateProvider.getLong(RepositoryStateProvider.KEY_OFFSET_SINCE);
final String offsetOrder = stateProvider.getString(RepositoryStateProvider.KEY_OFFSET_ORDER);
return offset != null && offsetSince != null && offsetOrder != null;
}
}

Просмотреть файл

@ -10,6 +10,7 @@ import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserBookmarksRepository;
@ -39,6 +40,27 @@ public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
return VersionConstants.BOOKMARKS_ENGINE_VERSION;
}
/**
* We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
* Once this stage is using a persistent buffer, this should change. See Bug 1318515.
*
* @return HighWaterMark.Disabled
*/
@Override
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Disabled;
}
/**
* Full batching is allowed, because we want all of the records.
*
* @return MultipleBatches.Enabled
*/
@Override
protected MultipleBatches getAllowedMultipleBatches() {
return MultipleBatches.Enabled;
}
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
return new ConfigurableServer15Repository(
@ -50,7 +72,10 @@ public class AndroidBrowserBookmarksServerSyncStage extends ServerSyncStage {
session.config.infoConfiguration,
BOOKMARKS_BATCH_LIMIT,
BOOKMARKS_SORT,
true /* allow multiple batches */);
getAllowedMultipleBatches(),
getAllowedToUseHighWaterMark(),
getRepositoryStateProvider()
);
}
@Override

Просмотреть файл

@ -8,8 +8,10 @@ import java.net.URISyntaxException;
import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
import org.mozilla.gecko.sync.repositories.domain.HistoryRecordFactory;
import org.mozilla.gecko.sync.repositories.domain.VersionConstants;
@ -42,6 +44,42 @@ public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
return new AndroidBrowserHistoryRepository();
}
/**
* We use a persistent state provider for this stage, because it lets us resume interrupted
* syncs more efficiently.
* We are able to do this because we match criteria described in {@link RepositoryStateProvider}.
*
* @return Persistent repository state provider.
*/
@Override
protected RepositoryStateProvider getRepositoryStateProvider() {
return new PersistentRepositoryStateProvider(
session.config.getBranch(statePreferencesPrefix())
);
}
/**
* We're downloading records oldest-first directly into live storage, forgoing any buffering other
* than AndroidBrowserHistoryRepository's internal records queue. These conditions allow us to use
* high-water-mark to resume downloads in case of interruptions.
*
* @return HighWaterMark.Enabled
*/
@Override
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Enabled;
}
/**
* Full batching is allowed, because we want all of the records.
*
* @return MultipleBatches.Enabled
*/
@Override
protected MultipleBatches getAllowedMultipleBatches() {
return MultipleBatches.Enabled;
}
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
return new ConfigurableServer15Repository(
@ -53,7 +91,10 @@ public class AndroidBrowserHistoryServerSyncStage extends ServerSyncStage {
session.config.infoConfiguration,
HISTORY_BATCH_LIMIT,
HISTORY_SORT,
true /* allow multiple batches */);
getAllowedMultipleBatches(),
getAllowedToUseHighWaterMark(),
getRepositoryStateProvider()
);
}
@Override

Просмотреть файл

@ -10,7 +10,10 @@ import org.mozilla.gecko.sync.SynchronizerConfiguration;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.android.AndroidBrowserHistoryRepository;
import java.io.IOException;
@ -26,7 +29,7 @@ import java.net.URISyntaxException;
public class AndroidBrowserRecentHistoryServerSyncStage extends AndroidBrowserHistoryServerSyncStage {
protected static final String LOG_TAG = "RecentHistoryStage";
// TODO: Bug 1316110 tracks follow up work to make this stage more efficient.
// Bug 1316110 tracks follow up work to generalize this stage and make it more efficient.
private static final int HISTORY_BATCH_LIMIT = 50;
// We need a custom configuration bundle name for this stage, because we want to track last-synced
// timestamp for this stage separately from that of a full history sync stage, yet their collection
@ -39,6 +42,37 @@ public class AndroidBrowserRecentHistoryServerSyncStage extends AndroidBrowserHi
return BUNDLE_NAME;
}
/**
* We use a non-persistent state provider for this stage, as it's designed to just run once.
*
* @return Non-persistent repository state provider.
*/
@Override
protected RepositoryStateProvider getRepositoryStateProvider() {
return new NonPersistentRepositoryStateProvider();
}
/**
* Force download to be limited to a single batch.
* We just to want fetch a batch-worth of records for this stage.
*
* @return MultipleBatches.Disabled
*/
@Override
protected MultipleBatches getAllowedMultipleBatches() {
return MultipleBatches.Disabled;
}
/**
* Right now this stage is designed to run just once, when there's no history data available.
*
* @return HighWaterMark.Disabled
*/
@Override
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Disabled;
}
@Override
protected Repository getLocalRepository() {
return new BufferingMiddlewareRepository(
@ -59,7 +93,9 @@ public class AndroidBrowserRecentHistoryServerSyncStage extends AndroidBrowserHi
session.config.infoConfiguration,
HISTORY_BATCH_LIMIT,
HISTORY_SORT,
false /* force single batch only */);
getAllowedMultipleBatches(),
getAllowedToUseHighWaterMark(),
getRepositoryStateProvider());
}
/**

Просмотреть файл

@ -10,6 +10,7 @@ import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.middleware.BufferingMiddlewareRepository;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.android.FormHistoryRepositorySession;
@ -39,6 +40,27 @@ public class FormHistoryServerSyncStage extends ServerSyncStage {
return VersionConstants.FORMS_ENGINE_VERSION;
}
/**
* We're downloading records into a non-persistent buffer for safety, so we can't use a H.W.M.
* Once this stage is using a persistent buffer, this should change.
*
* @return HighWaterMark.Disabled
*/
@Override
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Disabled;
}
/**
* Full batching is allowed, because we want all of the records.
*
* @return MultipleBatches.Enabled
*/
@Override
protected MultipleBatches getAllowedMultipleBatches() {
return MultipleBatches.Enabled;
}
@Override
protected Repository getRemoteRepository() throws URISyntaxException {
String collection = getCollection();
@ -51,7 +73,10 @@ public class FormHistoryServerSyncStage extends ServerSyncStage {
session.config.infoConfiguration,
FORM_HISTORY_BATCH_LIMIT,
FORM_HISTORY_SORT,
true /* allow multiple batches */);
getAllowedMultipleBatches(),
getAllowedToUseHighWaterMark(),
getRepositoryStateProvider()
);
}
@Override

Просмотреть файл

@ -27,10 +27,12 @@ import org.mozilla.gecko.sync.net.SyncStorageRequestDelegate;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RecordFactory;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionBeginDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
@ -60,6 +62,20 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
protected long stageStartTimestamp = -1;
protected long stageCompleteTimestamp = -1;
/**
* Poor-man's boolean typing.
* These enums are used to configure {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
*/
public enum HighWaterMark {
Enabled,
Disabled
}
public enum MultipleBatches {
Enabled,
Disabled
}
/**
* Override these in your subclasses.
*
@ -141,6 +157,34 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
protected abstract Repository getLocalRepository();
protected abstract RecordFactory getRecordFactory();
/**
* Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
* Override this if you need a persistent repository state provider.
*
* @return Non-persistent state provider.
*/
protected RepositoryStateProvider getRepositoryStateProvider() {
return new NonPersistentRepositoryStateProvider();
}
/**
* Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
* Override this if you want to restrict downloader to just a single batch.
*/
protected MultipleBatches getAllowedMultipleBatches() {
return MultipleBatches.Enabled;
}
/**
* Used to configure a {@link org.mozilla.gecko.sync.repositories.ConfigurableServer15Repository}.
* Override this if you want to allow resuming record downloads from a high-water-mark.
* Ensure you're using a {@link org.mozilla.gecko.sync.repositories.PersistentRepositoryStateProvider}
* to persist high-water-mark across syncs.
*/
protected HighWaterMark getAllowedToUseHighWaterMark() {
return HighWaterMark.Disabled;
}
// Override this in subclasses.
protected Repository getRemoteRepository() throws URISyntaxException {
String collection = getCollection();
@ -149,7 +193,8 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
session.config.storageURL(),
session.getAuthHeaderProvider(),
session.config.infoCollections,
session.config.infoConfiguration);
session.config.infoConfiguration,
new NonPersistentRepositoryStateProvider());
}
/**
@ -170,6 +215,10 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
return this.getCollection() + ".";
}
protected String statePreferencesPrefix() {
return this.getCollection() + ".state.";
}
protected SynchronizerConfiguration getConfig() throws NonObjectJSONException, IOException {
return new SynchronizerConfiguration(session.config.getBranch(bundlePrefix()));
}
@ -190,11 +239,21 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
}
/**
* Reset timestamps.
* Reset timestamps and any repository state.
*/
@Override
protected void resetLocal() {
resetLocalWithSyncID(null);
if (!getRepositoryStateProvider().resetAndCommit()) {
// At the very least, we can log this.
// Failing to reset at this point means that we'll have lingering state for any stages using a
// persistent provider. In certain cases this might negatively affect first sync of this stage
// in the future.
// Our timestamp resetting code in `persistConfig` is affected by the same problem.
// A way to work around this is to further prefix our persisted SharedPreferences with
// clientID/syncID, ensuring a very defined scope for any persisted state. See Bug 1332431.
Logger.warn(LOG_TAG, "Failed to reset repository state");
}
}
/**

Просмотреть файл

@ -6,6 +6,7 @@ package org.mozilla.gecko.sync.synchronizer;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@ -176,16 +177,7 @@ public class RecordsChannel implements
this.consumer = new ConcurrentRecordConsumer(this);
ThreadPool.run(this.consumer);
waitingForQueueDone = true;
// Fetch all records that were modified since our previous flow. If our previous flow succeeded,
// we will use source's last-sync timestamp. If our previous flow didn't complete, resume it,
// starting from sink's high water mark timestamp.
// If there was no previous flow (first sync, or data was cleared...), fetch everything.
// Resuming a flow is supported for buffered RepositorySessions. We degrade gracefully otherwise.
final long highWaterMark = sink.getHighWaterMarkTimestamp();
final long lastSync = source.getLastSyncTimestamp();
final long sinceTimestamp = Math.max(highWaterMark, lastSync);
source.fetchSince(sinceTimestamp, this);
source.fetchSince(source.getLastSyncTimestamp(), this);
}
/**
@ -215,9 +207,9 @@ public class RecordsChannel implements
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
delegate.onFlowFetchFailed(this, ex);
// Sink will be informed once consumer finishes.
this.consumer.halt();
delegate.onFlowFetchFailed(this, ex);
}
@Override
@ -318,6 +310,7 @@ public class RecordsChannel implements
// If consumer is still going at it, tell it to stop.
this.consumer.halt();
delegate.onFlowStoreFailed(this, ex, null);
delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
}
@ -363,7 +356,8 @@ public class RecordsChannel implements
}
@Nullable
/* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
@VisibleForTesting
public synchronized ReflowIsNecessaryException getReflowException() {
return reflowException;
}

Просмотреть файл

@ -11,6 +11,8 @@ import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import java.net.URI;
@ -26,6 +28,7 @@ public class TestServer15Repository {
protected final InfoCollections infoCollections = new InfoCollections();
protected final InfoConfiguration infoConfiguration = new InfoConfiguration();
protected final RepositoryStateProvider stateProvider = new NonPersistentRepositoryStateProvider();
public static void assertQueryEquals(String expected, URI u) {
Assert.assertEquals(expected, u.getRawQuery());
@ -33,8 +36,8 @@ public class TestServer15Repository {
@Test
public void testCollectionURI() throws URISyntaxException {
Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration);
Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration);
Server15Repository noTrailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL, null, infoCollections, infoConfiguration, stateProvider);
Server15Repository trailingSlash = new Server15Repository(COLLECTION, SYNC_DEADLINE, COLLECTION_URL + "/", null, infoCollections, infoConfiguration, stateProvider);
Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", noTrailingSlash.collectionURI().toASCIIString());
Assert.assertEquals("http://foo.com/1.5/n6ec3u5bee3tixzp2asys7bs6fve4jfw/storage/bookmarks", trailingSlash.collectionURI().toASCIIString());
}

Просмотреть файл

@ -65,13 +65,11 @@ public class BufferingMiddlewareRepositorySessionTest {
MockRecord record1 = new MockRecord("guid2", null, 1, false);
bufferingSession.store(record1);
assertEquals(2, bufferStorage.all().size());
assertEquals(1, bufferStorage.latestModifiedTimestamp());
// record2 must replace record.
MockRecord record2 = new MockRecord("guid1", null, 2, false);
bufferingSession.store(record2);
assertEquals(2, bufferStorage.all().size());
assertEquals(2, bufferStorage.latestModifiedTimestamp());
// Ensure inner session doesn't see incoming records.
verify(innerRepositorySession, never()).store(record);
@ -177,27 +175,4 @@ public class BufferingMiddlewareRepositorySessionTest {
bufferingSession.setStoreDelegate(delegate);
verify(innerRepositorySession).setStoreDelegate(delegate);
}
@Test
public void getHighWaterMarkTimestamp() throws Exception {
// Trivial case, empty buffer.
assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
// NB: same guid as above.
MockRecord record4 = new MockRecord("guid3", null, -1, false);
bufferingSession.store(record4);
assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
}
}

Просмотреть файл

@ -0,0 +1,91 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.repositories.downloaders;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import static org.junit.Assert.*;
@RunWith(TestRunner.class)
public class BatchingDownloaderControllerTest {
private BatchingDownloaderTest.MockSever15Repository serverRepository;
private Server15RepositorySession repositorySession;
private BatchingDownloaderTest.MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
private BatchingDownloaderTest.MockDownloader mockDownloader;
private BatchingDownloaderTest.CountingShadowRepositoryState repositoryStateProvider;
@Before
public void setUp() throws Exception {
sessionFetchRecordsDelegate = new BatchingDownloaderTest.MockSessionFetchRecordsDelegate();
serverRepository = new BatchingDownloaderTest.MockSever15Repository(
"dummyCollection", "http://dummy.url/", null,
new InfoCollections(), new InfoConfiguration());
repositorySession = new Server15RepositorySession(serverRepository);
repositoryStateProvider = new BatchingDownloaderTest.CountingShadowRepositoryState();
mockDownloader = new BatchingDownloaderTest.MockDownloader(repositorySession, true, true, repositoryStateProvider);
}
@Test
public void resumeFetchSinceIfPossible() throws Exception {
assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 2L, "oldest"));
// Test that we'll resume from offset if context is correct.
BatchingDownloaderController.resumeFetchSinceIfPossible(
mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 25L, "oldest");
assertEquals("offset1", mockDownloader.offset);
// Ensure we'll use context-provided since value.
assertEquals(2L, mockDownloader.newer);
assertEquals("oldest", mockDownloader.sort);
assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
// Test that we won't resume on context mismatch. Ensure that we use new context.
BatchingDownloaderController.resumeFetchSinceIfPossible(
mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 1L, 25L, "newest");
assertEquals(null, mockDownloader.offset);
assertEquals(1L, mockDownloader.newer);
assertEquals("newest", mockDownloader.sort);
assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset3"));
// Test that we may fetch with a different limit and still resume since our context is valid.
BatchingDownloaderController.resumeFetchSinceIfPossible(
mockDownloader, repositoryStateProvider, sessionFetchRecordsDelegate, 3L, 50L, "oldest");
assertEquals("offset3", mockDownloader.offset);
assertEquals("oldest", mockDownloader.sort);
assertEquals(2L, mockDownloader.newer);
}
@Test
public void testInitialSetAndUpdateOfContext() throws Exception {
assertFalse(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
// Test that we can't update context which wasn't set yet.
try {
assertFalse(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
fail();
} catch (IllegalStateException e) {}
// Test that we can set context and check that it's set.
assertTrue(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
assertTrue(BatchingDownloaderController.isResumeContextSet(repositoryStateProvider));
// Test that we can't set context after it was already set.
try {
assertFalse(BatchingDownloaderController.setInitialResumeContextAndCommit(repositoryStateProvider, "offset1", 1L, "newest"));
fail();
} catch (IllegalStateException e) {}
// Test that we can update context after it was set.
assertTrue(BatchingDownloaderController.updateResumeContextAndCommit(repositoryStateProvider, "offset2"));
}
}

Просмотреть файл

@ -19,6 +19,7 @@ import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15Repository;
@ -54,6 +55,8 @@ public class BatchingDownloaderDelegateTest {
Uri.EMPTY,
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
true,
true,
new NonPersistentRepositoryStateProvider(),
repositorySession
);
}
@ -116,7 +119,8 @@ public class BatchingDownloaderDelegateTest {
DEFAULT_COLLECTION_URL,
null,
new InfoCollections(),
new InfoConfiguration())
new InfoConfiguration(),
new NonPersistentRepositoryStateProvider())
);
mockDownloader = new MockDownloader(repositorySession);
}
@ -128,6 +132,8 @@ public class BatchingDownloaderDelegateTest {
Uri.EMPTY,
SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
true,
true,
new NonPersistentRepositoryStateProvider(),
repositorySession
);
RepositorySessionFetchRecordsDelegate delegate = new SimpleSessionFetchRecordsDelegate();

Просмотреть файл

@ -7,19 +7,24 @@ package org.mozilla.gecko.sync.repositories.downloaders;
import android.net.Uri;
import android.os.SystemClock;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageCollectionRequest;
import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.Repository;
@ -29,8 +34,12 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import ch.boye.httpclientandroidlib.ProtocolVersion;
import ch.boye.httpclientandroidlib.message.BasicHttpResponse;
@ -46,13 +55,70 @@ public class BatchingDownloaderTest {
private MockSessionFetchRecordsDelegate sessionFetchRecordsDelegate;
private MockDownloader mockDownloader;
private String DEFAULT_COLLECTION_NAME = "dummyCollection";
private String DEFAULT_COLLECTION_URL = "http://dummy.url/";
private static String DEFAULT_COLLECTION_URL = "http://dummy.url/";
private long DEFAULT_NEWER = 1;
private String DEFAULT_SORT = "oldest";
private String DEFAULT_IDS = "1";
private String DEFAULT_LMHEADER = "12345678";
class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
private CountingShadowRepositoryState repositoryStateProvider;
// Memory-backed state implementation which keeps a shadow of current values, so that they can be
// queried by the tests. Classes under test do not have access to the shadow, and follow regular
// non-persistent state semantics (value changes visible only after commit).
static class CountingShadowRepositoryState extends NonPersistentRepositoryStateProvider {
private AtomicInteger commitCount = new AtomicInteger(0);
private final Map<String, Object> shadowMap = Collections.synchronizedMap(new HashMap<String, Object>(2));
@Override
public boolean commit() {
commitCount.incrementAndGet();
return super.commit();
}
int getCommitCount() {
return commitCount.get();
}
@Nullable
public Long getShadowedLong(String key) {
return (Long) shadowMap.get(key);
}
@Nullable
public String getShadowedString(String key) {
return (String) shadowMap.get(key);
}
@Override
public CountingShadowRepositoryState setLong(String key, Long value) {
shadowMap.put(key, value);
super.setLong(key, value);
return this;
}
@Override
public CountingShadowRepositoryState setString(String key, String value) {
shadowMap.put(key, value);
super.setString(key, value);
return this;
}
@Override
public CountingShadowRepositoryState clear(String key) {
shadowMap.remove(key);
super.clear(key);
return this;
}
@Override
public boolean resetAndCommit() {
shadowMap.clear();
return super.resetAndCommit();
}
}
static class MockSessionFetchRecordsDelegate implements RepositorySessionFetchRecordsDelegate {
public boolean isFailure;
public boolean isFetched;
public boolean isSuccess;
@ -88,9 +154,9 @@ public class BatchingDownloaderTest {
}
}
class MockRequest extends SyncStorageCollectionRequest {
static class MockRequest extends SyncStorageCollectionRequest {
public MockRequest(URI uri) {
MockRequest(URI uri) {
super(uri);
}
@ -100,7 +166,7 @@ public class BatchingDownloaderTest {
}
}
class MockDownloader extends BatchingDownloader {
static class MockDownloader extends BatchingDownloader {
public long newer;
public long limit;
public boolean full;
@ -109,8 +175,9 @@ public class BatchingDownloaderTest {
public String offset;
public boolean abort;
public MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches) {
super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), allowMultipleBatches, repositorySession);
MockDownloader(RepositorySession repositorySession, boolean allowMultipleBatches, boolean keepTrackOfHighWaterMark, RepositoryStateProvider repositoryStateProvider) {
super(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
allowMultipleBatches, keepTrackOfHighWaterMark, repositoryStateProvider, repositorySession);
}
@Override
@ -149,18 +216,20 @@ public class BatchingDownloaderTest {
}
}
class MockSever15Repository extends Server15Repository {
public MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
static class MockSever15Repository extends Server15Repository {
MockSever15Repository(@NonNull String collection, @NonNull String storageURL,
AuthHeaderProvider authHeaderProvider, @NonNull InfoCollections infoCollections,
@NonNull InfoConfiguration infoConfiguration) throws URISyntaxException {
super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), storageURL, authHeaderProvider, infoCollections, infoConfiguration);
super(collection, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
storageURL, authHeaderProvider, infoCollections, infoConfiguration,
new NonPersistentRepositoryStateProvider());
}
}
class MockRepositorySession extends Server15RepositorySession {
static class MockRepositorySession extends Server15RepositorySession {
public boolean abort;
public MockRepositorySession(Repository repository) {
MockRepositorySession(Repository repository) {
super(repository);
}
@ -177,7 +246,8 @@ public class BatchingDownloaderTest {
serverRepository = new MockSever15Repository(DEFAULT_COLLECTION_NAME, DEFAULT_COLLECTION_URL, null,
new InfoCollections(), new InfoConfiguration());
repositorySession = new Server15RepositorySession(serverRepository);
mockDownloader = new MockDownloader(repositorySession, true);
repositoryStateProvider = new CountingShadowRepositoryState();
mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
}
@Test
@ -204,12 +274,12 @@ public class BatchingDownloaderTest {
@Test
public void testBatchingTrivial() throws Exception {
MockDownloader mockDownloader = new MockDownloader(repositorySession, true);
MockDownloader mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
// Number of records == batch limit.
final long BATCH_LIMIT = 100;
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, "100");
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
@ -221,16 +291,21 @@ public class BatchingDownloaderTest {
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
// NB: we set highWaterMark as part of onFetchedRecord, so we don't expect it to be set here.
// Expect no offset to be persisted.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
public void testBatchingSingleBatchMode() throws Exception {
MockDownloader mockDownloader = new MockDownloader(repositorySession, false);
MockDownloader mockDownloader = new MockDownloader(repositorySession, false, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
// Number of records > batch limit. But, we're only allowed to make one batch request.
final long BATCH_LIMIT = 100;
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
String offsetHeader = "25";
String recordsHeader = "500";
@ -244,72 +319,80 @@ public class BatchingDownloaderTest {
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(0, sessionFetchRecordsDelegate.batchesCompleted);
// We don't care about the offset in a single batch mode.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(1, repositoryStateProvider.getCommitCount());
}
@Test
public void testBatching() throws Exception {
final long BATCH_LIMIT = 25;
mockDownloader = new MockDownloader(repositorySession, true);
mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
assertNull(mockDownloader.getLastModified());
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT);
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
String offsetHeader = "25";
String recordsHeader = "25";
SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
final String recordsHeader = "25";
performOnFetchCompleted("25", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals(offsetHeader, mockDownloader.offset);
assertEquals("25", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(1, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context set.
ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
assertEquals(1, repositoryStateProvider.getCommitCount());
// The next batch, we still have an offset token and has not exceed the total limit.
offsetHeader = "50";
response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
performOnFetchCompleted("50", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals(offsetHeader, mockDownloader.offset);
assertEquals("50", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(2, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context updated.
ensureOffsetContextIs(repositoryStateProvider, "50", "oldest", 1L);
assertEquals(2, repositoryStateProvider.getCommitCount());
// The next batch, we still have an offset token and has not exceed the total limit.
offsetHeader = "75";
response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
performOnFetchCompleted("75", recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
// Verify the same parameters are used in the next fetch.
assertSameParameters(mockDownloader, BATCH_LIMIT);
assertEquals(offsetHeader, mockDownloader.offset);
assertEquals("75", mockDownloader.offset);
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context updated.
ensureOffsetContextIs(repositoryStateProvider, "75", "oldest", 1L);
assertEquals(3, repositoryStateProvider.getCommitCount());
// No more offset token, so we complete batching.
response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, null, recordsHeader);
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
BATCH_LIMIT, true, DEFAULT_SORT, DEFAULT_IDS);
performOnFetchCompleted(null, recordsHeader, BATCH_LIMIT);
assertEquals(DEFAULT_LMHEADER, mockDownloader.getLastModified());
assertTrue(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
assertEquals(3, sessionFetchRecordsDelegate.batchesCompleted);
// Offset context cleared since we finished batching, and committed.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(4, repositoryStateProvider.getCommitCount());
}
@Test
@ -336,6 +419,10 @@ public class BatchingDownloaderTest {
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertFalse(sessionFetchRecordsDelegate.isFailure);
// Offset context set.
ensureOffsetContextIs(repositoryStateProvider, "100", "oldest", 1L);
assertEquals(1, repositoryStateProvider.getCommitCount());
// Last modified header somehow changed.
lmHeader = "10000000";
response = makeSyncStorageResponse(200, lmHeader, offsetHeader, null);
@ -347,6 +434,10 @@ public class BatchingDownloaderTest {
assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched);
assertTrue(sessionFetchRecordsDelegate.isFailure);
// Since we failed due to a remotely modified collection, we expect offset context to be reset.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(2, repositoryStateProvider.getCommitCount());
}
@Test
@ -362,6 +453,46 @@ public class BatchingDownloaderTest {
assertNull(sessionFetchRecordsDelegate.record);
}
@Test
public void testOffsetNotResetAfterDeadline() throws Exception {
final long BATCH_LIMIT = 25;
mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
// Offset context set.
ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
assertEquals(1, repositoryStateProvider.getCommitCount());
Exception ex = new SyncDeadlineReachedException();
mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
// Offset context not reset.
ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
assertEquals(2, repositoryStateProvider.getCommitCount());
}
@Test
public void testOffsetResetAfterConcurrentModification() throws Exception {
final long BATCH_LIMIT = 25;
mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
mockDownloader.fetchSince(sessionFetchRecordsDelegate, DEFAULT_NEWER, BATCH_LIMIT, DEFAULT_SORT, null);
SyncStorageCollectionRequest request = performOnFetchCompleted("25", "25", 25);
// Offset context set.
ensureOffsetContextIs(repositoryStateProvider, "25", "oldest", 1L);
assertEquals(1, repositoryStateProvider.getCommitCount());
Exception ex = new CollectionConcurrentModificationException();
mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
// Offset is reset.
ensureOffsetContextIsNull(repositoryStateProvider);
assertEquals(2, repositoryStateProvider.getCommitCount());
}
@Test
public void testFetchRecord() {
CryptoRecord record = new CryptoRecord();
@ -373,10 +504,45 @@ public class BatchingDownloaderTest {
assertEquals(record, sessionFetchRecordsDelegate.record);
}
@Test
public void testHighWaterMarkTracking() {
CryptoRecord record = new CryptoRecord();
// HWM enabled
mockDownloader = new MockDownloader(repositorySession, true, true, repositoryStateProvider);
record.lastModified = 1L;
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertEquals(Long.valueOf(1), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
record.lastModified = 5L;
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertEquals(Long.valueOf(5), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
// NB: Currently nothing is preventing HWM from "going down".
record.lastModified = 4L;
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertEquals(Long.valueOf(4), repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
// HWM disabled
mockDownloader = new MockDownloader(repositorySession, true, false, repositoryStateProvider);
assertTrue(repositoryStateProvider.resetAndCommit());
record.lastModified = 4L;
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
record.lastModified = 5L;
mockDownloader.onFetchedRecord(record, sessionFetchRecordsDelegate);
assertNull(repositoryStateProvider.getShadowedLong(RepositoryStateProvider.KEY_HIGH_WATER_MARK));
}
@Test
public void testAbortRequests() {
MockRepositorySession mockRepositorySession = new MockRepositorySession(serverRepository);
BatchingDownloader downloader = new BatchingDownloader(null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), true, mockRepositorySession);
BatchingDownloader downloader = new BatchingDownloader(
null, Uri.EMPTY, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
true, true, new NonPersistentRepositoryStateProvider(), mockRepositorySession);
assertFalse(mockRepositorySession.abort);
downloader.abortRequests();
assertTrue(mockRepositorySession.abort);
@ -398,6 +564,15 @@ public class BatchingDownloaderTest {
}
}
private SyncStorageCollectionRequest performOnFetchCompleted(String offsetHeader, String recordsHeader, long batchLimit) throws URISyntaxException {
SyncStorageResponse response = makeSyncStorageResponse(200, DEFAULT_LMHEADER, offsetHeader, recordsHeader);
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL));
mockDownloader.onFetchCompleted(response, sessionFetchRecordsDelegate, request, DEFAULT_NEWER,
batchLimit, true, DEFAULT_SORT, DEFAULT_IDS);
return request;
}
private void assertSameParameters(MockDownloader mockDownloader, long limit) {
assertEquals(DEFAULT_NEWER, mockDownloader.newer);
assertEquals(limit, mockDownloader.limit);
@ -424,4 +599,16 @@ public class BatchingDownloaderTest {
return new SyncStorageResponse(response);
}
private void ensureOffsetContextIsNull(CountingShadowRepositoryState stateProvider) {
assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
assertNull(stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
assertNull(stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
}
private void ensureOffsetContextIs(CountingShadowRepositoryState stateProvider, String offset, String order, Long since) {
assertEquals(offset, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET));
assertEquals(since, stateProvider.getShadowedLong(RepositoryStateProvider.KEY_OFFSET_SINCE));
assertEquals(order, stateProvider.getShadowedString(RepositoryStateProvider.KEY_OFFSET_ORDER));
}
}

Просмотреть файл

@ -20,6 +20,7 @@ import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
@ -594,7 +595,8 @@ public class BatchingUploaderTest {
"http://dummy.url/",
null,
infoCollections,
infoConfiguration
infoConfiguration,
new NonPersistentRepositoryStateProvider()
);
} catch (URISyntaxException e) {
// Won't throw, and this won't happen.