From 02592aad3da238e3d872f27d71380c69af86cfc7 Mon Sep 17 00:00:00 2001 From: lougeniac64 Date: Thu, 10 Dec 2020 17:49:08 -0500 Subject: [PATCH] addresses incoming logic --- .../autofill/sql/create_shared_schema.sql | 2 - .../autofill/sql/create_sync_temp_tables.sql | 27 + components/autofill/src/db/schema.rs | 7 + components/autofill/src/lib.rs | 1 + .../autofill/src/sync/address/incoming.rs | 1068 +++++++++++++++++ components/autofill/src/sync/address/mod.rs | 89 ++ components/autofill/src/sync/mod.rs | 157 +++ 7 files changed, 1349 insertions(+), 2 deletions(-) create mode 100644 components/autofill/sql/create_sync_temp_tables.sql create mode 100644 components/autofill/src/sync/address/incoming.rs create mode 100644 components/autofill/src/sync/address/mod.rs create mode 100644 components/autofill/src/sync/mod.rs diff --git a/components/autofill/sql/create_shared_schema.sql b/components/autofill/sql/create_shared_schema.sql index e42525fce..1218b2f54 100644 --- a/components/autofill/sql/create_shared_schema.sql +++ b/components/autofill/sql/create_shared_schema.sql @@ -61,7 +61,6 @@ CREATE TABLE IF NOT EXISTS credit_cards_data ( cc_exp_month INTEGER, cc_exp_year INTEGER, cc_type TEXT NOT NULL, - -- cc_exp TEXT NOT NULL, -- text format of the expiration date e.g. "[cc_exp_year]-[cc_exp_month]" time_created INTEGER NOT NULL, time_last_used INTEGER, @@ -79,7 +78,6 @@ CREATE TABLE IF NOT EXISTS credit_cards_mirror ( cc_exp_month INTEGER, cc_exp_year INTEGER, cc_type TEXT NOT NULL, - -- cc_exp TEXT NOT NULL, -- text format of the expiration date e.g. "[cc_exp_year]-[cc_exp_month]" time_created INTEGER NOT NULL, time_last_used INTEGER, diff --git a/components/autofill/sql/create_sync_temp_tables.sql b/components/autofill/sql/create_sync_temp_tables.sql new file mode 100644 index 000000000..3f71486e8 --- /dev/null +++ b/components/autofill/sql/create_sync_temp_tables.sql @@ -0,0 +1,27 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at http://mozilla.org/MPL/2.0/. + +CREATE TEMP TABLE addresses_sync_staging ( + guid TEXT NOT NULL PRIMARY KEY, + given_name TEXT NOT NULL, + additional_name TEXT NOT NULL, + family_name TEXT NOT NULL, + organization TEXT NOT NULL, + street_address TEXT NOT NULL, + address_level3 TEXT NOT NULL, + address_level2 TEXT NOT NULL, + address_level1 TEXT NOT NULL, + postal_code TEXT NOT NULL, + country TEXT NOT NULL, + tel TEXT NOT NULL, + email TEXT NOT NULL, + time_created INTEGER NOT NULL, + time_last_used INTEGER, + time_last_modified INTEGER NOT NULL, + times_used INTEGER NOT NULL DEFAULT 0 +); + +CREATE TEMP TABLE addresses_tombstone_sync_staging ( + guid TEXT NOT NULL PRIMARY KEY +); diff --git a/components/autofill/src/db/schema.rs b/components/autofill/src/db/schema.rs index 6521b5c62..a10ce5d63 100644 --- a/components/autofill/src/db/schema.rs +++ b/components/autofill/src/db/schema.rs @@ -74,6 +74,7 @@ pub const CREDIT_CARD_COMMON_VALS: &str = " #[allow(dead_code)] const CREATE_SHARED_SCHEMA_SQL: &str = include_str!("../../sql/create_shared_schema.sql"); const CREATE_SHARED_TRIGGERS_SQL: &str = include_str!("../../sql/create_shared_triggers.sql"); +const CREATE_SYNC_TEMP_TABLES_SQL: &str = include_str!("../../sql/create_sync_temp_tables.sql"); #[allow(dead_code)] pub fn init(db: &Connection) -> Result<()> { @@ -94,3 +95,9 @@ fn create(db: &Connection) -> Result<()> { Ok(()) } + +pub fn create_empty_sync_temp_tables(db: &Connection) -> Result<()> { + log::debug!("Initializing sync temp tables"); + db.execute_batch(CREATE_SYNC_TEMP_TABLES_SQL)?; + Ok(()) +} diff --git a/components/autofill/src/lib.rs b/components/autofill/src/lib.rs index b0d511729..ba3a4b558 100644 --- a/components/autofill/src/lib.rs +++ b/components/autofill/src/lib.rs @@ -8,6 +8,7 @@ pub mod db; pub mod error; +pub mod sync; // Expose stuff needed by the uniffi generated code. use crate::db::models::address::*; diff --git a/components/autofill/src/sync/address/incoming.rs b/components/autofill/src/sync/address/incoming.rs new file mode 100644 index 000000000..ecbf278cd --- /dev/null +++ b/components/autofill/src/sync/address/incoming.rs @@ -0,0 +1,1068 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at http://mozilla.org/MPL/2.0/. +*/ + +use super::{Record, RecordData}; +use crate::error::*; +use interrupt_support::Interruptee; +use rusqlite::{named_params, types::ToSql, Connection}; +use sql_support::ConnExt; +use sync15::Payload; +use sync_guid::Guid as SyncGuid; +use types::Timestamp; + +type IncomingState = crate::sync::IncomingState; +type IncomingAction = crate::sync::IncomingAction; + +/// The first step in the "apply incoming" process for syncing autofill address records. +/// Incoming tombstones will saved in the `temp.addresses_tombstone_sync_staging` table +/// and incoming records will be saved to the `temp.addresses_sync_staging` table. +pub fn stage_incoming( + conn: &Connection, + incoming_payloads: Vec, + signal: &dyn Interruptee, +) -> Result<()> { + let mut incoming_records = Vec::with_capacity(incoming_payloads.len()); + let mut incoming_tombstones = Vec::with_capacity(incoming_payloads.len()); + + for payload in incoming_payloads { + match payload.deleted { + true => incoming_tombstones.push(payload.into_record::().unwrap()), + false => incoming_records.push(payload.into_record::().unwrap()), + }; + } + save_incoming_records(conn, incoming_records, signal)?; + save_incoming_tombstones(conn, incoming_tombstones, signal)?; + Ok(()) +} + +/// Saves incoming records (excluding incoming tombstones) in preparation for applying +/// incoming changes for the syncing autofill address records. +fn save_incoming_records( + conn: &Connection, + incoming_records: Vec, + signal: &dyn Interruptee, +) -> Result<()> { + let chunk_size = 17; + sql_support::each_sized_chunk( + &incoming_records, + sql_support::default_max_variable_number() / chunk_size, + |chunk, _| -> Result<()> { + let sql = format!( + "INSERT OR REPLACE INTO temp.addresses_sync_staging ( + guid, + given_name, + additional_name, + family_name, + organization, + street_address, + address_level3, + address_level2, + address_level1, + postal_code, + country, + tel, + email, + time_created, + time_last_used, + time_last_modified, + times_used + ) VALUES {}", + sql_support::repeat_multi_values(chunk.len(), chunk_size) + ); + let mut params = Vec::with_capacity(chunk.len() * chunk_size); + for record in chunk { + signal.err_if_interrupted()?; + params.push(&record.guid as &dyn ToSql); + params.push(&record.data.given_name); + params.push(&record.data.additional_name); + params.push(&record.data.family_name); + params.push(&record.data.organization); + params.push(&record.data.street_address); + params.push(&record.data.address_level3); + params.push(&record.data.address_level2); + params.push(&record.data.address_level1); + params.push(&record.data.postal_code); + params.push(&record.data.country); + params.push(&record.data.tel); + params.push(&record.data.email); + params.push(&record.data.time_created); + params.push(&record.data.time_last_used); + params.push(&record.data.time_last_modified); + params.push(&record.data.times_used); + } + conn.execute(&sql, ¶ms)?; + Ok(()) + }, + ) +} + +/// Saves incoming tombstones (excluding incoming records) in preparation for applying +/// incoming changes for the syncing autofill address records. +fn save_incoming_tombstones( + conn: &Connection, + incoming_tombstones: Vec, + signal: &dyn Interruptee, +) -> Result<()> { + let chunk_size = 1; + sql_support::each_sized_chunk( + &incoming_tombstones, + sql_support::default_max_variable_number() / chunk_size, + |chunk, _| -> Result<()> { + let sql = format!( + "INSERT OR REPLACE INTO temp.addresses_tombstone_sync_staging ( + guid + ) VALUES {}", + sql_support::repeat_multi_values(chunk.len(), chunk_size) + ); + let mut params = Vec::with_capacity(chunk.len() * chunk_size); + for record in chunk { + signal.err_if_interrupted()?; + params.push(&record.guid as &dyn ToSql); + } + conn.execute(&sql, ¶ms)?; + Ok(()) + }, + ) +} + +/// The second step in the "apply incoming" process for syncing autofill address records. +/// Incoming tombstones and records are retrieved from the temp tables and assigned +/// `IncomingState` values. +pub fn get_incoming(conn: &Connection) -> Result> { + let mut incoming_states = get_incoming_tombstone_states(conn)?; + let mut incoming_record_states = get_incoming_record_states(conn)?; + incoming_states.append(&mut incoming_record_states); + + Ok(incoming_states) +} + +/// Incoming tombstones are retrieved from the `addresses_tombstone_sync_staging` table +/// and assigned `IncomingState` values. +#[allow(dead_code)] +fn get_incoming_tombstone_states(conn: &Connection) -> Result> { + Ok(conn.conn().query_rows_and_then_named( + "SELECT + s.guid as s_guid, + l.guid as l_guid, + t.guid as t_guid, + l.given_name, + l.additional_name, + l.family_name, + l.organization, + l.street_address, + l.address_level3, + l.address_level2, + l.address_level1, + l.postal_code, + l.country, + l.tel, + l.email, + l.time_created, + l.time_last_used, + l.time_last_modified, + l.times_used, + l.sync_change_counter + FROM temp.addresses_tombstone_sync_staging s + LEFT JOIN addresses_data l ON s.guid = l.guid + LEFT JOIN addresses_tombstones t ON s.guid = t.guid", + &[], + |row| -> Result<(SyncGuid, IncomingState)> { + let incoming_guid: SyncGuid = row.get_unwrap("s_guid"); + let local_guid: Option = row.get("l_guid")?; + let tombstone_guid: Option = row.get("t_guid")?; + + Ok(( + incoming_guid.clone(), + IncomingState::IncomingTombstone { + guid: incoming_guid, + local: match local_guid { + Some(_) => Some(RecordData::from_row(row, "")?), + None => None, + }, + has_local_changes: match local_guid { + Some(_) => { + RecordData::from_row(row, "")? + .sync_change_counter + .unwrap_or(0) + != 0 + } + None => false, + }, + has_local_tombstone: tombstone_guid.is_some(), + }, + )) + }, + )?) +} + +/// Incoming records (excluding tombstones) are retrieved from the `addresses_sync_staging` table +/// and assigned `IncomingState` values. +#[allow(dead_code)] +fn get_incoming_record_states(conn: &Connection) -> Result> { + let sql_query = " + SELECT + s.guid as s_guid, + m.guid as m_guid, + l.guid as l_guid, + s.given_name as s_given_name, + m.given_name as m_given_name, + l.given_name as l_given_name, + s.additional_name as s_additional_name, + m.additional_name as m_additional_name, + l.additional_name as l_additional_name, + s.family_name as s_family_name, + m.family_name as m_family_name, + l.family_name as l_family_name, + s.organization as s_organization, + m.organization as m_organization, + l.organization as l_organization, + s.street_address as s_street_address, + m.street_address as m_street_address, + l.street_address as l_street_address, + s.address_level3 as s_address_level3, + m.address_level3 as m_address_level3, + l.address_level3 as l_address_level3, + s.address_level2 as s_address_level2, + m.address_level2 as m_address_level2, + l.address_level2 as l_address_level2, + s.address_level1 as s_address_level1, + m.address_level1 as m_address_level1, + l.address_level1 as l_address_level1, + s.postal_code as s_postal_code, + m.postal_code as m_postal_code, + l.postal_code as l_postal_code, + s.country as s_country, + m.country as m_country, + l.country as l_country, + s.tel as s_tel, + m.tel as m_tel, + l.tel as l_tel, + s.email as s_email, + m.email as m_email, + l.email as l_email, + s.time_created as s_time_created, + m.time_created as m_time_created, + l.time_created as l_time_created, + s.time_last_used as s_time_last_used, + m.time_last_used as m_time_last_used, + l.time_last_used as l_time_last_used, + s.time_last_modified as s_time_last_modified, + m.time_last_modified as m_time_last_modified, + l.time_last_modified as l_time_last_modified, + s.times_used as s_times_used, + m.times_used as m_times_used, + l.times_used as l_times_used, + l.sync_change_counter as l_sync_change_counter + FROM temp.addresses_sync_staging s + LEFT JOIN addresses_mirror m ON s.guid = m.guid + LEFT JOIN addresses_data l ON s.guid = l.guid"; + + Ok(conn.conn().query_rows_and_then_named( + sql_query, + &[], + |row| -> Result<(SyncGuid, IncomingState)> { + let guid: SyncGuid = row.get_unwrap("s_guid"); + let mirror_guid: Option = row.get_unwrap("m_guid"); + let local_guid: Option = row.get_unwrap("l_guid"); + + let incoming = RecordData::from_row(row, "s_")?; + + let mirror = match mirror_guid { + Some(_) => Some(RecordData::from_row(row, "m_")?), + None => None, + }; + + let incoming_state = match local_guid { + Some(_) => { + let local = RecordData::from_row(row, "l_")?; + IncomingState::HasLocal { + guid: guid.clone(), + incoming: incoming.clone(), + merged: merge(guid.clone(), incoming.clone(), local.clone(), mirror), + has_local_changes: local.sync_change_counter.unwrap_or(0) != 0, + } + } + None => { + let local_dupe = get_local_dupe( + conn, + Record { + guid: guid.clone(), + data: incoming.clone(), + }, + )?; + + match local_dupe { + Some(d) => IncomingState::HasLocalDupe { + guid: guid.clone(), + dupe_guid: d.guid, + merged: merge(guid.clone(), incoming.clone(), d.data, mirror), + }, + None => match has_local_tombstone(conn, &guid)? { + true => IncomingState::NonDeletedIncoming { + guid: guid.clone(), + incoming, + }, + false => IncomingState::IncomingOnly { + guid: guid.clone(), + incoming, + }, + }, + } + } + }; + + Ok((guid, incoming_state)) + }, + )?) +} + +/// Returns a local record that has the same values as the given incoming record (with the exception +/// of the `guid` values which should differ) that will be used as a local duplicate record for +/// syncing. +#[allow(dead_code)] +fn get_local_dupe(conn: &Connection, incoming: Record) -> Result> { + let sql = " + SELECT + guid, + given_name, + additional_name, + family_name, + organization, + street_address, + address_level3, + address_level2, + address_level1, + postal_code, + country, + tel, + email, + time_created, + time_last_used, + time_last_modified, + times_used, + sync_change_counter + FROM addresses_data + WHERE guid <> :guid + AND guid NOT IN ( + SELECT guid + FROM addresses_mirror + ) + AND given_name == :given_name + AND additional_name == :additional_name + AND family_name == :family_name + AND organization == :organization + AND street_address == :street_address + AND address_level3 == :address_level3 + AND address_level2 == :address_level2 + AND address_level1 == :address_level1 + AND postal_code == :postal_code + AND country == :country + AND tel == :tel + AND email == :email"; + + let params = named_params! { + ":guid": incoming.guid.as_str(), + ":given_name": incoming.data.given_name, + ":additional_name": incoming.data.additional_name, + ":family_name": incoming.data.family_name, + ":organization": incoming.data.organization, + ":street_address": incoming.data.street_address, + ":address_level3": incoming.data.address_level3, + ":address_level2": incoming.data.address_level2, + ":address_level1": incoming.data.address_level1, + ":postal_code": incoming.data.postal_code, + ":country": incoming.data.country, + ":tel": incoming.data.tel, + ":email": incoming.data.email, + }; + + let result = conn.conn().query_row_named(&sql, params, |row| { + Ok(Record { + guid: row.get_unwrap("guid"), + data: RecordData::from_row(&row, "").unwrap(), + }) + }); + + match result { + Ok(r) => Ok(Some(r)), + Err(e) => match e { + rusqlite::Error::QueryReturnedNoRows => Ok(None), + _ => Err(Error::SqlError(e)), + }, + } +} + +/// Determines if a local tombstone exists for a given GUID. +#[allow(dead_code)] +fn has_local_tombstone(conn: &Connection, guid: &str) -> Result { + Ok(conn.conn().query_row( + "SELECT EXISTS ( + SELECT 1 + FROM addresses_tombstones + WHERE guid = :guid + )", + &[guid], + |row| row.get(0), + )?) +} + +// We allow all "common" fields from the sub-types to be getters on the +// InsertableItem type. +macro_rules! field_check { + ($field_name:ident, + $guid:ident, + $incoming:ident, + $local:ident, + $mirror:ident, + $merged_record:ident + ) => { + let incoming_field = &$incoming.$field_name; + let local_field = &$local.$field_name; + let is_local_same; + let is_incoming_same; + + match &$mirror { + Some(m) => { + let mirror_field = &m.$field_name; + is_local_same = mirror_field == local_field; + is_incoming_same = mirror_field == incoming_field; + } + None => { + is_local_same = true; + is_incoming_same = local_field == incoming_field; + } + }; + + let should_use_local = is_incoming_same || local_field == incoming_field; + + if is_local_same && !is_incoming_same { + $merged_record.$field_name = incoming_field.clone(); + } else if should_use_local { + $merged_record.$field_name = local_field.clone(); + } else { + return get_forked_record(Record { + guid: $guid, + data: $local, + }); + } + }; +} + +/// Performs a three-way merge between an incoming, local, and mirror record. If a merge +/// cannot be successfully completed, the local record data is returned with a new guid +/// and sync metadata. +fn merge( + guid: SyncGuid, + incoming: RecordData, + local: RecordData, + mirror: Option, +) -> Record { + let mut merged_record: RecordData = Default::default(); + + field_check!(given_name, guid, incoming, local, mirror, merged_record); + field_check!( + additional_name, + guid, + incoming, + local, + mirror, + merged_record + ); + field_check!(family_name, guid, incoming, local, mirror, merged_record); + field_check!(organization, guid, incoming, local, mirror, merged_record); + field_check!(street_address, guid, incoming, local, mirror, merged_record); + field_check!(address_level3, guid, incoming, local, mirror, merged_record); + field_check!(address_level2, guid, incoming, local, mirror, merged_record); + field_check!(address_level1, guid, incoming, local, mirror, merged_record); + field_check!(postal_code, guid, incoming, local, mirror, merged_record); + field_check!(country, guid, incoming, local, mirror, merged_record); + field_check!(tel, guid, incoming, local, mirror, merged_record); + field_check!(email, guid, incoming, local, mirror, merged_record); + + set_sync_times(&mut merged_record, incoming, local, mirror); + + Record { + guid, + data: merged_record.clone(), + } +} + +fn set_sync_times( + merged_record: &mut RecordData, + incoming: RecordData, + local: RecordData, + mirror: Option, +) { + fn get_latest_time(times: &mut [Timestamp]) -> Timestamp { + times.sort(); + times[times.len() - 1] + } + + match mirror { + Some(m) => { + merged_record.time_created = + get_latest_time(&mut [incoming.time_created, local.time_created, m.time_created]); + merged_record.time_last_used = get_latest_time(&mut [ + incoming.time_last_used, + local.time_last_used, + m.time_last_used, + ]); + merged_record.time_last_modified = get_latest_time(&mut [ + incoming.time_last_modified, + local.time_last_modified, + m.time_last_modified, + ]); + + merged_record.times_used = m.times_used + + (local.times_used - m.times_used) + + (incoming.times_used - m.times_used); + } + None => { + merged_record.time_created = + get_latest_time(&mut [incoming.time_created, local.time_created]); + merged_record.time_last_used = + get_latest_time(&mut [incoming.time_last_used, local.time_last_used]); + merged_record.time_last_modified = + get_latest_time(&mut [incoming.time_last_modified, local.time_last_modified]); + merged_record.times_used = local.times_used + incoming.times_used; + } + } +} + +/// Returns a with the given local record's data but with a new guid and +/// fresh sync metadata. +fn get_forked_record(local_record: Record) -> Record { + let mut local_record_data = local_record.data; + local_record_data.time_created = Timestamp::now(); + local_record_data.time_last_used = Timestamp::now(); + local_record_data.time_last_modified = Timestamp::now(); + local_record_data.times_used = 0; + local_record_data.sync_change_counter = Some(1); + + Record { + guid: SyncGuid::random(), + data: local_record_data, + } +} + +/// Changes the guid of the local record for the given `old_guid` to the given `new_guid` used +/// for the `HasLocalDupe` incoming state. +fn change_local_guid(conn: &Connection, old_guid: SyncGuid, new_guid: SyncGuid) -> Result<()> { + conn.conn().execute_named( + "UPDATE addresses_data + SET guid = :new_guid + WHERE guid = :old_guid + AND guid NOT IN ( + SELECT guid + FROM addressess_mirror m + WHERE m.guid = :old_guid + ) + AND NOT EXISTS ( + SELECT 1 + FROM addresses_data d + WHERE d.guid = :new_guid + )", + rusqlite::named_params! { + ":old_guid": old_guid, + ":new_guid": new_guid, + }, + )?; + + Ok(()) +} + +fn update_local_record(conn: &Connection, new_record: Record) -> Result<()> { + conn.execute_named( + "UPDATE addresses_data + SET given_name = :given_name, + additional_name = :additional_name, + family_name = :family_name, + organization = :organization, + street_address = :street_address, + address_level3 = :address_level3, + address_level2 = :address_level2, + address_level1 = :address_level1, + postal_code = :postal_code, + country = :country, + tel = :tel, + email = :email, + sync_change_counter = 0 + WHERE guid = :guid", + rusqlite::named_params! { + ":given_name": new_record.data.given_name, + ":additional_name": new_record.data.additional_name, + ":family_name": new_record.data.family_name, + ":organization": new_record.data.organization, + ":street_address": new_record.data.street_address, + ":address_level3": new_record.data.address_level3, + ":address_level2": new_record.data.address_level2, + ":address_level1": new_record.data.address_level1, + ":postal_code": new_record.data.postal_code, + ":country": new_record.data.country, + ":tel": new_record.data.tel, + ":email": new_record.data.email, + ":guid": new_record.guid, + }, + )?; + + Ok(()) +} + +fn insert_local_record(conn: &Connection, new_record: Record) -> Result<()> { + conn.execute_named( + "INSERT OR IGNORE INTO addresses_data ( + guid, + given_name, + additional_name, + family_name, + organization, + street_address, + address_level3, + address_level2, + address_level1, + postal_code, + country, + tel, + email, + time_created, + time_last_used, + time_last_modified, + times_used, + sync_change_counter + ) VALUES ( + :guid, + :given_name, + :additional_name, + :family_name, + :organization, + :street_address, + :address_level3, + :address_level2, + :address_level1, + :postal_code, + :country, + :tel, + :email, + :time_created, + :time_last_used, + :time_last_modified, + :times_used, + :sync_change_counter + )", + rusqlite::named_params! { + ":guid": SyncGuid::random(), + ":given_name": new_record.data.given_name, + ":additional_name": new_record.data.additional_name, + ":family_name": new_record.data.family_name, + ":organization": new_record.data.organization, + ":street_address": new_record.data.street_address, + ":address_level3": new_record.data.address_level3, + ":address_level2": new_record.data.address_level2, + ":address_level1": new_record.data.address_level1, + ":postal_code": new_record.data.postal_code, + ":country": new_record.data.country, + ":tel": new_record.data.tel, + ":email": new_record.data.email, + ":time_created": Timestamp::now(), + ":time_last_used": Some(Timestamp::now()), + ":time_last_modified": Timestamp::now(), + ":times_used": 0, + ":sync_change_counter": 0, + }, + )?; + + Ok(()) +} + +fn upsert_local_record(conn: &Connection, new_record: Record) -> Result<()> { + let exists = conn.query_row( + "SELECT EXISTS ( + SELECT 1 + FROM addresses_data d + WHERE guid = :guid + )", + &[new_record.clone().guid], + |row| row.get(0), + )?; + + if exists { + update_local_record(conn, new_record)?; + } else { + insert_local_record(conn, new_record)?; + } + Ok(()) +} + +/// Apply the actions necessary to fully process the incoming items +pub fn apply_actions( + conn: &Connection, + actions: Vec<(SyncGuid, IncomingAction)>, + signal: &dyn Interruptee, +) -> Result<()> { + for (item, action) in actions { + signal.err_if_interrupted()?; + + log::trace!("action for '{:?}': {:?}", item, action); + match action { + IncomingAction::TakeMergedRecord { new_record } => { + update_local_record(conn, new_record)?; + } + IncomingAction::UpdateLocalGuid { + dupe_guid, + old_guid, + new_record, + } => { + change_local_guid(conn, old_guid, dupe_guid)?; + update_local_record(conn, new_record)?; + } + IncomingAction::TakeRemote { new_record } => { + upsert_local_record(conn, new_record)?; + } + IncomingAction::DeleteLocalTombstone { remote_record } => { + conn.execute_named( + "DELETE FROM addresses_tombstones WHERE guid = :guid", + rusqlite::named_params! { + ":guid": remote_record.guid, + }, + )?; + + insert_local_record(conn, remote_record)?; + } + IncomingAction::DeleteLocalRecord { guid } => { + conn.execute_named( + "DELETE FROM addresses_data + WHERE guid = :guid", + rusqlite::named_params! { + ":guid": guid, + }, + )?; + } + IncomingAction::DoNothing => {} + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::super::super::test::new_syncable_mem_db; + use super::*; + + use interrupt_support::NeverInterrupts; + use serde_json::{json, Value}; + + fn array_to_incoming(mut array: Value) -> Vec { + let jv = array.as_array_mut().expect("you must pass a json array"); + let mut result = Vec::with_capacity(jv.len()); + for elt in jv { + result.push(Payload::from_json(elt.take()).expect("must be valid")); + } + result + } + + #[test] + fn test_stage_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + struct TestCase { + incoming_records: Value, + expected_record_count: u32, + expected_tombstone_count: u32, + } + + let test_cases = vec![ + TestCase { + incoming_records: json! {[ + { + "id": "AAAAAAAAAAAAAAAAA", + "deleted": false, + "givenName": "john", + "additionalName": "", + "familyName": "doe", + "organization": "", + "streetAddress": "1300 Broadway", + "addressLevel3": "", + "addressLevel2": "New York, NY", + "addressLevel1": "", + "postalCode": "", + "country": "United States", + "tel": "", + "email": "", + "timeCreated": 0, + "timeLastUsed": 0, + "timeLastModified": 0, + "timesUsed": 0, + } + ]}, + expected_record_count: 1, + expected_tombstone_count: 0, + }, + TestCase { + incoming_records: json! {[ + { + "id": "AAAAAAAAAAAAAA", + "deleted": true, + "givenName": "", + "additionalName": "", + "familyName": "", + "organization": "", + "streetAddress": "", + "addressLevel3": "", + "addressLevel2": "", + "addressLevel1": "", + "postalCode": "", + "country": "", + "tel": "", + "email": "", + "timeCreated": 0, + "timeLastUsed": 0, + "timeLastModified": 0, + "timesUsed": 0, + } + ]}, + expected_record_count: 0, + expected_tombstone_count: 1, + }, + TestCase { + incoming_records: json! {[ + { + "id": "AAAAAAAAAAAAAAAAA", + "deleted": false, + "givenName": "john", + "additionalName": "", + "familyName": "doe", + "organization": "", + "streetAddress": "1300 Broadway", + "addressLevel3": "", + "addressLevel2": "New York, NY", + "addressLevel1": "", + "postalCode": "", + "country": "United States", + "tel": "", + "email": "", + "timeCreated": 0, + "timeLastUsed": 0, + "timeLastModified": 0, + "timesUsed": 0, + }, + { + "id": "CCCCCCCCCCCCCCCCCC", + "deleted": false, + "givenName": "jane", + "additionalName": "", + "familyName": "doe", + "organization": "", + "streetAddress": "3050 South La Brea Ave", + "addressLevel3": "", + "addressLevel2": "Los Angeles, CA", + "addressLevel1": "", + "postalCode": "", + "country": "United States", + "tel": "", + "email": "", + "timeCreated": 0, + "timeLastUsed": 0, + "timeLastModified": 0, + "timesUsed": 0, + }, + { + "id": "BBBBBBBBBBBBBBBBB", + "deleted": true, + "givenName": "", + "additionalName": "", + "familyName": "", + "organization": "", + "streetAddress": "", + "addressLevel3": "", + "addressLevel2": "", + "addressLevel1": "", + "postalCode": "", + "country": "", + "tel": "", + "email": "", + "timeCreated": 0, + "timeLastUsed": 0, + "timeLastModified": 0, + "timesUsed": 0, + } + ]}, + expected_record_count: 2, + expected_tombstone_count: 1, + }, + ]; + + for tc in test_cases { + stage_incoming( + &tx, + array_to_incoming(tc.incoming_records), + &NeverInterrupts, + )?; + + let record_count: u32 = tx + .try_query_one( + "SELECT COUNT(*) FROM temp.addresses_sync_staging", + &[], + false, + ) + .expect("get incoming record count") + .unwrap_or_default(); + + let tombstone_count: u32 = tx + .try_query_one( + "SELECT COUNT(*) FROM temp.addresses_tombstone_sync_staging", + &[], + false, + ) + .expect("get incoming tombstone count") + .unwrap_or_default(); + + assert_eq!(record_count, tc.expected_record_count); + assert_eq!(tombstone_count, tc.expected_tombstone_count); + + tx.execute_all(&[ + "DELETE FROM temp.addresses_tombstone_sync_staging;", + "DELETE FROM temp.addresses_sync_staging;", + ])?; + } + Ok(()) + } + + #[test] + fn test_get_incoming() -> Result<()> { + let mut db = new_syncable_mem_db(); + let tx = db.transaction()?; + + tx.execute_named( + "INSERT OR IGNORE INTO addresses_data ( + guid, + given_name, + additional_name, + family_name, + organization, + street_address, + address_level3, + address_level2, + address_level1, + postal_code, + country, + tel, + email, + time_created, + time_last_used, + time_last_modified, + times_used, + sync_change_counter + ) VALUES ( + :guid, + :given_name, + :additional_name, + :family_name, + :organization, + :street_address, + :address_level3, + :address_level2, + :address_level1, + :postal_code, + :country, + :tel, + :email, + :time_created, + :time_last_used, + :time_last_modified, + :times_used, + :sync_change_counter + )", + rusqlite::named_params! { + ":guid": "CCCCCCCCCCCCCCCCCC", + ":given_name": "jane", + ":additional_name": "", + ":family_name": "doe", + ":organization": "", + ":street_address": "3050 South La Brea Ave", + ":address_level3": "", + ":address_level2": "Los Angeles, CA", + ":address_level1": "", + ":postal_code": "", + ":country": "United States", + ":tel": "", + ":email": "", + ":time_created": Timestamp::now(), + ":time_last_used": Some(Timestamp::now()), + ":time_last_modified": Timestamp::now(), + ":times_used": 0, + ":sync_change_counter": 1, + }, + )?; + + tx.execute_named( + "INSERT OR IGNORE INTO temp.addresses_sync_staging ( + guid, + given_name, + additional_name, + family_name, + organization, + street_address, + address_level3, + address_level2, + address_level1, + postal_code, + country, + tel, + email, + time_created, + time_last_used, + time_last_modified, + times_used + ) VALUES ( + :guid, + :given_name, + :additional_name, + :family_name, + :organization, + :street_address, + :address_level3, + :address_level2, + :address_level1, + :postal_code, + :country, + :tel, + :email, + :time_created, + :time_last_used, + :time_last_modified, + :times_used + )", + rusqlite::named_params! { + ":guid": "CCCCCCCCCCCCCCCCCC", + ":given_name": "jane", + ":additional_name": "", + ":family_name": "doe", + ":organization": "", + ":street_address": "3050 South La Brea Ave", + ":address_level3": "", + ":address_level2": "Los Angeles, CA", + ":address_level1": "", + ":postal_code": "", + ":country": "United States", + ":tel": "", + ":email": "", + ":time_created": 0, + ":time_last_used": 0, + ":time_last_modified": 0, + ":times_used": 0, + + }, + )?; + + get_incoming(&tx)?; + + tx.execute_all(&[ + "DELETE FROM addresses_data;", + "DELETE FROM temp.addresses_sync_staging;", + ])?; + + Ok(()) + } +} diff --git a/components/autofill/src/sync/address/mod.rs b/components/autofill/src/sync/address/mod.rs new file mode 100644 index 000000000..a04a54a9c --- /dev/null +++ b/components/autofill/src/sync/address/mod.rs @@ -0,0 +1,89 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at http://mozilla.org/MPL/2.0/. +*/ + +pub mod incoming; + +use crate::error::*; +use rusqlite::Row; +use serde::Serialize; +use serde_derive::*; +use types::Timestamp; + +type Record = crate::sync::Record; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] +#[serde(rename_all = "camelCase")] +#[serde(default)] +pub struct RecordData { + pub given_name: String, + + pub additional_name: String, + + pub family_name: String, + + pub organization: String, + + pub street_address: String, + + pub address_level3: String, + + pub address_level2: String, + + pub address_level1: String, + + pub postal_code: String, + + pub country: String, + + pub tel: String, + + pub email: String, + + pub time_created: Timestamp, + + pub time_last_used: Timestamp, + + pub time_last_modified: Timestamp, + + pub times_used: i64, + + pub sync_change_counter: Option, +} + +impl RecordData { + pub fn from_row(row: &Row<'_>, column_prefix: &str) -> Result { + Ok(RecordData { + given_name: row + .get::<_, String>(format!("{}{}", column_prefix, "given_name").as_str())?, + additional_name: row + .get::<_, String>(format!("{}{}", column_prefix, "additional_name").as_str())?, + family_name: row + .get::<_, String>(format!("{}{}", column_prefix, "family_name").as_str())?, + organization: row + .get::<_, String>(format!("{}{}", column_prefix, "organization").as_str())?, + street_address: row + .get::<_, String>(format!("{}{}", column_prefix, "street_address").as_str())?, + address_level3: row + .get::<_, String>(format!("{}{}", column_prefix, "address_level3").as_str())?, + address_level2: row + .get::<_, String>(format!("{}{}", column_prefix, "address_level2").as_str())?, + address_level1: row + .get::<_, String>(format!("{}{}", column_prefix, "address_level1").as_str())?, + postal_code: row + .get::<_, String>(format!("{}{}", column_prefix, "postal_code").as_str())?, + country: row.get::<_, String>(format!("{}{}", column_prefix, "country").as_str())?, + tel: row.get::<_, String>(format!("{}{}", column_prefix, "tel").as_str())?, + email: row.get::<_, String>(format!("{}{}", column_prefix, "email").as_str())?, + time_created: row.get(format!("{}{}", column_prefix, "time_created").as_str())?, + time_last_used: row.get(format!("{}{}", column_prefix, "time_last_used").as_str())?, + time_last_modified: row + .get(format!("{}{}", column_prefix, "time_last_modified").as_str())?, + times_used: row.get(format!("{}{}", column_prefix, "times_used").as_str())?, + sync_change_counter: row + .get(format!("{}{}", column_prefix, "sync_change_counter").as_str()) + .ok(), + }) + } +} diff --git a/components/autofill/src/sync/mod.rs b/components/autofill/src/sync/mod.rs new file mode 100644 index 000000000..e686bd360 --- /dev/null +++ b/components/autofill/src/sync/mod.rs @@ -0,0 +1,157 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at http://mozilla.org/MPL/2.0/. +*/ + +pub mod address; +// pub mod credit_card; + +use serde::Serialize; +use serde_derive::*; +use sync_guid::Guid as SyncGuid; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Default)] +#[serde(rename_all = "camelCase")] +pub struct Record { + #[serde(rename = "id", default)] + pub guid: SyncGuid, + + #[serde(flatten)] + data: T, +} + +impl Record { + fn new(guid: SyncGuid, data: T) -> Record { + Record { guid, data } + } +} + +// Helpers for tests +#[cfg(test)] +pub mod test { + use crate::db::{schema::create_empty_sync_temp_tables, test::new_mem_db, AutofillDb}; + + pub fn new_syncable_mem_db() -> AutofillDb { + let _ = env_logger::try_init(); + let db = new_mem_db(); + create_empty_sync_temp_tables(&db).expect("should work"); + db + } +} + +/// The distinct states of records to be synced which determine the `IncomingAction` to be taken. +#[derive(Debug, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum IncomingState { + // Only the incoming record exists. An associated local or mirror record doesn't exist. + IncomingOnly { + guid: SyncGuid, + incoming: T, + }, + // The incoming record is a tombstone. + IncomingTombstone { + guid: SyncGuid, + local: Option, + has_local_changes: bool, + has_local_tombstone: bool, + }, + // The incoming record has an associated local record. + HasLocal { + guid: SyncGuid, + incoming: T, + merged: Record, + has_local_changes: bool, + }, + // The incoming record doesn't have an associated local record with the same GUID. + // A local record with the same data but a different GUID has been located. + HasLocalDupe { + guid: SyncGuid, + dupe_guid: SyncGuid, + merged: Record, + }, + // The incoming record doesn't have an associated local or local duplicate record but does + // have a local tombstone. + NonDeletedIncoming { + guid: SyncGuid, + incoming: T, + }, +} + +/// The distinct incoming sync actions to be preformed for incoming records. +#[derive(Debug, PartialEq)] +#[allow(clippy::large_enum_variant)] +pub enum IncomingAction { + DeleteLocalRecord { + guid: SyncGuid, + }, + TakeMergedRecord { + new_record: Record, + }, + UpdateLocalGuid { + old_guid: SyncGuid, + dupe_guid: SyncGuid, + new_record: Record, + }, + TakeRemote { + new_record: Record, + }, + DeleteLocalTombstone { + remote_record: Record, + }, + DoNothing, +} + +/// Given an `IncomingState` returns the `IncomingAction` that should be performed. +pub fn plan_incoming(s: IncomingState) -> IncomingAction { + match s { + IncomingState::IncomingOnly { guid, incoming } => IncomingAction::TakeRemote { + new_record: Record::::new(guid, incoming), + }, + IncomingState::IncomingTombstone { + guid, + local, + has_local_changes, + has_local_tombstone, + } => match local { + Some(_) => { + // Note: On desktop, when there's a local record for an incoming tombstone, a local tombstone + // would created. But we don't actually need to create a local tombstone here. If we did it would + // immediately be deleted after being uploaded to the server. + + if has_local_changes || has_local_tombstone { + IncomingAction::DoNothing + } else { + IncomingAction::DeleteLocalRecord { + guid: SyncGuid::new(&guid), + } + } + } + None => IncomingAction::DoNothing, + }, + IncomingState::HasLocal { + guid, + incoming, + merged, + has_local_changes, + } => match has_local_changes { + true => IncomingAction::TakeMergedRecord { new_record: merged }, + false => IncomingAction::TakeRemote { + new_record: Record::::new(guid, incoming), + }, + }, + IncomingState::HasLocalDupe { + guid, + dupe_guid, + merged, + } => IncomingAction::UpdateLocalGuid { + old_guid: guid, + dupe_guid, + new_record: merged, + }, + IncomingState::NonDeletedIncoming { guid, incoming } => { + IncomingAction::DeleteLocalTombstone { + remote_record: Record::::new(guid, incoming), + } + } + } +}