remerge: Implement remaining pieces of storage API

This commit is contained in:
Thom Chiovoloni 2019-11-25 13:19:13 -08:00
Родитель 67712161d7
Коммит cb9cd4eccb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 31F01AEBD799934A
15 изменённых файлов: 879 добавлений и 92 удалений

2
Cargo.lock сгенерированный
Просмотреть файл

@ -2186,6 +2186,7 @@ dependencies = [
"libsqlite3-sys 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
]
@ -2318,6 +2319,7 @@ name = "serde_json"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"indexmap 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",

Просмотреть файл

@ -6,10 +6,10 @@ edition = "2018"
license = "MPL-2.0"
[dependencies]
serde = { version = "1.0.102", features = ["derive"] }
serde = { version = "1.0.102", features = ["derive", "rc"] }
failure = { version = "0.1.6" }
url = "2.1.0"
serde_json = "1.0.44"
serde_json = { version = "1.0.44", features = ["preserve_order"] }
log = "0.4.8"
semver = { version = "0.9.0", features = ["serde"] }
lazy_static = "1.4.0"
@ -25,4 +25,4 @@ num-traits = "0.2.8"
[dependencies.rusqlite]
version = "0.20.0"
features = ["functions", "bundled"]
features = ["functions", "bundled", "serde_json"]

Просмотреть файл

