Refactored webext-storage database close function

This commit is contained in:
lougeniac64 2024-10-21 18:58:06 -04:00 коммит произвёл lougeniaC64
Родитель 1cb0a99432
Коммит 39c34efb3c
10 изменённых файлов: 257 добавлений и 181 удалений

Просмотреть файл

@ -438,8 +438,9 @@ mod tests {
#[test]
fn test_simple() -> Result<()> {
let ext_id = "x";
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
// an empty store.
for q in vec![JsonValue::Null, json!("foo"), json!(["foo"])].into_iter() {
@ -529,8 +530,9 @@ mod tests {
fn test_check_get_impl() -> Result<()> {
// This is a port of checkGetImpl in test_ext_storage.js in Desktop.
let ext_id = "x";
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let prop = "test-prop";
let value = "test-value";
@ -584,8 +586,9 @@ mod tests {
fn test_bug_1621162() -> Result<()> {
// apparently Firefox, unlike Chrome, will not optimize the changes.
// See bug 1621162 for more!
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let ext_id = "xyz";
set(&tx, ext_id, json!({"foo": "bar" }))?;
@ -599,8 +602,9 @@ mod tests {
#[test]
fn test_quota_maxitems() -> Result<()> {
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let ext_id = "xyz";
for i in 1..SYNC_MAX_ITEMS + 1 {
set(
@ -619,8 +623,9 @@ mod tests {
#[test]
fn test_quota_bytesperitem() -> Result<()> {
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let ext_id = "xyz";
// A string 5 bytes less than the max. This should be counted as being
// 3 bytes less than the max as the quotes are counted. Plus the length
@ -645,8 +650,9 @@ mod tests {
#[test]
fn test_quota_bytes() -> Result<()> {
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let ext_id = "xyz";
let val = "x".repeat(SYNC_QUOTA_BYTES + 1);
@ -682,8 +688,9 @@ mod tests {
#[test]
fn test_get_bytes_in_use() -> Result<()> {
let mut db = new_mem_db();
let tx = db.transaction()?;
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let ext_id = "xyz";
assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 0);
@ -714,8 +721,9 @@ mod tests {
#[test]
fn test_usage() {
let mut db = new_mem_db();
let tx = db.transaction().unwrap();
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction().unwrap();
// '{"a":"a","b":"bb","c":"ccc","n":999999}': 39 bytes
set(&tx, "xyz", json!({ "a": "a" })).unwrap();
set(&tx, "xyz", json!({ "b": "bb" })).unwrap();
@ -727,7 +735,7 @@ mod tests {
tx.commit().unwrap();
let usage = usage(&db).unwrap();
let usage = usage(conn).unwrap();
let expect = [
UsageInfo {
ext_id: "abc".to_string(),

Просмотреть файл

@ -11,7 +11,7 @@ use rusqlite::Connection;
use rusqlite::OpenFlags;
use sql_support::open_database::open_database_with_flags;
use sql_support::ConnExt;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use url::Url;
@ -23,10 +23,16 @@ use url::Url;
///
/// We only support a single writer connection - so that's the only thing we
/// store. It's still a bit overkill, but there's only so many yaks in a day.
pub enum WebExtStorageDb {
Open(Connection),
Closed,
}
pub struct StorageDb {
writer: Connection,
pub writer: WebExtStorageDb,
interrupt_handle: Arc<SqlInterruptHandle>,
}
impl StorageDb {
/// Create a new, or fetch an already open, StorageDb backed by a file on disk.
pub fn new(db_path: impl AsRef<Path>) -> Result<Self> {
@ -54,7 +60,7 @@ impl StorageDb {
let conn = open_database_with_flags(db_path, flags, &schema::WebExtMigrationLogin)?;
Ok(Self {
interrupt_handle: Arc::new(SqlInterruptHandle::new(&conn)),
writer: conn,
writer: WebExtStorageDb::Open(conn),
})
}
@ -73,29 +79,20 @@ impl StorageDb {
/// underlying connection so the caller can retry but (a) that's very tricky
/// in an Arc<Mutex<>> world and (b) we never actually took advantage of
/// that retry capability.
pub fn close(self) -> Result<()> {
self.writer.close().map_err(|(writer, err)| {
// In rusqlite 0.28.0 and earlier, if we just let `writer` drop,
// the close would panic on failure.
// Later rusqlite versions will not panic, but this behavior doesn't
// hurt there.
std::mem::forget(writer);
err.into()
})
pub fn close(&mut self) -> Result<()> {
let conn = match std::mem::replace(&mut self.writer, WebExtStorageDb::Closed) {
WebExtStorageDb::Open(conn) => conn,
WebExtStorageDb::Closed => return Ok(()),
};
conn.close().map_err(|(_, y)| Error::SqlError(y))
}
}
impl Deref for StorageDb {
type Target = Connection;
fn deref(&self) -> &Self::Target {
&self.writer
}
}
impl DerefMut for StorageDb {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.writer
pub(crate) fn get_connection(&self) -> Result<&Connection> {
let db = &self.writer;
match db {
WebExtStorageDb::Open(y) => Ok(y),
WebExtStorageDb::Closed => Err(Error::DatabaseConnectionClosed),
}
}
}
@ -290,12 +287,13 @@ mod tests {
#[test]
fn test_meta() -> Result<()> {
let writer = new_mem_db();
assert_eq!(get_meta::<String>(&writer, "foo")?, None);
put_meta(&writer, "foo", &"bar".to_string())?;
assert_eq!(get_meta(&writer, "foo")?, Some("bar".to_string()));
delete_meta(&writer, "foo")?;
assert_eq!(get_meta::<String>(&writer, "foo")?, None);
let db = new_mem_db();
let conn = &db.get_connection()?;
assert_eq!(get_meta::<String>(conn, "foo")?, None);
put_meta(conn, "foo", &"bar".to_string())?;
assert_eq!(get_meta(conn, "foo")?, Some("bar".to_string()));
delete_meta(conn, "foo")?;
assert_eq!(get_meta::<String>(conn, "foo")?, None);
Ok(())
}
}

Просмотреть файл

@ -346,8 +346,9 @@ mod tests {
init_source_db(path, f);
// now migrate
let mut db = new_mem_db();
let tx = db.transaction().expect("tx should work");
let db = new_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction().expect("tx should work");
let mi = migrate(&tx, &tmpdir.path().join("source.db")).expect("migrate should work");
tx.commit().expect("should work");
@ -384,17 +385,18 @@ mod tests {
#[test]
fn test_happy_paths() {
// some real data.
let conn = do_migrate(HAPPY_PATH_MIGRATION_INFO, |c| {
let db = do_migrate(HAPPY_PATH_MIGRATION_INFO, |c| {
c.execute_batch(HAPPY_PATH_SQL).expect("should populate")
});
let conn = db.get_connection().expect("should retrieve connection");
assert_has(
&conn,
conn,
"{e7fefcf3-b39c-4f17-5215-ebfe120a7031}",
json!({"userWelcomed": 1570659224457u64, "isWho": "4ec8109f"}),
);
assert_has(
&conn,
conn,
"https-everywhere@eff.org",
json!({"userRules": [], "ruleActiveStates": {}, "migration_version": 2}),
);

Просмотреть файл

@ -94,22 +94,24 @@ mod tests {
#[test]
fn test_create_schema_twice() {
let db = new_mem_db();
db.execute_batch(CREATE_SCHEMA_SQL)
let conn = db.get_connection().expect("should retrieve connection");
conn.execute_batch(CREATE_SCHEMA_SQL)
.expect("should allow running twice");
}
#[test]
fn test_create_empty_sync_temp_tables_twice() {
let db = new_mem_db();
create_empty_sync_temp_tables(&db).expect("should work first time");
let conn = db.get_connection().expect("should retrieve connection");
create_empty_sync_temp_tables(conn).expect("should work first time");
// insert something into our new temp table and check it's there.
db.execute_batch(
conn.execute_batch(
"INSERT INTO temp.storage_sync_staging
(guid, ext_id) VALUES
('guid', 'ext_id');",
)
.expect("should work once");
let count = db
let count = conn
.query_row_and_then(
"SELECT COUNT(*) FROM temp.storage_sync_staging;",
[],
@ -119,9 +121,9 @@ mod tests {
assert_eq!(count, 1, "should be one row");
// re-execute
create_empty_sync_temp_tables(&db).expect("should second first time");
create_empty_sync_temp_tables(conn).expect("should second first time");
// and it should have deleted existing data.
let count = db
let count = conn
.query_row_and_then(
"SELECT COUNT(*) FROM temp.storage_sync_staging;",
[],

Просмотреть файл

@ -59,8 +59,9 @@ impl WebExtStorageStore {
/// Sets one or more JSON key-value pairs for an extension ID. Returns a
/// list of changes, with existing and new values for each key in `val`.
pub fn set(&self, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let db = &self.db.lock();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let result = api::set(&tx, ext_id, val)?;
tx.commit()?;
Ok(result)
@ -68,8 +69,9 @@ impl WebExtStorageStore {
/// Returns information about per-extension usage
pub fn usage(&self) -> Result<Vec<crate::UsageInfo>> {
let db = self.db.lock();
api::usage(&db)
let db = &self.db.lock();
let conn = db.get_connection()?;
api::usage(conn)
}
/// Returns the values for one or more keys `keys` can be:
@ -90,8 +92,9 @@ impl WebExtStorageStore {
/// `serde_json::Value::Object`).
pub fn get(&self, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
// Don't care about transactions here.
let db = self.db.lock();
api::get(&db, ext_id, keys)
let db = &self.db.lock();
let conn = db.get_connection()?;
api::get(conn, ext_id, keys)
}
/// Deletes the values for one or more keys. As with `get`, `keys` can be
@ -99,8 +102,9 @@ impl WebExtStorageStore {
/// of changes, where each change contains the old value for each deleted
/// key.
pub fn remove(&self, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let db = &self.db.lock();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let result = api::remove(&tx, ext_id, keys)?;
tx.commit()?;
Ok(result)
@ -110,8 +114,9 @@ impl WebExtStorageStore {
/// a list of changes, where each change contains the old value for each
/// deleted key.
pub fn clear(&self, ext_id: &str) -> Result<StorageChanges> {
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let db = &self.db.lock();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let result = api::clear(&tx, ext_id)?;
tx.commit()?;
Ok(result)
@ -120,8 +125,9 @@ impl WebExtStorageStore {
/// Returns the bytes in use for the specified items (which can be null,
/// a string, or an array)
pub fn get_bytes_in_use(&self, ext_id: &str, keys: JsonValue) -> Result<usize> {
let db = self.db.lock();
api::get_bytes_in_use(&db, ext_id, keys)
let db = &self.db.lock();
let conn = db.get_connection()?;
api::get_bytes_in_use(conn, ext_id, keys)
}
/// Returns a bridged sync engine for Desktop for this store.
@ -135,8 +141,8 @@ impl WebExtStorageStore {
// Even though this consumes `self`, the fact we use an Arc<> means
// we can't guarantee we can actually consume the inner DB - so do
// the best we can.
let shared: ThreadSafeStorageDb = match Arc::try_unwrap(self.db) {
Ok(shared) => shared,
let shared: ThreadSafeStorageDb = match Arc::into_inner(self.db) {
Some(shared) => shared,
_ => {
// The only way this is possible is if the sync engine has an operation
// running - but that shouldn't be possible in practice because desktop
@ -157,7 +163,7 @@ impl WebExtStorageStore {
}
};
// consume the mutex and get back the inner.
let db = shared.into_inner();
let mut db = shared.into_inner();
db.close()
}
@ -177,12 +183,13 @@ impl WebExtStorageStore {
///
/// Note that `filename` isn't normalized or canonicalized.
pub fn migrate(&self, filename: impl AsRef<Path>) -> Result<()> {
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let db = &self.db.lock();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let result = migrate(&tx, filename.as_ref())?;
tx.commit()?;
// Failing to store this information should not cause migration failure.
if let Err(e) = result.store(&db) {
if let Err(e) = result.store(conn) {
debug_assert!(false, "Migration error: {:?}", e);
log::warn!("Failed to record migration telmetry: {}", e);
}
@ -192,8 +199,9 @@ impl WebExtStorageStore {
/// Read-and-delete (e.g. `take` in rust parlance, see Option::take)
/// operation for any MigrationInfo stored in this database.
pub fn take_migration_info(&self) -> Result<Option<MigrationInfo>> {
let db = self.db.lock();
let tx = db.unchecked_transaction()?;
let db = &self.db.lock();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let result = MigrationInfo::take(&tx)?;
tx.commit()?;
Ok(result)

Просмотреть файл

@ -58,26 +58,30 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
fn last_sync(&self) -> Result<i64> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
Ok(get_meta(&db, LAST_SYNC_META_KEY)?.unwrap_or(0))
let conn = db.get_connection()?;
Ok(get_meta(conn, LAST_SYNC_META_KEY)?.unwrap_or(0))
}
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
put_meta(&db, LAST_SYNC_META_KEY, &last_sync_millis)?;
let conn = db.get_connection()?;
put_meta(conn, LAST_SYNC_META_KEY, &last_sync_millis)?;
Ok(())
}
fn sync_id(&self) -> Result<Option<String>> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
Ok(get_meta(&db, SYNC_ID_META_KEY)?)
let conn = db.get_connection()?;
Ok(get_meta(conn, SYNC_ID_META_KEY)?)
}
fn reset_sync_id(&self) -> Result<String> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let new_id = SyncGuid::random().to_string();
self.do_reset(&tx)?;
put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
@ -88,11 +92,13 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let current: Option<String> = get_meta(&db, SYNC_ID_META_KEY)?;
let conn = db.get_connection()?;
let current: Option<String> = get_meta(conn, SYNC_ID_META_KEY)?;
Ok(match current {
Some(current) if current == sync_id => current,
_ => {
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
self.do_reset(&tx)?;
let result = sync_id.to_string();
put_meta(&tx, SYNC_ID_META_KEY, &result)?;
@ -105,7 +111,8 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
fn sync_started(&self) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
schema::create_empty_sync_temp_tables(&db)?;
let conn = db.get_connection()?;
schema::create_empty_sync_temp_tables(conn)?;
Ok(())
}
@ -113,7 +120,8 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let signal = db.begin_interrupt_scope()?;
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let incoming_content: Vec<_> = incoming_bsos
.into_iter()
.map(IncomingBso::into_content::<super::WebextRecord>)
@ -127,8 +135,8 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let signal = db.begin_interrupt_scope()?;
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let incoming = get_incoming(&tx)?;
let actions = incoming
.into_iter()
@ -138,14 +146,15 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
stage_outgoing(&tx)?;
tx.commit()?;
Ok(get_outgoing(&db, &signal)?.into())
Ok(get_outgoing(conn, &signal)?.into())
}
fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let conn = db.get_connection()?;
let signal = db.begin_interrupt_scope()?;
let tx = db.unchecked_transaction()?;
let tx = conn.unchecked_transaction()?;
record_uploaded(&tx, ids, &signal)?;
tx.commit()?;
@ -155,14 +164,16 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
fn sync_finished(&self) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
schema::create_empty_sync_temp_tables(&db)?;
let conn = db.get_connection()?;
schema::create_empty_sync_temp_tables(conn)?;
Ok(())
}
fn reset(&self) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
self.do_reset(&tx)?;
delete_meta(&tx, SYNC_ID_META_KEY)?;
tx.commit()?;
@ -172,7 +183,8 @@ impl sync15::engine::BridgedEngine for BridgedEngine {
fn wipe(&self) -> Result<()> {
let shared_db = self.thread_safe_storage_db()?;
let db = shared_db.lock();
let tx = db.unchecked_transaction()?;
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
// We assume the meta table is only used by sync.
tx.execute_batch(
"DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
@ -195,7 +207,8 @@ mod tests {
use crate::db::StorageDb;
use sync15::engine::BridgedEngine;
fn query_count(conn: &StorageDb, table: &str) -> u32 {
fn query_count(db: &StorageDb, table: &str) -> u32 {
let conn = db.get_connection().expect("should retrieve connection");
conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| {
row.get::<_, u32>(0)
})
@ -207,12 +220,13 @@ mod tests {
{
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
db.execute(
let conn = db.get_connection().expect("should retrieve connection");
conn.execute(
"INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
VALUES ('ext-a', 'invalid-json', 2)",
[],
)?;
db.execute(
conn.execute(
"INSERT INTO storage_sync_mirror (guid, ext_id, data)
VALUES ('guid', 'ext-a', '3')",
[],
@ -234,10 +248,11 @@ mod tests {
// A reset never wipes data...
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
let conn = db.get_connection().expect("should retrieve connection");
assert_eq!(query_count(&db, "storage_sync_data"), 1);
// But did reset the change counter.
let cc = db.query_row_and_then(
let cc = conn.query_row_and_then(
"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
[],
|row| row.get::<_, u32>(0),
@ -246,7 +261,7 @@ mod tests {
// But did wipe the mirror...
assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
// And the last_sync should have been wiped.
assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_none());
assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_none());
Ok(())
}
@ -254,8 +269,9 @@ mod tests {
fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
let shared = engine.thread_safe_storage_db()?;
let db = shared.lock();
let conn = db.get_connection().expect("should retrieve connection");
assert_eq!(query_count(&db, "storage_sync_data"), 1);
let cc = db.query_row_and_then(
let cc = conn.query_row_and_then(
"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
[],
|row| row.get::<_, u32>(0),
@ -263,7 +279,7 @@ mod tests {
assert_eq!(cc, 2);
assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
// And the last_sync should remain.
assert!(get_meta::<i64>(&db, LAST_SYNC_META_KEY)?.is_some());
assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_some());
Ok(())
}
@ -287,23 +303,25 @@ mod tests {
#[test]
fn test_reset() -> Result<()> {
let strong = new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(&strong);
let strong = &new_mem_thread_safe_storage_db();
let engine = super::BridgedEngine::new(strong);
setup_mock_data(&engine)?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
{
let db = strong.lock();
let conn = db.get_connection()?;
put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
}
engine.reset()?;
assert_reset(&engine)?;
// Only an explicit reset kills the sync-id, so check that here.
assert_eq!(
get_meta::<String>(&engine.thread_safe_storage_db()?.lock(), SYNC_ID_META_KEY)?,
None
);
{
let db = strong.lock();
let conn = db.get_connection()?;
// Only an explicit reset kills the sync-id, so check that here.
assert_eq!(get_meta::<String>(conn, SYNC_ID_META_KEY)?, None);
}
Ok(())
}
@ -330,11 +348,13 @@ mod tests {
setup_mock_data(&engine)?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"old-id".to_string(),
)?;
{
let storage_db = &engine.thread_safe_storage_db()?;
let db = storage_db.lock();
let conn = db.get_connection()?;
put_meta(conn, SYNC_ID_META_KEY, &"old-id".to_string())?;
}
assert_not_reset(&engine)?;
assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
@ -354,11 +374,12 @@ mod tests {
setup_mock_data(&engine)?;
assert_not_reset(&engine)?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
{
let storage_db = &engine.thread_safe_storage_db()?;
let db = storage_db.lock();
let conn = db.get_connection()?;
put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
}
engine.ensure_current_sync_id("sync-id")?;
// should not have reset.
@ -372,11 +393,13 @@ mod tests {
let engine = super::BridgedEngine::new(&strong);
setup_mock_data(&engine)?;
put_meta(
&engine.thread_safe_storage_db()?.lock(),
SYNC_ID_META_KEY,
&"sync-id".to_string(),
)?;
{
let storage_db = &engine.thread_safe_storage_db()?;
let db = storage_db.lock();
let conn = db.get_connection()?;
put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
}
assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
let new_id = engine.reset_sync_id()?;

Просмотреть файл

@ -548,8 +548,9 @@ mod tests {
#[test]
fn test_incoming_populates_staging() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
let incoming = json! {[
{
@ -570,8 +571,9 @@ mod tests {
#[test]
fn test_fetch_incoming_state() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
// Start with an item just in staging.
tx.execute(
@ -631,8 +633,9 @@ mod tests {
// Like test_fetch_incoming_state, but check NULLs are handled correctly.
#[test]
fn test_fetch_incoming_state_nulls() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
// Start with a tombstone just in staging.
tx.execute(
@ -735,10 +738,13 @@ mod tests {
#[test]
fn test_apply_actions() -> Result<()> {
let mut db = new_syncable_mem_db();
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("connection should be retrieved");
// DeleteLocally - row should be entirely removed.
let tx = db.transaction().expect("transaction should work");
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
@ -759,7 +765,9 @@ mod tests {
tx.rollback()?;
// TakeRemote - replace local data with remote and marked as not dirty.
let tx = db.transaction().expect("transaction should work");
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
@ -794,7 +802,9 @@ mod tests {
tx.rollback()?;
// Merge - like ::TakeRemote, but data remains dirty.
let tx = db.transaction().expect("transaction should work");
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,
@ -828,7 +838,9 @@ mod tests {
tx.rollback()?;
// Same - data stays the same but is marked not dirty.
let tx = db.transaction().expect("transaction should work");
let tx = conn
.unchecked_transaction()
.expect("transaction should begin");
api::set(&tx, "ext_id", json!({"foo": "local"}))?;
assert_eq!(
api::get(&tx, "ext_id", json!(null))?,

Просмотреть файл

@ -163,7 +163,8 @@ pub struct SyncedExtensionChange {
pub fn get_synced_changes(db: &StorageDb) -> Result<Vec<SyncedExtensionChange>> {
let signal = db.begin_interrupt_scope()?;
let sql = "SELECT ext_id, changes FROM temp.storage_sync_applied";
db.conn().query_rows_and_then(sql, [], |row| -> Result<_> {
let conn = db.get_connection()?;
conn.query_rows_and_then(sql, [], |row| -> Result<_> {
signal.err_if_interrupted()?;
Ok(SyncedExtensionChange {
ext_id: row.get("ext_id")?,
@ -181,7 +182,8 @@ pub mod test {
pub fn new_syncable_mem_db() -> StorageDb {
let _ = env_logger::try_init();
let db = new_mem_db();
create_empty_sync_temp_tables(&db).expect("should work");
let conn = db.get_connection().expect("should retrieve connection");
create_empty_sync_temp_tables(conn).expect("should work");
db
}
}
@ -382,7 +384,8 @@ mod tests {
#[test]
fn test_get_synced_changes() -> Result<()> {
let db = new_syncable_mem_db();
db.execute_batch(&format!(
let conn = db.get_connection()?;
conn.execute_batch(&format!(
r#"INSERT INTO temp.storage_sync_applied (ext_id, changes)
VALUES
('an-extension', '{change1}'),

Просмотреть файл

@ -124,8 +124,9 @@ mod tests {
#[test]
fn test_simple() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection()?;
let tx = conn.unchecked_transaction()?;
tx.execute_batch(
r#"

Просмотреть файл

@ -140,8 +140,9 @@ fn make_incoming_tombstone(guid: &Guid) -> IncomingContent<WebextRecord> {
#[test]
fn test_simple_outgoing_sync() -> Result<()> {
// So we are starting with an empty local store and empty server store.
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data.clone())?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -151,8 +152,9 @@ fn test_simple_outgoing_sync() -> Result<()> {
#[test]
fn test_simple_incoming_sync() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
let bridge_record = make_incoming(&Guid::new("guid"), "ext-id", &data);
assert_eq!(do_sync(&tx, &[bridge_record])?.len(), 0);
@ -166,8 +168,9 @@ fn test_simple_incoming_sync() -> Result<()> {
fn test_outgoing_tombstone() -> Result<()> {
// Tombstones are only kept when the mirror has that record - so first
// test that, then arrange for the mirror to have the record.
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data.clone())?;
assert_eq!(
@ -197,8 +200,9 @@ fn test_outgoing_tombstone() -> Result<()> {
fn test_incoming_tombstone_exists() -> Result<()> {
// An incoming tombstone for a record we've previously synced (and thus
// have data for)
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data.clone())?;
assert_eq!(
@ -224,8 +228,9 @@ fn test_incoming_tombstone_exists() -> Result<()> {
#[test]
fn test_incoming_tombstone_not_exists() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
// An incoming tombstone for something that's not anywhere locally.
let guid = Guid::new("guid");
let tombstone = make_incoming_tombstone(&guid);
@ -242,8 +247,9 @@ fn test_incoming_tombstone_not_exists() -> Result<()> {
#[test]
fn test_reconciled() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value"});
set(&tx, "ext-id", data)?;
// Incoming payload with the same data
@ -258,8 +264,9 @@ fn test_reconciled() -> Result<()> {
/// identical to what is in the mirrored table.
#[test]
fn test_reconcile_with_null_payload() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value"});
set(&tx, "ext-id", data.clone())?;
// We try to push this change on the next sync.
@ -278,8 +285,9 @@ fn test_reconcile_with_null_payload() -> Result<()> {
#[test]
fn test_accept_incoming_when_local_is_deleted() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
// We only record an extension as deleted locally if it has been
// uploaded before being deleted.
let data = json!({"key1": "key1-value"});
@ -299,8 +307,9 @@ fn test_accept_incoming_when_local_is_deleted() -> Result<()> {
#[test]
fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value"});
set(&tx, "ext-id", data)?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -319,8 +328,9 @@ fn test_accept_incoming_when_local_is_deleted_no_mirror() -> Result<()> {
#[test]
fn test_accept_deleted_key_mirrored() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data)?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -336,8 +346,9 @@ fn test_accept_deleted_key_mirrored() -> Result<()> {
#[test]
fn test_merged_no_mirror() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value"});
set(&tx, "ext-id", data)?;
// Incoming payload without 'key1' and some data for 'key2'.
@ -355,8 +366,9 @@ fn test_merged_no_mirror() -> Result<()> {
#[test]
fn test_merged_incoming() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let old_data = json!({"key1": "key1-value", "key2": "key2-value", "doomed_key": "deletable"});
set(&tx, "ext-id", old_data)?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -385,8 +397,9 @@ fn test_merged_incoming() -> Result<()> {
#[test]
fn test_merged_with_null_payload() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let old_data = json!({"key1": "key1-value"});
set(&tx, "ext-id", old_data.clone())?;
// Push this change remotely.
@ -409,8 +422,9 @@ fn test_merged_with_null_payload() -> Result<()> {
#[test]
fn test_deleted_mirrored_object_accept() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data)?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -427,8 +441,9 @@ fn test_deleted_mirrored_object_accept() -> Result<()> {
#[test]
fn test_deleted_mirrored_object_merged() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
let guid = get_mirror_guid(&tx, "ext-id")?;
@ -450,8 +465,9 @@ fn test_deleted_mirrored_object_merged() -> Result<()> {
/// Like the above test, but with a mirrored tombstone.
#[test]
fn test_deleted_mirrored_tombstone_merged() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
// Sync some data so we can get the guid for this extension.
set(&tx, "ext-id", json!({"key1": "key1-value"}))?;
assert_eq!(do_sync(&tx, &[])?.len(), 1);
@ -473,8 +489,9 @@ fn test_deleted_mirrored_tombstone_merged() -> Result<()> {
#[test]
fn test_deleted_not_mirrored_object_merged() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data)?;
// Incoming payload with data deleted.
@ -493,8 +510,9 @@ fn test_deleted_not_mirrored_object_merged() -> Result<()> {
#[test]
fn test_conflicting_incoming() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let data = json!({"key1": "key1-value", "key2": "key2-value"});
set(&tx, "ext-id", data)?;
// Incoming payload without 'key1' and conflicting for 'key2'.
@ -517,8 +535,9 @@ fn test_conflicting_incoming() -> Result<()> {
#[test]
fn test_invalid_incoming() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let db = new_syncable_mem_db();
let conn = db.get_connection().expect("should retrieve connection");
let tx = conn.unchecked_transaction()?;
let json = json!({"id": "id", "payload": json!("").to_string()});
let bso = serde_json::from_value::<IncomingBso>(json).unwrap();
let record = bso.into_content();