Bug 1291821 - Buffering repository middleware r=rnewman

MozReview-Commit-ID: GS3M7k670Po

--HG--
extra : rebase_source : 1b3f102b011fe171f8cafab0cf47ca69b2eb9b93
This commit is contained in:
Grisha Kruglov 2017-01-19 13:11:41 -08:00
Родитель c9b5e4ff48
Коммит 481be98952
8 изменённых файлов: 625 добавлений и 0 удалений

Просмотреть файл

@ -901,10 +901,14 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil
'sync/MetaGlobalException.java',
'sync/MetaGlobalMissingEnginesException.java',
'sync/MetaGlobalNotSetException.java',
'sync/middleware/BufferingMiddlewareRepository.java',
'sync/middleware/BufferingMiddlewareRepositorySession.java',
'sync/middleware/Crypto5MiddlewareRepository.java',
'sync/middleware/Crypto5MiddlewareRepositorySession.java',
'sync/middleware/MiddlewareRepository.java',
'sync/middleware/MiddlewareRepositorySession.java',
'sync/middleware/storage/BufferStorage.java',
'sync/middleware/storage/MemoryBufferStorage.java',
'sync/net/AbstractBearerTokenAuthHeaderProvider.java',
'sync/net/AuthHeaderProvider.java',
'sync/net/BaseResource.java',

Просмотреть файл

@ -0,0 +1,60 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.middleware;
import android.content.Context;
import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate;
/**
* A buffering-enabled middleware which is intended to wrap local repositories. Configurable with
* a sync deadline, buffer storage implementation and a consistency checker implementation.
*
* @author grisha
*/
public class BufferingMiddlewareRepository extends MiddlewareRepository {
private final long syncDeadline;
private final Repository inner;
private final BufferStorage bufferStorage;
private class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate {
private final BufferingMiddlewareRepository repository;
private final RepositorySessionCreationDelegate outerDelegate;
private BufferingMiddlewareRepositorySessionCreationDelegate(BufferingMiddlewareRepository repository, RepositorySessionCreationDelegate outerDelegate) {
this.repository = repository;
this.outerDelegate = outerDelegate;
}
@Override
public void onSessionCreateFailed(Exception ex) {
this.outerDelegate.onSessionCreateFailed(ex);
}
@Override
public void onSessionCreated(RepositorySession session) {
outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession(
session, this.repository, syncDeadline, bufferStorage
));
}
}
public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) {
this.syncDeadline = syncDeadline;
this.inner = wrappedRepository;
this.bufferStorage = bufferStore;
}
@Override
public void createSession(RepositorySessionCreationDelegate delegate, Context context) {
this.inner.createSession(
new BufferingMiddlewareRepositorySessionCreationDelegate(this, delegate),
context
);
}
}

Просмотреть файл

