Added outgoing logic for the autofill component (#3885)

This commit is contained in:
lougeniaC64 2021-03-03 17:52:06 -05:00 коммит произвёл GitHub
Родитель a051c845fe
Коммит 0b485208b4
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
10 изменённых файлов: 915 добавлений и 57 удалений

Просмотреть файл

@ -2,12 +2,28 @@
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
CREATE TEMP TABLE IF NOT EXISTS addresses_sync_staging (
DROP TABLE IF EXISTS addresses_sync_staging;
CREATE TEMP TABLE addresses_sync_staging (
guid TEXT NOT NULL PRIMARY KEY CHECK(length(guid) != 0),
payload TEXT NOT NULL CHECK(length(payload) != 0)
);
CREATE TEMP TABLE IF NOT EXISTS credit_cards_sync_staging (
DROP TABLE IF EXISTS credit_cards_sync_staging;
CREATE TEMP TABLE credit_cards_sync_staging (
guid TEXT NOT NULL PRIMARY KEY CHECK(length(guid) != 0),
payload TEXT NOT NULL CHECK(length(payload) != 0)
);
DROP TABLE IF EXISTS addresses_sync_outgoing_staging;
CREATE TEMP TABLE addresses_sync_outgoing_staging (
guid TEXT NOT NULL PRIMARY KEY CHECK(length(guid) != 0),
payload TEXT NOT NULL CHECK(length(payload) != 0),
sync_change_counter INTEGER NOT NULL
);
DROP TABLE IF EXISTS credit_cards_sync_outgoing_staging;
CREATE TEMP TABLE credit_cards_sync_outgoing_staging (
guid TEXT NOT NULL PRIMARY KEY CHECK(length(guid) != 0),
payload TEXT NOT NULL CHECK(length(payload) != 0),
sync_change_counter INTEGER NOT NULL
);

Просмотреть файл

@ -13,9 +13,9 @@ use interrupt_support::Interruptee;
use rusqlite::{named_params, Transaction};
use sync_guid::Guid as SyncGuid;
pub(super) struct AddressesImpl {}
pub(super) struct IncomingAddressesImpl {}
impl ProcessIncomingRecordImpl for AddressesImpl {
impl ProcessIncomingRecordImpl for IncomingAddressesImpl {
type Record = InternalAddress;
/// The first step in the "apply incoming" process - stage the records
@ -267,7 +267,7 @@ mod tests {
for tc in test_cases {
log::info!("starting new testcase");
let tx = db.transaction()?;
let ri = AddressesImpl {};
let ri = IncomingAddressesImpl {};
ri.stage_incoming(
&tx,
array_to_incoming(tc.incoming_records),
@ -298,7 +298,7 @@ mod tests {
fn test_change_local_guid() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let ri = AddressesImpl {};
let ri = IncomingAddressesImpl {};
ri.insert_local_record(&tx, test_record('C'))?;
@ -317,7 +317,7 @@ mod tests {
fn test_get_incoming() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = AddressesImpl {};
let ai = IncomingAddressesImpl {};
do_test_incoming_same(&ai, &tx, test_record('C'));
}
@ -325,7 +325,7 @@ mod tests {
fn test_incoming_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = AddressesImpl {};
let ai = IncomingAddressesImpl {};
do_test_incoming_tombstone(&ai, &tx, test_record('C'));
}
@ -333,7 +333,7 @@ mod tests {
fn test_staged_to_mirror() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = AddressesImpl {};
let ai = IncomingAddressesImpl {};
do_test_staged_to_mirror(&ai, &tx, test_record('C'), "addresses_mirror");
}
}

Просмотреть файл

@ -4,13 +4,17 @@
*/
pub mod incoming;
pub mod outgoing;
use super::engine::{ConfigSyncEngine, EngineConfig, SyncEngineStorageImpl};
use super::{MergeResult, Metadata, ProcessIncomingRecordImpl, SyncRecord};
use super::{
MergeResult, Metadata, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord,
};
use crate::db::models::address::InternalAddress;
use crate::error::*;
use crate::sync_merge_field_check;
use incoming::AddressesImpl;
use incoming::IncomingAddressesImpl;
use outgoing::OutgoingAddressesImpl;
use rusqlite::Transaction;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
@ -33,17 +37,20 @@ pub(super) struct AddressesEngineStorageImpl {}
impl SyncEngineStorageImpl<InternalAddress> for AddressesEngineStorageImpl {
fn get_incoming_impl(&self) -> Box<dyn ProcessIncomingRecordImpl<Record = InternalAddress>> {
Box::new(AddressesImpl {})
Box::new(IncomingAddressesImpl {})
}
fn reset_storage(&self, tx: &Transaction<'_>) -> Result<()> {
tx.execute_batch(
"DELETE FROM addresses_mirror;
DELETE FROM addresses_tombstones;
UPDATE addresses_data SET sync_change_counter = 1",
DELETE FROM addresses_tombstones;",
)?;
Ok(())
}
fn get_outgoing_impl(&self) -> Box<dyn ProcessOutgoingRecordImpl<Record = InternalAddress>> {
Box::new(OutgoingAddressesImpl {})
}
}
// These structs are what's stored on the sync server.
@ -134,7 +141,7 @@ impl SyncRecord for InternalAddress {
merged_record.metadata = incoming.metadata;
merged_record
.metadata
.merge(&local.metadata, &mirror.as_ref().map(|m| m.metadata()));
.merge(&local.metadata, mirror.as_ref().map(|m| m.metadata()));
MergeResult::Merged {
merged: merged_record,

Просмотреть файл

@ -0,0 +1,256 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
use crate::db::models::address::InternalAddress;
use crate::db::schema::ADDRESS_COMMON_COLS;
use crate::error::*;
use crate::sync::common::*;
use crate::sync::{
OutgoingChangeset, Payload, ProcessOutgoingRecordImpl, ServerTimestamp, SyncRecord,
};
use rusqlite::{Row, Transaction};
use sync_guid::Guid as SyncGuid;
const DATA_TABLE_NAME: &str = "addresses_data";
const MIRROR_TABLE_NAME: &str = "addresses_mirror";
const STAGING_TABLE_NAME: &str = "addresses_sync_outgoing_staging";
pub(super) struct OutgoingAddressesImpl {}
impl ProcessOutgoingRecordImpl for OutgoingAddressesImpl {
type Record = InternalAddress;
/// Gets the local records that have unsynced changes or don't have corresponding mirror
/// records and upserts them to the mirror table
fn fetch_outgoing_records(
&self,
tx: &Transaction<'_>,
collection_name: String,
timestamp: ServerTimestamp,
) -> anyhow::Result<OutgoingChangeset> {
let mut outgoing = OutgoingChangeset::new(collection_name, timestamp);
let data_sql = format!(
"SELECT
{common_cols},
sync_change_counter
FROM addresses_data
WHERE sync_change_counter > 0
OR guid NOT IN (
SELECT m.guid
FROM addresses_mirror m
)",
common_cols = ADDRESS_COMMON_COLS,
);
let payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<Payload> =
&|row| Ok(InternalAddress::from_row(row)?.to_payload()?);
let tombstones_sql = "SELECT guid FROM addresses_tombstones";
// save outgoing records to the mirror table
let staging_records = common_get_outgoing_staging_records(
&tx,
&data_sql,
&tombstones_sql,
payload_from_data_row,
)?;
common_save_outgoing_records(&tx, STAGING_TABLE_NAME, staging_records)?;
// return outgoing changes
let outgoing_records: Vec<(Payload, i64)> =
common_get_outgoing_records(&tx, &data_sql, &tombstones_sql, payload_from_data_row)?;
outgoing.changes = outgoing_records
.into_iter()
.map(|(payload, _)| payload)
.collect::<Vec<Payload>>();
Ok(outgoing)
}
fn push_synced_items(
&self,
tx: &Transaction<'_>,
records_synced: Vec<SyncGuid>,
) -> anyhow::Result<()> {
common_push_synced_items(
&tx,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
records_synced,
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::{addresses::add_internal_address, models::address::InternalAddress};
use crate::sync::{common::tests::*, test::new_syncable_mem_db, SyncRecord};
use rusqlite::Connection;
use serde_json::{json, Map, Value};
use types::Timestamp;
const COLLECTION_NAME: &str = "addresses";
fn insert_mirror_record(conn: &Connection, address: InternalAddress) {
// This should probably be in the sync module, but it's used here.
let guid = address.guid.clone();
let payload = address.to_payload().expect("is json").into_json_string();
conn.execute_named(
"INSERT OR IGNORE INTO addresses_mirror (guid, payload)
VALUES (:guid, :payload)",
rusqlite::named_params! {
":guid": guid,
":payload": &payload,
},
)
.expect("should insert");
}
lazy_static::lazy_static! {
static ref TEST_JSON_RECORDS: Map<String, Value> = {
// NOTE: the JSON here is the same as stored on the sync server -
// the superfluous `entry` is unfortunate but from desktop.
let val = json! {{
"C" : {
"id": expand_test_guid('C'),
"entry": {
"givenName": "jane",
"familyName": "doe",
"streetAddress": "3050 South La Brea Ave",
"addressLevel2": "Los Angeles, CA",
"country": "United States",
"timeCreated": 0,
"timeLastUsed": 0,
"timeLastModified": 0,
"timesUsed": 0,
"version": 1,
}
}
}};
val.as_object().expect("literal is an object").clone()
};
}
fn test_json_record(guid_prefix: char) -> Value {
TEST_JSON_RECORDS
.get(&guid_prefix.to_string())
.expect("should exist")
.clone()
}
fn test_record(guid_prefix: char) -> InternalAddress {
let json = test_json_record(guid_prefix);
let sync_payload = sync15::Payload::from_json(json).unwrap();
InternalAddress::to_record(sync_payload).expect("should be valid")
}
#[test]
fn test_outgoing_never_synced() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let test_record = test_record('C');
// create data record
assert!(add_internal_address(&tx, &test_record).is_ok());
do_test_outgoing_never_synced(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let test_record = test_record('C');
// create tombstone record
assert!(tx
.execute_named(
"INSERT INTO addresses_tombstones (
guid,
time_deleted
) VALUES (
:guid,
:time_deleted
)",
rusqlite::named_params! {
":guid": test_record.guid,
":time_deleted": Timestamp::now(),
},
)
.is_ok());
do_test_outgoing_tombstone(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_synced_with_local_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
// create synced record with non-zero sync_change_counter
let mut test_record = test_record('C');
let initial_change_counter_val = 2;
test_record.metadata.sync_change_counter = initial_change_counter_val;
assert!(add_internal_address(&tx, &test_record).is_ok());
insert_mirror_record(&tx, test_record.clone());
exists_with_counter_value_in_table(
&tx,
DATA_TABLE_NAME,
&test_record.guid,
initial_change_counter_val,
);
do_test_outgoing_synced_with_local_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_synced_with_no_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
// create synced record with no changes (sync_change_counter = 0)
let test_record = test_record('C');
assert!(add_internal_address(&tx, &test_record).is_ok());
insert_mirror_record(&tx, test_record.clone());
do_test_outgoing_synced_with_no_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
}

Просмотреть файл

@ -222,6 +222,150 @@ macro_rules! sync_merge_field_check {
};
}
pub(super) fn common_get_outgoing_staging_records(
conn: &Connection,
data_sql: &str,
tombstones_sql: &str,
payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<Payload>,
) -> anyhow::Result<Vec<(String, String, i64)>> {
let outgoing_records =
common_get_outgoing_records(conn, data_sql, tombstones_sql, payload_from_data_row)?;
Ok(outgoing_records
.into_iter()
.map(|(payload, sync_change_counter)| {
(
payload.id.to_string(),
payload.into_json_string(),
sync_change_counter as i64,
)
})
.collect::<Vec<(String, String, i64)>>())
}
fn get_outgoing_records(
conn: &Connection,
sql: &str,
payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<Payload>,
) -> anyhow::Result<Vec<(Payload, i64)>> {
Ok(conn
.prepare(sql)?
.query_map(NO_PARAMS, |row| {
let payload = payload_from_data_row(row).unwrap();
let sync_change_counter = if payload.deleted {
0
} else {
row.get::<_, i64>("sync_change_counter")?
};
Ok((payload, sync_change_counter))
})?
.collect::<std::result::Result<Vec<(Payload, i64)>, _>>()?)
}
pub(super) fn common_get_outgoing_records(
conn: &Connection,
data_sql: &str,
tombstone_sql: &str,
payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<Payload>,
) -> anyhow::Result<Vec<(Payload, i64)>> {
let mut payload = get_outgoing_records(conn, data_sql, payload_from_data_row)?;
payload.append(&mut get_outgoing_records(conn, tombstone_sql, &|row| {
Ok(Payload::new_tombstone(Guid::from_string(row.get("guid")?)))
})?);
Ok(payload)
}
pub(super) fn common_save_outgoing_records(
conn: &Connection,
table_name: &str,
staging_records: Vec<(String, String, i64)>,
) -> anyhow::Result<()> {
let chunk_size = 3;
sql_support::each_sized_chunk(
&staging_records,
sql_support::default_max_variable_number() / chunk_size,
|chunk, _| -> anyhow::Result<()> {
let sql = format!(
"INSERT OR REPLACE INTO temp.{table_name} (guid, payload, sync_change_counter)
VALUES {staging_records}",
table_name = table_name,
staging_records = sql_support::repeat_multi_values(chunk.len(), chunk_size)
);
let mut params = Vec::with_capacity(chunk.len() * chunk_size);
for (guid, json, sync_change_counter) in chunk {
params.push(guid as &dyn ToSql);
params.push(json);
params.push(sync_change_counter);
}
conn.execute(&sql, params)?;
Ok(())
},
)?;
Ok(())
}
pub(super) fn common_push_synced_items(
conn: &Connection,
data_table_name: &str,
mirror_table_name: &str,
outgoing_table_name: &str,
records_synced: Vec<Guid>,
) -> anyhow::Result<()> {
reset_sync_change_counter(conn, data_table_name, outgoing_table_name, records_synced)?;
push_outgoing_records(conn, mirror_table_name, outgoing_table_name)?;
Ok(())
}
fn reset_sync_change_counter(
conn: &Connection,
data_table_name: &str,
outgoing_table_name: &str,
records_synced: Vec<Guid>,
) -> anyhow::Result<()> {
sql_support::each_chunk(&records_synced, |chunk, _| -> anyhow::Result<()> {
conn.execute(
&format!(
// We're making two checks that in practice should be redundant. First we're limiting the
// number of records that we're pulling from the outgoing staging table to one. Lastly we're
// ensuring that the updated local records are also in `records_synced` which should be the
// case since the sync will fail entirely if the server rejects individual records.
"UPDATE {data_table_name} AS data
SET sync_change_counter = sync_change_counter -
(
SELECT outgoing.sync_change_counter
FROM temp.{outgoing_table_name} AS outgoing
WHERE outgoing.guid = data.guid LIMIT 1
)
WHERE guid IN ({values})",
data_table_name = data_table_name,
outgoing_table_name = outgoing_table_name,
values = sql_support::repeat_sql_values(chunk.len())
),
chunk,
)?;
Ok(())
})?;
Ok(())
}
fn push_outgoing_records(
conn: &Connection,
mirror_table_name: &str,
outgoing_staging_table_name: &str,
) -> Result<()> {
let sql = format!(
"INSERT OR REPLACE INTO {mirror_table_name}
SELECT guid, payload FROM temp.{outgoing_staging_table_name}",
mirror_table_name = mirror_table_name,
outgoing_staging_table_name = outgoing_staging_table_name,
);
conn.execute(&sql, NO_PARAMS)?;
Ok(())
}
// And common helpers for tests (although no actual tests!)
#[cfg(test)]
pub(super) mod tests {
@ -338,4 +482,177 @@ pub(super) mod tests {
.unwrap();
assert_eq!(num_rows, 2);
}
fn exists_in_table(tx: &Transaction<'_>, table_name: &str, guid: &Guid) {
let sql = format!(
"SELECT COUNT(*) FROM {} where guid = '{}'",
table_name, guid
);
let num_rows = tx
.query_row(&sql, NO_PARAMS, |row| Ok(row.get::<_, u32>(0).unwrap()))
.unwrap();
assert_eq!(num_rows, 1);
}
pub(in crate::sync) fn exists_with_counter_value_in_table(
tx: &Transaction<'_>,
table_name: &str,
guid: &Guid,
expected_counter_value: i64,
) {
let sql = format!(
"SELECT COUNT(*)
FROM {table_name}
WHERE sync_change_counter = {expected_counter_value}
AND guid = :guid",
table_name = table_name,
expected_counter_value = expected_counter_value,
);
let num_rows = tx
.query_row(&sql, &[guid], |row| Ok(row.get::<_, u32>(0).unwrap()))
.unwrap();
assert_eq!(num_rows, 1);
}
pub(in crate::sync) fn do_test_outgoing_never_synced<
T: SyncRecord + std::fmt::Debug + Clone,
>(
tx: &Transaction<'_>,
ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
guid: &Guid,
data_table_name: &str,
mirror_table_name: &str,
staging_table_name: &str,
collection_name: &str,
) {
// call fetch outgoing records
assert!(ro
.fetch_outgoing_records(
&tx,
collection_name.to_string(),
ServerTimestamp::from_millis(0)
)
.is_ok());
// check that the record is in the outgoing table
exists_in_table(&tx, &format!("temp.{}", staging_table_name), guid);
// call push synced items
assert!(ro.push_synced_items(&tx, vec![guid.clone()]).is_ok());
// check that the sync change counter
exists_with_counter_value_in_table(&tx, data_table_name, guid, 0);
// check that the outgoing record is in the mirror
exists_in_table(&tx, mirror_table_name, guid);
}
pub(in crate::sync) fn do_test_outgoing_tombstone<T: SyncRecord + std::fmt::Debug + Clone>(
tx: &Transaction<'_>,
ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
guid: &Guid,
data_table_name: &str,
mirror_table_name: &str,
staging_table_name: &str,
collection_name: &str,
) {
// call fetch outgoing records
assert!(ro
.fetch_outgoing_records(
&tx,
collection_name.to_string(),
ServerTimestamp::from_millis(0),
)
.is_ok());
// check that the record is in the outgoing table
exists_in_table(&tx, &format!("temp.{}", staging_table_name), guid);
// call push synced items
assert!(ro.push_synced_items(&tx, vec![guid.clone()]).is_ok());
// check that the record wasn't copied to the data table
let sql = format!(
"SELECT COUNT(*) FROM {} where guid = '{}'",
data_table_name, guid
);
let num_rows = tx
.query_row(&sql, NO_PARAMS, |row| Ok(row.get::<_, u32>(0).unwrap()))
.unwrap();
assert_eq!(num_rows, 0);
// check that the outgoing record is in the mirror
exists_in_table(&tx, mirror_table_name, guid);
}
pub(in crate::sync) fn do_test_outgoing_synced_with_local_change<
T: SyncRecord + std::fmt::Debug + Clone,
>(
tx: &Transaction<'_>,
ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
guid: &Guid,
data_table_name: &str,
mirror_table_name: &str,
staging_table_name: &str,
collection_name: &str,
) {
// call fetch outgoing records
assert!(ro
.fetch_outgoing_records(
&tx,
collection_name.to_string(),
ServerTimestamp::from_millis(0),
)
.is_ok());
// check that the record is in the outgoing table
exists_in_table(&tx, &format!("temp.{}", staging_table_name), guid);
// call push synced items
assert!(ro.push_synced_items(&tx, vec![guid.clone()]).is_ok());
// check that the sync change counter
exists_with_counter_value_in_table(&tx, data_table_name, guid, 0);
// check that the outgoing record is in the mirror
exists_in_table(&tx, mirror_table_name, guid);
}
pub(in crate::sync) fn do_test_outgoing_synced_with_no_change<
T: SyncRecord + std::fmt::Debug + Clone,
>(
tx: &Transaction<'_>,
ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
guid: &Guid,
data_table_name: &str,
staging_table_name: &str,
collection_name: &str,
) {
// call fetch outgoing records
assert!(ro
.fetch_outgoing_records(
&tx,
collection_name.to_string(),
ServerTimestamp::from_millis(0),
)
.is_ok());
// check that the record is not in the outgoing table
let sql = format!(
"SELECT COUNT(*) FROM {} where guid = '{}'",
&format!("temp.{}", staging_table_name),
guid
);
let num_rows = tx
.query_row(&sql, NO_PARAMS, |row| Ok(row.get::<_, u32>(0).unwrap()))
.unwrap();
assert_eq!(num_rows, 0);
// call push synced items
assert!(ro.push_synced_items(&tx, Vec::<Guid>::new()).is_ok());
// check that the sync change counter is unchanged
exists_with_counter_value_in_table(&tx, data_table_name, guid, 0);
}
}

Просмотреть файл

@ -13,9 +13,9 @@ use interrupt_support::Interruptee;
use rusqlite::{named_params, Transaction};
use sync_guid::Guid as SyncGuid;
pub(super) struct CreditCardsImpl {}
pub(super) struct IncomingCreditCardsImpl {}
impl ProcessIncomingRecordImpl for CreditCardsImpl {
impl ProcessIncomingRecordImpl for IncomingCreditCardsImpl {
type Record = InternalCreditCard;
/// The first step in the "apply incoming" process - stage the records
@ -246,7 +246,7 @@ mod tests {
for tc in test_cases {
log::info!("starting new testcase");
let tx = db.transaction()?;
let ri = CreditCardsImpl {};
let ri = IncomingCreditCardsImpl {};
ri.stage_incoming(
&tx,
array_to_incoming(tc.incoming_records),
@ -277,7 +277,7 @@ mod tests {
fn test_change_local_guid() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let ri = CreditCardsImpl {};
let ri = IncomingCreditCardsImpl {};
ri.insert_local_record(&tx, test_record('C'))?;
@ -296,15 +296,15 @@ mod tests {
fn test_get_incoming() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ci = CreditCardsImpl {};
do_test_incoming_same(&ci, &tx, test_record('C'));
let ai = IncomingCreditCardsImpl {};
do_test_incoming_same(&ai, &tx, test_record('C'));
}
#[test]
fn test_incoming_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ci = CreditCardsImpl {};
let ci = IncomingCreditCardsImpl {};
do_test_incoming_tombstone(&ci, &tx, test_record('C'));
}
@ -312,7 +312,7 @@ mod tests {
fn test_staged_to_mirror() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ci = CreditCardsImpl {};
let ci = IncomingCreditCardsImpl {};
do_test_staged_to_mirror(&ci, &tx, test_record('C'), "credit_cards_mirror");
}
}

Просмотреть файл

@ -4,13 +4,17 @@
*/
pub mod incoming;
pub mod outgoing;
use super::engine::{ConfigSyncEngine, EngineConfig, SyncEngineStorageImpl};
use super::{MergeResult, Metadata, ProcessIncomingRecordImpl, SyncRecord};
use super::{
MergeResult, Metadata, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord,
};
use crate::db::models::credit_card::InternalCreditCard;
use crate::error::*;
use crate::sync_merge_field_check;
use incoming::CreditCardsImpl;
use incoming::IncomingCreditCardsImpl;
use outgoing::OutgoingCreditCardsImpl;
use rusqlite::Transaction;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
@ -35,17 +39,20 @@ pub(super) struct CreditCardsEngineStorageImpl {}
impl SyncEngineStorageImpl<InternalCreditCard> for CreditCardsEngineStorageImpl {
fn get_incoming_impl(&self) -> Box<dyn ProcessIncomingRecordImpl<Record = InternalCreditCard>> {
Box::new(CreditCardsImpl {})
Box::new(IncomingCreditCardsImpl {})
}
fn reset_storage(&self, tx: &Transaction<'_>) -> Result<()> {
tx.execute_batch(
"DELETE FROM credit_cards_mirror;
DELETE FROM credit_cards_tombstones;
UPDATE credit_cards_data SET sync_change_counter = 1",
DELETE FROM credit_cards_tombstones;",
)?;
Ok(())
}
fn get_outgoing_impl(&self) -> Box<dyn ProcessOutgoingRecordImpl<Record = InternalCreditCard>> {
Box::new(OutgoingCreditCardsImpl {})
}
}
// These structs are what's stored on the sync server.
@ -122,7 +129,7 @@ impl SyncRecord for InternalCreditCard {
merged_record.metadata = incoming.metadata;
merged_record
.metadata
.merge(&local.metadata, &mirror.as_ref().map(|m| m.metadata()));
.merge(&local.metadata, mirror.as_ref().map(|m| m.metadata()));
MergeResult::Merged {
merged: merged_record,

Просмотреть файл

@ -0,0 +1,241 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
use crate::db::models::credit_card::InternalCreditCard;
use crate::db::schema::CREDIT_CARD_COMMON_COLS;
use crate::error::*;
use crate::sync::common::*;
use crate::sync::{
OutgoingChangeset, Payload, ProcessOutgoingRecordImpl, ServerTimestamp, SyncRecord,
};
use rusqlite::{Row, Transaction};
use sync_guid::Guid as SyncGuid;
const DATA_TABLE_NAME: &str = "credit_cards_data";
const MIRROR_TABLE_NAME: &str = "credit_cards_mirror";
const STAGING_TABLE_NAME: &str = "credit_cards_sync_outgoing_staging";
pub(super) struct OutgoingCreditCardsImpl {}
impl ProcessOutgoingRecordImpl for OutgoingCreditCardsImpl {
type Record = InternalCreditCard;
/// Gets the local records that have unsynced changes or don't have corresponding mirror
/// records and upserts them to the mirror table
fn fetch_outgoing_records(
&self,
tx: &Transaction<'_>,
collection_name: String,
timestamp: ServerTimestamp,
) -> anyhow::Result<OutgoingChangeset> {
let mut outgoing = OutgoingChangeset::new(collection_name, timestamp);
let data_sql = format!(
"SELECT
{common_cols},
sync_change_counter
FROM credit_cards_data
WHERE sync_change_counter > 0
OR guid NOT IN (
SELECT m.guid
FROM credit_cards_mirror m
)",
common_cols = CREDIT_CARD_COMMON_COLS,
);
let payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<Payload> =
&|row| Ok(InternalCreditCard::from_row(row)?.to_payload()?);
let tombstones_sql = "SELECT guid FROM credit_cards_tombstones";
// save outgoing records to the mirror table
let staging_records = common_get_outgoing_staging_records(
&tx,
&data_sql,
&tombstones_sql,
payload_from_data_row,
)?;
common_save_outgoing_records(&tx, STAGING_TABLE_NAME, staging_records)?;
// return outgoing changes
let outgoing_records: Vec<(Payload, i64)> =
common_get_outgoing_records(&tx, &data_sql, &tombstones_sql, payload_from_data_row)?;
outgoing.changes = outgoing_records
.into_iter()
.map(|(payload, _)| payload)
.collect::<Vec<Payload>>();
Ok(outgoing)
}
fn push_synced_items(
&self,
tx: &Transaction<'_>,
records_synced: Vec<SyncGuid>,
) -> anyhow::Result<()> {
common_push_synced_items(
&tx,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
records_synced,
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::credit_cards::{add_internal_credit_card, tests::insert_mirror_record};
use crate::sync::{common::tests::*, test::new_syncable_mem_db, SyncRecord};
use serde_json::{json, Map, Value};
use types::Timestamp;
const COLLECTION_NAME: &str = "creditcards";
lazy_static::lazy_static! {
static ref TEST_JSON_RECORDS: Map<String, Value> = {
// NOTE: the JSON here is the same as stored on the sync server -
// the superfluous `entry` is unfortunate but from desktop.
let val = json! {{
"C" : {
"id": expand_test_guid('C'),
"entry": {
"cc-name": "Mr Me Another Person",
"cc-number": "87654321",
"cc-exp-month": 1,
"cc-exp-year": 2020,
"cc-type": "visa",
"timeCreated": 0,
"timeLastUsed": 0,
"timeLastModified": 0,
"timesUsed": 0,
"version": 3,
}
}
}};
val.as_object().expect("literal is an object").clone()
};
}
fn test_json_record(guid_prefix: char) -> Value {
TEST_JSON_RECORDS
.get(&guid_prefix.to_string())
.expect("should exist")
.clone()
}
fn test_record(guid_prefix: char) -> InternalCreditCard {
let json = test_json_record(guid_prefix);
let sync_payload = sync15::Payload::from_json(json).unwrap();
InternalCreditCard::to_record(sync_payload).expect("should be valid")
}
#[test]
fn test_outgoing_never_synced() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingCreditCardsImpl {};
let test_record = test_record('C');
// create date record
assert!(add_internal_credit_card(&tx, &test_record).is_ok());
do_test_outgoing_never_synced(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingCreditCardsImpl {};
let test_record = test_record('C');
// create tombstone record
assert!(tx
.execute_named(
"INSERT INTO credit_cards_tombstones (
guid,
time_deleted
) VALUES (
:guid,
:time_deleted
)",
rusqlite::named_params! {
":guid": test_record.guid,
":time_deleted": Timestamp::now(),
},
)
.is_ok());
do_test_outgoing_tombstone(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_synced_with_local_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingCreditCardsImpl {};
// create synced record with non-zero sync_change_counter
let mut test_record = test_record('C');
let initial_change_counter_val = 2;
test_record.metadata.sync_change_counter = initial_change_counter_val;
assert!(add_internal_credit_card(&tx, &test_record).is_ok());
insert_mirror_record(&tx, test_record.clone());
exists_with_counter_value_in_table(
&tx,
DATA_TABLE_NAME,
&test_record.guid,
initial_change_counter_val,
);
do_test_outgoing_synced_with_local_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
#[test]
fn test_outgoing_synced_with_no_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingCreditCardsImpl {};
// create synced record with no changes (sync_change_counter = 0)
let test_record = test_record('C');
assert!(add_internal_credit_card(&tx, &test_record).is_ok());
insert_mirror_record(&tx, test_record.clone());
do_test_outgoing_synced_with_no_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
STAGING_TABLE_NAME,
COLLECTION_NAME,
);
}
}

Просмотреть файл

@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::{plan_incoming, ProcessIncomingRecordImpl, SyncRecord};
use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
use crate::db::AutofillDb;
use crate::error::*;
use rusqlite::{
@ -32,6 +32,7 @@ pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
pub trait SyncEngineStorageImpl<T> {
fn get_incoming_impl(&self) -> Box<dyn ProcessIncomingRecordImpl<Record = T>>;
fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
fn get_outgoing_impl(&self) -> Box<dyn ProcessOutgoingRecordImpl<Record = T>>;
}
// A sync engine that gets functionality from an EngineConfig.
@ -81,6 +82,7 @@ impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
let num_incoming = inbound.changes.len() as u32;
let tx = db.writer.unchecked_transaction()?;
let incoming_impl = self.storage_impl.get_incoming_impl();
let outgoing_impl = self.storage_impl.get_outgoing_impl();
// The first step in the "apply incoming" process for syncing autofill records.
incoming_impl.stage_incoming(&tx, inbound.changes, &signal)?;
@ -100,20 +102,25 @@ impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
self.put_meta(&tx, LAST_SYNC_META_KEY, &(timestamp.as_millis() as i64))?;
incoming_impl.finish_incoming(&tx)?;
// Not quite sure if we should commit now and then stage outgoing?
tx.commit()?;
// Finally, stage outgoing items.
// TODO: Call yet-to-be-implemented stage outgoing code
// let outgoing = self.fetch_outgoing_records(timestamp)?;
// Ok(outgoing)
Ok(OutgoingChangeset::new(self.collection_name(), timestamp))
let outgoing = outgoing_impl.fetch_outgoing_records(
&tx,
self.config.collection.to_string(),
timestamp,
)?;
// we're committing now because it may take a long time to actually perform the upload
// and we've already staged everything we need to complete the sync in a way that
// doesn't require the transaction to stay alive, so we commit now and start a new
// transaction once complete
tx.commit()?;
Ok(outgoing)
}
fn sync_finished(
&self,
new_timestamp: ServerTimestamp,
_records_synced: Vec<Guid>,
records_synced: Vec<Guid>,
) -> anyhow::Result<()> {
let db = self.db.lock().unwrap();
self.put_meta(
@ -121,8 +128,10 @@ impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
LAST_SYNC_META_KEY,
&(new_timestamp.as_millis() as i64),
)?;
// TODO: Call yet-to-be implement stage outgoing code
db.pragma_update(None, "wal_checkpoint", &"PASSIVE")?;
let tx = db.writer.unchecked_transaction()?;
let outgoing_impl = self.storage_impl.get_outgoing_impl();
outgoing_impl.push_synced_items(&tx, records_synced)?;
tx.commit()?;
Ok(())
}
@ -191,8 +200,7 @@ mod tests {
use crate::db::credit_cards::add_internal_credit_card;
use crate::db::credit_cards::tests::{get_all, insert_mirror_record, insert_tombstone_record};
use crate::db::models::credit_card::InternalCreditCard;
use crate::db::test::new_mem_db;
use rusqlite::NO_PARAMS;
use crate::db::{schema::create_empty_sync_temp_tables, test::new_mem_db};
use sql_support::ConnExt;
// We use the credit-card engine here.
@ -212,6 +220,8 @@ mod tests {
#[test]
fn test_credit_card_engine_sync_finished() -> Result<()> {
let db = new_mem_db();
create_empty_sync_temp_tables(&db).expect("should create temp tables");
let credit_card_engine = create_engine(db);
let last_sync = 24;
@ -302,19 +312,8 @@ mod tests {
.reset(&EngineSyncAssociation::Disconnected)
.expect("should work");
// check that sync change counter has been reset
{
let conn = &engine.db.lock().unwrap().writer;
let reset_record_exists: bool = conn.query_row(
"SELECT EXISTS (
SELECT 1
FROM credit_cards_data
WHERE sync_change_counter = 1
)",
NO_PARAMS,
|row| row.get(0),
)?;
assert!(reset_record_exists);
// check that the mirror and tombstone tables have no records
assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());

Просмотреть файл

@ -12,7 +12,7 @@ pub(crate) use crate::db::models::Metadata;
use crate::error::Result;
use interrupt_support::Interruptee;
use rusqlite::Transaction;
use sync15::{Payload, ServerTimestamp};
use sync15::{OutgoingChangeset, Payload, ServerTimestamp};
use sync_guid::Guid;
use types::Timestamp;
@ -77,7 +77,22 @@ pub trait ProcessIncomingRecordImpl {
fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
}
// TODO: Will need new trait for outgoing.
pub trait ProcessOutgoingRecordImpl {
type Record;
fn fetch_outgoing_records(
&self,
tx: &Transaction<'_>,
collection_name: String,
timestamp: ServerTimestamp,
) -> anyhow::Result<OutgoingChangeset>;
fn push_synced_items(
&self,
tx: &Transaction<'_>,
records_synced: Vec<Guid>,
) -> anyhow::Result<()>;
}
// A trait that abstracts the functionality in the record itself.
pub trait SyncRecord {
@ -102,7 +117,7 @@ impl Metadata {
/// (which must already have valid metadata).
/// Note that mirror being None is an edge-case and typically means first
/// sync since a "reset" (eg, disconnecting and reconnecting.
pub fn merge(&mut self, other: &Metadata, mirror: &Option<&Metadata>) {
pub fn merge(&mut self, other: &Metadata, mirror: Option<&Metadata>) {
match mirror {
Some(m) => {
fn get_latest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
@ -265,7 +280,7 @@ fn plan_incoming<T: std::fmt::Debug + SyncRecord>(
let metadata = incoming_record.metadata_mut();
metadata.merge(
&local_record.metadata(),
&mirror.as_ref().map(|m| m.metadata()),
mirror.as_ref().map(|m| m.metadata()),
);
// a micro-optimization here would be to `::DoNothing` if
// the metadata was actually identical, but this seems like
@ -309,7 +324,7 @@ fn plan_incoming<T: std::fmt::Debug + SyncRecord>(
let metadata = incoming_record.metadata_mut();
metadata.merge(
&local_dupe.metadata(),
&mirror.as_ref().map(|m| m.metadata()),
mirror.as_ref().map(|m| m.metadata()),
);
IncomingAction::UpdateLocalGuid {
old_guid,