Bug 1291821 - Track incomplete stages and re-sync them r=rnewman

Stage re-sync is requested if:
- We hit a 412 either during batching download or batching upload
- We hit a sync deadline either during batching download or when merging records from the buffer

SessionStoreDelegate interface was expanded with onStoreFailed,
indicating that not just a particular record failed, but the whole operation did.

onFetchFailed is used to inform delegates of 412/deadline failures during downloads.
Three new exception types were added, to facilitated messaging between different layers.

MozReview-Commit-ID: Ltdi5noEvdV

--HG--
extra : rebase_source : 9d4af039198b9bc92fbbf25cf8e3d32375a2ab26
This commit is contained in:
Grisha Kruglov 2017-02-24 13:04:54 -08:00
Родитель 29a79ad111
Коммит 976fe61ec1
37 изменённых файлов: 835 добавлений и 246 удалений

Просмотреть файл

@ -866,6 +866,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/AlreadySyncingException.java', 'sync/AlreadySyncingException.java',
'sync/BackoffHandler.java', 'sync/BackoffHandler.java',
'sync/BadRequiredFieldJSONException.java', 'sync/BadRequiredFieldJSONException.java',
'sync/CollectionConcurrentModificationException.java',
'sync/CollectionKeys.java', 'sync/CollectionKeys.java',
'sync/CommandProcessor.java', 'sync/CommandProcessor.java',
'sync/CommandRunner.java', 'sync/CommandRunner.java',
@ -944,6 +945,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/NullClusterURLException.java', 'sync/NullClusterURLException.java',
'sync/PersistedMetaGlobal.java', 'sync/PersistedMetaGlobal.java',
'sync/PrefsBackoffHandler.java', 'sync/PrefsBackoffHandler.java',
'sync/ReflowIsNecessaryException.java',
'sync/repositories/android/AndroidBrowserBookmarksDataAccessor.java', 'sync/repositories/android/AndroidBrowserBookmarksDataAccessor.java',
'sync/repositories/android/AndroidBrowserBookmarksRepository.java', 'sync/repositories/android/AndroidBrowserBookmarksRepository.java',
'sync/repositories/android/AndroidBrowserBookmarksRepositorySession.java', 'sync/repositories/android/AndroidBrowserBookmarksRepositorySession.java',
@ -1060,6 +1062,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/SyncConfiguration.java', 'sync/SyncConfiguration.java',
'sync/SyncConfigurationException.java', 'sync/SyncConfigurationException.java',
'sync/SyncConstants.java', 'sync/SyncConstants.java',
'sync/SyncDeadlineReachedException.java',
'sync/SyncException.java', 'sync/SyncException.java',
'sync/synchronizer/ConcurrentRecordConsumer.java', 'sync/synchronizer/ConcurrentRecordConsumer.java',
'sync/synchronizer/RecordConsumer.java', 'sync/synchronizer/RecordConsumer.java',

Просмотреть файл

@ -53,9 +53,10 @@ import org.mozilla.gecko.tokenserver.TokenServerToken;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -123,8 +124,15 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
super.rejectSync(); super.rejectSync();
} }
/* package-local */ void requestFollowUpSync(String stage) {
this.stageNamesForFollowUpSync.add(stage);
}
protected final Collection<String> stageNamesToSync; protected final Collection<String> stageNamesToSync;
// Keeps track of incomplete stages during this sync that need to be re-synced once we're done.
private final List<String> stageNamesForFollowUpSync = Collections.synchronizedList(new ArrayList<String>());
public SyncDelegate(BlockingQueue<Result> latch, SyncResult syncResult, AndroidFxAccount fxAccount, Collection<String> stageNamesToSync) { public SyncDelegate(BlockingQueue<Result> latch, SyncResult syncResult, AndroidFxAccount fxAccount, Collection<String> stageNamesToSync) {
super(latch, syncResult); super(latch, syncResult);
this.stageNamesToSync = Collections.unmodifiableCollection(stageNamesToSync); this.stageNamesToSync = Collections.unmodifiableCollection(stageNamesToSync);
@ -183,6 +191,15 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
public void handleStageCompleted(Stage currentState, GlobalSession globalSession) { public void handleStageCompleted(Stage currentState, GlobalSession globalSession) {
} }
/**
* Schedule an incomplete stage for a follow-up sync.
*/
@Override
public void handleIncompleteStage(Stage currentState,
GlobalSession globalSession) {
syncDelegate.requestFollowUpSync(currentState.getRepositoryName());
}
@Override @Override
public void handleSuccess(GlobalSession globalSession) { public void handleSuccess(GlobalSession globalSession) {
Logger.info(LOG_TAG, "Global session succeeded."); Logger.info(LOG_TAG, "Global session succeeded.");
@ -574,7 +591,24 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter {
fxAccount.releaseSharedAccountStateLock(); fxAccount.releaseSharedAccountStateLock();
} }
// If there are any incomplete stages, request a follow-up sync. Otherwise, we're done.
// Incomplete stage is:
// - one that hit a 412 error during either upload or download of data, indicating that
// its collection has been modified remotely, or
// - one that hit a sync deadline
final String[] stagesToSyncAgain;
synchronized (syncDelegate.stageNamesForFollowUpSync) {
stagesToSyncAgain = syncDelegate.stageNamesForFollowUpSync.toArray(
new String[syncDelegate.stageNamesForFollowUpSync.size()]
);
}
if (stagesToSyncAgain.length > 0) {
Logger.info(LOG_TAG, "Syncing done. Requesting an immediate follow-up sync.");
fxAccount.requestImmediateSync(stagesToSyncAgain, null);
} else {
Logger.info(LOG_TAG, "Syncing done."); Logger.info(LOG_TAG, "Syncing done.");
}
lastSyncRealtimeMillis = SystemClock.elapsedRealtime(); lastSyncRealtimeMillis = SystemClock.elapsedRealtime();
} }
} }

Просмотреть файл

@ -0,0 +1,15 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
/**
* Thrown when a collection has been modified by another client while we were either
* downloading from it or uploading to it.
*
* @author grisha
*/
public class CollectionConcurrentModificationException extends ReflowIsNecessaryException {
private static final long serialVersionUID = 2701457832508838524L;
}

Просмотреть файл