@ -0,0 +1,193 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.middleware;
import android.os.SystemClock;
import android.support.annotation.VisibleForTesting;
import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
import org.mozilla.gecko.sync.repositories.InactiveSessionException;
import org.mozilla.gecko.sync.repositories.NoStoreDelegateException;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Buffering middleware which is intended to wrap local RepositorySessions.
*
* Configure it:
* - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc).
*
* Fetch is pass-through, store is buffered.
*
* @author grisha
*/
/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession {
private final BufferStorage bufferStorage;
private final long syncDeadlineMillis;
private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor();
private volatile boolean storeMarkedIncomplete = false;
/* package-local */ BufferingMiddlewareRepositorySession(
RepositorySession repositorySession, MiddlewareRepository repository,
long syncDeadlineMillis, BufferStorage bufferStorage) {
super(repositorySession, repository);
this.syncDeadlineMillis = syncDeadlineMillis;
this.bufferStorage = bufferStorage;
}
@Override
public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) {
this.inner.fetchSince(timestamp, delegate);
}
@Override
public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException {
this.inner.fetch(guids, delegate);
}
@Override
public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) {
this.inner.fetchAll(delegate);
}
/**
* Will be called when this repository is acting as a `source`, and a flow of records into `sink`
* was completed. That is, we've uploaded merged records to the server, so now is a good time
* to clean up our buffer for this repository.
*/
@Override
public void performCleanup() {
bufferStorage.clear();
}
@Override
public void store(Record record) throws NoStoreDelegateException {
bufferStorage.addOrReplace(record);
}
@Override
public void storeIncomplete() {
storeMarkedIncomplete = true;
}
@Override
public void storeDone() {
storeDone(System.currentTimeMillis());
}
@Override
public void storeFlush() {
bufferStorage.flush();
}
@Override
public void storeDone(final long end) {
doStoreDonePrepare();
// Determine if we have enough time to perform consistency checks on the buffered data and
// then store it. If we don't have enough time now, we keep our buffer and try again later.
// We don't store results of a buffer consistency check anywhere, so we can't treat it
// separately from storage.
if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) {
super.abort();
storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end);
return;
}
// Separate actual merge, so that it may be tested without involving system clock.
doStoreDone(end);
}
@VisibleForTesting
public void doStoreDonePrepare() {
// Now that records stopped flowing, persist them.
bufferStorage.flush();
}
@VisibleForTesting
public void doStoreDone(final long end) {
final Collection<Record> buffer = bufferStorage.all();
// Trivial case of an empty buffer.
if (buffer.isEmpty()) {
super.storeDone(end);
return;
}
// Flush our buffer to the wrapped local repository. Data goes live!
try {
for (Record record : buffer) {
this.inner.store(record);
}
} catch (NoStoreDelegateException e) {
// At this point we should have a delegate, so this won't happen.
}
// And, we're done!
super.storeDone(end);
}
/**
* When source fails to provide more records, we need to decide what to do with the buffer.
* We might fail because of a network partition, or because of a concurrent modification of a
* collection. Either way we do not clear the buffer in a general case. If a collection has been
* modified, affected records' last-modified timestamps will be bumped, and we will receive those
* records during the next sync. If we already have them in our buffer, we replace our now-old
* copy. Otherwise, they are new records and we just append them.
*
* We depend on GUIDs to be a primary key for incoming records.
*
* @param e indicates reason of failure.
*/
@Override
public void sourceFailed(Exception e) {
bufferStorage.flush();
super.sourceFailed(e);
}
/**
* Session abnormally aborted. This doesn't mean our so-far buffered data is invalid.
* Clean up after ourselves, if there's anything to clean up.
*/
@Override
public void abort() {
bufferStorage.flush();
super.abort();
}
@Override
public void setStoreDelegate(RepositorySessionStoreDelegate delegate) {
inner.setStoreDelegate(delegate);
this.storeDelegate = delegate;
}
@Override
public long getHighWaterMarkTimestamp() {
return bufferStorage.latestModifiedTimestamp();
}
private boolean mayProceedToMergeBuffer() {
// If our buffer storage is not persistent, disallowing merging after buffer has been filled
// means throwing away records only to re-download them later.
// In this case allow merge to proceed even if we're past the deadline.
if (!bufferStorage.isPersistent()) {
return true;
}
// While actual runtime of a merge operation is a function of record type, buffer size, etc.,
// let's do a simple thing for now and say that we may proceed if we have couple of minutes
// of runtime left. That surely is enough, right?
final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime();
return timeLeftMillis > 1000 * 60 * 2;
}
}

Просмотреть файл

@ -0,0 +1,35 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.middleware.storage;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.Collection;
/**
* A contract between BufferingMiddleware and specific storage implementations.
*
* @author grisha
*/
public interface BufferStorage {
// Returns all of the records currently present in the buffer.
Collection<Record> all();
// Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace
// what's already present in the storage layer.
// NB: For a database-backed storage, "replace" happens at a transaction level.
void addOrReplace(Record record);
// For database-backed implementations, commits any records that came in up to this point.
void flush();
void clear();
// For buffers that are filled up oldest-first this is a high water mark, which enables resuming
// a sync.
long latestModifiedTimestamp();
boolean isPersistent();
}

Просмотреть файл

