diff --git a/mobile/android/base/android-services.mozbuild b/mobile/android/base/android-services.mozbuild index 34b4fc7588e2..7186f9f92033 100644 --- a/mobile/android/base/android-services.mozbuild +++ b/mobile/android/base/android-services.mozbuild @@ -866,6 +866,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil 'sync/AlreadySyncingException.java', 'sync/BackoffHandler.java', 'sync/BadRequiredFieldJSONException.java', + 'sync/CollectionConcurrentModificationException.java', 'sync/CollectionKeys.java', 'sync/CommandProcessor.java', 'sync/CommandRunner.java', @@ -944,6 +945,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil 'sync/NullClusterURLException.java', 'sync/PersistedMetaGlobal.java', 'sync/PrefsBackoffHandler.java', + 'sync/ReflowIsNecessaryException.java', 'sync/repositories/android/AndroidBrowserBookmarksDataAccessor.java', 'sync/repositories/android/AndroidBrowserBookmarksRepository.java', 'sync/repositories/android/AndroidBrowserBookmarksRepositorySession.java', @@ -1060,6 +1062,7 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil 'sync/SyncConfiguration.java', 'sync/SyncConfigurationException.java', 'sync/SyncConstants.java', + 'sync/SyncDeadlineReachedException.java', 'sync/SyncException.java', 'sync/synchronizer/ConcurrentRecordConsumer.java', 'sync/synchronizer/RecordConsumer.java', diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java b/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java index f4fcc3e312bc..6df68c6a82b0 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/fxa/sync/FxAccountSyncAdapter.java @@ -53,9 +53,10 @@ import org.mozilla.gecko.tokenserver.TokenServerToken; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumSet; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -123,8 +124,15 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter { super.rejectSync(); } + /* package-local */ void requestFollowUpSync(String stage) { + this.stageNamesForFollowUpSync.add(stage); + } + protected final Collection stageNamesToSync; + // Keeps track of incomplete stages during this sync that need to be re-synced once we're done. + private final List stageNamesForFollowUpSync = Collections.synchronizedList(new ArrayList()); + public SyncDelegate(BlockingQueue latch, SyncResult syncResult, AndroidFxAccount fxAccount, Collection stageNamesToSync) { super(latch, syncResult); this.stageNamesToSync = Collections.unmodifiableCollection(stageNamesToSync); @@ -183,6 +191,15 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter { public void handleStageCompleted(Stage currentState, GlobalSession globalSession) { } + /** + * Schedule an incomplete stage for a follow-up sync. + */ + @Override + public void handleIncompleteStage(Stage currentState, + GlobalSession globalSession) { + syncDelegate.requestFollowUpSync(currentState.getRepositoryName()); + } + @Override public void handleSuccess(GlobalSession globalSession) { Logger.info(LOG_TAG, "Global session succeeded."); @@ -574,7 +591,24 @@ public class FxAccountSyncAdapter extends AbstractThreadedSyncAdapter { fxAccount.releaseSharedAccountStateLock(); } - Logger.info(LOG_TAG, "Syncing done."); + // If there are any incomplete stages, request a follow-up sync. Otherwise, we're done. + // Incomplete stage is: + // - one that hit a 412 error during either upload or download of data, indicating that + // its collection has been modified remotely, or + // - one that hit a sync deadline + final String[] stagesToSyncAgain; + synchronized (syncDelegate.stageNamesForFollowUpSync) { + stagesToSyncAgain = syncDelegate.stageNamesForFollowUpSync.toArray( + new String[syncDelegate.stageNamesForFollowUpSync.size()] + ); + } + + if (stagesToSyncAgain.length > 0) { + Logger.info(LOG_TAG, "Syncing done. Requesting an immediate follow-up sync."); + fxAccount.requestImmediateSync(stagesToSyncAgain, null); + } else { + Logger.info(LOG_TAG, "Syncing done."); + } lastSyncRealtimeMillis = SystemClock.elapsedRealtime(); } } diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java new file mode 100644 index 000000000000..ef77a950c5c4 --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/CollectionConcurrentModificationException.java @@ -0,0 +1,15 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync; + +/** + * Thrown when a collection has been modified by another client while we were either + * downloading from it or uploading to it. + * + * @author grisha + */ +public class CollectionConcurrentModificationException extends ReflowIsNecessaryException { + private static final long serialVersionUID = 2701457832508838524L; +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java index 370eba961b7e..a3f4e00cddec 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/GlobalSession.java @@ -479,6 +479,11 @@ public class GlobalSession implements HttpResponseObserver { this.callback.handleError(this, e); } + public void handleIncompleteStage() { + // Let our delegate know that current stage is incomplete and needs to be synced again. + callback.handleIncompleteStage(this.currentState, this); + } + public void handleHTTPError(SyncStorageResponse response, String reason) { // TODO: handling of 50x (backoff), 401 (node reassignment or auth error). // Fall back to aborting. diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java new file mode 100644 index 000000000000..f01ceb86d138 --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/ReflowIsNecessaryException.java @@ -0,0 +1,21 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync; + +/** + * Used by SynchronizerSession to indicate that reflow of a stage is necessary. + * To reflow a stage is to request that it is synced again. Depending on the stage and its current + * state (last-synced timestamp, resume context, high-water-mark) we might resume, or sync from a + * high-water-mark if allowed, or sync regularly from last-synced timestamp. + * A re-sync of a stage is no different from a regular sync of the same stage. + * + * Stages which complete only partially due to hitting a concurrent collection modification error or + * hitting a sync deadline should be re-synced as soon as possible. + * + * @author grisha + */ +public class ReflowIsNecessaryException extends Exception { + private static final long serialVersionUID = -2614772437814638768L; +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java new file mode 100644 index 000000000000..be5028e06ea9 --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/SyncDeadlineReachedException.java @@ -0,0 +1,14 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync; + +/** + * Thrown when we've hit a self-imposed sync deadline, and decided not to proceed. + * + * @author grisha + */ +public class SyncDeadlineReachedException extends ReflowIsNecessaryException { + private static final long serialVersionUID = 2305367921350245484L; +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java index 9829f5b34665..2c569fc735b0 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/delegates/GlobalSessionCallback.java @@ -38,6 +38,7 @@ public interface GlobalSessionCallback { void handleError(GlobalSession globalSession, Exception ex); void handleSuccess(GlobalSession globalSession); void handleStageCompleted(Stage currentState, GlobalSession globalSession); + void handleIncompleteStage(Stage currentState, GlobalSession globalSession); /** * Called when a {@link GlobalSession} wants to know if it should continue diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java index 9bb640d5d53c..ca107d647240 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java @@ -7,6 +7,7 @@ package org.mozilla.gecko.sync.middleware; import android.os.SystemClock; import android.support.annotation.VisibleForTesting; +import org.mozilla.gecko.sync.SyncDeadlineReachedException; import org.mozilla.gecko.sync.middleware.storage.BufferStorage; import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; @@ -35,8 +36,6 @@ import java.util.concurrent.Executors; private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor(); - private volatile boolean storeMarkedIncomplete = false; - /* package-local */ BufferingMiddlewareRepositorySession( RepositorySession repositorySession, MiddlewareRepository repository, long syncDeadlineMillis, BufferStorage bufferStorage) { @@ -75,9 +74,23 @@ import java.util.concurrent.Executors; bufferStorage.addOrReplace(record); } + /** + * When source fails to provide all records, we need to decide what to do with the buffer. + * We might fail because of a network partition, or because of a concurrent modification of a + * collection, or because we ran out of time fetching records, or some other reason. + * + * Either way we do not clear the buffer in any error scenario, but rather + * allow it to be re-filled, replacing existing records with their newer versions if necessary. + * + * If a collection has been modified, affected records' last-modified timestamps will be bumped, + * and we will receive those records during the next sync. If we already have them in our buffer, + * we replace our now-old copy. Otherwise, they are new records and we just append them. + * + * Incoming records are mapped to existing ones via GUIDs. + */ @Override public void storeIncomplete() { - storeMarkedIncomplete = true; + bufferStorage.flush(); } @Override @@ -92,69 +105,43 @@ import java.util.concurrent.Executors; @Override public void storeDone(final long end) { - doStoreDonePrepare(); + bufferStorage.flush(); - // Determine if we have enough time to perform consistency checks on the buffered data and - // then store it. If we don't have enough time now, we keep our buffer and try again later. - // We don't store results of a buffer consistency check anywhere, so we can't treat it - // separately from storage. - if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) { + // Determine if we have enough time to merge the buffer data. + // If we don't have enough time now, we keep our buffer and try again later. + if (!mayProceedToMergeBuffer()) { super.abort(); - storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end); + storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreFailed(new SyncDeadlineReachedException()); return; } - // Separate actual merge, so that it may be tested without involving system clock. - doStoreDone(end); + doMergeBuffer(end); } @VisibleForTesting - public void doStoreDonePrepare() { - // Now that records stopped flowing, persist them. - bufferStorage.flush(); - } - - @VisibleForTesting - public void doStoreDone(final long end) { - final Collection buffer = bufferStorage.all(); + /* package-local */ void doMergeBuffer(long end) { + final Collection bufferData = bufferStorage.all(); // Trivial case of an empty buffer. - if (buffer.isEmpty()) { + if (bufferData.isEmpty()) { super.storeDone(end); return; } - // Flush our buffer to the wrapped local repository. Data goes live! + // Let session handle actual storing of records as it pleases. + // See Bug 1332094 which is concerned with allowing merge to proceed transactionally. try { - for (Record record : buffer) { + for (Record record : bufferData) { this.inner.store(record); } } catch (NoStoreDelegateException e) { - // At this point we should have a delegate, so this won't happen. + // At this point we should have a store delegate set on the session, so this won't happen. } - // And, we're done! + // Let session know that there are no more records to store. super.storeDone(end); } - /** - * When source fails to provide more records, we need to decide what to do with the buffer. - * We might fail because of a network partition, or because of a concurrent modification of a - * collection. Either way we do not clear the buffer in a general case. If a collection has been - * modified, affected records' last-modified timestamps will be bumped, and we will receive those - * records during the next sync. If we already have them in our buffer, we replace our now-old - * copy. Otherwise, they are new records and we just append them. - * - * We depend on GUIDs to be a primary key for incoming records. - * - * @param e indicates reason of failure. - */ - @Override - public void sourceFailed(Exception e) { - bufferStorage.flush(); - super.sourceFailed(e); - } - /** * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid. * Clean up after ourselves, if there's anything to clean up. diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java index c80ff8705c1f..880984188860 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java @@ -24,7 +24,7 @@ import org.mozilla.gecko.sync.repositories.domain.Record; * *
    *
  • Construct, with a reference to its parent {@link Repository}, by calling - * {@link Repository#createSession(RepositorySessionCreationDelegate, android.content.Context)}.
  • + * {@link Repository#createSession(org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate, android.content.Context)}. *
  • Populate with saved information by calling {@link #unbundle(RepositorySessionBundle)}.
  • *
  • Begin a sync by calling {@link #begin(RepositorySessionBeginDelegate)}. begin() * is an appropriate place to initialize expensive resources.
  • @@ -158,14 +158,6 @@ public abstract class RepositorySession { public void storeFlush() { } - /** - * During flow of records, indicates that source failed. - * - * @param e indicates reason of failure. - */ - public void sourceFailed(Exception e) { - } - /** * Indicates that a flow of records have been completed. */ diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java index 2f659c733f68..3e2d104392aa 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/DeferredRepositorySessionStoreDelegate.java @@ -54,4 +54,14 @@ public class DeferredRepositorySessionStoreDelegate implements } }); } + + @Override + public void onStoreFailed(final Exception e) { + executor.execute(new Runnable() { + @Override + public void run() { + inner.onStoreFailed(e); + } + }); + } } diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java index 01e44c3aea96..578ebda6feb3 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/delegates/RepositorySessionStoreDelegate.java @@ -14,10 +14,11 @@ import java.util.concurrent.ExecutorService; * */ public interface RepositorySessionStoreDelegate { - public void onRecordStoreFailed(Exception ex, String recordGuid); + void onRecordStoreFailed(Exception ex, String recordGuid); // Called with a GUID when store has succeeded. - public void onRecordStoreSucceeded(String guid); - public void onStoreCompleted(long storeEnd); - public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor); + void onRecordStoreSucceeded(String guid); + void onStoreCompleted(long storeEnd); + void onStoreFailed(Exception e); + RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor); } diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java index 13358b1e87be..0254853077df 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloader.java @@ -10,8 +10,10 @@ import android.support.annotation.Nullable; import android.support.annotation.VisibleForTesting; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.DelayedWorkTracker; +import org.mozilla.gecko.sync.SyncDeadlineReachedException; import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.net.SyncResponse; @@ -24,7 +26,6 @@ import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; -import java.util.ConcurrentModificationException; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -185,9 +186,9 @@ public class BatchingDownloader { // We expected server to fail our request with 412 in case of concurrent modifications, so // this is unexpected. However, let's treat this case just as if we received a 412. if (lastModifiedChanged) { - this.abort( + this.handleFetchFailed( fetchRecordsDelegate, - new ConcurrentModificationException("Last-modified timestamp has changed unexpectedly") + new CollectionConcurrentModificationException() ); return; } @@ -222,7 +223,7 @@ public class BatchingDownloader { // Should we proceed, however? Do we have enough time? if (!mayProceedWithBatching(fetchDeadline)) { - this.abort(fetchRecordsDelegate, new Exception("Not enough time to complete next batch")); + this.handleFetchFailed(fetchRecordsDelegate, new SyncDeadlineReachedException()); return; } @@ -242,10 +243,16 @@ public class BatchingDownloader { } } - public void onFetchFailed(final Exception ex, - final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, - final SyncStorageCollectionRequest request) { - removeRequestFromPending(request); + private void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, + final Exception ex) { + handleFetchFailed(fetchRecordsDelegate, ex, null); + } + + /* package-local */ void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, + final Exception ex, + @Nullable final SyncStorageCollectionRequest request) { + this.removeRequestFromPending(request); + this.abortRequests(); this.workTracker.delayWorkItem(new Runnable() { @Override public void run() { @@ -291,18 +298,6 @@ public class BatchingDownloader { return this.lastModified; } - private void abort(final RepositorySessionFetchRecordsDelegate delegate, final Exception exception) { - Logger.error(LOG_TAG, exception.getMessage()); - this.abortRequests(); - this.workTracker.delayWorkItem(new Runnable() { - @Override - public void run() { - Logger.debug(LOG_TAG, "Delayed onFetchCompleted running."); - delegate.onFetchFailed(exception); - } - }); - } - private static boolean mayProceedWithBatching(long deadline) { // For simplicity, allow batching to proceed if there's at least a minute left for the sync. // This should be enough to fetch and process records in the batch. diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java index 274decc8e706..91723bd4adca 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegate.java @@ -5,6 +5,7 @@ package org.mozilla.gecko.sync.repositories.downloaders; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.crypto.KeyBundle; @@ -15,8 +16,6 @@ import org.mozilla.gecko.sync.net.WBOCollectionRequestDelegate; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; -import java.util.ConcurrentModificationException; - /** * Delegate that gets passed into fetch methods to handle server response from fetch. */ @@ -60,37 +59,38 @@ public class BatchingDownloaderDelegate extends WBOCollectionRequestDelegate { @Override public void handleRequestSuccess(SyncStorageResponse response) { Logger.debug(LOG_TAG, "Fetch done."); - if (response.lastModified() != null) { - this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request, - this.newer, this.batchLimit, this.full, this.sort, this.ids); + + // Sanity check. + if (response.lastModified() == null) { + this.downloader.handleFetchFailed( + this.fetchRecordsDelegate, + new IllegalStateException("Missing last modified header from response"), + this.request + ); return; } - this.downloader.onFetchFailed( - new IllegalStateException("Missing last modified header from response"), - this.fetchRecordsDelegate, - this.request); + + this.downloader.onFetchCompleted(response, this.fetchRecordsDelegate, this.request, + this.newer, this.batchLimit, this.full, this.sort, this.ids); } @Override public void handleRequestFailure(SyncStorageResponse response) { Logger.warn(LOG_TAG, "Got a non-success response."); - // Handle concurrent modification errors separately. We will need to signal upwards that - // this happened, in case stage buffer will want to clean up. + // Handle concurrent modification errors separately. + final Exception ex; if (response.getStatusCode() == 412) { - this.downloader.onFetchFailed( - new ConcurrentModificationException(), - this.fetchRecordsDelegate, - this.request - ); + ex = new CollectionConcurrentModificationException(); } else { - this.handleRequestError(new HTTPFailureException(response)); + ex = new HTTPFailureException(response); } + this.handleRequestError(ex); } @Override public void handleRequestError(final Exception ex) { Logger.warn(LOG_TAG, "Got request error.", ex); - this.downloader.onFetchFailed(ex, this.fetchRecordsDelegate, this.request); + this.downloader.handleFetchFailed(this.fetchRecordsDelegate, ex, this.request); } @Override diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java index 0a4a832da4dc..c23397cc24d8 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploader.java @@ -8,7 +8,9 @@ import android.net.Uri; import android.support.annotation.VisibleForTesting; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.InfoConfiguration; +import org.mozilla.gecko.sync.Server15PreviousPostFailedException; import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.repositories.RepositorySession; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; @@ -16,6 +18,7 @@ import org.mozilla.gecko.sync.repositories.domain.Record; import java.util.ArrayList; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -70,6 +73,7 @@ public class BatchingUploader { } // Accessed by the record consumer thread pool. + private final ExecutorService executor; // Will be re-created, so mark it as volatile. private volatile Payload payload; @@ -88,7 +92,7 @@ public class BatchingUploader { private final Object payloadLock = new Object(); public BatchingUploader( - final RepositorySession repositorySession, final Executor workQueue, + final RepositorySession repositorySession, final ExecutorService workQueue, final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri, final Long localCollectionLastModified, final InfoConfiguration infoConfiguration, final AuthHeaderProvider authHeaderProvider) { @@ -102,12 +106,29 @@ public class BatchingUploader { this.payload = new Payload( payloadLock, infoConfiguration.maxPostBytes, infoConfiguration.maxPostRecords); - this.payloadDispatcher = new PayloadDispatcher(workQueue, this, localCollectionLastModified); + this.payloadDispatcher = createPayloadDispatcher(workQueue, localCollectionLastModified); + + this.executor = workQueue; } // Called concurrently from the threads running off of a record consumer thread pool. public void process(final Record record) { final String guid = record.guid; + + // If store failed entirely, just bail out. We've already told our delegate that we failed. + if (payloadDispatcher.storeFailed) { + return; + } + + // If a record or a payload failed, we won't let subsequent requests proceed.' + // This means that we may bail much earlier. + if (payloadDispatcher.recordUploadFailed) { + sessionStoreDelegate.deferredStoreDelegate(executor).onRecordStoreFailed( + new Server15PreviousPostFailedException(), guid + ); + return; + } + final byte[] recordBytes = record.toJSONBytes(); final long recordDeltaByteCount = recordBytes.length + PER_RECORD_OVERHEAD_BYTE_COUNT; @@ -218,7 +239,15 @@ public class BatchingUploader { } } - /* package-local */ static class BatchingUploaderException extends Exception { + /** + * Allows tests to define their own PayloadDispatcher. + */ + @VisibleForTesting + PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) { + return new PayloadDispatcher(workQueue, this, localCollectionLastModified); + } + + public static class BatchingUploaderException extends Exception { private static final long serialVersionUID = 1L; } /* package-local */ static class LastModifiedDidNotChange extends BatchingUploaderException { diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java index e81d9e9a2a9a..798f2b6b6a5b 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadDispatcher.java @@ -8,6 +8,7 @@ import android.support.annotation.Nullable; import android.support.annotation.VisibleForTesting; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.Server15RecordPostFailedException; import org.mozilla.gecko.sync.net.SyncResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse; @@ -30,12 +31,15 @@ class PayloadDispatcher { volatile BatchMeta batchWhiteboard; private final AtomicLong uploadTimestamp = new AtomicLong(0); - // Accessed from different threads sequentially running on the 'executor'. - private volatile boolean recordUploadFailed = false; - private final Executor executor; private final BatchingUploader uploader; + // For both of these flags: + // Written from sequentially running thread(s) on the SingleThreadExecutor `executor`. + // Read by many threads running concurrently on the records consumer thread pool. + volatile boolean recordUploadFailed = false; + volatile boolean storeFailed = false; + PayloadDispatcher(Executor executor, BatchingUploader uploader, @Nullable Long initialLastModified) { // Initially we don't know if we're in a batching mode. this.batchWhiteboard = new BatchMeta(initialLastModified, null); @@ -53,21 +57,7 @@ class PayloadDispatcher { executor.execute(new BatchContextRunnable(isCommit) { @Override public void run() { - new RecordUploadRunnable( - new BatchingAtomicUploaderMayUploadProvider(), - uploader.collectionUri, - batchWhiteboard.getToken(), - new PayloadUploadDelegate( - uploader.getRepositorySession().getServerRepository().getAuthHeaderProvider(), - PayloadDispatcher.this, - outgoingGuids, - isCommit, - isLastPayload - ), - outgoing, - byteCount, - isCommit - ).run(); + createRecordUploadRunnable(outgoing, outgoingGuids, byteCount, isCommit, isLastPayload).run(); } }); } @@ -142,6 +132,12 @@ class PayloadDispatcher { uploader.sessionStoreDelegate.onRecordStoreFailed(e, recordGuid); } + void concurrentModificationDetected() { + recordUploadFailed = true; + storeFailed = true; + uploader.sessionStoreDelegate.onStoreFailed(new CollectionConcurrentModificationException()); + } + void prepareForNextBatch() { batchWhiteboard = batchWhiteboard.nextBatchMeta(); } @@ -158,6 +154,31 @@ class PayloadDispatcher { } } + /** + * Allows tests to define their own RecordUploadRunnable. + */ + @VisibleForTesting + Runnable createRecordUploadRunnable(final ArrayList outgoing, + final ArrayList outgoingGuids, + final long byteCount, + final boolean isCommit, final boolean isLastPayload) { + return new RecordUploadRunnable( + new BatchingAtomicUploaderMayUploadProvider(), + uploader.collectionUri, + batchWhiteboard.getToken(), + new PayloadUploadDelegate( + uploader.authHeaderProvider, + PayloadDispatcher.this, + outgoingGuids, + isCommit, + isLastPayload + ), + outgoing, + byteCount, + isCommit + ); + } + /** * Allows tests to easily peek into the flow of upload tasks. */ @@ -176,6 +197,7 @@ class PayloadDispatcher { @VisibleForTesting abstract static class NonPayloadContextRunnable implements Runnable {} + // Instances of this class must be accessed from threads running on the `executor`. private class BatchingAtomicUploaderMayUploadProvider implements MayUploadProvider { public boolean mayUpload() { return !recordUploadFailed; diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java index 8bfaf51bfef6..0974c65da0e7 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegate.java @@ -184,7 +184,11 @@ class PayloadUploadDelegate implements SyncStorageRequestDelegate { @Override public void handleRequestFailure(final SyncStorageResponse response) { - this.handleRequestError(new HTTPFailureException(response)); + if (response.getStatusCode() == 412) { + dispatcher.concurrentModificationDetected(); + } else { + this.handleRequestError(new HTTPFailureException(response)); + } } @Override diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java index 98011df0873b..fdc0100b76cf 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/stage/ServerSyncStage.java @@ -14,6 +14,7 @@ import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.MetaGlobalException; import org.mozilla.gecko.sync.NoCollectionKeysSetException; import org.mozilla.gecko.sync.NonObjectJSONException; +import org.mozilla.gecko.sync.ReflowIsNecessaryException; import org.mozilla.gecko.sync.SynchronizerConfiguration; import org.mozilla.gecko.sync.Utils; import org.mozilla.gecko.sync.crypto.KeyBundle; @@ -622,6 +623,12 @@ public abstract class ServerSyncStage extends AbstractSessionManagingSyncStage i } } + // Let global session know that this stage is not complete (due to a 412 or hitting a deadline). + // This stage will be re-synced once current sync is complete. + if (lastException instanceof ReflowIsNecessaryException) { + session.handleIncompleteStage(); + } + Logger.info(LOG_TAG, "Advancing session even though stage failed (took " + getStageDurationString() + "). Timestamps not persisted."); session.advance(); diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java index 2f8763e7a4e8..c3a2bfd1ffdb 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ConcurrentRecordConsumer.java @@ -65,7 +65,11 @@ class ConcurrentRecordConsumer extends RecordConsumer { private void consumerIsDone() { Logger.debug(LOG_TAG, "Consumer is done. Processed " + counter + ((counter == 1) ? " record." : " records.")); - delegate.consumerIsDone(allRecordsQueued); + if (allRecordsQueued) { + delegate.consumerIsDoneFull(); + } else { + delegate.consumerIsDonePartial(); + } } @Override diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java index 62363c4da21e..4b429a5e67cd 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java @@ -4,11 +4,15 @@ package org.mozilla.gecko.sync.synchronizer; +import android.support.annotation.NonNull; +import android.support.annotation.Nullable; + import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.ReflowIsNecessaryException; import org.mozilla.gecko.sync.ThreadPool; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; @@ -72,6 +76,8 @@ public class RecordsChannel implements private final RecordsChannelDelegate delegate; private long fetchEnd = -1; + private volatile ReflowIsNecessaryException reflowException; + protected final AtomicInteger numFetched = new AtomicInteger(); protected final AtomicInteger numFetchFailed = new AtomicInteger(); protected final AtomicInteger numStored = new AtomicInteger(); @@ -93,7 +99,7 @@ public class RecordsChannel implements * Then we notify our delegate of completion. */ private RecordConsumer consumer; - private boolean waitingForQueueDone = false; + private volatile boolean waitingForQueueDone = false; private final ConcurrentLinkedQueue toProcess = new ConcurrentLinkedQueue(); @Override @@ -204,9 +210,12 @@ public class RecordsChannel implements @Override public void onFetchFailed(Exception ex) { - Logger.warn(LOG_TAG, "onFetchFailed. Informing sink, calling for immediate stop.", ex); - sink.sourceFailed(ex); + Logger.warn(LOG_TAG, "onFetchFailed. Calling for immediate stop.", ex); numFetchFailed.incrementAndGet(); + if (ex instanceof ReflowIsNecessaryException) { + setReflowException((ReflowIsNecessaryException) ex); + } + // Sink will be informed once consumer finishes. this.consumer.halt(); delegate.onFlowFetchFailed(this, ex); } @@ -246,16 +255,27 @@ public class RecordsChannel implements this.consumer.stored(); } - @Override - public void consumerIsDone(boolean allRecordsQueued) { - Logger.trace(LOG_TAG, "Consumer is done. Are we waiting for it? " + waitingForQueueDone); + public void consumerIsDoneFull() { + Logger.trace(LOG_TAG, "Consumer is done, processed all records. Are we waiting for it? " + waitingForQueueDone); if (waitingForQueueDone) { waitingForQueueDone = false; - if (!allRecordsQueued) { - this.sink.storeIncomplete(); - } - this.sink.storeDone(); // Now we'll be waiting for onStoreCompleted. + + // Now we'll be waiting for sink to call its delegate's onStoreCompleted or onStoreFailed. + this.sink.storeDone(); + } + } + + @Override + public void consumerIsDonePartial() { + Logger.trace(LOG_TAG, "Consumer is done, processed some records. Are we waiting for it? " + waitingForQueueDone); + if (waitingForQueueDone) { + waitingForQueueDone = false; + + // Let sink clean up or flush records if necessary. + this.sink.storeIncomplete(); + + delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis()); } } @@ -265,9 +285,40 @@ public class RecordsChannel implements "Fetch end is " + fetchEnd + ", store end is " + storeEnd); // Source might have used caches used to facilitate flow of records, so now is a good // time to clean up. Particularly pertinent for buffered sources. + // Rephrasing this in a more concrete way, buffers are cleared only once records have been merged + // locally and results of the merge have been uploaded to the server successfully. this.source.performCleanup(); - // TODO: synchronize on consumer callback? delegate.onFlowCompleted(this, fetchEnd, storeEnd); + + } + + @Override + public void onStoreFailed(Exception ex) { + Logger.warn(LOG_TAG, "onStoreFailed. Calling for immediate stop.", ex); + if (ex instanceof ReflowIsNecessaryException) { + setReflowException((ReflowIsNecessaryException) ex); + } + + // NB: consumer might or might not be running at this point. There are two cases here: + // 1) If we're storing records remotely, we might fail due to a 412. + // -- we might hit 412 at any point, so consumer might be in either state. + // Action: ignore consumer state, we have nothing else to do other to inform our delegate + // that we're done with this flow. Based on the reflow exception, it'll determine what to do. + + // 2) If we're storing (merging) records locally, we might fail due to a sync deadline. + // -- we might hit a deadline only prior to attempting to merge records, + // -- at which point consumer would have finished already, and storeDone was called. + // Action: consumer state is known (done), so we can ignore it safely and inform our delegate + // that we're done. + + // Prevent "once consumer is done..." actions from taking place. They already have (case 2), or + // we don't need them (case 1). + waitingForQueueDone = false; + + // If consumer is still going at it, tell it to stop. + this.consumer.halt(); + + delegate.onFlowCompleted(this, fetchEnd, System.currentTimeMillis()); } @Override @@ -310,4 +361,17 @@ public class RecordsChannel implements // Lie outright. We know that all of our fetch methods are safe. return this; } + + @Nullable + /* package-local */ synchronized ReflowIsNecessaryException getReflowException() { + return reflowException; + } + + private synchronized void setReflowException(@NonNull ReflowIsNecessaryException e) { + // It is a mistake to set reflow exception multiple times. + if (reflowException != null) { + throw new IllegalStateException("Reflow exception already set: " + reflowException); + } + reflowException = e; + } } diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java index a00abf8483aa..5667bd740b97 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsConsumerDelegate.java @@ -9,15 +9,19 @@ import java.util.concurrent.ConcurrentLinkedQueue; import org.mozilla.gecko.sync.repositories.domain.Record; interface RecordsConsumerDelegate { - public abstract ConcurrentLinkedQueue getQueue(); + ConcurrentLinkedQueue getQueue(); /** * Called when no more items will be processed. - * If forced is true, the consumer is terminating because it was told to halt; - * not all items will necessarily have been processed. - * If forced is false, the consumer has invoked store and received an onStoreCompleted callback. - * @param forced + * Indicates that all items have been processed. */ - public abstract void consumerIsDone(boolean forced); - public abstract void store(Record record); + void consumerIsDoneFull(); + + /** + * Called when no more items will be processed. + * Indicates that only some of the items have been processed. + */ + void consumerIsDonePartial(); + + void store(Record record); } \ No newline at end of file diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java index dc9eb01a0c0c..511088bdbc9c 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/ServerLocalSynchronizerSession.java @@ -5,6 +5,7 @@ package org.mozilla.gecko.sync.synchronizer; import org.mozilla.gecko.background.common.log.Logger; +import org.mozilla.gecko.sync.ReflowIsNecessaryException; import org.mozilla.gecko.sync.repositories.FetchFailedException; import org.mozilla.gecko.sync.repositories.StoreFailedException; @@ -29,6 +30,15 @@ public class ServerLocalSynchronizerSession extends SynchronizerSession { @Override public void onFirstFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { + // If a "reflow exception" was thrown, consider this synchronization failed. + final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException(); + if (reflowException != null) { + final String message = "Reflow is necessary: " + reflowException; + Logger.warn(LOG_TAG, message + " Aborting session."); + delegate.onSynchronizeFailed(this, reflowException, message); + return; + } + // Fetch failures always abort. int numRemoteFetchFailed = recordsChannel.getFetchFailureCount(); if (numRemoteFetchFailed > 0) { @@ -53,6 +63,15 @@ public class ServerLocalSynchronizerSession extends SynchronizerSession { @Override public void onSecondFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { + // If a "reflow exception" was thrown, consider this synchronization failed. + final ReflowIsNecessaryException reflowException = recordsChannel.getReflowException(); + if (reflowException != null) { + final String message = "Reflow is necessary: " + reflowException; + Logger.warn(LOG_TAG, message + " Aborting session."); + delegate.onSynchronizeFailed(this, reflowException, message); + return; + } + // Fetch failures always abort. int numLocalFetchFailed = recordsChannel.getFetchFailureCount(); if (numLocalFetchFailed > 0) { diff --git a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java index 64834e6d124d..019c9f8f8531 100644 --- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java +++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/db/TestBookmarks.java @@ -746,6 +746,11 @@ public class TestBookmarks extends AndroidSyncTestCase { @Override public void onRecordStoreSucceeded(String guid) { } + + @Override + public void onStoreFailed(Exception e) { + + } }; session.setStoreDelegate(storeDelegate); for (BookmarkRecord record : records) { diff --git a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java index 2041c1f258a7..3d2726d1ec04 100644 --- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java +++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/TestStoreTracking.java @@ -121,6 +121,11 @@ public class TestStoreTracking extends AndroidSyncTestCase { performNotify(e); } } + + @Override + public void onStoreFailed(Exception e) { + + } }; session.setStoreDelegate(storeDelegate); diff --git a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java index 7ba2e6df60e7..eb0488d3a535 100644 --- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java +++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/sync/helpers/DefaultStoreDelegate.java @@ -11,7 +11,7 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS @Override public void onRecordStoreFailed(Exception ex, String guid) { - performNotify("Store failed", ex); + performNotify("Record store failed", ex); } @Override @@ -24,6 +24,11 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS performNotify("DefaultStoreDelegate used", null); } + @Override + public void onStoreFailed(Exception ex) { + performNotify("Store failed", ex); + } + @Override public RepositorySessionStoreDelegate deferredStoreDelegate(final ExecutorService executor) { final RepositorySessionStoreDelegate self = this; @@ -59,6 +64,11 @@ public class DefaultStoreDelegate extends DefaultDelegate implements RepositoryS }); } + @Override + public void onStoreFailed(Exception e) { + + } + @Override public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService newExecutor) { if (newExecutor == executor) { diff --git a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java index c8be7e330598..afe43c8642d6 100644 --- a/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java +++ b/mobile/android/tests/background/junit3/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java @@ -28,6 +28,12 @@ public class DefaultGlobalSessionCallback implements GlobalSessionCallback { public void informMigrated(GlobalSession globalSession) { } + @Override + public void handleIncompleteStage(Stage currentState, + GlobalSession globalSession) { + + } + @Override public void handleAborted(GlobalSession globalSession, String reason) { } diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java index 95027085c517..d927af9378b5 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/SynchronizerHelpers.java @@ -6,6 +6,8 @@ package org.mozilla.android.sync.test; import android.content.Context; import org.mozilla.gecko.background.common.log.Logger; import org.mozilla.gecko.background.testhelpers.WBORepository; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; +import org.mozilla.gecko.sync.SyncDeadlineReachedException; import org.mozilla.gecko.sync.repositories.FetchFailedException; import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; @@ -23,10 +25,38 @@ import java.util.concurrent.ExecutorService; public class SynchronizerHelpers { public static final String FAIL_SENTINEL = "Fail"; + enum FailMode { + COLLECTION_MODIFIED, + DEADLINE_REACHED, + FETCH, + STORE + } + + private static Exception getFailException(FailMode failMode) { + switch (failMode) { + case COLLECTION_MODIFIED: + return new CollectionConcurrentModificationException(); + case DEADLINE_REACHED: + return new SyncDeadlineReachedException(); + case FETCH: + return new FetchFailedException(); + case STORE: + return new StoreFailedException(); + default: + throw new IllegalStateException(); + } + } + /** * Store one at a time, failing if the guid contains FAIL_SENTINEL. */ public static class FailFetchWBORepository extends WBORepository { + private final FailMode failMode; + + public FailFetchWBORepository(FailMode failMode) { + this.failMode = failMode; + } + @Override public void createSession(RepositorySessionCreationDelegate delegate, Context context) { @@ -38,7 +68,7 @@ public class SynchronizerHelpers { @Override public void onFetchedRecord(Record record) { if (record.guid.contains(FAIL_SENTINEL)) { - delegate.onFetchFailed(new FetchFailedException()); + delegate.onFetchFailed(getFailException(failMode)); } else { delegate.onFetchedRecord(record); } @@ -73,6 +103,12 @@ public class SynchronizerHelpers { * Store one at a time, failing if the guid contains FAIL_SENTINEL. */ public static class SerialFailStoreWBORepository extends WBORepository { + private final FailMode failMode; + + public SerialFailStoreWBORepository(FailMode failMode) { + this.failMode = failMode; + } + @Override public void createSession(RepositorySessionCreationDelegate delegate, Context context) { @@ -83,7 +119,12 @@ public class SynchronizerHelpers { throw new NoStoreDelegateException(); } if (record.guid.contains(FAIL_SENTINEL)) { - storeDelegate.onRecordStoreFailed(new StoreFailedException(), record.guid); + Exception ex = getFailException(failMode); + if (ex instanceof CollectionConcurrentModificationException) { + storeDelegate.onStoreFailed(ex); + } else { + storeDelegate.onRecordStoreFailed(ex, record.guid); + } } else { super.store(record); } diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java index 69d3c32e7df7..c5c0866c526c 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestRecordsChannel.java @@ -3,6 +3,7 @@ package org.mozilla.android.sync.test; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mozilla.android.sync.test.SynchronizerHelpers.FailFetchWBORepository; @@ -11,9 +12,10 @@ import org.mozilla.android.sync.test.helpers.ExpectSuccessRepositorySessionFinis import org.mozilla.gecko.background.testhelpers.TestRunner; import org.mozilla.gecko.background.testhelpers.WBORepository; import org.mozilla.gecko.background.testhelpers.WaitHelper; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; +import org.mozilla.gecko.sync.SyncDeadlineReachedException; import org.mozilla.gecko.sync.repositories.InactiveSessionException; import org.mozilla.gecko.sync.repositories.InvalidSessionTransitionException; -import org.mozilla.gecko.sync.repositories.Repository; import org.mozilla.gecko.sync.repositories.RepositorySession; import org.mozilla.gecko.sync.repositories.RepositorySessionBundle; import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord; @@ -30,57 +32,48 @@ import static org.junit.Assert.assertTrue; @RunWith(TestRunner.class) public class TestRecordsChannel { - protected WBORepository remote; - protected WBORepository local; + private WBORepository sourceRepository; + private RepositorySession sourceSession; + private WBORepository sinkRepository; + private RepositorySession sinkSession; - protected RepositorySession source; - protected RepositorySession sink; - protected RecordsChannelDelegate rcDelegate; + private RecordsChannelDelegate rcDelegate; - protected AtomicInteger numFlowFetchFailed; - protected AtomicInteger numFlowStoreFailed; - protected AtomicInteger numFlowCompleted; - protected AtomicBoolean flowBeginFailed; - protected AtomicBoolean flowFinishFailed; + private AtomicInteger numFlowFetchFailed; + private AtomicInteger numFlowStoreFailed; + private AtomicInteger numFlowCompleted; + private AtomicBoolean flowBeginFailed; + private AtomicBoolean flowFinishFailed; - public void doFlow(final Repository remote, final Repository local) throws Exception { - WaitHelper.getTestWaiter().performWait(new Runnable() { - @Override - public void run() { - remote.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) { - @Override - public void onSessionCreated(RepositorySession session) { - source = session; - local.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) { - @Override - public void onSessionCreated(RepositorySession session) { - sink = session; - WaitHelper.getTestWaiter().performNotify(); - } - }, null); - } - }, null); - } - }); - - assertNotNull(source); - assertNotNull(sink); + private volatile RecordsChannel recordsChannel; + private volatile Exception fetchException; + private volatile Exception storeException; + @Before + public void setUp() throws Exception { numFlowFetchFailed = new AtomicInteger(0); numFlowStoreFailed = new AtomicInteger(0); numFlowCompleted = new AtomicInteger(0); flowBeginFailed = new AtomicBoolean(false); flowFinishFailed = new AtomicBoolean(false); + // Repositories and sessions will be set/created by tests. + sourceRepository = null; + sourceSession = null; + sinkRepository = null; + sinkSession = null; + rcDelegate = new RecordsChannelDelegate() { @Override public void onFlowFetchFailed(RecordsChannel recordsChannel, Exception ex) { numFlowFetchFailed.incrementAndGet(); + fetchException = ex; } @Override public void onFlowStoreFailed(RecordsChannel recordsChannel, Exception ex, String recordGuid) { numFlowStoreFailed.incrementAndGet(); + storeException = ex; } @Override @@ -93,11 +86,11 @@ public class TestRecordsChannel { public void onFlowCompleted(RecordsChannel recordsChannel, long fetchEnd, long storeEnd) { numFlowCompleted.incrementAndGet(); try { - sink.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { + sinkSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { @Override public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) { try { - source.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { + sourceSession.finish(new ExpectSuccessRepositorySessionFinishDelegate(WaitHelper.getTestWaiter()) { @Override public void onFinishSucceeded(RepositorySession session, RepositorySessionBundle bundle) { performNotify(); @@ -119,13 +112,39 @@ public class TestRecordsChannel { WaitHelper.getTestWaiter().performNotify(); } }; + } - final RecordsChannel rc = new RecordsChannel(source, sink, rcDelegate); + private void createSessions() { + WaitHelper.getTestWaiter().performWait(new Runnable() { + @Override + public void run() { + sourceRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) { + @Override + public void onSessionCreated(RepositorySession session) { + sourceSession = session; + sinkRepository.createSession(new ExpectSuccessRepositorySessionCreationDelegate(WaitHelper.getTestWaiter()) { + @Override + public void onSessionCreated(RepositorySession session) { + sinkSession = session; + WaitHelper.getTestWaiter().performNotify(); + } + }, null); + } + }, null); + } + }); + } + + public void doFlow() throws Exception { + createSessions(); + assertNotNull(sourceSession); + assertNotNull(sinkSession); + recordsChannel = new RecordsChannel(sourceSession, sinkSession, rcDelegate); WaitHelper.getTestWaiter().performWait(new Runnable() { @Override public void run() { try { - rc.beginAndFlow(); + recordsChannel.beginAndFlow(); } catch (InvalidSessionTransitionException e) { WaitHelper.getTestWaiter().performNotify(e); } @@ -133,6 +152,7 @@ public class TestRecordsChannel { }); } + // NB: records in WBORepository are stored in a HashMap, so don't assume an order. public static final BookmarkRecord[] inbounds = new BookmarkRecord[] { new BookmarkRecord("inboundSucc1", "bookmarks", 1, false), new BookmarkRecord("inboundSucc2", "bookmarks", 1, false), @@ -145,9 +165,9 @@ public class TestRecordsChannel { new BookmarkRecord("outboundSucc1", "bookmarks", 1, false), new BookmarkRecord("outboundSucc2", "bookmarks", 1, false), new BookmarkRecord("outboundSucc3", "bookmarks", 1, false), + new BookmarkRecord("outboundFail6", "bookmarks", 1, false), new BookmarkRecord("outboundSucc4", "bookmarks", 1, false), new BookmarkRecord("outboundSucc5", "bookmarks", 1, false), - new BookmarkRecord("outboundFail6", "bookmarks", 1, false), }; protected WBORepository empty() { @@ -163,8 +183,9 @@ public class TestRecordsChannel { return repo; } - protected WBORepository failingFetch() { - WBORepository repo = new FailFetchWBORepository(); + protected WBORepository failingFetch(SynchronizerHelpers.FailMode failMode) { + WBORepository repo = new FailFetchWBORepository(failMode); + for (BookmarkRecord outbound : outbounds) { repo.wbos.put(outbound.guid, outbound); } @@ -173,57 +194,143 @@ public class TestRecordsChannel { @Test public void testSuccess() throws Exception { - WBORepository source = full(); - WBORepository sink = empty(); - doFlow(source, sink); + sourceRepository = full(); + sinkRepository = empty(); + doFlow(); assertEquals(1, numFlowCompleted.get()); assertEquals(0, numFlowFetchFailed.get()); assertEquals(0, numFlowStoreFailed.get()); - assertEquals(source.wbos, sink.wbos); + assertEquals(sourceRepository.wbos, sinkRepository.wbos); + assertEquals(0, recordsChannel.getFetchFailureCount()); + assertEquals(0, recordsChannel.getStoreFailureCount()); + assertEquals(6, recordsChannel.getStoreCount()); } @Test public void testFetchFail() throws Exception { - WBORepository source = failingFetch(); - WBORepository sink = empty(); - doFlow(source, sink); + sourceRepository = failingFetch(SynchronizerHelpers.FailMode.FETCH); + sinkRepository = empty(); + doFlow(); assertEquals(1, numFlowCompleted.get()); assertTrue(numFlowFetchFailed.get() > 0); assertEquals(0, numFlowStoreFailed.get()); - assertTrue(sink.wbos.size() < 6); + assertTrue(sinkRepository.wbos.size() < 6); + assertTrue(recordsChannel.getFetchFailureCount() > 0); + assertEquals(0, recordsChannel.getStoreFailureCount()); + assertTrue(recordsChannel.getStoreCount() < 6); + } + + @Test + public void testStoreFetchFailedCollectionModified() throws Exception { + sourceRepository = failingFetch(SynchronizerHelpers.FailMode.COLLECTION_MODIFIED); + sinkRepository = empty(); + doFlow(); + assertEquals(1, numFlowCompleted.get()); + assertTrue(numFlowFetchFailed.get() > 0); + assertEquals(0, numFlowStoreFailed.get()); + assertTrue(sinkRepository.wbos.size() < 6); + + assertTrue(recordsChannel.getFetchFailureCount() > 0); + assertEquals(0, recordsChannel.getStoreFailureCount()); + assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size()); + + assertEquals(CollectionConcurrentModificationException.class, fetchException.getClass()); + final Exception ex = recordsChannel.getReflowException(); + assertNotNull(ex); + assertEquals(CollectionConcurrentModificationException.class, ex.getClass()); + } + + @Test + public void testStoreFetchFailedDeadline() throws Exception { + sourceRepository = failingFetch(SynchronizerHelpers.FailMode.DEADLINE_REACHED); + sinkRepository = empty(); + doFlow(); + assertEquals(1, numFlowCompleted.get()); + assertTrue(numFlowFetchFailed.get() > 0); + assertEquals(0, numFlowStoreFailed.get()); + assertTrue(sinkRepository.wbos.size() < 6); + + assertTrue(recordsChannel.getFetchFailureCount() > 0); + assertEquals(0, recordsChannel.getStoreFailureCount()); + assertTrue(recordsChannel.getStoreCount() < sourceRepository.wbos.size()); + + assertEquals(SyncDeadlineReachedException.class, fetchException.getClass()); + final Exception ex = recordsChannel.getReflowException(); + assertNotNull(ex); + assertEquals(SyncDeadlineReachedException.class, ex.getClass()); } @Test public void testStoreSerialFail() throws Exception { - WBORepository source = full(); - WBORepository sink = new SynchronizerHelpers.SerialFailStoreWBORepository(); - doFlow(source, sink); + sourceRepository = full(); + sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository( + SynchronizerHelpers.FailMode.STORE); + doFlow(); assertEquals(1, numFlowCompleted.get()); assertEquals(0, numFlowFetchFailed.get()); assertEquals(1, numFlowStoreFailed.get()); - assertEquals(5, sink.wbos.size()); + // We will fail to store one of the records but expect flow to continue. + assertEquals(5, sinkRepository.wbos.size()); + + assertEquals(0, recordsChannel.getFetchFailureCount()); + assertEquals(1, recordsChannel.getStoreFailureCount()); + // Number of store attempts. + assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount()); + } + + @Test + public void testStoreSerialFailCollectionModified() throws Exception { + sourceRepository = full(); + sinkRepository = new SynchronizerHelpers.SerialFailStoreWBORepository( + SynchronizerHelpers.FailMode.COLLECTION_MODIFIED); + doFlow(); + assertEquals(1, numFlowCompleted.get()); + assertEquals(0, numFlowFetchFailed.get()); + assertEquals(1, numFlowStoreFailed.get()); + // One of the records will fail, at which point we'll stop flowing them. + final int sunkenRecords = sinkRepository.wbos.size(); + assertTrue(sunkenRecords > 0 && sunkenRecords < 6); + + assertEquals(0, recordsChannel.getFetchFailureCount()); + // RecordChannel's storeFail count is only incremented for failures of individual records. + assertEquals(0, recordsChannel.getStoreFailureCount()); + + assertEquals(CollectionConcurrentModificationException.class, storeException.getClass()); + final Exception ex = recordsChannel.getReflowException(); + assertNotNull(ex); + assertEquals(CollectionConcurrentModificationException.class, ex.getClass()); } @Test public void testStoreBatchesFail() throws Exception { - WBORepository source = full(); - WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(3); - doFlow(source, sink); + sourceRepository = full(); + sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(3); + doFlow(); assertEquals(1, numFlowCompleted.get()); assertEquals(0, numFlowFetchFailed.get()); assertEquals(3, numFlowStoreFailed.get()); // One batch fails. - assertEquals(3, sink.wbos.size()); // One batch succeeds. + assertEquals(3, sinkRepository.wbos.size()); // One batch succeeds. + + assertEquals(0, recordsChannel.getFetchFailureCount()); + assertEquals(3, recordsChannel.getStoreFailureCount()); + // Number of store attempts. + assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount()); } @Test public void testStoreOneBigBatchFail() throws Exception { - WBORepository source = full(); - WBORepository sink = new SynchronizerHelpers.BatchFailStoreWBORepository(50); - doFlow(source, sink); + sourceRepository = full(); + sinkRepository = new SynchronizerHelpers.BatchFailStoreWBORepository(50); + doFlow(); assertEquals(1, numFlowCompleted.get()); assertEquals(0, numFlowFetchFailed.get()); assertEquals(6, numFlowStoreFailed.get()); // One (big) batch fails. - assertEquals(0, sink.wbos.size()); // No batches succeed. + assertEquals(0, sinkRepository.wbos.size()); // No batches succeed. + + assertEquals(0, recordsChannel.getFetchFailureCount()); + assertEquals(6, recordsChannel.getStoreFailureCount()); + // Number of store attempts. + assertEquals(sourceRepository.wbos.size(), recordsChannel.getStoreCount()); } } diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java index b05608f0313f..29be0a1c7f7e 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServer15RepositorySession.java @@ -12,6 +12,7 @@ import org.mozilla.android.sync.test.helpers.BaseTestStorageRequestDelegate; import org.mozilla.android.sync.test.helpers.HTTPServerTestHelper; import org.mozilla.android.sync.test.helpers.MockServer; import org.mozilla.gecko.background.testhelpers.TestRunner; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.InfoCollections; import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.Utils; @@ -22,6 +23,7 @@ import org.mozilla.gecko.sync.net.BaseResource; import org.mozilla.gecko.sync.net.BasicAuthHeaderProvider; import org.mozilla.gecko.sync.net.SyncStorageResponse; import org.mozilla.gecko.sync.repositories.FetchFailedException; +import org.mozilla.gecko.sync.repositories.NonPersistentRepositoryStateProvider; import org.mozilla.gecko.sync.repositories.Server15Repository; import org.mozilla.gecko.sync.repositories.StoreFailedException; import org.mozilla.gecko.sync.repositories.domain.BookmarkRecord; @@ -112,7 +114,8 @@ public class TestServer15RepositorySession { final TrackingWBORepository local = getLocal(100); final Server15Repository remote = new Server15Repository( COLLECTION, SystemClock.elapsedRealtime() + TimeUnit.MINUTES.toMillis(30), - getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration); + getCollectionURL(COLLECTION), authHeaderProvider, infoCollections, infoConfiguration, + new NonPersistentRepositoryStateProvider()); KeyBundle collectionKey = new KeyBundle(TEST_USERNAME, SYNC_KEY); Crypto5MiddlewareRepository cryptoRepo = new Crypto5MiddlewareRepository(remote, collectionKey); cryptoRepo.recordFactory = new BookmarkRecordFactory(); @@ -142,6 +145,14 @@ public class TestServer15RepositorySession { assertEquals(FetchFailedException.class, e.getClass()); } + @Test + public void testFetch412Failure() throws Exception { + MockServer server = new MockServer(412, "error"); + Exception e = doSynchronize(server); + assertNotNull(e); + assertEquals(CollectionConcurrentModificationException.class, e.getClass()); + } + @Test public void testStorePostSuccessWithFailingRecords() throws Exception { MockServer server = new MockServer(200, "{ modified: \" + " + Utils.millisecondsToDecimalSeconds(System.currentTimeMillis()) + ", " + diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java index 267798672cdd..19c8e8d34449 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/TestServerLocalSynchronizer.java @@ -107,7 +107,7 @@ public class TestServerLocalSynchronizer { @Test public void testLocalFetchErrors() { WBORepository remote = new TrackingWBORepository(); - WBORepository local = new FailFetchWBORepository(); + WBORepository local = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH); Synchronizer synchronizer = getSynchronizer(remote, local); Exception e = doSynchronize(synchronizer); @@ -121,7 +121,7 @@ public class TestServerLocalSynchronizer { @Test public void testRemoteFetchErrors() { - WBORepository remote = new FailFetchWBORepository(); + WBORepository remote = new FailFetchWBORepository(SynchronizerHelpers.FailMode.FETCH); WBORepository local = new TrackingWBORepository(); Synchronizer synchronizer = getSynchronizer(remote, local); @@ -137,7 +137,7 @@ public class TestServerLocalSynchronizer { @Test public void testLocalSerialStoreErrorsAreIgnored() { WBORepository remote = new TrackingWBORepository(); - WBORepository local = new SerialFailStoreWBORepository(); + WBORepository local = new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.FETCH); Synchronizer synchronizer = getSynchronizer(remote, local); assertNull(doSynchronize(synchronizer)); @@ -158,7 +158,7 @@ public class TestServerLocalSynchronizer { @Test public void testRemoteSerialStoreErrorsAreNotIgnored() throws Exception { - Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(), new TrackingWBORepository()); // Tracking so we don't send incoming records back. + Synchronizer synchronizer = getSynchronizer(new SerialFailStoreWBORepository(SynchronizerHelpers.FailMode.STORE), new TrackingWBORepository()); // Tracking so we don't send incoming records back. Exception e = doSynchronize(synchronizer); assertNotNull(e); diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java index cfca180fa11e..c02e90c67058 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/ExpectSuccessRepositorySessionStoreDelegate.java @@ -32,6 +32,12 @@ public class ExpectSuccessRepositorySessionStoreDelegate extends log("Record store completed at " + storeEnd); } + @Override + public void onStoreFailed(Exception e) { + log("Store failed.", e); + performNotify(new AssertionFailedError("onStoreFailed: store should not have failed.")); + } + @Override public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) { return this; diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java index 5d7e8edd12cf..2d219f582a32 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/android/sync/test/helpers/MockGlobalSessionCallback.java @@ -56,6 +56,12 @@ public class MockGlobalSessionCallback implements GlobalSessionCallback { this.testWaiter().performNotify(); } + @Override + public void handleIncompleteStage(Stage currentState, + GlobalSession globalSession) { + + } + @Override public void handleStageCompleted(Stage currentState, GlobalSession globalSession) { diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java index 373dd4eab700..379ce8d4d8fd 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/background/testhelpers/DefaultGlobalSessionCallback.java @@ -44,6 +44,12 @@ public class DefaultGlobalSessionCallback implements GlobalSessionCallback { GlobalSession globalSession) { } + @Override + public void handleIncompleteStage(Stage currentState, + GlobalSession globalSession) { + + } + @Override public boolean shouldBackOffStorage() { return false; diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java index 07ae13d791f0..5960c1f5b442 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java @@ -81,13 +81,8 @@ public class BufferingMiddlewareRepositorySessionTest { @Test public void storeDone() throws Exception { - // Verify that storage's flush is called. - verify(bufferStorageMocked, times(0)).flush(); - bufferingSessionMocked.doStoreDonePrepare(); - verify(bufferStorageMocked, times(1)).flush(); - // Trivial case, no records to merge. - bufferingSession.doStoreDone(123L); + bufferingSession.doMergeBuffer(123L); verify(innerRepositorySession, times(1)).storeDone(123L); verify(innerRepositorySession, never()).store(any(Record.class)); @@ -109,7 +104,7 @@ public class BufferingMiddlewareRepositorySessionTest { bufferingSession.store(record4); // Done storing. - bufferingSession.doStoreDone(123L); + bufferingSession.doMergeBuffer(123L); // Ensure all records where stored in the wrapped session. verify(innerRepositorySession, times(1)).store(record); @@ -154,32 +149,6 @@ public class BufferingMiddlewareRepositorySessionTest { assertEquals(0, bufferStorage.all().size()); } - @Test - public void sourceFailed() throws Exception { - // Source failes before any records have been stored. - bufferingSession.sourceFailed(new Exception()); - assertEquals(0, bufferStorage.all().size()); - - // Store some records now. - MockRecord record = new MockRecord("guid1", null, 1, false); - bufferingSession.store(record); - - MockRecord record2 = new MockRecord("guid2", null, 13, false); - bufferingSession.store(record2); - - MockRecord record3 = new MockRecord("guid3", null, 5, false); - bufferingSession.store(record3); - - // Verify that buffer is intact after source fails. - bufferingSession.sourceFailed(new Exception()); - assertEquals(3, bufferStorage.all().size()); - - // Verify that buffer is flushed after source fails. - verify(bufferStorageMocked, times(0)).flush(); - bufferingSessionMocked.sourceFailed(new Exception()); - verify(bufferStorageMocked, times(1)).flush(); - } - @Test public void abort() throws Exception { MockRecord record = new MockRecord("guid1", null, 1, false); diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java index c72f214f1b20..3c8e92ddb266 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderDelegateTest.java @@ -11,6 +11,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mozilla.gecko.background.testhelpers.TestRunner; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.CryptoRecord; import org.mozilla.gecko.sync.HTTPFailureException; import org.mozilla.gecko.sync.InfoCollections; @@ -66,8 +67,8 @@ public class BatchingDownloaderDelegateTest { } @Override - public void onFetchFailed(final Exception ex, - final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, + public void handleFetchFailed(final RepositorySessionFetchRecordsDelegate fetchRecordsDelegate, + final Exception ex, final SyncStorageCollectionRequest request) { this.isFailure = true; this.ex = ex; @@ -173,6 +174,18 @@ public class BatchingDownloaderDelegateTest { assertFalse(mockDownloader.isFetched); } + @Test + public void testFailure412() throws Exception { + BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null, + new SyncStorageCollectionRequest(new URI(DEFAULT_COLLECTION_URL)), 0, 0, true, null, null); + SyncStorageResponse response = makeSyncStorageResponse(412, null); + downloaderDelegate.handleRequestFailure(response); + assertTrue(mockDownloader.isFailure); + assertEquals(CollectionConcurrentModificationException.class, mockDownloader.ex.getClass()); + assertFalse(mockDownloader.isSuccess); + assertFalse(mockDownloader.isFetched); + } + @Test public void testFailureRequestError() throws Exception { BatchingDownloaderDelegate downloaderDelegate = new BatchingDownloaderDelegate(mockDownloader, null, diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java index 7f82ec0f3433..89aa1ebbef4e 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/downloaders/BatchingDownloaderTest.java @@ -353,7 +353,7 @@ public class BatchingDownloaderTest { public void testFailureException() throws Exception { Exception ex = new IllegalStateException(); SyncStorageCollectionRequest request = new SyncStorageCollectionRequest(new URI("http://dummy.url")); - mockDownloader.onFetchFailed(ex, sessionFetchRecordsDelegate, request); + mockDownloader.handleFetchFailed(sessionFetchRecordsDelegate, ex, request); assertFalse(sessionFetchRecordsDelegate.isSuccess); assertFalse(sessionFetchRecordsDelegate.isFetched); diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java index 7ff38943c47a..0c632b07782f 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/BatchingUploaderTest.java @@ -18,19 +18,28 @@ import org.mozilla.gecko.sync.ExtendedJSONObject; import org.mozilla.gecko.sync.InfoCollections; import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.Utils; +import org.mozilla.gecko.sync.net.AuthHeaderProvider; +import org.mozilla.gecko.sync.repositories.RepositorySession; import org.mozilla.gecko.sync.repositories.Server15Repository; import org.mozilla.gecko.sync.repositories.Server15RepositorySession; import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @RunWith(TestRunner.class) public class BatchingUploaderTest { - class MockExecutorService implements Executor { + class MockExecutorService implements ExecutorService { int totalPayloads = 0; int commitPayloads = 0; @@ -47,6 +56,71 @@ public class BatchingUploaderTest { } command.run(); } + + @Override + public void shutdown() {} + + @NonNull + @Override + public List shutdownNow() { + return null; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return false; + } + + @NonNull + @Override + public Future submit(Callable task) { + return null; + } + + @NonNull + @Override + public Future submit(Runnable task, T result) { + return null; + } + + @NonNull + @Override + public Future submit(Runnable task) { + return null; + } + + @NonNull + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return null; + } + + @NonNull + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + return null; + } + + @NonNull + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return null; + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return null; + } } class MockStoreDelegate implements RepositorySessionStoreDelegate { @@ -69,13 +143,18 @@ public class BatchingUploaderTest { ++storeCompleted; } + @Override + public void onStoreFailed(Exception e) { + + } + @Override public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) { return null; } } - private Executor workQueue; + private ExecutorService workQueue; private RepositorySessionStoreDelegate storeDelegate; @Before @@ -442,11 +521,41 @@ public class BatchingUploaderTest { makeCountConstrainedRepository(maxPostRecords, maxTotalRecords, firstSync) ); server15RepositorySession.setStoreDelegate(storeDelegate); - return new BatchingUploader( - server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, null, + return new MockUploader( + server15RepositorySession, workQueue, storeDelegate, Uri.EMPTY, 0L, new InfoConfiguration(infoConfigurationJSON), null); } + class MockPayloadDispatcher extends PayloadDispatcher { + MockPayloadDispatcher(final Executor workQueue, final BatchingUploader uploader, Long lastModified) { + super(workQueue, uploader, lastModified); + } + + @Override + Runnable createRecordUploadRunnable(ArrayList outgoing, ArrayList outgoingGuids, long byteCount, boolean isCommit, boolean isLastPayload) { + // No-op runnable. We don't want this to actually do any work for these tests. + return new Runnable() { + @Override + public void run() {} + }; + } + } + + class MockUploader extends BatchingUploader { + MockUploader(final RepositorySession repositorySession, final ExecutorService workQueue, + final RepositorySessionStoreDelegate sessionStoreDelegate, final Uri baseCollectionUri, + final Long localCollectionLastModified, final InfoConfiguration infoConfiguration, + final AuthHeaderProvider authHeaderProvider) { + super(repositorySession, workQueue, sessionStoreDelegate, baseCollectionUri, + localCollectionLastModified, infoConfiguration, authHeaderProvider); + } + + @Override + PayloadDispatcher createPayloadDispatcher(ExecutorService workQueue, Long localCollectionLastModified) { + return new MockPayloadDispatcher(workQueue, this, localCollectionLastModified); + } + } + private Server15Repository makeCountConstrainedRepository(long maxPostRecords, long maxTotalRecords, boolean firstSync) { return makeConstrainedRepository(1024, 1024, maxPostRecords, 4096, maxTotalRecords, firstSync); } diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java index ba7ec083189f..e409f7e6aa10 100644 --- a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/repositories/uploaders/PayloadUploadDelegateTest.java @@ -3,15 +3,22 @@ package org.mozilla.gecko.sync.repositories.uploaders; +import android.net.Uri; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mozilla.gecko.background.testhelpers.TestRunner; +import org.mozilla.gecko.sync.CollectionConcurrentModificationException; import org.mozilla.gecko.sync.HTTPFailureException; +import org.mozilla.gecko.sync.InfoConfiguration; import org.mozilla.gecko.sync.NonObjectJSONException; import org.mozilla.gecko.sync.net.AuthHeaderProvider; import org.mozilla.gecko.sync.net.SyncResponse; import org.mozilla.gecko.sync.net.SyncStorageResponse; +import org.mozilla.gecko.sync.repositories.RepositorySession; +import org.mozilla.gecko.sync.repositories.RepositoryStateProvider; +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; import static org.mockito.Mockito.mock; @@ -19,6 +26,7 @@ import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import ch.boye.httpclientandroidlib.HttpResponse; import ch.boye.httpclientandroidlib.ProtocolVersion; @@ -33,6 +41,8 @@ public class PayloadUploadDelegateTest { private PayloadDispatcher payloadDispatcher; private AuthHeaderProvider authHeaderProvider; + private RepositorySessionStoreDelegate sessionStoreDelegate; + class MockPayloadDispatcher extends PayloadDispatcher { public final HashMap failedRecords = new HashMap<>(); public boolean didLastPayloadFail = false; @@ -66,6 +76,7 @@ public class PayloadUploadDelegateTest { @Override public void recordFailed(final Exception e, final String recordGuid) { + recordUploadFailed = true; failedRecords.put(recordGuid, e); } @@ -75,11 +86,44 @@ public class PayloadUploadDelegateTest { } } + class MockRepositorySessionStoreDelegate implements RepositorySessionStoreDelegate { + Exception storeFailedException; + ArrayList succeededGuids = new ArrayList<>(); + HashMap failedGuids = new HashMap<>(); + + @Override + public void onRecordStoreFailed(Exception ex, String recordGuid) { + failedGuids.put(recordGuid, ex); + } + + @Override + public void onRecordStoreSucceeded(String guid) { + succeededGuids.add(guid); + } + + @Override + public void onStoreCompleted(long storeEnd) { + + } + + @Override + public void onStoreFailed(Exception e) { + storeFailedException = e; + } + + @Override + public RepositorySessionStoreDelegate deferredStoreDelegate(ExecutorService executor) { + return null; + } + } + @Before public void setUp() throws Exception { + sessionStoreDelegate = new MockRepositorySessionStoreDelegate(); + payloadDispatcher = new MockPayloadDispatcher( null, - mock(BatchingUploader.class) + new BatchingUploader(mock(RepositorySession.class), null, sessionStoreDelegate, Uri.parse("https://example.com"), 0L, mock(InfoConfiguration.class), mock(AuthHeaderProvider.class)) ); authHeaderProvider = mock(AuthHeaderProvider.class); @@ -368,6 +412,26 @@ public class PayloadUploadDelegateTest { payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response)); assertEquals(3, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size()); assertTrue(((MockPayloadDispatcher) payloadDispatcher).didLastPayloadFail); + assertTrue(payloadDispatcher.recordUploadFailed); + } + + @Test + public void testHandleRequestFailure412() { + ArrayList postedGuids = new ArrayList<>(3); + postedGuids.add("testGuid1"); + postedGuids.add("testGuid2"); + postedGuids.add("testGuid3"); + PayloadUploadDelegate payloadUploadDelegate = new PayloadUploadDelegate(authHeaderProvider, payloadDispatcher, postedGuids, false, false); + + final HttpResponse response = new BasicHttpResponse( + new BasicStatusLine(new ProtocolVersion("HTTP", 1, 1), 412, "Precondition failed")); + payloadUploadDelegate.handleRequestFailure(new SyncStorageResponse(response)); + + assertEquals(0, ((MockPayloadDispatcher) payloadDispatcher).failedRecords.size()); + assertTrue(payloadDispatcher.recordUploadFailed); + assertTrue(payloadDispatcher.storeFailed); + + assertTrue(((MockRepositorySessionStoreDelegate) sessionStoreDelegate).storeFailedException instanceof CollectionConcurrentModificationException); } @Test