@ -479,6 +479,11 @@ public class GlobalSession implements HttpResponseObserver {
this.callback.handleError(this, e); this.callback.handleError(this, e);
} }
public void handleIncompleteStage() {
// Let our delegate know that current stage is incomplete and needs to be synced again.
callback.handleIncompleteStage(this.currentState, this);
}
public void handleHTTPError(SyncStorageResponse response, String reason) { public void handleHTTPError(SyncStorageResponse response, String reason) {
// TODO: handling of 50x (backoff), 401 (node reassignment or auth error). // TODO: handling of 50x (backoff), 401 (node reassignment or auth error).
// Fall back to aborting. // Fall back to aborting.

Просмотреть файл

@ -0,0 +1,21 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
/**
* Used by SynchronizerSession to indicate that reflow of a stage is necessary.
* To reflow a stage is to request that it is synced again. Depending on the stage and its current
* state (last-synced timestamp, resume context, high-water-mark) we might resume, or sync from a
* high-water-mark if allowed, or sync regularly from last-synced timestamp.
* A re-sync of a stage is no different from a regular sync of the same stage.
*
* Stages which complete only partially due to hitting a concurrent collection modification error or
* hitting a sync deadline should be re-synced as soon as possible.
*
* @author grisha
*/
public class ReflowIsNecessaryException extends Exception {
private static final long serialVersionUID = -2614772437814638768L;
}

Просмотреть файл

@ -0,0 +1,14 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync;
/**
* Thrown when we've hit a self-imposed sync deadline, and decided not to proceed.
*
* @author grisha
*/
public class SyncDeadlineReachedException extends ReflowIsNecessaryException {
private static final long serialVersionUID = 2305367921350245484L;
}

Просмотреть файл

@ -38,6 +38,7 @@ public interface GlobalSessionCallback {
void handleError(GlobalSession globalSession, Exception ex); void handleError(GlobalSession globalSession, Exception ex);
void handleSuccess(GlobalSession globalSession); void handleSuccess(GlobalSession globalSession);
void handleStageCompleted(Stage currentState, GlobalSession globalSession); void handleStageCompleted(Stage currentState, GlobalSession globalSession);
void handleIncompleteStage(Stage currentState, GlobalSession globalSession);
/** /**
* Called when a {@link GlobalSession} wants to know if it should continue * Called when a {@link GlobalSession} wants to know if it should continue

Просмотреть файл

@ -7,6 +7,7 @@ package org.mozilla.gecko.sync.middleware;
import android.os.SystemClock; import android.os.SystemClock;
import android.support.annotation.VisibleForTesting; import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.middleware.storage.BufferStorage; import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
@ -35,8 +36,6 @@ import java.util.concurrent.Executors;
private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor(); private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
private volatile boolean storeMarkedIncomplete = false;
/* package-local */ BufferingMiddlewareRepositorySession( /* package-local */ BufferingMiddlewareRepositorySession(
RepositorySession repositorySession, MiddlewareRepository repository, RepositorySession repositorySession, MiddlewareRepository repository,
long syncDeadlineMillis, BufferStorage bufferStorage) { long syncDeadlineMillis, BufferStorage bufferStorage) {
@ -75,9 +74,23 @@ import java.util.concurrent.Executors;
bufferStorage.addOrReplace(record); bufferStorage.addOrReplace(record);
} }
/**
* When source fails to provide all records, we need to decide what to do with the buffer.
* We might fail because of a network partition, or because of a concurrent modification of a
* collection, or because we ran out of time fetching records, or some other reason.
*
* Either way we do not clear the buffer in any error scenario, but rather
* allow it to be re-filled, replacing existing records with their newer versions if necessary.
*
* If a collection has been modified, affected records' last-modified timestamps will be bumped,
* and we will receive those records during the next sync. If we already have them in our buffer,
* we replace our now-old copy. Otherwise, they are new records and we just append them.
*
* Incoming records are mapped to existing ones via GUIDs.
*/
@Override @Override
public void storeIncomplete() { public void storeIncomplete() {
storeMarkedIncomplete = true; bufferStorage.flush();
} }
@Override @Override
@ -92,69 +105,43 @@ import java.util.concurrent.Executors;
@Override @Override
public void storeDone(final long end) { public void storeDone(final long end) {
doStoreDonePrepare(); bufferStorage.flush();
// Determine if we have enough time to perform consistency checks on the buffered data and // Determine if we have enough time to merge the buffer data.
// then store it. If we don't have enough time now, we keep our buffer and try again later. // If we don't have enough time now, we keep our buffer and try again later.
// We don't store results of a buffer consistency check anywhere, so we can't treat it if (!mayProceedToMergeBuffer()) {
// separately from storage.
if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
super.abort(); super.abort();
storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end); storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreFailed(new SyncDeadlineReachedException());
return; return;
} }
// Separate actual merge, so that it may be tested without involving system clock. doMergeBuffer(end);
doStoreDone(end);
} }
@VisibleForTesting @VisibleForTesting
public void doStoreDonePrepare() { /* package-local */ void doMergeBuffer(long end) {
// Now that records stopped flowing, persist them. final Collection<Record> bufferData = bufferStorage.all();
bufferStorage.flush();
}
@VisibleForTesting
public void doStoreDone(final long end) {
final Collection<Record> buffer = bufferStorage.all();
// Trivial case of an empty buffer. // Trivial case of an empty buffer.
if (buffer.isEmpty()) { if (bufferData.isEmpty()) {
super.storeDone(end); super.storeDone(end);
return; return;
} }
// Flush our buffer to the wrapped local repository. Data goes live! // Let session handle actual storing of records as it pleases.
// See Bug 1332094 which is concerned with allowing merge to proceed transactionally.
try { try {
for (Record record : buffer) { for (Record record : bufferData) {
this.inner.store(record); this.inner.store(record);
} }
} catch (NoStoreDelegateException e) { } catch (NoStoreDelegateException e) {
// At this point we should have a delegate, so this won't happen. // At this point we should have a store delegate set on the session, so this won't happen.
} }
// And, we're done! // Let session know that there are no more records to store.
super.storeDone(end); super.storeDone(end);
} }
/**
* When source fails to provide more records, we need to decide what to do with the buffer.
* We might fail because of a network partition, or because of a concurrent modification of a
* collection. Either way we do not clear the buffer in a general case. If a collection has been
* modified, affected records' last-modified timestamps will be bumped, and we will receive those
* records during the next sync. If we already have them in our buffer, we replace our now-old
* copy. Otherwise, they are new records and we just append them.
*
* We depend on GUIDs to be a primary key for incoming records.
*
* @param e indicates reason of failure.
*/
@Override
public void sourceFailed(Exception e) {
bufferStorage.flush();
super.sourceFailed(e);
}
/** /**
* Session abnormally aborted. This doesn't mean our so-far buffered data is invalid. * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
* Clean up after ourselves, if there's anything to clean up. * Clean up after ourselves, if there's anything to clean up.

Просмотреть файл

@ -24,7 +24,7 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
* *
*<ul> *<ul>
* <li>Construct, with a reference to its parent {@link Repository}, by calling * <li>Construct, with a reference to its parent {@link Repository}, by calling
* {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.</li> * {@link Repository#createSession(org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate, android.content.Context)}.</li>
* <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li> * <li>Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.</li>
* <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code> * <li>Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. <code>begin()</code>
* is an appropriate place to initialize expensive resources.</li> * is an appropriate place to initialize expensive resources.</li>
@ -158,14 +158,6 @@ public abstract class RepositorySession {
public void storeFlush() { public void storeFlush() {
} }
/**
* During flow of records, indicates that source failed.
*
* @param e indicates reason of failure.
*/
public void sourceFailed(Exception e) {
}
/** /**
* Indicates that a flow of records have been completed. * Indicates that a flow of records have been completed.
*/ */

Просмотреть файл

@ -54,4 +54,14 @@ public class DeferredRepositorySessionStoreDelegate implements
} }
}); });
} }
@Override
public void onStoreFailed(final Exception e) {
executor.execute(new Runnable() {
@Override
public void run() {
inner.onStoreFailed(e);
}
});
}
} }

Просмотреть файл

@ -14,10 +14,11 @@ import java.util.concurrent.ExecutorService;
* *
*/ */
public interface RepositorySessionStoreDelegate { public interface RepositorySessionStoreDelegate {
public void onRecordStoreFailed(Exception ex, String recordGuid); void onRecordStoreFailed(Exception ex, String recordGuid);
// Called with a GUID when store has succeeded. // Called with a GUID when store has succeeded.
public void onRecordStoreSucceeded(String guid); void onRecordStoreSucceeded(String guid);
public void onStoreCompleted(long storeEnd); void onStoreCompleted(long storeEnd);
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor); void onStoreFailed(Exception e);
RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor);
} }

Просмотреть файл

@ -10,8 +10,10 @@ import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting; import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.DelayedWorkTracker; import org.mozilla.gecko.sync.DelayedWorkTracker;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse; import org.mozilla.gecko.sync.net.SyncResponse;
@ -24,7 +26,6 @@ import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -185,9 +186,9 @@ public class BatchingDownloader {
// We expected server to fail our request with 412 in case of concurrent modifications, so // We expected server to fail our request with 412 in case of concurrent modifications, so
// this is unexpected. However, let's treat this case just as if we received a 412. // this is unexpected. However, let's treat this case just as if we received a 412.
if (lastModifiedChanged) { if (lastModifiedChanged) {
this.abort( this.handleFetchFailed(
fetchRecordsDelegate, fetchRecordsDelegate,
new ConcurrentModificationException("Last-modified timestamp has changed unexpectedly") new CollectionConcurrentModificationException()
); );
return; return;
} }
@ -222,7 +223,7 @@ public class BatchingDownloader {
// Should we proceed, however? Do we have enough time? // Should we proceed, however? Do we have enough time?
if (!mayProceedWithBatching(fetchDeadline)) { if (!mayProceedWithBatching(fetchDeadline)) {
this.abort(fetchRecordsDelegate, new Exception("Not enough time to complete next batch")); this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException());
return; return;
} }
@ -242,10 +243,16 @@ public class BatchingDownloader {
} }
} }
public void onFetchFailed(final Exception ex, private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, final Exception ex) {
final SyncStorageCollectionRequest request) { handleFetchFailed(fetchRecordsDelegate, ex, null);
removeRequestFromPending(request); }
/* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final Exception ex,
@Nullable final SyncStorageCollectionRequest request) {
this.removeRequestFromPending(request);
this.abortRequests();
this.workTracker.delayWorkItem(new Runnable() { this.workTracker.delayWorkItem(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -291,18 +298,6 @@ public class BatchingDownloader {
return this.lastModified; return this.lastModified;
} }
private void abort(final RepositorySessionFetchRecordsDelegate delegate, final Exception exception) {
Logger.error(LOG_TAG, exception.getMessage());
this.abortRequests();
this.workTracker.delayWorkItem(new Runnable() {
@Override
public void run() {
Logger.debug(LOG_TAG, "Delayed onFetchCompleted running.");
delegate.onFetchFailed(exception);
}
});
}
private static boolean mayProceedWithBatching(long deadline) { private static boolean mayProceedWithBatching(long deadline) {
// For simplicity, allow batching to proceed if there's at least a minute left for the sync. // For simplicity, allow batching to proceed if there's at least a minute left for the sync.
// This should be enough to fetch and process records in the batch. // This should be enough to fetch and process records in the batch.

Просмотреть файл

@ -5,6 +5,7 @@
package org.mozilla.gecko.sync.repositories.downloaders; package org.mozilla.gecko.sync.repositories.downloaders;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.crypto.KeyBundle; import org.mozilla.gecko.sync.crypto.KeyBundle;
@ -15,8 +16,6 @@ import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import java.util.ConcurrentModificationException;
/** /**
* Delegate that gets passed into fetch methods to handle server response from fetch. * Delegate that gets passed into fetch methods to handle server response from fetch.
*/ */
@ -60,37 +59,38 @@ public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate {
@Override @Override
public void handleRequestSuccess(SyncStorageResponse response) { public void handleRequestSuccess(SyncStorageResponse response) {
Logger.debug(LOG_TAG, "Fetch done."); Logger.debug(LOG_TAG, "Fetch done.");
if (response.lastModified() != null) {
this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request, // Sanity check.
this.newer, this.batchLimit, this.full, this.sort, this.ids); if (response.lastModified() == null) {
this.downloader.handleFetchFailed(
this.fetchRecordsDelegate,
new IllegalStateException("Missing last modified header from response"),
this.request
);
return; return;
} }
this.downloader.onFetchFailed(
new IllegalStateException("Missing last modified header from response"), this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request,
this.fetchRecordsDelegate, this.newer, this.batchLimit, this.full, this.sort, this.ids);
this.request);
} }
@Override @Override
public void handleRequestFailure(SyncStorageResponse response) { public void handleRequestFailure(SyncStorageResponse response) {
Logger.warn(LOG_TAG, "Got a non-success response."); Logger.warn(LOG_TAG, "Got a non-success response.");
// Handle concurrent modification errors separately. We will need to signal upwards that // Handle concurrent modification errors separately.
// this happened, in case stage buffer will want to clean up. final Exception ex;
if (response.getStatusCode() == 412) { if (response.getStatusCode() == 412) {
this.downloader.onFetchFailed( ex = new CollectionConcurrentModificationException();
new ConcurrentModificationException(),
this.fetchRecordsDelegate,
this.request
);
} else { } else {
this.handleRequestError(new HTTPFailureException(response)); ex = new HTTPFailureException(response);
} }
this.handleRequestError(ex);
} }
@Override @Override
public void handleRequestError(final Exception ex) { public void handleRequestError(final Exception ex) {
Logger.warn(LOG_TAG, "Got request error.", ex); Logger.warn(LOG_TAG, "Got request error.", ex);
this.downloader.onFetchFailed(ex, this.fetchRecordsDelegate, this.request); this.downloader.handleFetchFailed(this.fetchRecordsDelegate, ex, this.request);
} }
@Override @Override