@ -0,0 +1,70 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package org.mozilla.gecko.sync.middleware.storage;
import org.mozilla.gecko.sync.repositories.domain.Record;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* A trivial, memory-backed, transient implementation of a BufferStorage.
* Its intended use is to buffer syncing of small collections.
* Thread-safe.
*
* @author grisha
*/
public class MemoryBufferStorage implements BufferStorage {
private final Map<String, Record> recordBuffer = Collections.synchronizedMap(new HashMap<String, Record>());
@Override
public boolean isPersistent() {
return false;
}
@Override
public Collection<Record> all() {
synchronized (recordBuffer) {
return new ArrayList<>(recordBuffer.values());
}
}
@Override
public void addOrReplace(Record record) {
recordBuffer.put(record.guid, record);
}
@Override
public void flush() {
// This is a no-op; flush intended for database-backed stores.
}
@Override
public void clear() {
recordBuffer.clear();
}
@Override
public long latestModifiedTimestamp() {
long lastModified = 0;
synchronized (recordBuffer) {
if (recordBuffer.size() == 0) {
return lastModified;
}
for (Record record : recordBuffer.values()) {
if (record.lastModified > lastModified) {
lastModified = record.lastModified;
}
}
}
return lastModified;
}
}

Просмотреть файл

@ -76,6 +76,11 @@ public abstract class RepositorySession {
return lastSyncTimestamp;
}
// Override this in the buffering wrappers.
public long getHighWaterMarkTimestamp() {
return 0;
}
public static long now() {
return System.currentTimeMillis();
}
@ -146,6 +151,27 @@ public abstract class RepositorySession {
storeWorkQueue.execute(command);
}
/**
* Indicates that a number of records have been stored, more are still to come but after some time,
* and now would be a good time to flush records and perform any other similar operations.
*/
public void storeFlush() {
}
/**
* During flow of records, indicates that source failed.
*
* @param e indicates reason of failure.
*/
public void sourceFailed(Exception e) {
}
/**
* Indicates that a flow of records have been completed.
*/
public void performCleanup() {
}
public abstract void wipe(RepositorySessionWipeDelegate delegate);
/**

Просмотреть файл

@ -248,6 +248,9 @@ public class RecordsChannel implements
public void onStoreCompleted(long storeEnd) {
Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " +
"Fetch end is " + fetchEnd + ", store end is " + storeEnd);
// Source might have used caches used to facilitate flow of records, so now is a good
// time to clean up. Particularly pertinent for buffered sources.
this.source.performCleanup();
// TODO: synchronize on consumer callback?
delegate.onFlowCompleted(this, fetchEnd, storeEnd);
}

Просмотреть файл

@ -0,0 +1,234 @@
/* Any copyright is dedicated to the Public Domain.
http://creativecommons.org/publicdomain/zero/1.0/ */
package org.mozilla.gecko.sync.middleware;
import org.junit.Before;
import org.junit.Test;
import org.mozilla.gecko.background.testhelpers.MockRecord;
import org.mozilla.gecko.sync.middleware.storage.BufferStorage;
import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage;
import org.mozilla.gecko.sync.repositories.Repository;
import org.mozilla.gecko.sync.repositories.RepositorySession;
import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate;
import org.mozilla.gecko.sync.repositories.domain.Record;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.junit.Assert.*;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class BufferingMiddlewareRepositorySessionTest {
private RepositorySession innerRepositorySession;
private BufferingMiddlewareRepositorySession bufferingSession;
private BufferingMiddlewareRepositorySession bufferingSessionMocked;
private BufferStorage bufferStorage;
private BufferStorage bufferStorageMocked;
@Before
public void setUp() throws Exception {
BufferingMiddlewareRepository bufferingRepository;
Repository innerRepositoy;
innerRepositoy = mock(Repository.class);
innerRepositorySession = mock(RepositorySession.class);
bufferingRepository = new BufferingMiddlewareRepository(
0L,
new MemoryBufferStorage(),
innerRepositoy
);
bufferStorage = new MemoryBufferStorage();
bufferStorageMocked = mock(MemoryBufferStorage.class);
bufferingSession = new BufferingMiddlewareRepositorySession(
innerRepositorySession, bufferingRepository, 0L,
bufferStorage);
bufferingSessionMocked = new BufferingMiddlewareRepositorySession(
innerRepositorySession, bufferingRepository, 0L,
bufferStorageMocked);
}
@Test
public void store() throws Exception {
assertEquals(0, bufferStorage.all().size());
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
assertEquals(1, bufferStorage.all().size());
MockRecord record1 = new MockRecord("guid2", null, 1, false);
bufferingSession.store(record1);
assertEquals(2, bufferStorage.all().size());
assertEquals(1, bufferStorage.latestModifiedTimestamp());
// record2 must replace record.
MockRecord record2 = new MockRecord("guid1", null, 2, false);
bufferingSession.store(record2);
assertEquals(2, bufferStorage.all().size());
assertEquals(2, bufferStorage.latestModifiedTimestamp());
// Ensure inner session doesn't see incoming records.
verify(innerRepositorySession, never()).store(record);
verify(innerRepositorySession, never()).store(record1);
verify(innerRepositorySession, never()).store(record2);
}
@Test
public void storeDone() throws Exception {
// Verify that storage's flush is called.
verify(bufferStorageMocked, times(0)).flush();
bufferingSessionMocked.doStoreDonePrepare();
verify(bufferStorageMocked, times(1)).flush();
// Trivial case, no records to merge.
bufferingSession.doStoreDone(123L);
verify(innerRepositorySession, times(1)).storeDone(123L);
verify(innerRepositorySession, never()).store(any(Record.class));
// Reset call counters.
reset(innerRepositorySession);
// Now store some records.
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
// NB: same guid as above.
MockRecord record4 = new MockRecord("guid3", null, -1, false);
bufferingSession.store(record4);
// Done storing.
bufferingSession.doStoreDone(123L);
// Ensure all records where stored in the wrapped session.
verify(innerRepositorySession, times(1)).store(record);
verify(innerRepositorySession, times(1)).store(record2);
verify(innerRepositorySession, times(1)).store(record4);
// Ensure storeDone was called on the wrapped session.
verify(innerRepositorySession, times(1)).storeDone(123L);
// Ensure buffer wasn't cleared on the wrapped session.
assertEquals(3, bufferStorage.all().size());
}
@Test
public void storeFlush() throws Exception {
verify(bufferStorageMocked, times(0)).flush();
bufferingSessionMocked.storeFlush();
verify(bufferStorageMocked, times(1)).flush();
}
@Test
public void performCleanup() throws Exception {
// Baseline.
assertEquals(0, bufferStorage.all().size());
// Test that we can call cleanup with an empty buffer storage.
bufferingSession.performCleanup();
assertEquals(0, bufferStorage.all().size());
// Store a couple of records.
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
// Confirm it worked.
assertEquals(2, bufferStorage.all().size());
// Test that buffer storage is cleaned up.
bufferingSession.performCleanup();
assertEquals(0, bufferStorage.all().size());
}
@Test
public void sourceFailed() throws Exception {
// Source failes before any records have been stored.
bufferingSession.sourceFailed(new Exception());
assertEquals(0, bufferStorage.all().size());
// Store some records now.
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
// Verify that buffer is intact after source fails.
bufferingSession.sourceFailed(new Exception());
assertEquals(3, bufferStorage.all().size());
// Verify that buffer is flushed after source fails.
verify(bufferStorageMocked, times(0)).flush();
bufferingSessionMocked.sourceFailed(new Exception());
verify(bufferStorageMocked, times(1)).flush();
}
@Test
public void abort() throws Exception {
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
// NB: same guid as above.
MockRecord record4 = new MockRecord("guid3", null, -1, false);
bufferingSession.store(record4);
bufferingSession.abort();
// Verify number of records didn't change.
// Abort shouldn't clear the buffer.
assertEquals(3, bufferStorage.all().size());
}
@Test
public void setStoreDelegate() throws Exception {
RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class);
bufferingSession.setStoreDelegate(delegate);
verify(innerRepositorySession).setStoreDelegate(delegate);
}
@Test
public void getHighWaterMarkTimestamp() throws Exception {
// Trivial case, empty buffer.
assertEquals(0, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record = new MockRecord("guid1", null, 1, false);
bufferingSession.store(record);
assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record3 = new MockRecord("guid3", null, 5, false);
bufferingSession.store(record3);
assertEquals(5, bufferingSession.getHighWaterMarkTimestamp());
// NB: same guid as above.
MockRecord record4 = new MockRecord("guid3", null, -1, false);
bufferingSession.store(record4);
assertEquals(1, bufferingSession.getHighWaterMarkTimestamp());
MockRecord record2 = new MockRecord("guid2", null, 13, false);
bufferingSession.store(record2);
assertEquals(13, bufferingSession.getHighWaterMarkTimestamp());
}
}