Merge pull request #596 from mozilla/push-storage

feat: schema and basic crud of push's storage
This commit is contained in:
JR Conlin 2019-02-01 09:16:33 -08:00 коммит произвёл GitHub
Родитель 933a55d1e3 ab13427691
Коммит bf5aedc616
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 552 добавлений и 155 удалений

Просмотреть файл

@ -5,9 +5,16 @@ authors = ["jrconlin <me+crypt@jrconlin.com>"]
edition = "2018"
[dependencies]
crypto = {path = "../crypto"}
crypto = { path = "../crypto" }
lazy_static = "1.2"
log = "0.4"
failure = "0.1.5"
failure_derive = "0.1.5"
openssl = "0.10"
lazy_static = "1.2"
sql-support = {path = "../../support/sql"}
rusqlite = "0.16.0"
serde = { version = "1.0.85", features = ["derive"] }
serde_json = "1.0.37"
sql-support = { path = "../../support/sql" }
[dev-dependencies]
env_logger = "0.5.13"

Просмотреть файл

@ -0,0 +1,205 @@
use std::{ops::Deref, path::Path};
use rusqlite::{types::ToSql, Connection, NO_PARAMS};
use sql_support::ConnExt;
use crate::{
error::{Error, Result},
record::PushRecord,
schema,
};
// TODO: Add broadcasts storage
pub trait Storage {
fn get_record(&self, uaid: &str, chid: &str) -> Result<Option<PushRecord>>;
fn put_record(&self, uaid: &str, record: &PushRecord) -> Result<bool>;
fn delete_record(&self, uaid: &str, chid: &str) -> Result<bool>;
fn delete_all_records(&self, uaid: &str) -> Result<()>;
}
pub struct PushDb {
pub db: Connection,
}
impl PushDb {
pub fn with_connection(db: Connection) -> Result<Self> {
// XXX: consider the init_test_logging call in other components
schema::init(&db)?;
Ok(Self { db })
}
pub fn open(path: impl AsRef<Path>) -> Result<impl Storage> {
Ok(Self::with_connection(Connection::open(path)?)?)
}
fn open_in_memory() -> Result<impl Storage> {
let conn = Connection::open_in_memory()?;
Ok(Self::with_connection(conn)?)
}
}
impl Deref for PushDb {
type Target = Connection;
fn deref(&self) -> &Connection {
&self.db
}
}
impl ConnExt for PushDb {
fn conn(&self) -> &Connection {
*&self
}
}
impl Storage for PushDb {
fn get_record(&self, _uaid: &str, chid: &str) -> Result<Option<PushRecord>> {
let query = format!(
"SELECT {common_cols}
FROM push_record WHERE channel_id = :chid",
common_cols = schema::COMMON_COLS,
);
Ok(self.try_query_row(
&query,
&[(":chid", &chid as &ToSql)],
PushRecord::from_row,
false,
)?)
}
fn put_record(&self, _uaid: &str, record: &PushRecord) -> Result<bool> {
let query = format!(
"INSERT INTO push_record
({common_cols})
VALUES
(:channel_id, :endpoint, :scope, :origin_attributes, :key, :system_record,
:recent_message_ids, :push_count, :last_push, :ctime, :quota, :app_server_key,
:native_id)
ON CONFLICT(channel_id) DO UPDATE SET
endpoint = :endpoint,
scope = :scope,
origin_attributes = :origin_attributes,
key = :key,
system_record = :system_record,
recent_message_ids = :recent_message_ids,
push_count = :push_count,
last_push = :last_push,
ctime = :ctime,
quota = :quota,
app_server_key = :app_server_key,
native_id = :native_id",
common_cols = schema::COMMON_COLS,
);
let affected_rows = self.execute_named(
&query,
&[
(":channel_id", &record.channel_id),
(":endpoint", &record.endpoint),
(":scope", &record.scope),
(":origin_attributes", &record.origin_attributes),
(":key", &record.key),
(":system_record", &record.system_record),
(
":recent_message_ids",
&serde_json::to_string(&record.recent_message_ids).map_err(|e| {
Error::internal(&format!("Serializing recent_message_ids: {}", e))
})?,
),
(":push_count", &record.push_count),
(":last_push", &record.last_push),
(":ctime", &record.ctime),
(":quota", &record.quota),
(":app_server_key", &record.app_server_key),
(":native_id", &record.native_id),
],
)?;
Ok(affected_rows == 1)
}
fn delete_record(&self, _uaid: &str, chid: &str) -> Result<bool> {
let affected_rows = self.execute_named(
"DELETE FROM push_record
WHERE channel_id = :chid",
&[(":chid", &chid as &ToSql)],
)?;
Ok(affected_rows == 1)
}
fn delete_all_records(&self, _uaid: &str) -> Result<()> {
self.execute("DELETE FROM push_record", NO_PARAMS)?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::PushDb;
use crate::{db::Storage, error::Result, record::PushRecord};
use crypto::{Crypto, Cryptography};
fn prec() -> PushRecord {
PushRecord::new(
"",
"deadbeef00000000decafbad00000000",
"https://example.com/update",
"https://example.com/1",
"appId=1",
Crypto::generate_key().expect("Couldn't generate_key"),
false,
)
}
#[test]
fn basic() -> Result<()> {
let db = PushDb::open_in_memory()?;
let rec = prec();
let chid = &rec.channel_id;
assert!(db.get_record("", chid)?.is_none());
assert!(db.put_record("", &rec)?);
assert!(db.get_record("", chid)?.is_some());
assert_eq!(db.get_record("", chid)?, Some(rec.clone()));
let mut rec2 = rec.clone();
rec2.increment()?;
assert!(db.put_record("", &rec2)?);
let result = db.get_record("", chid)?.unwrap();
assert_ne!(result, rec);
assert_eq!(result, rec2);
Ok(())
}
#[test]
fn delete() -> Result<()> {
let db = PushDb::open_in_memory()?;
let rec = prec();
let chid = &rec.channel_id;
assert!(db.put_record("", &rec)?);
assert!(db.get_record("", chid)?.is_some());
assert!(db.delete_record("", chid)?);
assert!(db.get_record("", chid)?.is_none());
Ok(())
}
#[test]
fn delete_all_records() -> Result<()> {
let db = PushDb::open_in_memory()?;
let rec = prec();
let mut rec2 = rec.clone();
rec2.channel_id = "deadbeef00000002".to_owned();
rec2.endpoint = "https://example.com/update2".to_owned();
assert!(db.put_record("", &rec)?);
assert!(db.put_record("", &rec2)?);
assert!(db.get_record("", &rec.channel_id)?.is_some());
db.delete_all_records("")?;
assert!(db.get_record("", &rec.channel_id)?.is_none());
assert!(db.get_record("", &rec2.channel_id)?.is_none());
Ok(())
}
}

Просмотреть файл

@ -0,0 +1,64 @@
use failure::{Context, Fail};
use rusqlite;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub struct Error {
inner: Context<ErrorKind>,
}
#[derive(Debug, Fail)]
pub enum ErrorKind {
#[fail(display = "Unexpected error: {}", _0)]
Internal(String),
#[fail(display = "Error executing SQL: {}", _0)]
Sql(#[fail(cause)] rusqlite::Error),
}
impl Error {
#[inline]
pub fn kind(&self) -> &ErrorKind {
self.inner.get_context()
}
pub fn internal(msg: &str) -> Self {
ErrorKind::Internal(msg.to_owned()).into()
}
}
impl From<ErrorKind> for Error {
#[inline]
fn from(kind: ErrorKind) -> Error {
Context::new(kind).into()
}
}
impl From<Context<ErrorKind>> for Error {
fn from(inner: Context<ErrorKind>) -> Error {
Error { inner }
}
}
macro_rules! impl_from_error {
($(($variant:ident, $type:ty)),+) => ($(
impl From<$type> for ErrorKind {
#[inline]
fn from(e: $type) -> ErrorKind {
ErrorKind::$variant(e)
}
}
impl From<$type> for Error {
#[inline]
fn from(e: $type) -> Error {
ErrorKind::from(e).into()
}
}
)*);
}
impl_from_error! {
(Sql, rusqlite::Error)
}

Просмотреть файл

@ -6,156 +6,13 @@
*/
extern crate crypto;
use openssl::ec::EcKey;
use openssl::pkey::Private;
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
mod db;
mod error;
mod record;
mod schema;
mod types;
use crypto::Key;
pub type ChannelID = String;
#[derive(Clone, Debug, PartialEq)]
pub struct PushRecord {
// Endpoint provided from the push server
pub endpoint: String,
// Designation label provided by the subscribing service
pub designator: String,
// List of origin Host attributes.
pub origin_attributes: HashMap<String, String>,
// Number of pushes for this record
pub push_count: u8,
// Last push rec'vd
pub last_push: u64,
// Private EC Prime256v1 key info. (Public key can be derived from this)
pub key: Vec<u8>,
// Is this as priviledged system record
pub system_record: bool,
// VAPID public key to restrict subscription updates for only those that sign
// using the private VAPID key.
pub app_server_key: Option<String>,
// List of the most recent message IDs from the server.
pub recent_message_ids: Vec<String>,
// Time this subscription was created.
pub ctime: u64,
// Max quota count for sub
pub quota: u8,
// (if this is a bridged connection (e.g. on Android), this is the native OS Push ID)
pub native_id: Option<String>,
}
fn now_u64() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
impl PushRecord {
fn increment(&mut self) -> Result<Self, StorageError> {
self.push_count += 1;
self.last_push = now_u64();
// TODO check for quotas, etc
// write to storage.
Ok(self.clone())
}
}
//TODO: Add broadcasts storage
pub struct StorageError;
pub trait Storage {
// Connect to the storage system
// fn connect<S: Storage>() -> S;
// Generate a Push Record from the Subscription info, which has the endpoint,
// encryption keys, etc.
fn create_record(
&self,
uaid: &str,
chid: &str,
origin_attributes: HashMap<String, String>,
endpoint: &str,
auth: &str,
private_key: &Key,
system_record: bool,
) -> PushRecord;
fn get_record(&self, uaid: &str, chid: &str) -> Option<PushRecord>;
fn put_record(&self, uaid: &str, chid: &str, record: &PushRecord)
-> Result<bool, StorageError>;
fn purge(&self, uaid: &str, chid: Option<&str>) -> Result<bool, StorageError>;
fn generate_channel_id(&self) -> String;
}
pub struct Store;
impl Store {
fn connect() -> impl Storage {
Store
}
}
// TODO: Fill this out (pretty skeletal)
impl Storage for Store {
fn create_record(
&self,
_uaid: &str,
chid: &str,
origin_attributes: HashMap<String, String>,
endpoint: &str,
server_auth: &str,
private_key: &Key,
_system_record: bool,
) -> PushRecord {
// TODO: fill this out properly
PushRecord {
endpoint: String::from(endpoint),
designator: String::from(chid),
origin_attributes: origin_attributes.clone(),
push_count: 0,
last_push: 0,
key: private_key.serialize().unwrap(),
system_record: false,
app_server_key: None,
recent_message_ids: Vec::new(),
// do we need sub second resolution?
ctime: now_u64(),
quota: 0,
native_id: None,
}
}
fn get_record(&self, _uaid: &str, _chid: &str) -> Option<PushRecord> {
None
}
fn put_record(
&self,
_uaid: &str,
_chid: &str,
_record: &PushRecord,
) -> Result<bool, StorageError> {
Ok(false)
}
fn purge(&self, _uaid: &str, _chid: Option<&str>) -> Result<bool, StorageError> {
Ok(false)
}
fn generate_channel_id(&self) -> String {
String::from("deadbeef00000000decafbad00000000")
}
}
pub use crate::{
db::{PushDb as Store, Storage},
record::PushRecord,
};

Просмотреть файл

@ -0,0 +1,111 @@
use rusqlite::Row;
use crate::{
error::{Error, Result},
types::Timestamp,
};
use crypto::Key;
pub type ChannelID = String;
#[derive(Clone, Debug, PartialEq)]
pub struct PushRecord {
// Designation label provided by the subscribing service
pub channel_id: ChannelID,
// Endpoint provided from the push server
pub endpoint: String,
// XXX:
pub scope: String,
// An originAttributes to suffix string (XXX: related to scope)
pub origin_attributes: String,
// Private EC Prime256v1 key info. (Public key can be derived from this)
pub key: Vec<u8>,
// Is this as priviledged system record
pub system_record: bool,
// List of the most recent message IDs from the server.
pub recent_message_ids: Vec<String>,
// Number of pushes for this record
pub push_count: u8,
// Last push rec'vd
pub last_push: Timestamp,
// Time this subscription was created.
pub ctime: Timestamp,
// Max quota count for sub
pub quota: u8,
// VAPID public key to restrict subscription updates for only those that sign
// using the private VAPID key.
pub app_server_key: Option<String>,
// (if this is a bridged connection (e.g. on Android), this is the native OS Push ID)
pub native_id: Option<String>,
}
impl PushRecord {
/// Create a Push Record from the Subscription info: endpoint, encryption
/// keys, etc.
pub fn new(
_uaid: &str,
chid: &str,
endpoint: &str,
scope: &str,
origin_attributes: &str,
private_key: Key,
system_record: bool,
) -> Self {
// XXX: unwrap
Self {
channel_id: chid.to_owned(),
endpoint: endpoint.to_owned(),
scope: scope.to_owned(),
origin_attributes: origin_attributes.to_owned(),
key: private_key.serialize().unwrap(),
system_record: system_record,
recent_message_ids: vec![],
push_count: 0,
last_push: 0.into(), // XXX: consider null instead
ctime: Timestamp::now(),
quota: 0,
app_server_key: None,
native_id: None,
}
}
pub(crate) fn from_row(row: &Row) -> Result<Self> {
Ok(PushRecord {
channel_id: row.get_checked("channel_id")?,
endpoint: row.get_checked("endpoint")?,
scope: row.get_checked("scope")?,
origin_attributes: row.get_checked("origin_attributes")?,
key: row.get_checked("key")?,
system_record: row.get_checked("system_record")?,
recent_message_ids: serde_json::from_str(
&row.get_checked::<_, String>("recent_message_ids")?,
)
.map_err(|e| Error::internal(&format!("Deserializing recent_message_ids: {}", e)))?,
push_count: row.get_checked("push_count")?,
last_push: row.get_checked("last_push")?,
ctime: row.get_checked("ctime")?,
quota: row.get_checked("quota")?,
app_server_key: row.get_checked("app_server_key")?,
native_id: row.get_checked("native_id")?,
})
}
pub(crate) fn increment(&mut self) -> Result<Self> {
self.push_count += 1;
self.last_push = Timestamp::now();
// TODO: check for quotas, etc
Ok(self.clone())
}
}

Просмотреть файл

@ -0,0 +1,89 @@
use rusqlite::Connection;
use sql_support::ConnExt;
use crate::error::Result;
const VERSION: i64 = 1;
// XXX: named "pushapi", maybe push_sub?
const CREATE_TABLE_PUSH_SQL: &'static str = "
CREATE TABLE IF NOT EXISTS push_record (
channel_id TEXT NOT NULL PRIMARY KEY,
endpoint TEXT NOT NULL UNIQUE,
scope TEXT NOT NULL,
origin_attributes TEXT NOT NULL,
key TEXT NOT NULL,
system_record TINYINT NOT NULL,
recent_message_ids TEXT NOT NULL,
push_count SMALLINT NOT NULL,
last_push INTEGER NOT NULL,
ctime INTEGER NOT NULL,
quota TINYINT NOT NULL,
app_server_key TEXT,
native_id TEXT
);
-- index to fetch records based on endpoints. used by unregister
CREATE INDEX idx_endpoint ON push_record (endpoint);
-- index to fetch records by identifiers. In the current security
-- model, the originAttributes distinguish between different 'apps' on
-- the same origin. Since ServiceWorkers are same-origin to the scope
-- they are registered for, the attributes and scope are enough to
-- reconstruct a valid principal.
CREATE UNIQUE INDEX idx_identifiers ON push_record (scope, origin_attributes);
CREATE INDEX idx_origin_attributes ON push_record (origin_attributes);
";
pub const COMMON_COLS: &'static str = "
channel_id,
endpoint,
scope,
origin_attributes,
key,
system_record,
recent_message_ids,
push_count,
last_push,
ctime,
quota,
app_server_key,
native_id
";
pub fn init(db: &Connection) -> Result<()> {
let user_version = db.query_one::<i64>("PRAGMA user_version")?;
if user_version == 0 {
create(db)?;
} else if user_version != VERSION {
if user_version < VERSION {
upgrade(db, user_version)?;
} else {
log::warn!(
"Loaded future schema version {} (we only understand version {}). \
Optimisitically ",
user_version,
VERSION
)
}
}
Ok(())
}
fn upgrade(_db: &Connection, from: i64) -> Result<()> {
log::debug!("Upgrading schema from {} to {}", from, VERSION);
if from == VERSION {
return Ok(());
}
panic!("sorry, no upgrades yet - delete your db!");
}
pub fn create(db: &Connection) -> Result<()> {
log::debug!("Creating schema");
db.execute_all(&[
CREATE_TABLE_PUSH_SQL,
&format!("PRAGMA user_version = {version}", version = VERSION),
])?;
Ok(())
}

Просмотреть файл

@ -0,0 +1,64 @@
use std::fmt;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rusqlite::types::{FromSql, FromSqlResult, ToSql, ToSqlOutput, ValueRef};
use rusqlite::Result as RusqliteResult;
use serde::{Deserialize, Serialize};
// Typesafe way to manage timestamps.
// XXX: copied from places, consolidate the impls later
#[derive(
Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize, Default,
)]
pub struct Timestamp(pub u64);
impl Timestamp {
pub fn now() -> Self {
SystemTime::now().into()
}
}
impl From<Timestamp> for u64 {
fn from(ts: Timestamp) -> Self {
ts.0
}
}
impl From<SystemTime> for Timestamp {
fn from(st: SystemTime) -> Self {
let d = st.duration_since(UNIX_EPOCH).unwrap_or_default();
Timestamp((d.as_secs() as u64) * 1000 + ((d.subsec_nanos() as u64) / 1_000_000))
}
}
impl From<Timestamp> for SystemTime {
fn from(ts: Timestamp) -> Self {
UNIX_EPOCH + Duration::from_millis(ts.into())
}
}
impl From<u64> for Timestamp {
fn from(ts: u64) -> Self {
// XXX: we use 0.. maybe null instead
//assert!(ts != 0);
Timestamp(ts)
}
}
impl fmt::Display for Timestamp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl ToSql for Timestamp {
fn to_sql(&self) -> RusqliteResult<ToSqlOutput> {
Ok(ToSqlOutput::from(self.0 as i64)) // hrm - no u64 in rusqlite
}
}
impl FromSql for Timestamp {
fn column_result(value: ValueRef) -> FromSqlResult<Self> {
value.as_i64().map(|v| Timestamp(v as u64)) // hrm - no u64
}
}