Просмотреть файл

@ -8,7 +8,9 @@ import android.net.Uri;
import android.support.annotation.VisibleForTesting; import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Server15PreviousPostFailedException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession; import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
@ -16,6 +18,7 @@ import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -70,6 +73,7 @@ public class BatchingUploader {
} }
// Accessed by the record consumer thread pool. // Accessed by the record consumer thread pool.
private final ExecutorService executor;
// Will be re-created, so mark it as volatile. // Will be re-created, so mark it as volatile.
private volatile Payload payload; private volatile Payload payload;
@ -88,7 +92,7 @@ public class BatchingUploader {
private final Object payloadLock = new Object(); private final Object payloadLock = new Object();
public BatchingUploader( public BatchingUploader(
final RepositorySession repositorySession, final Executor workQueue, final RepositorySession repositorySession, final ExecutorService workQueue,
final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri, final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
final Long localCollectionLastModified, final InfoConfiguration infoConfiguration, final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
final AuthHeaderProvider authHeaderProvider) { final AuthHeaderProvider authHeaderProvider) {
@ -102,12 +106,29 @@ public class BatchingUploader {
this.payload = new Payload( this.payload = new Payload(
payloadLock, infoConfiguration.maxPostBytes, infoConfiguration.maxPostRecords); payloadLock, infoConfiguration.maxPostBytes, infoConfiguration.maxPostRecords);
this.payloadDispatcher = new PayloadDispatcher(workQueue, this, localCollectionLastModified); this.payloadDispatcher = createPayloadDispatcher(workQueue, localCollectionLastModified);
this.executor = workQueue;
} }
// Called concurrently from the threads running off of a record consumer thread pool. // Called concurrently from the threads running off of a record consumer thread pool.
public void process(final Record record) { public void process(final Record record) {
final String guid = record.guid; final String guid = record.guid;
// If store failed entirely, just bail out. We've already told our delegate that we failed.
if (payloadDispatcher.storeFailed) {
return;
}
// If a record or a payload failed, we won't let subsequent requests proceed.'
// This means that we may bail much earlier.
if (payloadDispatcher.recordUploadFailed) {
sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed(
new Server15PreviousPostFailedException(), guid
);
return;
}
final byte[] recordBytes = record.toJSONBytes(); final byte[] recordBytes = record.toJSONBytes();
final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT; final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT;
@ -218,7 +239,15 @@ public class BatchingUploader {
} }
} }
/* package-local */ static class BatchingUploaderException extends Exception { /**
* Allows tests to define their own PayloadDispatcher.
*/
@VisibleForTesting
PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
return new PayloadDispatcher(workQueue, this, localCollectionLastModified);
}
public static class BatchingUploaderException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
} }
/* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException { /* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException {

Просмотреть файл

@ -8,6 +8,7 @@ import android.support.annotation.Nullable;
import android.support.annotation.VisibleForTesting; import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.Server15RecordPostFailedException; import org.mozilla.gecko.sync.Server15RecordPostFailedException;
import org.mozilla.gecko.sync.net.SyncResponse; import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse;
@ -30,12 +31,15 @@ class PayloadDispatcher {
volatile BatchMeta batchWhiteboard; volatile BatchMeta batchWhiteboard;
private final AtomicLong uploadTimestamp = new AtomicLong(0); private final AtomicLong uploadTimestamp = new AtomicLong(0);
// Accessed from different threads sequentially running on the 'executor'.
private volatile boolean recordUploadFailed = false;
private final Executor executor; private final Executor executor;
private final BatchingUploader uploader; private final BatchingUploader uploader;
// For both of these flags:
// Written from sequentially running thread(s) on the SingleThreadExecutor `executor`.
// Read by many threads running concurrently on the records consumer thread pool.
volatile boolean recordUploadFailed = false;
volatile boolean storeFailed = false;
PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) { PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) {
// Initially we don't know if we're in a batching mode. // Initially we don't know if we're in a batching mode.
this.batchWhiteboard = new BatchMeta(initialLastModified, null); this.batchWhiteboard = new BatchMeta(initialLastModified, null);
@ -53,21 +57,7 @@ class PayloadDispatcher {
executor.execute(new BatchContextRunnable(isCommit) { executor.execute(new BatchContextRunnable(isCommit) {
@Override @Override
public void run() { public void run() {
new RecordUploadRunnable( createRecordUploadRunnable(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload).run();
new BatchingAtomicUploaderMayUploadProvider(),
uploader.collectionUri,
batchWhiteboard.getToken(),
new PayloadUploadDelegate(
uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider(),
PayloadDispatcher.this,
outgoingGuids,
isCommit,
isLastPayload
),
outgoing,
byteCount,
isCommit
).run();
} }
}); });
} }
@ -142,6 +132,12 @@ class PayloadDispatcher {
uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid); uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid);
} }
void concurrentModificationDetected() {
recordUploadFailed = true;
storeFailed = true;
uploader.sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException());
}
void prepareForNextBatch() { void prepareForNextBatch() {
batchWhiteboard = batchWhiteboard.nextBatchMeta(); batchWhiteboard = batchWhiteboard.nextBatchMeta();
} }
@ -158,6 +154,31 @@ class PayloadDispatcher {
} }
} }
/**
* Allows tests to define their own RecordUploadRunnable.
*/
@VisibleForTesting
Runnable createRecordUploadRunnable(final ArrayList<byte[]> outgoing,
final ArrayList<String> outgoingGuids,
final long byteCount,
final boolean isCommit, final boolean isLastPayload) {
return new RecordUploadRunnable(
new BatchingAtomicUploaderMayUploadProvider(),
uploader.collectionUri,
batchWhiteboard.getToken(),
new PayloadUploadDelegate(
uploader.authHeaderProvider,
PayloadDispatcher.this,
outgoingGuids,
isCommit,
isLastPayload
),
outgoing,
byteCount,
isCommit
);
}
/** /**
* Allows tests to easily peek into the flow of upload tasks. * Allows tests to easily peek into the flow of upload tasks.
*/ */
@ -176,6 +197,7 @@ class PayloadDispatcher {
@VisibleForTesting @VisibleForTesting
abstract static class NonPayloadContextRunnable implements Runnable {} abstract static class NonPayloadContextRunnable implements Runnable {}
// Instances of this class must be accessed from threads running on the `executor`.
private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider { private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider {
public boolean mayUpload() { public boolean mayUpload() {
return !recordUploadFailed; return !recordUploadFailed;

Просмотреть файл

@ -184,8 +184,12 @@ class PayloadUploadDelegate implements SyncStorageRequestDelegate {
@Override @Override
public void handleRequestFailure(final SyncStorageResponse response) { public void handleRequestFailure(final SyncStorageResponse response) {
if (response.getStatusCode() == 412) {
dispatcher.concurrentModificationDetected();
} else {
this.handleRequestError(new HTTPFailureException(response)); this.handleRequestError(new HTTPFailureException(response));
} }
}
@Override @Override
public void handleRequestError(Exception e) { public void handleRequestError(Exception e) {

Просмотреть файл

@ -14,6 +14,7 @@ import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.MetaGlobalException; import org.mozilla.gecko.sync.MetaGlobalException;
import org.mozilla.gecko.sync.NoCollectionKeysSetException; import org.mozilla.gecko.sync.NoCollectionKeysSetException;
import org.mozilla.gecko.sync.NonObjectJSONException; import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.SynchronizerConfiguration; import org.mozilla.gecko.sync.SynchronizerConfiguration;
import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.crypto.KeyBundle; import org.mozilla.gecko.sync.crypto.KeyBundle;
@ -622,6 +623,12 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i
} }
} }
// Let global session know that this stage is not complete (due to a 412 or hitting a deadline).
// This stage will be re-synced once current sync is complete.
if (lastException instanceof ReflowIsNecessaryException) {
session.handleIncompleteStage();
}
Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() + Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() +
"). Timestamps not persisted."); "). Timestamps not persisted.");
session.advance(); session.advance();

Просмотреть файл

@ -65,7 +65,11 @@ class ConcurrentRecordConsumer extends RecordConsumer {
private void consumerIsDone() { private void consumerIsDone() {
Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records.")); Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records."));
delegate.consumerIsDone(allRecordsQueued); if (allRecordsQueued) {
delegate.consumerIsDoneFull();
} else {
delegate.consumerIsDonePartial();
}
} }
@Override @Override

Просмотреть файл

@ -4,11 +4,15 @@
package org.mozilla.gecko.sync.synchronizer; package org.mozilla.gecko.sync.synchronizer;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.ThreadPool; import org.mozilla.gecko.sync.ThreadPool;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
@ -72,6 +76,8 @@ public class RecordsChannel implements
private final RecordsChannelDelegate delegate; private final RecordsChannelDelegate delegate;
private long fetchEnd = -1; private long fetchEnd = -1;
private volatile ReflowIsNecessaryException reflowException;
protected final AtomicInteger numFetched = new AtomicInteger(); protected final AtomicInteger numFetched = new AtomicInteger();
protected final AtomicInteger numFetchFailed = new AtomicInteger(); protected final AtomicInteger numFetchFailed = new AtomicInteger();
protected final AtomicInteger numStored = new AtomicInteger(); protected final AtomicInteger numStored = new AtomicInteger();
@ -93,7 +99,7 @@ public class RecordsChannel implements
* Then we notify our delegate of completion. * Then we notify our delegate of completion.
*/ */
private RecordConsumer consumer; private RecordConsumer consumer;
private boolean waitingForQueueDone = false; private volatile boolean waitingForQueueDone = false;
private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>(); private final ConcurrentLinkedQueue<Record> toProcess = new ConcurrentLinkedQueue<Record>();
@Override @Override
@ -204,9 +210,12 @@ public class RecordsChannel implements
@Override @Override
public void onFetchFailed(Exception ex) { public void onFetchFailed(Exception ex) {
Logger.warn(LOG_TAG, "onFetchFailed. Informing sink, calling for immediate stop.", ex); Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex);
sink.sourceFailed(ex);
numFetchFailed.incrementAndGet(); numFetchFailed.incrementAndGet();
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
// Sink will be informed once consumer finishes.
this.consumer.halt(); this.consumer.halt();
delegate.onFlowFetchFailed(this, ex); delegate.onFlowFetchFailed(this, ex);
} }
@ -246,16 +255,27 @@ public class RecordsChannel implements
this.consumer.stored(); this.consumer.stored();
} }
@Override @Override
public void consumerIsDone(boolean allRecordsQueued) { public void consumerIsDoneFull() {
Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone); Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone);
if (waitingForQueueDone) { if (waitingForQueueDone) {
waitingForQueueDone = false; waitingForQueueDone = false;
if (!allRecordsQueued) {
this.sink.storeIncomplete(); // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed.
this.sink.storeDone();
} }
this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted. }
@Override
public void consumerIsDonePartial() {
Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone);
if (waitingForQueueDone) {
waitingForQueueDone = false;
// Let sink clean up or flush records if necessary.
this.sink.storeIncomplete();
delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
} }
} }
@ -265,9 +285,40 @@ public class RecordsChannel implements
"Fetch end is " + fetchEnd + ", store end is " + storeEnd); "Fetch end is " + fetchEnd + ", store end is " + storeEnd);
// Source might have used caches used to facilitate flow of records, so now is a good // Source might have used caches used to facilitate flow of records, so now is a good
// time to clean up. Particularly pertinent for buffered sources. // time to clean up. Particularly pertinent for buffered sources.
// Rephrasing this in a more concrete way, buffers are cleared only once records have been merged
// locally and results of the merge have been uploaded to the server successfully.
this.source.performCleanup(); this.source.performCleanup();
// TODO: synchronize on consumer callback?
delegate.onFlowCompleted(this, fetchEnd, storeEnd); delegate.onFlowCompleted(this, fetchEnd, storeEnd);
}
@Override
public void onStoreFailed(Exception ex) {
Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex);
if (ex instanceof ReflowIsNecessaryException) {
setReflowException((ReflowIsNecessaryException) ex);
}
// NB: consumer might or might not be running at this point. There are two cases here:
// 1) If we're storing records remotely, we might fail due to a 412.
// -- we might hit 412 at any point, so consumer might be in either state.
// Action: ignore consumer state, we have nothing else to do other to inform our delegate
// that we're done with this flow. Based on the reflow exception, it'll determine what to do.
// 2) If we're storing (merging) records locally, we might fail due to a sync deadline.
// -- we might hit a deadline only prior to attempting to merge records,
// -- at which point consumer would have finished already, and storeDone was called.
// Action: consumer state is known (done), so we can ignore it safely and inform our delegate
// that we're done.
// Prevent "once consumer is done..." actions from taking place. They already have (case 2), or
// we don't need them (case 1).
waitingForQueueDone = false;
// If consumer is still going at it, tell it to stop.
this.consumer.halt();
delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis());
} }
@Override @Override
@ -310,4 +361,17 @@ public class RecordsChannel implements
// Lie outright. We know that all of our fetch methods are safe. // Lie outright. We know that all of our fetch methods are safe.
return this; return this;
} }
@Nullable
/* package-local */ synchronized ReflowIsNecessaryException getReflowException() {
return reflowException;
}
private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) {
// It is a mistake to set reflow exception multiple times.
if (reflowException != null) {
throw new IllegalStateException("Reflow exception already set: " + reflowException);
}
reflowException = e;
}
} }

