From 481be98952b085d93c9124b9bb7ffc819a0721d8 Mon Sep 17 00:00:00 2001 From: Grisha Kruglov Date: Thu, 19 Jan 2017 13:11:41 -0800 Subject: [PATCH] Bug 1291821 - Buffering repository middleware r=rnewman MozReview-Commit-ID: GS3M7k670Po --HG-- extra : rebase_source : 1b3f102b011fe171f8cafab0cf47ca69b2eb9b93 --- mobile/android/base/android-services.mozbuild | 4 + .../BufferingMiddlewareRepository.java | 60 +++++ .../BufferingMiddlewareRepositorySession.java | 193 +++++++++++++++ .../middleware/storage/BufferStorage.java | 35 +++ .../storage/MemoryBufferStorage.java | 70 ++++++ .../sync/repositories/RepositorySession.java | 26 ++ .../sync/synchronizer/RecordsChannel.java | 3 + ...feringMiddlewareRepositorySessionTest.java | 234 ++++++++++++++++++ 8 files changed, 625 insertions(+) create mode 100644 mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java create mode 100644 mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java create mode 100644 mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java create mode 100644 mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java create mode 100644 mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java diff --git a/mobile/android/base/android-services.mozbuild b/mobile/android/base/android-services.mozbuild index 9b6a70c226a4..5396029e4401 100644 --- a/mobile/android/base/android-services.mozbuild +++ b/mobile/android/base/android-services.mozbuild @@ -901,10 +901,14 @@ sync_java_files = [TOPSRCDIR + '/mobile/android/services/src/main/java/org/mozil 'sync/MetaGlobalException.java', 'sync/MetaGlobalMissingEnginesException.java', 'sync/MetaGlobalNotSetException.java', + 'sync/middleware/BufferingMiddlewareRepository.java', + 'sync/middleware/BufferingMiddlewareRepositorySession.java', 'sync/middleware/Crypto5MiddlewareRepository.java', 'sync/middleware/Crypto5MiddlewareRepositorySession.java', 'sync/middleware/MiddlewareRepository.java', 'sync/middleware/MiddlewareRepositorySession.java', + 'sync/middleware/storage/BufferStorage.java', + 'sync/middleware/storage/MemoryBufferStorage.java', 'sync/net/AbstractBearerTokenAuthHeaderProvider.java', 'sync/net/AuthHeaderProvider.java', 'sync/net/BaseResource.java', diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java new file mode 100644 index 000000000000..186b6d7a25a7 --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepository.java @@ -0,0 +1,60 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync.middleware; + +import android.content.Context; + +import org.mozilla.gecko.sync.middleware.storage.BufferStorage; +import org.mozilla.gecko.sync.repositories.Repository; +import org.mozilla.gecko.sync.repositories.RepositorySession; +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionCreationDelegate; + +/** + * A buffering-enabled middleware which is intended to wrap local repositories. Configurable with + * a sync deadline, buffer storage implementation and a consistency checker implementation. + * + * @author grisha + */ +public class BufferingMiddlewareRepository extends MiddlewareRepository { + private final long syncDeadline; + private final Repository inner; + private final BufferStorage bufferStorage; + + private class BufferingMiddlewareRepositorySessionCreationDelegate extends MiddlewareRepository.SessionCreationDelegate { + private final BufferingMiddlewareRepository repository; + private final RepositorySessionCreationDelegate outerDelegate; + + private BufferingMiddlewareRepositorySessionCreationDelegate(BufferingMiddlewareRepository repository, RepositorySessionCreationDelegate outerDelegate) { + this.repository = repository; + this.outerDelegate = outerDelegate; + } + + @Override + public void onSessionCreateFailed(Exception ex) { + this.outerDelegate.onSessionCreateFailed(ex); + } + + @Override + public void onSessionCreated(RepositorySession session) { + outerDelegate.onSessionCreated(new BufferingMiddlewareRepositorySession( + session, this.repository, syncDeadline, bufferStorage + )); + } + } + + public BufferingMiddlewareRepository(long syncDeadline, BufferStorage bufferStore, Repository wrappedRepository) { + this.syncDeadline = syncDeadline; + this.inner = wrappedRepository; + this.bufferStorage = bufferStore; + } + + @Override + public void createSession(RepositorySessionCreationDelegate delegate, Context context) { + this.inner.createSession( + new BufferingMiddlewareRepositorySessionCreationDelegate(this, delegate), + context + ); + } +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java new file mode 100644 index 000000000000..9bb640d5d53c --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySession.java @@ -0,0 +1,193 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync.middleware; + +import android.os.SystemClock; +import android.support.annotation.VisibleForTesting; + +import org.mozilla.gecko.sync.middleware.storage.BufferStorage; +import org.mozilla.gecko.sync.repositories.InactiveSessionException; +import org.mozilla.gecko.sync.repositories.NoStoreDelegateException; +import org.mozilla.gecko.sync.repositories.RepositorySession; +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionFetchRecordsDelegate; +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; +import org.mozilla.gecko.sync.repositories.domain.Record; + +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Buffering middleware which is intended to wrap local RepositorySessions. + * + * Configure it: + * - with an appropriate BufferStore (in-memory, record-type-aware database-backed, etc). + * + * Fetch is pass-through, store is buffered. + * + * @author grisha + */ +/* package-local */ class BufferingMiddlewareRepositorySession extends MiddlewareRepositorySession { + private final BufferStorage bufferStorage; + private final long syncDeadlineMillis; + + private ExecutorService storeDelegateExecutor = Executors.newSingleThreadExecutor(); + + private volatile boolean storeMarkedIncomplete = false; + + /* package-local */ BufferingMiddlewareRepositorySession( + RepositorySession repositorySession, MiddlewareRepository repository, + long syncDeadlineMillis, BufferStorage bufferStorage) { + super(repositorySession, repository); + this.syncDeadlineMillis = syncDeadlineMillis; + this.bufferStorage = bufferStorage; + } + + @Override + public void fetchSince(long timestamp, RepositorySessionFetchRecordsDelegate delegate) { + this.inner.fetchSince(timestamp, delegate); + } + + @Override + public void fetch(String[] guids, RepositorySessionFetchRecordsDelegate delegate) throws InactiveSessionException { + this.inner.fetch(guids, delegate); + } + + @Override + public void fetchAll(RepositorySessionFetchRecordsDelegate delegate) { + this.inner.fetchAll(delegate); + } + + /** + * Will be called when this repository is acting as a `source`, and a flow of records into `sink` + * was completed. That is, we've uploaded merged records to the server, so now is a good time + * to clean up our buffer for this repository. + */ + @Override + public void performCleanup() { + bufferStorage.clear(); + } + + @Override + public void store(Record record) throws NoStoreDelegateException { + bufferStorage.addOrReplace(record); + } + + @Override + public void storeIncomplete() { + storeMarkedIncomplete = true; + } + + @Override + public void storeDone() { + storeDone(System.currentTimeMillis()); + } + + @Override + public void storeFlush() { + bufferStorage.flush(); + } + + @Override + public void storeDone(final long end) { + doStoreDonePrepare(); + + // Determine if we have enough time to perform consistency checks on the buffered data and + // then store it. If we don't have enough time now, we keep our buffer and try again later. + // We don't store results of a buffer consistency check anywhere, so we can't treat it + // separately from storage. + if (storeMarkedIncomplete || !mayProceedToMergeBuffer()) { + super.abort(); + storeDelegate.deferredStoreDelegate(storeDelegateExecutor).onStoreCompleted(end); + return; + } + + // Separate actual merge, so that it may be tested without involving system clock. + doStoreDone(end); + } + + @VisibleForTesting + public void doStoreDonePrepare() { + // Now that records stopped flowing, persist them. + bufferStorage.flush(); + } + + @VisibleForTesting + public void doStoreDone(final long end) { + final Collection buffer = bufferStorage.all(); + + // Trivial case of an empty buffer. + if (buffer.isEmpty()) { + super.storeDone(end); + return; + } + + // Flush our buffer to the wrapped local repository. Data goes live! + try { + for (Record record : buffer) { + this.inner.store(record); + } + } catch (NoStoreDelegateException e) { + // At this point we should have a delegate, so this won't happen. + } + + // And, we're done! + super.storeDone(end); + } + + /** + * When source fails to provide more records, we need to decide what to do with the buffer. + * We might fail because of a network partition, or because of a concurrent modification of a + * collection. Either way we do not clear the buffer in a general case. If a collection has been + * modified, affected records' last-modified timestamps will be bumped, and we will receive those + * records during the next sync. If we already have them in our buffer, we replace our now-old + * copy. Otherwise, they are new records and we just append them. + * + * We depend on GUIDs to be a primary key for incoming records. + * + * @param e indicates reason of failure. + */ + @Override + public void sourceFailed(Exception e) { + bufferStorage.flush(); + super.sourceFailed(e); + } + + /** + * Session abnormally aborted. This doesn't mean our so-far buffered data is invalid. + * Clean up after ourselves, if there's anything to clean up. + */ + @Override + public void abort() { + bufferStorage.flush(); + super.abort(); + } + + @Override + public void setStoreDelegate(RepositorySessionStoreDelegate delegate) { + inner.setStoreDelegate(delegate); + this.storeDelegate = delegate; + } + + @Override + public long getHighWaterMarkTimestamp() { + return bufferStorage.latestModifiedTimestamp(); + } + + private boolean mayProceedToMergeBuffer() { + // If our buffer storage is not persistent, disallowing merging after buffer has been filled + // means throwing away records only to re-download them later. + // In this case allow merge to proceed even if we're past the deadline. + if (!bufferStorage.isPersistent()) { + return true; + } + + // While actual runtime of a merge operation is a function of record type, buffer size, etc., + // let's do a simple thing for now and say that we may proceed if we have couple of minutes + // of runtime left. That surely is enough, right? + final long timeLeftMillis = syncDeadlineMillis - SystemClock.elapsedRealtime(); + return timeLeftMillis > 1000 * 60 * 2; + } +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java new file mode 100644 index 000000000000..5f40f107d1d6 --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/BufferStorage.java @@ -0,0 +1,35 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync.middleware.storage; + +import org.mozilla.gecko.sync.repositories.domain.Record; + +import java.util.Collection; + +/** + * A contract between BufferingMiddleware and specific storage implementations. + * + * @author grisha + */ +public interface BufferStorage { + // Returns all of the records currently present in the buffer. + Collection all(); + + // Implementations are responsible to ensure that any incoming records with duplicate GUIDs replace + // what's already present in the storage layer. + // NB: For a database-backed storage, "replace" happens at a transaction level. + void addOrReplace(Record record); + + // For database-backed implementations, commits any records that came in up to this point. + void flush(); + + void clear(); + + // For buffers that are filled up oldest-first this is a high water mark, which enables resuming + // a sync. + long latestModifiedTimestamp(); + + boolean isPersistent(); +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java new file mode 100644 index 000000000000..e156614379ee --- /dev/null +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/middleware/storage/MemoryBufferStorage.java @@ -0,0 +1,70 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +package org.mozilla.gecko.sync.middleware.storage; + +import org.mozilla.gecko.sync.repositories.domain.Record; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A trivial, memory-backed, transient implementation of a BufferStorage. + * Its intended use is to buffer syncing of small collections. + * Thread-safe. + * + * @author grisha + */ +public class MemoryBufferStorage implements BufferStorage { + private final Map recordBuffer = Collections.synchronizedMap(new HashMap()); + + @Override + public boolean isPersistent() { + return false; + } + + @Override + public Collection all() { + synchronized (recordBuffer) { + return new ArrayList<>(recordBuffer.values()); + } + } + + @Override + public void addOrReplace(Record record) { + recordBuffer.put(record.guid, record); + } + + @Override + public void flush() { + // This is a no-op; flush intended for database-backed stores. + } + + @Override + public void clear() { + recordBuffer.clear(); + } + + @Override + public long latestModifiedTimestamp() { + long lastModified = 0; + + synchronized (recordBuffer) { + if (recordBuffer.size() == 0) { + return lastModified; + } + + for (Record record : recordBuffer.values()) { + if (record.lastModified > lastModified) { + lastModified = record.lastModified; + } + } + } + + return lastModified; + } +} diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java index 0029d793ddd9..948a3db4e556 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/repositories/RepositorySession.java @@ -76,6 +76,11 @@ public abstract class RepositorySession { return lastSyncTimestamp; } + // Override this in the buffering wrappers. + public long getHighWaterMarkTimestamp() { + return 0; + } + public static long now() { return System.currentTimeMillis(); } @@ -146,6 +151,27 @@ public abstract class RepositorySession { storeWorkQueue.execute(command); } + /** + * Indicates that a number of records have been stored, more are still to come but after some time, + * and now would be a good time to flush records and perform any other similar operations. + */ + public void storeFlush() { + } + + /** + * During flow of records, indicates that source failed. + * + * @param e indicates reason of failure. + */ + public void sourceFailed(Exception e) { + } + + /** + * Indicates that a flow of records have been completed. + */ + public void performCleanup() { + } + public abstract void wipe(RepositorySessionWipeDelegate delegate); /** diff --git a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java index cafeae8ca771..2f52936ccdbb 100644 --- a/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java +++ b/mobile/android/services/src/main/java/org/mozilla/gecko/sync/synchronizer/RecordsChannel.java @@ -248,6 +248,9 @@ public class RecordsChannel implements public void onStoreCompleted(long storeEnd) { Logger.trace(LOG_TAG, "onStoreCompleted. Notifying delegate of onFlowCompleted. " + "Fetch end is " + fetchEnd + ", store end is " + storeEnd); + // Source might have used caches used to facilitate flow of records, so now is a good + // time to clean up. Particularly pertinent for buffered sources. + this.source.performCleanup(); // TODO: synchronize on consumer callback? delegate.onFlowCompleted(this, fetchEnd, storeEnd); } diff --git a/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java new file mode 100644 index 000000000000..07ae13d791f0 --- /dev/null +++ b/mobile/android/tests/background/junit4/src/org/mozilla/gecko/sync/middleware/BufferingMiddlewareRepositorySessionTest.java @@ -0,0 +1,234 @@ +/* Any copyright is dedicated to the Public Domain. + http://creativecommons.org/publicdomain/zero/1.0/ */ + +package org.mozilla.gecko.sync.middleware; + +import org.junit.Before; +import org.junit.Test; +import org.mozilla.gecko.background.testhelpers.MockRecord; +import org.mozilla.gecko.sync.middleware.storage.BufferStorage; +import org.mozilla.gecko.sync.middleware.storage.MemoryBufferStorage; +import org.mozilla.gecko.sync.repositories.Repository; +import org.mozilla.gecko.sync.repositories.RepositorySession; +import org.mozilla.gecko.sync.repositories.delegates.RepositorySessionStoreDelegate; +import org.mozilla.gecko.sync.repositories.domain.Record; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class BufferingMiddlewareRepositorySessionTest { + private RepositorySession innerRepositorySession; + private BufferingMiddlewareRepositorySession bufferingSession; + private BufferingMiddlewareRepositorySession bufferingSessionMocked; + private BufferStorage bufferStorage; + private BufferStorage bufferStorageMocked; + + @Before + public void setUp() throws Exception { + BufferingMiddlewareRepository bufferingRepository; + Repository innerRepositoy; + + innerRepositoy = mock(Repository.class); + innerRepositorySession = mock(RepositorySession.class); + bufferingRepository = new BufferingMiddlewareRepository( + 0L, + new MemoryBufferStorage(), + innerRepositoy + ); + + bufferStorage = new MemoryBufferStorage(); + bufferStorageMocked = mock(MemoryBufferStorage.class); + + bufferingSession = new BufferingMiddlewareRepositorySession( + innerRepositorySession, bufferingRepository, 0L, + bufferStorage); + + bufferingSessionMocked = new BufferingMiddlewareRepositorySession( + innerRepositorySession, bufferingRepository, 0L, + bufferStorageMocked); + } + + @Test + public void store() throws Exception { + assertEquals(0, bufferStorage.all().size()); + + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + assertEquals(1, bufferStorage.all().size()); + + MockRecord record1 = new MockRecord("guid2", null, 1, false); + bufferingSession.store(record1); + assertEquals(2, bufferStorage.all().size()); + assertEquals(1, bufferStorage.latestModifiedTimestamp()); + + // record2 must replace record. + MockRecord record2 = new MockRecord("guid1", null, 2, false); + bufferingSession.store(record2); + assertEquals(2, bufferStorage.all().size()); + assertEquals(2, bufferStorage.latestModifiedTimestamp()); + + // Ensure inner session doesn't see incoming records. + verify(innerRepositorySession, never()).store(record); + verify(innerRepositorySession, never()).store(record1); + verify(innerRepositorySession, never()).store(record2); + } + + @Test + public void storeDone() throws Exception { + // Verify that storage's flush is called. + verify(bufferStorageMocked, times(0)).flush(); + bufferingSessionMocked.doStoreDonePrepare(); + verify(bufferStorageMocked, times(1)).flush(); + + // Trivial case, no records to merge. + bufferingSession.doStoreDone(123L); + verify(innerRepositorySession, times(1)).storeDone(123L); + verify(innerRepositorySession, never()).store(any(Record.class)); + + // Reset call counters. + reset(innerRepositorySession); + + // Now store some records. + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + + MockRecord record2 = new MockRecord("guid2", null, 13, false); + bufferingSession.store(record2); + + MockRecord record3 = new MockRecord("guid3", null, 5, false); + bufferingSession.store(record3); + + // NB: same guid as above. + MockRecord record4 = new MockRecord("guid3", null, -1, false); + bufferingSession.store(record4); + + // Done storing. + bufferingSession.doStoreDone(123L); + + // Ensure all records where stored in the wrapped session. + verify(innerRepositorySession, times(1)).store(record); + verify(innerRepositorySession, times(1)).store(record2); + verify(innerRepositorySession, times(1)).store(record4); + + // Ensure storeDone was called on the wrapped session. + verify(innerRepositorySession, times(1)).storeDone(123L); + + // Ensure buffer wasn't cleared on the wrapped session. + assertEquals(3, bufferStorage.all().size()); + } + + @Test + public void storeFlush() throws Exception { + verify(bufferStorageMocked, times(0)).flush(); + bufferingSessionMocked.storeFlush(); + verify(bufferStorageMocked, times(1)).flush(); + } + + @Test + public void performCleanup() throws Exception { + // Baseline. + assertEquals(0, bufferStorage.all().size()); + + // Test that we can call cleanup with an empty buffer storage. + bufferingSession.performCleanup(); + assertEquals(0, bufferStorage.all().size()); + + // Store a couple of records. + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + + MockRecord record2 = new MockRecord("guid2", null, 13, false); + bufferingSession.store(record2); + + // Confirm it worked. + assertEquals(2, bufferStorage.all().size()); + + // Test that buffer storage is cleaned up. + bufferingSession.performCleanup(); + assertEquals(0, bufferStorage.all().size()); + } + + @Test + public void sourceFailed() throws Exception { + // Source failes before any records have been stored. + bufferingSession.sourceFailed(new Exception()); + assertEquals(0, bufferStorage.all().size()); + + // Store some records now. + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + + MockRecord record2 = new MockRecord("guid2", null, 13, false); + bufferingSession.store(record2); + + MockRecord record3 = new MockRecord("guid3", null, 5, false); + bufferingSession.store(record3); + + // Verify that buffer is intact after source fails. + bufferingSession.sourceFailed(new Exception()); + assertEquals(3, bufferStorage.all().size()); + + // Verify that buffer is flushed after source fails. + verify(bufferStorageMocked, times(0)).flush(); + bufferingSessionMocked.sourceFailed(new Exception()); + verify(bufferStorageMocked, times(1)).flush(); + } + + @Test + public void abort() throws Exception { + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + + MockRecord record2 = new MockRecord("guid2", null, 13, false); + bufferingSession.store(record2); + + MockRecord record3 = new MockRecord("guid3", null, 5, false); + bufferingSession.store(record3); + + // NB: same guid as above. + MockRecord record4 = new MockRecord("guid3", null, -1, false); + bufferingSession.store(record4); + + bufferingSession.abort(); + + // Verify number of records didn't change. + // Abort shouldn't clear the buffer. + assertEquals(3, bufferStorage.all().size()); + } + + @Test + public void setStoreDelegate() throws Exception { + RepositorySessionStoreDelegate delegate = mock(RepositorySessionStoreDelegate.class); + bufferingSession.setStoreDelegate(delegate); + verify(innerRepositorySession).setStoreDelegate(delegate); + } + + @Test + public void getHighWaterMarkTimestamp() throws Exception { + // Trivial case, empty buffer. + assertEquals(0, bufferingSession.getHighWaterMarkTimestamp()); + + MockRecord record = new MockRecord("guid1", null, 1, false); + bufferingSession.store(record); + assertEquals(1, bufferingSession.getHighWaterMarkTimestamp()); + + MockRecord record3 = new MockRecord("guid3", null, 5, false); + bufferingSession.store(record3); + assertEquals(5, bufferingSession.getHighWaterMarkTimestamp()); + + // NB: same guid as above. + MockRecord record4 = new MockRecord("guid3", null, -1, false); + bufferingSession.store(record4); + assertEquals(1, bufferingSession.getHighWaterMarkTimestamp()); + + MockRecord record2 = new MockRecord("guid2", null, 13, false); + bufferingSession.store(record2); + assertEquals(13, bufferingSession.getHighWaterMarkTimestamp()); + } +} \ No newline at end of file