@ -23,7 +23,7 @@ CREATE TABLE rec_local (
guid TEXT NOT NULL UNIQUE,
remerge_schema_version TEXT,
-- XXX Should this be nullable for the case where is_deleted == true?
record_data TEXT NOT NULL,
-- A local timestamp
local_modified_ms INTEGER NOT NULL DEFAULT 0,

Просмотреть файл

@ -24,6 +24,18 @@ pub enum ErrorKind {
#[fail(display = "Invalid record: {}", _0)]
InvalidRecord(#[fail(cause)] InvalidRecord),
#[fail(
display = "No record with guid exists (when one was required): {:?}",
_0
)]
NoSuchRecord(String),
#[fail(
display = "Failed to convert local record to native record (may indicate bad remerge schema): {}",
_0
)]
LocalToNativeError(String),
#[fail(display = "Error parsing JSON data: {}", _0)]
JsonError(#[fail(cause)] serde_json::Error),
@ -60,6 +72,11 @@ pub enum InvalidRecord {
InvalidRecordSet(String),
#[fail(display = "The field {:?} is not a valid guid", _0)]
InvalidGuid(String),
// TODO(issue 2232): Should be more specific.
#[fail(display = "The field {:?} is invalid: {}", _0, _1)]
InvalidField(String, String),
#[fail(display = "A record with the given guid already exists")]
IdNotUnique,
#[fail(display = "Record violates a `dedupe_on` constraint")]
Duplicate,
}

Просмотреть файл

@ -5,3 +5,13 @@ pub mod ms_time;
pub mod schema;
pub mod storage;
pub mod vclock;
// Some re-exports we use frequently for local convenience
pub(crate) use sync_guid::Guid;
pub(crate) use serde_json::Value as JsonValue;
pub(crate) type JsonObject<Val = JsonValue> = serde_json::Map<String, Val>;
pub use crate::error::*;
pub use crate::ms_time::MsTime;
pub use crate::vclock::VClock;

Просмотреть файл

@ -5,17 +5,15 @@
use super::merge_kinds::*;
use crate::error::*;
use crate::ms_time::EARLIEST_SANE_TIME;
use crate::{JsonObject, JsonValue};
use index_vec::IndexVec;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::{HashMap, HashSet};
use url::Url;
/// The set of features understood by this client.
pub const REMERGE_FEATURES_UNDERSTOOD: &[&str] = &["record_set", "untyped_map"];
pub type JsonObject = serde_json::Map<String, JsonValue>;
index_vec::define_index_type! {
/// Newtype wrapper around usize, referring into the `fields` vec in a
/// RecordSchema
@ -48,6 +46,13 @@ pub struct RecordSchema {
pub field_own_guid: Option<FieldIndex>,
}
impl RecordSchema {
pub fn field<'a, S: ?Sized + AsRef<str>>(&'a self, name: &S) -> Option<&'a Field> {
let idx = *self.field_map.get(name.as_ref())?;
Some(&self.fields[idx])
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub enum CompositeInfo {
Member { root: FieldIndex },
@ -58,6 +63,7 @@ pub enum CompositeInfo {
#[derive(Clone, Debug, PartialEq)]
pub struct Field {
pub name: String,
// Note: frequently equal to name.
pub local_name: String,
pub required: bool,
@ -73,6 +79,7 @@ pub struct Field {
impl Field {
pub fn validate(&self, v: JsonValue) -> Result<JsonValue> {
// TODO(issue 2232): most errors should be more specific.
use InvalidRecord::*;
if !self.required && v.is_null() {
return Ok(v);
@ -170,6 +177,8 @@ impl Field {
}
FieldType::Timestamp { .. } => {
// XXX should we check `semantic` here? See also comments in
// `native_to_local` in `storage::info`.
if let JsonValue::Number(n) = v {
let v = n
.as_i64()
@ -249,6 +258,17 @@ impl Field {
Ok(val)
}
}
pub fn timestamp_semantic(&self) -> Option<TimestampSemantic> {
match &self.ty {
FieldType::Timestamp { semantic, .. } => *semantic,
_ => None,
}
}
pub fn is_kind(&self, k: FieldKind) -> bool {
self.ty.is_kind(k)
}
}
#[derive(Clone, Debug, PartialEq)]

Просмотреть файл

@ -15,10 +15,10 @@ use super::desc::*;
use super::error::*;
use super::merge_kinds::*;
use crate::util::is_default;
use crate::{JsonObject, JsonValue};
use index_vec::IndexVec;
use matches::matches;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::{HashMap, HashSet};
use url::Url;

Просмотреть файл

@ -14,25 +14,16 @@
//! - native-schema-version
//!
use super::meta;
use super::{meta, RemergeInfo};
use crate::error::*;
use crate::schema::RecordSchema;
use crate::Guid;
use rusqlite::Connection;
use std::sync::Arc;
#[derive(Clone)]
pub struct RemergeInfo {
pub(crate) collection_name: String,
pub(crate) native: Arc<RecordSchema>,
pub(crate) local: Arc<RecordSchema>,
pub(crate) client_id: sync_guid::Guid,
}
pub(super) fn load_or_bootstrap(
db: &Connection,
native: super::NativeSchemaInfo<'_>,
) -> Result<RemergeInfo> {
) -> Result<(RemergeInfo, Guid)> {
if let Some(name) = meta::try_get::<String>(db, meta::COLLECTION_NAME)? {
let native = native.parsed;
if name != native.name {
@ -60,12 +51,14 @@ pub(super) fn load_or_bootstrap(
// is yes, we should probably have more tests to ensure we never begin
// rejecting a schema we previously considered valid!
let parsed = crate::schema::parse_from_string(&local_schema, false)?;
Ok(RemergeInfo {
local: Arc::new(parsed),
native,
collection_name: name,
Ok((
RemergeInfo {
local: Arc::new(parsed),
native,
collection_name: name,
},
client_id,
})
))
} else {
bootstrap(db, native)
}
@ -74,7 +67,7 @@ pub(super) fn load_or_bootstrap(
pub(super) fn bootstrap(
db: &Connection,
native: super::NativeSchemaInfo<'_>,
) -> Result<RemergeInfo> {
) -> Result<(RemergeInfo, Guid)> {
let guid = sync_guid::Guid::random();
meta::put(db, meta::OWN_CLIENT_ID, &guid)?;
let sql = "
@ -95,10 +88,12 @@ pub(super) fn bootstrap(
meta::put(db, meta::NATIVE_SCHEMA_VERSION, &ver_str)?;
meta::put(db, meta::COLLECTION_NAME, &native.parsed.name)?;
meta::put(db, meta::CHANGE_COUNTER, &1)?;
Ok(RemergeInfo {
collection_name: native.parsed.name.clone(),
native: native.parsed.clone(),
local: native.parsed.clone(),
client_id: guid,
})
Ok((
RemergeInfo {
collection_name: native.parsed.name.clone(),
native: native.parsed.clone(),
local: native.parsed.clone(),
},
guid,
))
}

Просмотреть файл

@ -2,22 +2,25 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::SyncStatus;
use super::{info::ToLocalReason, LocalRecord, NativeRecord, RemergeInfo, SyncStatus};
use crate::error::*;
use crate::ms_time::MsTime;
use crate::vclock::{Counter, VClock};
use rusqlite::Connection;
use serde_json::Value as JsonValue;
use crate::Guid;
use rusqlite::{named_params, Connection};
use sql_support::ConnExt;
use std::convert::TryFrom;
use std::sync::Mutex;
use sync_guid::Guid;
pub struct RemergeDb {
db: Connection,
info: super::bootstrap::RemergeInfo,
info: RemergeInfo,
client_id: sync_guid::Guid,
}
lazy_static::lazy_static! {
// XXX: We should replace this with something like the PlacesApi path-based
// hashmap, but for now this is better than nothing.
static ref DB_INIT_MUTEX: Mutex<()> = Mutex::new(());
}
@ -59,9 +62,13 @@ impl RemergeDb {
db.execute_batch(pragmas)?;
let tx = db.transaction()?;
super::schema::init(&tx)?;
let info = super::bootstrap::load_or_bootstrap(&tx, native)?;
let (info, client_id) = super::bootstrap::load_or_bootstrap(&tx, native)?;
tx.commit()?;
Ok(RemergeDb { db, info })
Ok(RemergeDb {
db,
info,
client_id,
})
}
pub fn exists(&self, id: &str) -> Result<bool> {
@ -73,55 +80,27 @@ impl RemergeDb {
SELECT 1 FROM rec_mirror
WHERE guid = :guid AND is_overridden IS NOT 1
)",
rusqlite::named_params! { ":guid": id },
named_params! { ":guid": id },
|row| row.get(0),
)?)
}
pub fn create(&self, record_info: JsonValue) -> Result<sync_guid::Guid> {
let mut id = Guid::random();
let mut to_insert = serde_json::Map::default();
let record_obj = record_info
.as_object()
.ok_or_else(|| InvalidRecord::NotJsonObject)?;
for field in &self.info.local.fields {
let native_field = &self
.info
.native
.fields
.iter()
.find(|f| f.name == field.name);
let local_name = native_field
.map(|n| n.local_name.as_str())
.unwrap_or_else(|| field.name.as_str());
let is_guid = crate::schema::FieldKind::OwnGuid == field.ty.kind();
if let Some(v) = record_obj.get(local_name) {
let fixed = field.validate(v.clone())?;
if is_guid {
if let JsonValue::String(s) = &fixed {
id = Guid::from(s.as_str());
} else {
unreachable!("Field::validate checks this");
}
}
to_insert.insert(field.name.clone(), fixed);
} else if let Some(def) = field.ty.get_default() {
to_insert.insert(field.name.clone(), def);
} else if is_guid {
to_insert.insert(field.name.clone(), id.to_string().into());
} else if field.required {
throw!(InvalidRecord::MissingRequiredField(local_name.to_owned()));
}
}
pub fn create(&self, native: &NativeRecord) -> Result<Guid> {
let (id, record) = self
.info
.native_to_local(&native, ToLocalReason::Creation)?;
let tx = self.db.unchecked_transaction()?;
// TODO: Search DB for dupes based on the value of the fields listed in dedupe_on.
let id_exists = self.exists(id.as_ref())?;
if id_exists {
throw!(InvalidRecord::IdNotUnique);
}
let ctr = super::meta::get::<i64>(&self.db, super::meta::CHANGE_COUNTER)? + 1;
super::meta::put(&self.db, super::meta::CHANGE_COUNTER, &ctr)?;
let vclock = VClock::new(self.info.client_id.clone(), ctr as Counter);
if self.dupe_exists(&record)? {
throw!(InvalidRecord::Duplicate);
}
let ctr = self.counter_bump()?;
let vclock = VClock::new(self.client_id(), ctr as Counter);
let now = MsTime::now();
self.db.execute_named(
"INSERT INTO rec_local (
@ -143,16 +122,238 @@ impl RemergeDb {
:vclock,
:client_id
)",
rusqlite::named_params! {
named_params! {
":guid": id,
":schema_ver": self.info.local.version.to_string(),
":record": record,
":now": now,
":status": SyncStatus::New as u8,
":vclock": vclock,
":client_id": self.info.client_id,
":client_id": self.client_id,
},
)?;
tx.commit()?;
Ok(id)
}
fn counter_bump(&self) -> Result<Counter> {
use super::meta;
let mut ctr = meta::get::<i64>(&self.db, meta::CHANGE_COUNTER)?;
assert!(
ctr >= 0,
"Corrupt db? negative global change counter: {:?}",
ctr
);
ctr += 1;
meta::put(&self.db, meta::CHANGE_COUNTER, &ctr)?;
// Overflowing i64 takes around 9 quintillion (!!) writes, so the only
// way it can realistically happen is on db corruption.
//
// FIXME: We should be returning a specific error for DB corruption
// instead of panicing, and have a maintenance routine (a la places).
Ok(Counter::try_from(ctr).expect("Corrupt db? i64 overflow"))
}
fn get_vclock(&self, id: &str) -> Result<VClock> {
Ok(self.db.query_row_named(
"SELECT vector_clock FROM rec_local
WHERE guid = :guid AND is_deleted = 0
UNION ALL
SELECT vector_clock FROM rec_mirror
WHERE guid = :guid AND is_overridden IS NOT 1",
named_params! { ":guid": id },
|row| row.get(0),
)?)
}
pub fn delete_by_id(&self, id: &str) -> Result<bool> {
let tx = self.db.unchecked_transaction()?;
let exists = self.exists(id)?;
if !exists {
// Hrm, is there anything else we should do here? Logins goes
// through the whole process (which is tricker for us...)
return Ok(false);
}
let now_ms = MsTime::now();
let vclock = self.get_bumped_vclock(id)?;
// Locally, mark is_deleted and clear sensitive fields
self.db.execute_named(
"UPDATE rec_local
SET local_modified_ms = :now_ms,
sync_status = :changed,
is_deleted = 1,
record_data = '{}',
vector_clock = :vclock,
last_writer_id = :own_id
WHERE guid = :guid",
named_params! {
":now_ms": now_ms,
":changed": SyncStatus::Changed as u8,
":guid": id,
":vclock": vclock,
":own_id": self.client_id,
},
)?;
// Mark the mirror as overridden. XXX should we clear `record_data` here too?
self.db.execute_named(
"UPDATE rec_mirror SET is_overridden = 1 WHERE guid = :guid",
named_params! { ":guid": id },
)?;
// If we don't have a local record for this ID, but do have it in the
// mirror, insert tombstone.
self.db.execute_named(
"INSERT OR IGNORE INTO rec_local
(guid, local_modified_ms, is_deleted, sync_status, record_data, vector_clock, last_writer_id, remerge_schema_version)
SELECT (guid, :now_ms, 1, :changed, '{}', :vclock, :own_id, :schema_ver)
FROM rec_mirror
WHERE guid = :guid",
named_params! {
":now_ms": now_ms,
":guid": id,
":schema_ver": self.info.local.version.to_string(),
":vclock": vclock,
":changed": SyncStatus::Changed as u8,
})?;
tx.commit()?;
Ok(exists)
}
pub fn get_by_id(&self, id: &str) -> Result<Option<NativeRecord>> {
let _ = id;
let r: Option<LocalRecord> = self.db.try_query_row(
"SELECT record_data FROM rec_local WHERE guid = :guid AND is_deleted = 0
UNION ALL
SELECT record_data FROM rec_mirror WHERE guid = :guid AND is_overridden = 0
LIMIT 1",
named_params! { ":guid": id },
|r| r.get(0),
true, // cache
)?;
r.map(|v| self.info.local_to_native(&v)).transpose()
}
pub fn get_all(&self) -> Result<Vec<NativeRecord>> {
let mut stmt = self.db.prepare_cached(
"SELECT record_data FROM rec_local WHERE is_deleted = 0
UNION ALL
SELECT record_data FROM rec_mirror WHERE is_overridden = 0",
)?;
let rows = stmt.query_and_then(rusqlite::NO_PARAMS, |row| -> Result<NativeRecord> {
let r: LocalRecord = row.get("record_data")?;
self.info.local_to_native(&r)
})?;
rows.collect::<Result<_>>()
}
fn ensure_local_overlay_exists(&self, guid: &str) -> Result<()> {
let already_have_local: bool = self.db.query_row_named(
"SELECT EXISTS(SELECT 1 FROM rec_local WHERE guid = :guid)",
named_params! { ":guid": guid },
|row| row.get(0),
)?;
if already_have_local {
return Ok(());
}
log::debug!("No overlay; cloning one for {:?}.", guid);
let changed = self.clone_mirror_to_overlay(guid)?;
if changed == 0 {
log::error!("Failed to create local overlay for GUID {:?}.", guid);
throw!(ErrorKind::NoSuchRecord(guid.to_owned()));
}
Ok(())
}
fn clone_mirror_to_overlay(&self, guid: &str) -> Result<usize> {
let sql = "
INSERT OR IGNORE INTO rec_local
(guid, record_data, vector_clock, last_writer_id, local_modified_ms, is_deleted, sync_status)
SELECT
guid, record_data, vector_clock, last_writer_id, 0 as local_modified_ms, 0 AS is_deleted, 0 AS sync_status
FROM rec_mirror
WHERE guid = :guid
";
Ok(self
.db
.execute_named_cached(sql, named_params! { ":guid": guid })?)
}
fn mark_mirror_overridden(&self, guid: &str) -> Result<()> {
self.db.execute_named_cached(
"UPDATE rec_mirror SET is_overridden = 1 WHERE guid = :guid",
named_params! { ":guid": guid },
)?;
Ok(())
}
/// Combines get_vclock with counter_bump, and produces a new VClock with the bumped counter.
fn get_bumped_vclock(&self, id: &str) -> Result<VClock> {
let vc = self.get_vclock(id)?;
let counter = self.counter_bump()?;
Ok(vc.apply(self.client_id.clone(), counter))
}
pub fn update_record(&self, record: &NativeRecord) -> Result<()> {
let (guid, record) = self.info.native_to_local(record, ToLocalReason::Update)?;
let tx = self.db.unchecked_transaction()?;
if self.dupe_exists(&record)? {
throw!(InvalidRecord::Duplicate);
}
// Note: These fail with NoSuchRecord if the record doesn't exist.
self.ensure_local_overlay_exists(guid.as_str())?;
self.mark_mirror_overridden(guid.as_str())?;
let now_ms = MsTime::now();
let vclock = self.get_bumped_vclock(&guid)?;
let sql = "
UPDATE rec_local
SET local_modified_ms = :now_millis,
record_data = :record,
vector_clock = :vclock,
last_writer_id = :own_id,
remerge_schema_version = :schema_ver,
sync_status = max(sync_status, :changed)
WHERE guid = :guid
";
self.db.execute_named(
&sql,
named_params! {
":guid": guid,
":changed": SyncStatus::Changed as u8,
":record": record,
":schema_ver": self.info.local.version.to_string(),
":now_millis": now_ms,
":own_id": self.client_id,
":vclock": vclock,
},
)?;
tx.commit()?;
Ok(())
}
pub fn client_id(&self) -> Guid {
// Guid are essentially free unless the Guid ends up in the "large guid"
// path, which should never happen for remerge client ids, so it should
// be fine to always clone this.
self.client_id.clone()
}
pub fn info(&self) -> &RemergeInfo {
&self.info
}
fn dupe_exists(&self, _record: &LocalRecord) -> Result<bool> {
// XXX FIXME: this is obviously wrong, but should work for
// extension-storage / engines that don't do deduping. (Is it correct
// that ext-storage won't want to dedupe on anything?)
Ok(false)
}
}

Просмотреть файл

@ -0,0 +1,307 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use super::{LocalRecord, NativeRecord};
use crate::error::*;
use crate::schema::{FieldKind, FieldType, RecordSchema};
use crate::{Guid, JsonObject, JsonValue};
use std::sync::Arc;
/// Reason for converting a native record to a local record. Essentially a
/// typesafe `is_creation: bool`. Exists just to be passed to `native_to_local`,
/// see that function's docs for more info.
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub enum ToLocalReason {
/// The record is going to be compared with existing records, and won't be
/// inserted into the DB. This means we're going to perform deduping
Comparison,
/// The record is being created
Creation,
/// The record is expected to exist, and is being updated.
Update,
}
#[derive(Clone, Debug, PartialEq)]
pub struct RemergeInfo {
pub(crate) collection_name: String,
pub(crate) native: Arc<RecordSchema>,
pub(crate) local: Arc<RecordSchema>,
}
impl RemergeInfo {
/// Convert a native record to a local record.
///
/// The reason parameter influences the behavior of this function.
///
/// ### if `reason == Creation`
///
/// - We assume we need to generate an ID for this record, unless it has a
/// non-auto OwnGuid field (in which case we ensure this is present).
///
/// - If we have a timestamp field with the created_at semantic, we populate
/// that.
///
/// - Note that if you use this, you will likely need to check that no such
/// record is in the database already (this function can't).
///
/// ### if `reason == Update`
///
/// - We require the OwnGuid field to be populated.
/// - If we have a timestamp field with the updated_at semantic, it's
/// updated.
///
/// ### if `reason == Comparison`
///
/// - The OwnGuid field may optionally be populated. If it's not populated,
/// the resulting LocalRecord will not have a populated guid, and the
/// first member of the tuple will be an empty guid.
///
/// - The OwnGuid field is optional for comparison, since for deduping
/// you might want to validate an existing record. If this is the
/// case, the guid allows us to avoid saying an item is it's own dupe.
///
/// However, if validating prior to creation, you wouldn't provide a
/// guid (unless the own_guid is not auto)
///
/// - Semantic timestamps are not filled in (Hrm...)
pub fn native_to_local(
&self,
record: &NativeRecord,
reason: ToLocalReason,
) -> Result<(Guid, LocalRecord)> {
let mut id = Guid::random();
let mut fields = JsonObject::default();
// TODO: Maybe we should ensure this for all `Record`s?
let mut inserted_guid = false;
let now_ms = crate::MsTime::now();
for field in &self.local.fields {
let native_field = &self.native.field(&field.name);
// XXX `local_name` in the schema should be renamed to something
// else. It's the property used to rename a field locally, while
// leaving it's canonical name the same. For example, this is
// (eventually) what logins wants to do for hostname/origin.
//
// All good so far, the confusion is that `local` generally refers
// to the on-disk type, and `native` refers to the values coming
// from the running local application (which will use `local_name`).
//
// Or maybe renaming `LocalRecord` and such would be enough.
let native_name = native_field.map(|n| n.local_name.as_str());
let is_guid = FieldKind::OwnGuid == field.ty.kind();
let ts_sema = field.timestamp_semantic();
if let Some(v) = native_name.and_then(|s| record.get(s)) {
let mut fixed = field.validate(v.clone())?;
if is_guid {
if let JsonValue::String(s) = &fixed {
id = Guid::from(s.as_str());
inserted_guid = true;
} else {
unreachable!(
"Field::validate checks that OwnGuid fields have string values."
);
}
} else if let Some(semantic) = ts_sema {
use crate::schema::TimestampSemantic::*;
// Consider a format where in v1 there's a timestamp field
// which has no semantic, but the devs are manually making
// it behave like it had the `updated_at` semantic.
//
// Then, in v2, they did a closer read of the remerge docs
// (or something) and changed it to have the `updated_at`
// semantic.
//
// Erroring here would make this a breaking change. However,
// we don't really want to just support it blindly, so we
// check and see if the native schema version thinks this
// should be a timestamp field too, and if so we allow it
//
// However, we use our own timestamps, so that they're
// consistent with timestamps we generate elsewhere.
if native_field.map_or(false, |nf| !nf.is_kind(FieldKind::Timestamp)) {
throw!(InvalidRecord::InvalidField(
native_name
.unwrap_or_else(|| field.name.as_str())
.to_owned(),
format!(
"A value was provided for timestamp with {:?} semantic",
semantic
),
));
}
match (reason, semantic) {
(ToLocalReason::Creation, _) => {
// Initialize both CreatedAt/UpdatedAt to now_ms on creation
fixed = now_ms.into();
}
(ToLocalReason::Update, UpdatedAt) => {
fixed = now_ms.into();
}
// Keep these here explicitly to ensure this gets
// updated if the enums changed.
(ToLocalReason::Update, CreatedAt) => {}
(ToLocalReason::Comparison, _) => {
// XXX The result of this won't be "fully" valid...
// Shouldn't matter for deduping (what Comparison is
// currently used for), since you cant dedupe_on a
// semantic timestamp (validation checks this).
}
}
}
fields.insert(field.name.clone(), fixed);
} else if let Some(def) = field.ty.get_default() {
fields.insert(field.name.clone(), def);
} else if is_guid {
match reason {
ToLocalReason::Update => {
throw!(InvalidRecord::InvalidField(
native_name
.unwrap_or_else(|| field.name.as_str())
.to_owned(),
"no value provided in ID field for update".into()
));
}
ToLocalReason::Creation => {
// Note: auto guids are handled below
fields.insert(field.name.clone(), id.to_string().into());
}
ToLocalReason::Comparison => {
// Records from Comparison are allowed to omit their
// guids. Motivation for this is in fn header comment
// (tldr: you'll want to omit it when running a
// validation/dupe check for a fresh record, and provide
// it for an existing record)
// Clear the `id`. This isn't great, but I doubt anybody
// will care about it. Using an Option<Guid> for the
// return where it will always be Some(id) for
// Creation/Update, and None for Comparison seems worse
// to me.
//
// eh. Comparison is only half-implemented for now
// anyway.
id = Guid::empty();
}
}
} else if field.required {
throw!(InvalidRecord::MissingRequiredField(
native_name
.unwrap_or_else(|| field.name.as_str())
.to_owned()
));
}
}
// XXX We should error if there are any fields in the native record we
// don't know about, instead of silently droppin them.
if !inserted_guid && reason == ToLocalReason::Creation {
self.complain_unless_auto_guid()?;
}
Ok((id, LocalRecord::new_unchecked(fields)))
}
pub fn local_to_native(&self, record: &LocalRecord) -> Result<NativeRecord> {
let mut fields = JsonObject::default();
// Note: we should probably report special telemetry for many of these
// errors, as they indicate (either a bug in remerge or)
for native_field in &self.native.fields {
// First try the record. Note that the `name` property isnt'
// supposed to change, barring removal or similar. (This is why
// `local_name` exists)
if let Some(value) = record.get(&native_field.name) {
fields.insert(native_field.local_name.clone(), value.clone());
continue;
} else if let Some(default) = native_field.ty.get_default() {
// Otherwise, we see if the field has a default value specified
// in the native schema.
fields.insert(native_field.local_name.clone(), default);
continue;
}
// If not, see if it has a default specified in the local schema.
// Even though we apply defaults when writing local records into the
// DB, this can happen if the local schema we wrote `record` with is
// an older version than our current local schema version.
if let Some(default) = self
.local
.field(&native_field.name)
.and_then(|lf| lf.ty.get_default())
{
// Make sure that that default is valid. If it's not, we
// ignore it (unless it's a required native field, in which
// case we complain).
if let Ok(fixed) = native_field.validate(default.clone()) {
if fixed == default {
fields.insert(native_field.local_name.clone(), default);
continue;
}
// If this is actually a problem (e.g. the field is
// required), we'll complain loudly below (this is likely a
// schema issue if the field is required).
log::error!(
"More recent schema has default record for field {:?}, but it required fixups according to the native schema!",
native_field.local_name,
);
} else {
// Same -- this is bad, but if it really matters we report
// below.
//
// (That said, for this one, it might be worth always
// erroring for tho, since the local schema's default value
// for some field being invalid according to the native
// schema should be a breaking change)
log::error!(
"More recent schema has default record for field {:?}, but it required fixups according to the native schema!",
native_field.local_name,
);
}
}
if !native_field.required {
// We didn't have it, but it's optional.
continue;
}
// Everything we tried failed, which means we have a bad record in
// our DB. This is probably caused by an incompatible schema update
// that didn't specify the right required version. :(
//
// In practice this can be fixed by pushing a updated schema with a
// default value / fixed default value for this, so it's unclear
// what to actually do here until we see what kinds of things cause
// it in the wild, if any.
throw!(ErrorKind::LocalToNativeError(format!(
"Local record is missing or has invalid required field {:?}",
native_field.local_name
)));
}
Ok(NativeRecord::new_unchecked(fields))
}
/// Called if the guid isn't provided, returns Err if it wasn't needed.
fn complain_unless_auto_guid(&self) -> Result<()> {
let mut required_own_guid_field = None;
for &schema in &[&*self.local, &*self.native] {
if let Some(idx) = schema.field_own_guid {
if let FieldType::OwnGuid { auto } = &schema.fields[idx].ty {
if *auto {
return Ok(());
}
required_own_guid_field = Some(schema.fields[idx].name.as_str());
} else {
// Validation ensures this.
panic!("bug: field_own_guid refers to non-OwnGuid field");
}
}
}
if let Some(name) = required_own_guid_field {
throw!(InvalidRecord::MissingRequiredField(name.to_string()));
}
Ok(())
}
}

Просмотреть файл

@ -4,9 +4,14 @@
pub mod bootstrap;
pub mod db;
mod info;
mod meta;
pub mod records;
pub mod schema;
pub use info::RemergeInfo;
pub use records::{LocalRecord, NativeRecord};
use crate::schema::RecordSchema;
use std::sync::Arc;
@ -20,6 +25,9 @@ use std::sync::Arc;
/// stable serialization format for schemas, and don't need two).
///
/// Initialize with TryFrom.
///
// XXX the name NativeSchemaInfo is kind of confusing with RemergeSchemaInfo...
// Neither are great names
#[derive(Clone)]
pub struct NativeSchemaInfo<'a> {
pub parsed: Arc<RecordSchema>,

Просмотреть файл

@ -0,0 +1,196 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
//! This module provides wrappers around JsonValue that allow for better
//! documentation and type safety for the format a method (usually in `db.rs`)
//! is expected to take/return.
//!
//! XXX The names "Local" vs "Native" here (and around) is confusing, but beats
//! everything passing around `serde_json::Value`s directly, and it matches the
//! terms used in the RFC. I'm open to name suggestions, though.
use crate::{error::*, JsonObject, JsonValue};
use std::marker::PhantomData;
mod private {
/// Sealed trait to prevent code from outside from implementing RecordFormat
/// for anything other than the implementations here.
pub trait Sealed {}
impl Sealed for super::LocalFormat {}
impl Sealed for super::NativeFormat {}
}
/// Used to distinguish different categories of records.
///
/// Most of the bounds are just so that we don't have to manually implement
/// traits we could otherwise derive -- in practice we just use this in a
/// PhantomData.
pub trait RecordFormat:
private::Sealed + Copy + std::fmt::Debug + PartialEq + 'static + Sync + Send
{
}
/// Record format for records in the current local schema. This is the format
/// which we insert into the database, and it should always be newer or
/// equal to the native format.
#[derive(Debug, Clone, PartialEq, Copy, Eq, Ord, Hash, PartialOrd)]
pub struct LocalFormat;
/// A record in the native format understood by the local application using
/// remerge. Data that comes from the FFI, and that is returned over the FFI
/// should be in this format.
#[derive(Debug, Clone, PartialEq, Copy, Eq, Ord, Hash, PartialOrd)]
pub struct NativeFormat;
// Note: For sync we'll likely want a RemoteFormat/RemoteRecord too.
impl RecordFormat for LocalFormat {}
impl RecordFormat for NativeFormat {}
/// A [`Record`] in [`LocalFormat`].
pub type LocalRecord = Record<LocalFormat>;
/// A [`Record`] in [`NativeFormat`].
pub type NativeRecord = Record<NativeFormat>;
/// A wrapper around `serde_json::Value` which indicates what format the record
/// is in. Note that converting between formats cannot be done without schema
/// information, so this is a paper-thin wrapper.
///
/// # Which record format to use
///
/// - Data coming from the FFI, or being returned to the FFI is always in
/// [`NativeFormat`], so use NativeRecord.
///
/// - Data going into the database, or that came out of the database is in
/// [`LocalFormat`], so use LocalRecord.
///
/// - Data from remote servers will likely be a future `RemoteFormat`, and you'd
/// use [`RemoteRecord`].
///
/// Converting between a record in one format to another requires schema
/// information. This can generally done by methods on `RemergeInfo`.
#[repr(transparent)]
#[derive(Debug, Clone, PartialEq)]
pub struct Record<F: RecordFormat>(pub(crate) JsonObject, PhantomData<F>);
impl<F: RecordFormat> Record<F> {
/// Create a new record with the format `F` directly.
///
/// The name of this function contains `unchecked` as it's up to the caller
/// to ensure that the `record_json` is actually in the requested format.
/// See the [`Record`] docs for how to make this determination.
#[inline]
pub fn new_unchecked(record_json: JsonObject) -> Self {
Self(record_json, PhantomData)
}
/// If `record` is a JSON Object, returns `Ok(Self::new_unchecked(record))`,
/// otherwise, returns `Err(InvalidRecord::NotJsonObject)`
///
/// The name of this function contains `unchecked` as it's up to the caller
/// to ensure that the `record_json` is actually in the requested format.
/// See the [`Record`] docs for how to make this determination.
pub fn from_value_unchecked(record_json: JsonValue) -> Result<Self, InvalidRecord> {
if let JsonValue::Object(m) = record_json {
Ok(Self::new_unchecked(m))
} else {
Err(crate::error::InvalidRecord::NotJsonObject)
}
}
#[inline]
pub fn as_obj(&self) -> &JsonObject {
&self.0
}
#[inline]
pub fn into_obj(self) -> JsonObject {
self.0
}
#[inline]
pub fn into_val(self) -> JsonValue {
self.into_obj().into()
}
}
impl NativeRecord {
/// Parse a record from a str given to us over the FFI, returning an error
/// if it's obviously bad (not a json object).
pub fn from_native_str(s: &str) -> Result<Self> {
let record: JsonValue = serde_json::from_str(s)?;
if let JsonValue::Object(m) = record {
Ok(Self(m, PhantomData))
} else {
Err(crate::error::InvalidRecord::NotJsonObject.into())
}
}
}
impl<F: RecordFormat> std::ops::Deref for Record<F> {
type Target = JsonObject;
#[inline]
fn deref(&self) -> &Self::Target {
self.as_obj()
}
}
impl<F: RecordFormat> AsRef<JsonObject> for Record<F> {
#[inline]
fn as_ref(&self) -> &JsonObject {
self.as_obj()
}
}
impl<F: RecordFormat> From<Record<F>> for JsonValue {
#[inline]
fn from(r: Record<F>) -> JsonValue {
r.into_val()
}
}
impl<F: RecordFormat> From<Record<F>> for JsonObject {
#[inline]
fn from(r: Record<F>) -> JsonObject {
r.into_obj()
}
}
impl<'a, F: RecordFormat> From<&'a Record<F>> for &'a JsonObject {
#[inline]
fn from(r: &'a Record<F>) -> &'a JsonObject {
&r.0
}
}
impl<F: RecordFormat> std::fmt::Display for Record<F> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut writer = crate::util::FormatWriter(f);
serde_json::to_writer(&mut writer, &self.0).map_err(|_| std::fmt::Error)
}
}
// Separated because we're going to glob import rusqlite::types::*, since we
// need nearly all of them.
mod sql_impls {
use super::LocalRecord;
use rusqlite::{types::*, Result};
impl ToSql for LocalRecord {
fn to_sql(&self) -> Result<ToSqlOutput<'_>> {
Ok(ToSqlOutput::from(self.to_string()))
}
}
impl FromSql for LocalRecord {
fn column_result(value: ValueRef<'_>) -> FromSqlResult<Self> {
match value {
ValueRef::Text(s) => serde_json::from_slice(s),
ValueRef::Blob(b) => serde_json::from_slice(b),
_ => return Err(FromSqlError::InvalidType),
}
.map(LocalRecord::new_unchecked)
.map_err(|err| FromSqlError::Other(err.into()))
}
}
}

Просмотреть файл

@ -1,6 +1,7 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::io::{Error as IoError, ErrorKind as IoErrorKind, Result as IoResult, Write};
/// For use with `#[serde(skip_serializing_if = )]`
#[inline]
@ -48,3 +49,28 @@ macro_rules! ensure {
}
};
}
/// Helper to allow passing a std::fmt::Formatter to a function needing
/// std::io::Write.
///
/// Mainly used to implement std::fmt::Display for the Record types without
/// requiring cloning them (which would be needed because serde_json::Value is
/// the one that impls Display, not serde_json::Map, aka JsonObject).
///
/// Alternatively we could have done `serde_json::to_string(self).unwrap()` or
/// something, but this this is cleaner.
pub struct FormatWriter<'a, 'b>(pub &'a mut std::fmt::Formatter<'b>);
impl<'a, 'b> Write for FormatWriter<'a, 'b> {
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
std::str::from_utf8(buf)
.ok()
.and_then(|s| self.0.write_str(s).ok())
.ok_or_else(|| IoError::new(IoErrorKind::Other, std::fmt::Error))?;
Ok(buf.len())
}
fn flush(&mut self) -> IoResult<()> {
Ok(())
}
}

Просмотреть файл

@ -5,9 +5,9 @@
//! Our implementation of vector clocks. See the remerge RFC's appendix for an
//! overview of how these work if you're unfamiliar.
use crate::Guid;
use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
use std::collections::BTreeMap;
use sync_guid::Guid;
pub type Counter = u64;
@ -105,27 +105,32 @@ impl VClock {
/// instead. Otherwise, do nothing.
///
/// Notes that this clock has seen the `value`th event of `client_id`.
pub fn apply(&mut self, client_id: Guid, value: Counter) {
#[must_use]
pub fn apply(mut self, client_id: Guid, value: Counter) -> Self {
if value == 0 {
// Avoid inserting 0 if we can help it.
return;
return self;
}
let old_value = self.0.entry(client_id).or_default();
if *old_value < value {
*old_value = value;
}
self
}
pub fn apply_all(&mut self, o: &VClock) {
for (id, ctr) in &o.0 {
self.apply(id.clone(), *ctr)
}
#[must_use]
pub fn combine(self, o: &VClock) -> Self {
o.0.iter()
.fold(self, |accum, (id, ctr)| accum.apply(id.clone(), *ctr))
}
}
pub fn merge(&self, o: &VClock) -> Self {
let mut res = self.clone();
res.apply_all(o);
res
impl<'a> IntoIterator for &'a VClock {
type IntoIter = std::collections::btree_map::Iter<'a, Guid, Counter>;
type Item = (&'a Guid, &'a Counter);
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.0.iter()
}
}

Просмотреть файл

@ -11,7 +11,7 @@ macro_rules! define_error_wrapper {
/// method, which we don't use much but should *really* use more.
pub use failure::ResultExt;
pub type Result<T> = std::result::Result<T, Error>;
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct Error(Box<failure::Context<$Kind>>);