Просмотреть файл

@ -9,15 +9,19 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.mozilla.gecko.sync.repositories.domain.Record; import org.mozilla.gecko.sync.repositories.domain.Record;
interface RecordsConsumerDelegate { interface RecordsConsumerDelegate {
public abstract ConcurrentLinkedQueue<Record> getQueue(); ConcurrentLinkedQueue<Record> getQueue();
/** /**
* Called when no more items will be processed. * Called when no more items will be processed.
* If forced is true, the consumer is terminating because it was told to halt; * Indicates that all items have been processed.
* not all items will necessarily have been processed.
* If forced is false, the consumer has invoked store and received an onStoreCompleted callback.
* @param forced
*/ */
public abstract void consumerIsDone(boolean forced); void consumerIsDoneFull();
public abstract void store(Record record);
/**
* Called when no more items will be processed.
* Indicates that only some of the items have been processed.
*/
void consumerIsDonePartial();
void store(Record record);
} }

Просмотреть файл

@ -5,6 +5,7 @@
package org.mozilla.gecko.sync.synchronizer; package org.mozilla.gecko.sync.synchronizer;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.sync.ReflowIsNecessaryException;
import org.mozilla.gecko.sync.repositories.FetchFailedException; import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.StoreFailedException; import org.mozilla.gecko.sync.repositories.StoreFailedException;
@ -29,6 +30,15 @@ public class ServerLocalSynchronizerSession extends SynchronizerSession {
@Override @Override
public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
// If a "reflow exception" was thrown, consider this synchronization failed.
final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
// Fetch failures always abort. // Fetch failures always abort.
int numRemoteFetchFailed = recordsChannel.getFetchFailureCount(); int numRemoteFetchFailed = recordsChannel.getFetchFailureCount();
if (numRemoteFetchFailed > 0) { if (numRemoteFetchFailed > 0) {
@ -53,6 +63,15 @@ public class ServerLocalSynchronizerSession extends SynchronizerSession {
@Override @Override
public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
// If a "reflow exception" was thrown, consider this synchronization failed.
final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException();
if (reflowException != null) {
final String message = "Reflow is necessary: " + reflowException;
Logger.warn(LOG_TAG, message + " Aborting session.");
delegate.onSynchronizeFailed(this, reflowException, message);
return;
}
// Fetch failures always abort. // Fetch failures always abort.
int numLocalFetchFailed = recordsChannel.getFetchFailureCount(); int numLocalFetchFailed = recordsChannel.getFetchFailureCount();
if (numLocalFetchFailed > 0) { if (numLocalFetchFailed > 0) {

Просмотреть файл

@ -746,6 +746,11 @@ public class TestBookmarks extends AndroidSyncTestCase {
@Override @Override
public void onRecordStoreSucceeded(String guid) { public void onRecordStoreSucceeded(String guid) {
} }
@Override
public void onStoreFailed(Exception e) {
}
}; };
session.setStoreDelegate(storeDelegate); session.setStoreDelegate(storeDelegate);
for (BookmarkRecord record : records) { for (BookmarkRecord record : records) {

Просмотреть файл

@ -121,6 +121,11 @@ public class TestStoreTracking extends AndroidSyncTestCase {
performNotify(e); performNotify(e);
} }
} }
@Override
public void onStoreFailed(Exception e) {
}
}; };
session.setStoreDelegate(storeDelegate); session.setStoreDelegate(storeDelegate);

Просмотреть файл

@ -11,7 +11,7 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS
@Override @Override
public void onRecordStoreFailed(Exception ex, String guid) { public void onRecordStoreFailed(Exception ex, String guid) {
performNotify("Store failed", ex); performNotify("Record store failed", ex);
} }
@Override @Override
@ -24,6 +24,11 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS
performNotify("DefaultStoreDelegate used", null); performNotify("DefaultStoreDelegate used", null);
} }
@Override
public void onStoreFailed(Exception ex) {
performNotify("Store failed", ex);
}
@Override @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) { public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) {
final RepositorySessionStoreDelegate self = this; final RepositorySessionStoreDelegate self = this;
@ -59,6 +64,11 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS
}); });
} }
@Override
public void onStoreFailed(Exception e) {
}
@Override @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) { public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) {
if (newExecutor == executor) { if (newExecutor == executor) {

Просмотреть файл

@ -28,6 +28,12 @@ public class DefaultGlobalSessionCallback implements GlobalSessionCallback {
public void informMigrated(GlobalSession globalSession) { public void informMigrated(GlobalSession globalSession) {
} }
@Override
public void handleIncompleteStage(Stage currentState,
GlobalSession globalSession) {
}
@Override @Override
public void handleAborted(GlobalSession globalSession, String reason) { public void handleAborted(GlobalSession globalSession, String reason) {
} }

Просмотреть файл

@ -6,6 +6,8 @@ package org.mozilla.android.sync.test;
import android.content.Context; import android.content.Context;
import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.common.log.Logger;
import org.mozilla.gecko.background.testhelpers.WBORepository; import org.mozilla.gecko.background.testhelpers.WBORepository;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.repositories.FetchFailedException; import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
@ -23,10 +25,38 @@ import java.util.concurrent.ExecutorService;
public class SynchronizerHelpers { public class SynchronizerHelpers {
public static final String FAIL_SENTINEL = "Fail"; public static final String FAIL_SENTINEL = "Fail";
enum FailMode {
COLLECTION_MODIFIED,
DEADLINE_REACHED,
FETCH,
STORE
}
private static Exception getFailException(FailMode failMode) {
switch (failMode) {
case COLLECTION_MODIFIED:
return new CollectionConcurrentModificationException();
case DEADLINE_REACHED:
return new SyncDeadlineReachedException();
case FETCH:
return new FetchFailedException();
case STORE:
return new StoreFailedException();
default:
throw new IllegalStateException();
}
}
/** /**
* Store one at a time, failing if the guid contains FAIL_SENTINEL. * Store one at a time, failing if the guid contains FAIL_SENTINEL.
*/ */
public static class FailFetchWBORepository extends WBORepository { public static class FailFetchWBORepository extends WBORepository {
private final FailMode failMode;
public FailFetchWBORepository(FailMode failMode) {
this.failMode = failMode;
}
@Override @Override
public void createSession(RepositorySessionCreationDelegate delegate, public void createSession(RepositorySessionCreationDelegate delegate,
Context context) { Context context) {
@ -38,7 +68,7 @@ public class SynchronizerHelpers {
@Override @Override
public void onFetchedRecord(Record record) { public void onFetchedRecord(Record record) {
if (record.guid.contains(FAIL_SENTINEL)) { if (record.guid.contains(FAIL_SENTINEL)) {
delegate.onFetchFailed(new FetchFailedException()); delegate.onFetchFailed(getFailException(failMode));
} else { } else {
delegate.onFetchedRecord(record); delegate.onFetchedRecord(record);
} }
@ -73,6 +103,12 @@ public class SynchronizerHelpers {
* Store one at a time, failing if the guid contains FAIL_SENTINEL. * Store one at a time, failing if the guid contains FAIL_SENTINEL.
*/ */
public static class SerialFailStoreWBORepository extends WBORepository { public static class SerialFailStoreWBORepository extends WBORepository {
private final FailMode failMode;
public SerialFailStoreWBORepository(FailMode failMode) {
this.failMode = failMode;
}
@Override @Override
public void createSession(RepositorySessionCreationDelegate delegate, public void createSession(RepositorySessionCreationDelegate delegate,
Context context) { Context context) {
@ -83,7 +119,12 @@ public class SynchronizerHelpers {
throw new NoStoreDelegateException(); throw new NoStoreDelegateException();
} }
if (record.guid.contains(FAIL_SENTINEL)) { if (record.guid.contains(FAIL_SENTINEL)) {
storeDelegate.onRecordStoreFailed(new StoreFailedException(), record.guid); Exception ex = getFailException(failMode);
if (ex instanceof CollectionConcurrentModificationException) {
storeDelegate.onStoreFailed(ex);
} else {
storeDelegate.onRecordStoreFailed(ex, record.guid);
}
} else { } else {
super.store(record); super.store(record);
} }

Просмотреть файл

@ -3,6 +3,7 @@
package org.mozilla.android.sync.test; package org.mozilla.android.sync.test;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mozilla.android.sync.test.SynchronizerHelpers.FailFetchWBORepository; import org.mozilla.android.sync.test.SynchronizerHelpers.FailFetchWBORepository;
@ -11,9 +12,10 @@ import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionFinis
import org.mozilla.gecko.background.testhelpers.TestRunner; import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.background.testhelpers.WBORepository; import org.mozilla.gecko.background.testhelpers.WBORepository;
import org.mozilla.gecko.background.testhelpers.WaitHelper; import org.mozilla.gecko.background.testhelpers.WaitHelper;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.SyncDeadlineReachedException;
import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession; import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositorySessionBundle; import org.mozilla.gecko.sync.repositories.RepositorySessionBundle;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord; import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
@ -30,57 +32,48 @@ import static org.junit.Assert.assertTrue;
@RunWith(TestRunner.class) @RunWith(TestRunner.class)
public class TestRecordsChannel { public class TestRecordsChannel {
protected WBORepository remote; private WBORepository sourceRepository;
protected WBORepository local; private RepositorySession sourceSession;
private WBORepository sinkRepository;
private RepositorySession sinkSession;
protected RepositorySession source; private RecordsChannelDelegate rcDelegate;
protected RepositorySession sink;
protected RecordsChannelDelegate rcDelegate;
protected AtomicInteger numFlowFetchFailed; private AtomicInteger numFlowFetchFailed;
protected AtomicInteger numFlowStoreFailed; private AtomicInteger numFlowStoreFailed;
protected AtomicInteger numFlowCompleted; private AtomicInteger numFlowCompleted;
protected AtomicBoolean flowBeginFailed; private AtomicBoolean flowBeginFailed;
protected AtomicBoolean flowFinishFailed; private AtomicBoolean flowFinishFailed;
public void doFlow(final Repository remote, final Repository local) throws Exception { private volatile RecordsChannel recordsChannel;
WaitHelper.getTestWaiter().performWait(new Runnable() { private volatile Exception fetchException;
@Override private volatile Exception storeException;
public void run() {
remote.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onSessionCreated(RepositorySession session) {
source = session;
local.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onSessionCreated(RepositorySession session) {
sink = session;
WaitHelper.getTestWaiter().performNotify();
}
}, null);
}
}, null);
}
});
assertNotNull(source);
assertNotNull(sink);
@Before
public void setUp() throws Exception {
numFlowFetchFailed = new AtomicInteger(0); numFlowFetchFailed = new AtomicInteger(0);
numFlowStoreFailed = new AtomicInteger(0); numFlowStoreFailed = new AtomicInteger(0);
numFlowCompleted = new AtomicInteger(0); numFlowCompleted = new AtomicInteger(0);
flowBeginFailed = new AtomicBoolean(false); flowBeginFailed = new AtomicBoolean(false);
flowFinishFailed = new AtomicBoolean(false); flowFinishFailed = new AtomicBoolean(false);
// Repositories and sessions will be set/created by tests.
sourceRepository = null;
sourceSession = null;
sinkRepository = null;
sinkSession = null;
rcDelegate = new RecordsChannelDelegate() { rcDelegate = new RecordsChannelDelegate() {
@Override @Override
public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) {
numFlowFetchFailed.incrementAndGet(); numFlowFetchFailed.incrementAndGet();
fetchException = ex;
} }
@Override @Override
public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) {
numFlowStoreFailed.incrementAndGet(); numFlowStoreFailed.incrementAndGet();
storeException = ex;
} }
@Override @Override
@ -93,11 +86,11 @@ public class TestRecordsChannel {
public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) {
numFlowCompleted.incrementAndGet(); numFlowCompleted.incrementAndGet();
try { try {
sink.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { sinkSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override @Override
public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) { public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
try { try {
source.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { sourceSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) {
@Override @Override
public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) { public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) {
performNotify(); performNotify();
@ -119,13 +112,39 @@ public class TestRecordsChannel {
WaitHelper.getTestWaiter().performNotify(); WaitHelper.getTestWaiter().performNotify();
} }
}; };
}
final RecordsChannel rc = new RecordsChannel(source, sink, rcDelegate); private void createSessions() {
WaitHelper.getTestWaiter().performWait(new Runnable() {
@Override
public void run() {
sourceRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onSessionCreated(RepositorySession session) {
sourceSession = session;
sinkRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) {
@Override
public void onSessionCreated(RepositorySession session) {
sinkSession = session;
WaitHelper.getTestWaiter().performNotify();
}
}, null);
}
}, null);
}
});
}
public void doFlow() throws Exception {
createSessions();
assertNotNull(sourceSession);
assertNotNull(sinkSession);
recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate);
WaitHelper.getTestWaiter().performWait(new Runnable() { WaitHelper.getTestWaiter().performWait(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
rc.beginAndFlow(); recordsChannel.beginAndFlow();
} catch (InvalidSessionTransitionException e) { } catch (InvalidSessionTransitionException e) {
WaitHelper.getTestWaiter().performNotify(e); WaitHelper.getTestWaiter().performNotify(e);
} }
@ -133,6 +152,7 @@ public class TestRecordsChannel {
}); });
} }
// NB: records in WBORepository are stored in a HashMap, so don't assume an order.
public static final BookmarkRecord[] inbounds = new BookmarkRecord[] { public static final BookmarkRecord[] inbounds = new BookmarkRecord[] {
new BookmarkRecord("inboundSucc1", "bookmarks", 1, false), new BookmarkRecord("inboundSucc1", "bookmarks", 1, false),
new BookmarkRecord("inboundSucc2", "bookmarks", 1, false), new BookmarkRecord("inboundSucc2", "bookmarks", 1, false),
@ -145,9 +165,9 @@ public class TestRecordsChannel {
new BookmarkRecord("outboundSucc1", "bookmarks", 1, false), new BookmarkRecord("outboundSucc1", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc2", "bookmarks", 1, false), new BookmarkRecord("outboundSucc2", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc3", "bookmarks", 1, false), new BookmarkRecord("outboundSucc3", "bookmarks", 1, false),
new BookmarkRecord("outboundFail6", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc4", "bookmarks", 1, false), new BookmarkRecord("outboundSucc4", "bookmarks", 1, false),
new BookmarkRecord("outboundSucc5", "bookmarks", 1, false), new BookmarkRecord("outboundSucc5", "bookmarks", 1, false),
new BookmarkRecord("outboundFail6", "bookmarks", 1, false),
}; };
protected WBORepository empty() { protected WBORepository empty() {
@ -163,8 +183,9 @@ public class TestRecordsChannel {
return repo; return repo;
} }
protected WBORepository failingFetch() { protected WBORepository failingFetch(SynchronizerHelpers.FailMode failMode) {
WBORepository repo = new FailFetchWBORepository(); WBORepository repo = new FailFetchWBORepository(failMode);
for (BookmarkRecord outbound : outbounds) { for (BookmarkRecord outbound : outbounds) {
repo.wbos.put(outbound.guid, outbound); repo.wbos.put(outbound.guid, outbound);
} }
@ -173,57 +194,143 @@ public class TestRecordsChannel {
@Test @Test
public void testSuccess() throws Exception { public void testSuccess() throws Exception {
WBORepository source = full(); sourceRepository = full();
WBORepository sink = empty(); sinkRepository = empty();
doFlow(source, sink); doFlow();
assertEquals(1, numFlowCompleted.get()); assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get()); assertEquals(0, numFlowFetchFailed.get());
assertEquals(0, numFlowStoreFailed.get()); assertEquals(0, numFlowStoreFailed.get());
assertEquals(source.wbos, sink.wbos); assertEquals(sourceRepository.wbos, sinkRepository.wbos);
assertEquals(0, recordsChannel.getFetchFailureCount());
assertEquals(0, recordsChannel.getStoreFailureCount());
assertEquals(6, recordsChannel.getStoreCount());
} }
@Test @Test
public void testFetchFail() throws Exception { public void testFetchFail() throws Exception {
WBORepository source = failingFetch(); sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH);
WBORepository sink = empty(); sinkRepository = empty();
doFlow(source, sink); doFlow();
assertEquals(1, numFlowCompleted.get()); assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0); assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get()); assertEquals(0, numFlowStoreFailed.get());
assertTrue(sink.wbos.size() < 6); assertTrue(sinkRepository.wbos.size() < 6);
assertTrue(recordsChannel.getFetchFailureCount() > 0);
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreCount() < 6);
}
@Test
public void testStoreFetchFailedCollectionModified() throws Exception {
sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
sinkRepository = empty();
doFlow();
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
assertTrue(sinkRepository.wbos.size() < 6);
assertTrue(recordsChannel.getFetchFailureCount() > 0);
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
}
@Test
public void testStoreFetchFailedDeadline() throws Exception {
sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED);
sinkRepository = empty();
doFlow();
assertEquals(1, numFlowCompleted.get());
assertTrue(numFlowFetchFailed.get() > 0);
assertEquals(0, numFlowStoreFailed.get());
assertTrue(sinkRepository.wbos.size() < 6);
assertTrue(recordsChannel.getFetchFailureCount() > 0);
assertEquals(0, recordsChannel.getStoreFailureCount());
assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size());
assertEquals(SyncDeadlineReachedException.class, fetchException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(SyncDeadlineReachedException.class, ex.getClass());
} }
@Test @Test
public void testStoreSerialFail() throws Exception { public void testStoreSerialFail() throws Exception {
WBORepository source = full(); sourceRepository = full();
WBORepository sink = new SynchronizerHelpers.SerialFailStoreWBORepository(); sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
doFlow(source, sink); SynchronizerHelpers.FailMode.STORE);
doFlow();
assertEquals(1, numFlowCompleted.get()); assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get()); assertEquals(0, numFlowFetchFailed.get());
assertEquals(1, numFlowStoreFailed.get()); assertEquals(1, numFlowStoreFailed.get());
assertEquals(5, sink.wbos.size()); // We will fail to store one of the records but expect flow to continue.
assertEquals(5, sinkRepository.wbos.size());
assertEquals(0, recordsChannel.getFetchFailureCount());
assertEquals(1, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
}
@Test
public void testStoreSerialFailCollectionModified() throws Exception {
sourceRepository = full();
sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository(
SynchronizerHelpers.FailMode.COLLECTION_MODIFIED);
doFlow();
assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get());
assertEquals(1, numFlowStoreFailed.get());
// One of the records will fail, at which point we'll stop flowing them.
final int sunkenRecords = sinkRepository.wbos.size();
assertTrue(sunkenRecords > 0 && sunkenRecords < 6);
assertEquals(0, recordsChannel.getFetchFailureCount());
// RecordChannel's storeFail count is only incremented for failures of individual records.
assertEquals(0, recordsChannel.getStoreFailureCount());
assertEquals(CollectionConcurrentModificationException.class, storeException.getClass());
final Exception ex = recordsChannel.getReflowException();
assertNotNull(ex);
assertEquals(CollectionConcurrentModificationException.class, ex.getClass());
} }
@Test @Test
public void testStoreBatchesFail() throws Exception { public void testStoreBatchesFail() throws Exception {
WBORepository source = full(); sourceRepository = full();
WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(3); sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3);
doFlow(source, sink); doFlow();
assertEquals(1, numFlowCompleted.get()); assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get()); assertEquals(0, numFlowFetchFailed.get());
assertEquals(3, numFlowStoreFailed.get()); // One batch fails. assertEquals(3, numFlowStoreFailed.get()); // One batch fails.
assertEquals(3, sink.wbos.size()); // One batch succeeds. assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds.
assertEquals(0, recordsChannel.getFetchFailureCount());
assertEquals(3, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
} }
@Test @Test
public void testStoreOneBigBatchFail() throws Exception { public void testStoreOneBigBatchFail() throws Exception {
WBORepository source = full(); sourceRepository = full();
WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(50); sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50);
doFlow(source, sink); doFlow();
assertEquals(1, numFlowCompleted.get()); assertEquals(1, numFlowCompleted.get());
assertEquals(0, numFlowFetchFailed.get()); assertEquals(0, numFlowFetchFailed.get());
assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails. assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails.
assertEquals(0, sink.wbos.size()); // No batches succeed. assertEquals(0, sinkRepository.wbos.size()); // No batches succeed.
assertEquals(0, recordsChannel.getFetchFailureCount());
assertEquals(6, recordsChannel.getStoreFailureCount());
// Number of store attempts.
assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount());
} }
} }

Просмотреть файл

@ -12,6 +12,7 @@ import org.mozilla.android.sync.test.helpers.BaseTestStorageRequestDelegate;
import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper; import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper;
import org.mozilla.android.sync.test.helpers.MockServer; import org.mozilla.android.sync.test.helpers.MockServer;
import org.mozilla.gecko.background.testhelpers.TestRunner; import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.InfoCollections; import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.Utils;
@ -22,6 +23,7 @@ import org.mozilla.gecko.sync.net.BaseResource;
import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider; import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncStorageResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.FetchFailedException; import org.mozilla.gecko.sync.repositories.FetchFailedException;
import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.Server15Repository; import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.StoreFailedException; import org.mozilla.gecko.sync.repositories.StoreFailedException;
import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord; import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord;
@ -112,7 +114,8 @@ public class TestServer15RepositorySession {
final TrackingWBORepository local = getLocal(100); final TrackingWBORepository local = getLocal(100);
final Server15Repository remote = new Server15Repository( final Server15Repository remote = new Server15Repository(
COLLECTION, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), COLLECTION, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30),
getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration); getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration,
new NonPersistentRepositoryStateProvider());
KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY); KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY);
Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey); Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey);
cryptoRepo.recordFactory = new BookmarkRecordFactory(); cryptoRepo.recordFactory = new BookmarkRecordFactory();
@ -142,6 +145,14 @@ public class TestServer15RepositorySession {
assertEquals(FetchFailedException.class, e.getClass()); assertEquals(FetchFailedException.class, e.getClass());
} }
@Test
public void testFetch412Failure() throws Exception {
MockServer server = new MockServer(412, "error");
Exception e = doSynchronize(server);
assertNotNull(e);
assertEquals(CollectionConcurrentModificationException.class, e.getClass());
}
@Test @Test
public void testStorePostSuccessWithFailingRecords() throws Exception { public void testStorePostSuccessWithFailingRecords() throws Exception {
MockServer server = new MockServer(200, "{ modified: \" + " + Utils.millisecondsToDecimalSeconds(System.currentTimeMillis()) + ", " + MockServer server = new MockServer(200, "{ modified: \" + " + Utils.millisecondsToDecimalSeconds(System.currentTimeMillis()) + ", " +

Просмотреть файл

@ -107,7 +107,7 @@ public class TestServerLocalSynchronizer {
@Test @Test
public void testLocalFetchErrors() { public void testLocalFetchErrors() {
WBORepository remote = new TrackingWBORepository(); WBORepository remote = new TrackingWBORepository();
WBORepository local = new FailFetchWBORepository(); WBORepository local = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
Synchronizer synchronizer = getSynchronizer(remote, local); Synchronizer synchronizer = getSynchronizer(remote, local);
Exception e = doSynchronize(synchronizer); Exception e = doSynchronize(synchronizer);
@ -121,7 +121,7 @@ public class TestServerLocalSynchronizer {
@Test @Test
public void testRemoteFetchErrors() { public void testRemoteFetchErrors() {
WBORepository remote = new FailFetchWBORepository(); WBORepository remote = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH);
WBORepository local = new TrackingWBORepository(); WBORepository local = new TrackingWBORepository();
Synchronizer synchronizer = getSynchronizer(remote, local); Synchronizer synchronizer = getSynchronizer(remote, local);
@ -137,7 +137,7 @@ public class TestServerLocalSynchronizer {
@Test @Test
public void testLocalSerialStoreErrorsAreIgnored() { public void testLocalSerialStoreErrorsAreIgnored() {
WBORepository remote = new TrackingWBORepository(); WBORepository remote = new TrackingWBORepository();
WBORepository local = new SerialFailStoreWBORepository(); WBORepository local = new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.FETCH);
Synchronizer synchronizer = getSynchronizer(remote, local); Synchronizer synchronizer = getSynchronizer(remote, local);
assertNull(doSynchronize(synchronizer)); assertNull(doSynchronize(synchronizer));
@ -158,7 +158,7 @@ public class TestServerLocalSynchronizer {
@Test @Test
public void testRemoteSerialStoreErrorsAreNotIgnored() throws Exception { public void testRemoteSerialStoreErrorsAreNotIgnored() throws Exception {
Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(), new TrackingWBORepository()); // Tracking so we don't send incoming records back. Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.STORE), new TrackingWBORepository()); // Tracking so we don't send incoming records back.
Exception e = doSynchronize(synchronizer); Exception e = doSynchronize(synchronizer);
assertNotNull(e); assertNotNull(e);

Просмотреть файл

@ -32,6 +32,12 @@ public class ExpectSuccessRepositorySessionStoreDelegate extends
log("Record store completed at " + storeEnd); log("Record store completed at " + storeEnd);
} }
@Override
public void onStoreFailed(Exception e) {
log("Store failed.", e);
performNotify(new AssertionFailedError("onStoreFailed: store should not have failed."));
}
@Override @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) { public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return this; return this;

Просмотреть файл

@ -56,6 +56,12 @@ public class MockGlobalSessionCallback implements GlobalSessionCallback {
this.testWaiter().performNotify(); this.testWaiter().performNotify();
} }
@Override
public void handleIncompleteStage(Stage currentState,
GlobalSession globalSession) {
}
@Override @Override
public void handleStageCompleted(Stage currentState, public void handleStageCompleted(Stage currentState,
GlobalSession globalSession) { GlobalSession globalSession) {

Просмотреть файл

@ -44,6 +44,12 @@ public class DefaultGlobalSessionCallback implements GlobalSessionCallback {
GlobalSession globalSession) { GlobalSession globalSession) {
} }
@Override
public void handleIncompleteStage(Stage currentState,
GlobalSession globalSession) {
}
@Override @Override
public boolean shouldBackOffStorage() { public boolean shouldBackOffStorage() {
return false; return false;

Просмотреть файл

@ -81,13 +81,8 @@ public class BufferingMiddlewareRepositorySessionTest {
@Test @Test
public void storeDone() throws Exception { public void storeDone() throws Exception {
// Verify that storage's flush is called.
verify(bufferStorageMocked, times(0)).flush();
bufferingSessionMocked.doStoreDonePrepare();
verify(bufferStorageMocked, times(1)).flush();
// Trivial case, no records to merge. // Trivial case, no records to merge.
bufferingSession.doStoreDone(123L); bufferingSession.doMergeBuffer(123L);
verify(innerRepositorySession, times(1)).storeDone(123L); verify(innerRepositorySession, times(1)).storeDone(123L);
verify(innerRepositorySession, never()).store(any(Record.class)); verify(innerRepositorySession, never()).store(any(Record.class));
@ -109,7 +104,7 @@ public class BufferingMiddlewareRepositorySessionTest {
bufferingSession.store(record4); bufferingSession.store(record4);
// Done storing. // Done storing.
bufferingSession.doStoreDone(123L); bufferingSession.doMergeBuffer(123L);
// Ensure all records where stored in the wrapped session. // Ensure all records where stored in the wrapped session.
verify(innerRepositorySession, times(1)).store(record); verify(innerRepositorySession, times(1)).store(record);
@ -154,32 +149,6 @@ public class BufferingMiddlewareRepositorySessionTest {
assertEquals(0, bufferStorage.all().size()); assertEquals(0, bufferStorage.all().size());
} }
@Test
public void sourceFailed() throws Exception {
// Source failes before any records have been stored.
bufferingSession.sourceFailed(new Exception());
assertEquals(0, bufferStorage.all().size());
// Store some records now.
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
// Verify that buffer is intact after source fails.
bufferingSession.sourceFailed(new Exception());
assertEquals(3, bufferStorage.all().size());
// Verify that buffer is flushed after source fails.
verify(bufferStorageMocked, times(0)).flush();
bufferingSessionMocked.sourceFailed(new Exception());
verify(bufferStorageMocked, times(1)).flush();
}
@Test @Test
public void abort() throws Exception { public void abort() throws Exception {
MockRecord record = new MockRecord("guid1", null, 1, false); MockRecord record = new MockRecord("guid1", null, 1, false);

Просмотреть файл

@ -11,6 +11,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner; import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.CryptoRecord;
import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.InfoCollections; import org.mozilla.gecko.sync.InfoCollections;
@ -66,8 +67,8 @@ public class BatchingDownloaderDelegateTest {
} }
@Override @Override
public void onFetchFailed(final Exception ex, public void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate,
final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, final Exception ex,
final SyncStorageCollectionRequest request) { final SyncStorageCollectionRequest request) {
this.isFailure = true; this.isFailure = true;
this.ex = ex; this.ex = ex;
@ -173,6 +174,18 @@ public class BatchingDownloaderDelegateTest {
assertFalse(mockDownloader.isFetched); assertFalse(mockDownloader.isFetched);
} }
@Test
public void testFailure412() throws Exception {
BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,
new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null);
SyncStorageResponse response = makeSyncStorageResponse(412, null);
downloaderDelegate.handleRequestFailure(response);
assertTrue(mockDownloader.isFailure);
assertEquals(CollectionConcurrentModificationException.class, mockDownloader.ex.getClass());
assertFalse(mockDownloader.isSuccess);
assertFalse(mockDownloader.isFetched);
}
@Test @Test
public void testFailureRequestError() throws Exception { public void testFailureRequestError() throws Exception {
BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null, BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null,

Просмотреть файл

@ -353,7 +353,7 @@ public class BatchingDownloaderTest {
public void testFailureException() throws Exception { public void testFailureException() throws Exception {
Exception ex = new IllegalStateException(); Exception ex = new IllegalStateException();
SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url")); SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url"));
mockDownloader.onFetchFailed(ex, sessionFetchRecordsDelegate, request); mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request);
assertFalse(sessionFetchRecordsDelegate.isSuccess); assertFalse(sessionFetchRecordsDelegate.isSuccess);
assertFalse(sessionFetchRecordsDelegate.isFetched); assertFalse(sessionFetchRecordsDelegate.isFetched);

Просмотреть файл

@ -18,19 +18,28 @@ import org.mozilla.gecko.sync.ExtendedJSONObject;
import org.mozilla.gecko.sync.InfoCollections; import org.mozilla.gecko.sync.InfoCollections;
import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.Utils;
import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.Server15Repository; import org.mozilla.gecko.sync.repositories.Server15Repository;
import org.mozilla.gecko.sync.repositories.Server15RepositorySession; import org.mozilla.gecko.sync.repositories.Server15RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@RunWith(TestRunner.class) @RunWith(TestRunner.class)
public class BatchingUploaderTest { public class BatchingUploaderTest {
class MockExecutorService implements Executor { class MockExecutorService implements ExecutorService {
int totalPayloads = 0; int totalPayloads = 0;
int commitPayloads = 0; int commitPayloads = 0;
@ -47,6 +56,71 @@ public class BatchingUploaderTest {
} }
command.run(); command.run();
} }
@Override
public void shutdown() {}
@NonNull
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@NonNull
@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}
@NonNull
@Override
public <T> Future<T> submit(Runnable task, T result) {
return null;
}
@NonNull
@Override
public Future<?> submit(Runnable task) {
return null;
}
@NonNull
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@NonNull
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@NonNull
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
} }
class MockStoreDelegate implements RepositorySessionStoreDelegate { class MockStoreDelegate implements RepositorySessionStoreDelegate {
@ -69,13 +143,18 @@ public class BatchingUploaderTest {
++storeCompleted; ++storeCompleted;
} }
@Override
public void onStoreFailed(Exception e) {
}
@Override @Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) { public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return null; return null;
} }
} }
private Executor workQueue; private ExecutorService workQueue;
private RepositorySessionStoreDelegate storeDelegate; private RepositorySessionStoreDelegate storeDelegate;
@Before @Before
@ -442,11 +521,41 @@ public class BatchingUploaderTest {
makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync) makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync)
); );
server15RepositorySession.setStoreDelegate(storeDelegate); server15RepositorySession.setStoreDelegate(storeDelegate);
return new BatchingUploader( return new MockUploader(
server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, null, server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, 0L,
new InfoConfiguration(infoConfigurationJSON), null); new InfoConfiguration(infoConfigurationJSON), null);
} }
class MockPayloadDispatcher extends PayloadDispatcher {
MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader, Long lastModified) {
super(workQueue, uploader, lastModified);
}
@Override
Runnable createRecordUploadRunnable(ArrayList<byte[]> outgoing, ArrayList<String> outgoingGuids, long byteCount, boolean isCommit, boolean isLastPayload) {
// No-op runnable. We don't want this to actually do any work for these tests.
return new Runnable() {
@Override
public void run() {}
};
}
}
class MockUploader extends BatchingUploader {
MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue,
final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri,
final Long localCollectionLastModified, final InfoConfiguration infoConfiguration,
final AuthHeaderProvider authHeaderProvider) {
super(repositorySession, workQueue, sessionStoreDelegate, baseCollectionUri,
localCollectionLastModified, infoConfiguration, authHeaderProvider);
}
@Override
PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) {
return new MockPayloadDispatcher(workQueue, this, localCollectionLastModified);
}
}
private Server15Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) { private Server15Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) {
return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync); return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync);
} }

Просмотреть файл

@ -3,15 +3,22 @@
package org.mozilla.gecko.sync.repositories.uploaders; package org.mozilla.gecko.sync.repositories.uploaders;
import android.net.Uri;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mozilla.gecko.background.testhelpers.TestRunner; import org.mozilla.gecko.background.testhelpers.TestRunner;
import org.mozilla.gecko.sync.CollectionConcurrentModificationException;
import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.HTTPFailureException;
import org.mozilla.gecko.sync.InfoConfiguration;
import org.mozilla.gecko.sync.NonObjectJSONException; import org.mozilla.gecko.sync.NonObjectJSONException;
import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.net.AuthHeaderProvider;
import org.mozilla.gecko.sync.net.SyncResponse; import org.mozilla.gecko.sync.net.SyncResponse;
import org.mozilla.gecko.sync.net.SyncStorageResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.RepositoryStateProvider;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -19,6 +26,7 @@ import java.io.ByteArrayInputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import ch.boye.httpclientandroidlib.HttpResponse; import ch.boye.httpclientandroidlib.HttpResponse;
import ch.boye.httpclientandroidlib.ProtocolVersion; import ch.boye.httpclientandroidlib.ProtocolVersion;
@ -33,6 +41,8 @@ public class PayloadUploadDelegateTest {
private PayloadDispatcher payloadDispatcher; private PayloadDispatcher payloadDispatcher;
private AuthHeaderProvider authHeaderProvider; private AuthHeaderProvider authHeaderProvider;
private RepositorySessionStoreDelegate sessionStoreDelegate;
class MockPayloadDispatcher extends PayloadDispatcher { class MockPayloadDispatcher extends PayloadDispatcher {
public final HashMap<String, Exception> failedRecords = new HashMap<>(); public final HashMap<String, Exception> failedRecords = new HashMap<>();
public boolean didLastPayloadFail = false; public boolean didLastPayloadFail = false;
@ -66,6 +76,7 @@ public class PayloadUploadDelegateTest {
@Override @Override
public void recordFailed(final Exception e, final String recordGuid) { public void recordFailed(final Exception e, final String recordGuid) {
recordUploadFailed = true;
failedRecords.put(recordGuid, e); failedRecords.put(recordGuid, e);
} }
@ -75,11 +86,44 @@ public class PayloadUploadDelegateTest {
} }
} }
class MockRepositorySessionStoreDelegate implements RepositorySessionStoreDelegate {
Exception storeFailedException;
ArrayList<String> succeededGuids = new ArrayList<>();
HashMap<String, Exception> failedGuids = new HashMap<>();
@Override
public void onRecordStoreFailed(Exception ex, String recordGuid) {
failedGuids.put(recordGuid, ex);
}
@Override
public void onRecordStoreSucceeded(String guid) {
succeededGuids.add(guid);
}
@Override
public void onStoreCompleted(long storeEnd) {
}
@Override
public void onStoreFailed(Exception e) {
storeFailedException = e;
}
@Override
public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) {
return null;
}
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
sessionStoreDelegate = new MockRepositorySessionStoreDelegate();
payloadDispatcher = new MockPayloadDispatcher( payloadDispatcher = new MockPayloadDispatcher(
null, null,
mock(BatchingUploader.class) new BatchingUploader(mock(RepositorySession.class), null, sessionStoreDelegate, Uri.parse("https://example.com"), 0L, mock(InfoConfiguration.class), mock(AuthHeaderProvider.class))
); );
authHeaderProvider = mock(AuthHeaderProvider.class); authHeaderProvider = mock(AuthHeaderProvider.class);
@ -368,6 +412,26 @@ public class PayloadUploadDelegateTest {
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response)); payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size()); assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail); assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail);
assertTrue(payloadDispatcher.recordUploadFailed);
}
@Test
public void testHandleRequestFailure412() {
ArrayList<String> postedGuids = new ArrayList<>(3);
postedGuids.add("testGuid1");
postedGuids.add("testGuid2");
postedGuids.add("testGuid3");
PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(authHeaderProvider, payloadDispatcher, postedGuids, false, false);
final HttpResponse response = new BasicHttpResponse(
new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 412, "Precondition failed"));
payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response));
assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size());
assertTrue(payloadDispatcher.recordUploadFailed);
assertTrue(payloadDispatcher.storeFailed);
assertTrue(((MockRepositorySessionStoreDelegate) sessionStoreDelegate).storeFailedException instanceof CollectionConcurrentModificationException);
} }
@Test @Test