Adding shutdown support to sql-support

When the user wants to shutdown the application, we should:
  - Interrupt all current `SqlInterruptScope`s
  - Interrupt all future `SqlInterruptScope`s when they're created.

The nice thing about this approach is that it didn't require invasive
changes in places to support it.  The main new requirement was we need
to have a way to get a `Weak<AsRef<SqlInterruptHandler>>` for each
database.  In order to support that, I needed to:

 - For the read/write and read-only connections: have `PlacesConnection`
   store an `SqlInterruptHandler` and implement `AsRef`.
 - For the sync connection: Added struct that wraps the
   `Mutex<PlacesDb>` and also stores a `SqlInterruptHandler` and
   implements `AsRef`.

Updated `places-utils` so that ctrl-c starts shutdown mode.
This commit is contained in:
Ben Dean-Kawamura 2022-02-01 17:33:45 -05:00
Родитель d5c490946e
Коммит 91d0bab23e
19 изменённых файлов: 202 добавлений и 53 удалений

2
Cargo.lock сгенерированный
Просмотреть файл

@ -893,6 +893,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"sql-support",
"structopt",
"sync-guid",
"sync15",
@ -3315,6 +3316,7 @@ dependencies = [
"lazy_static",
"log",
"nss_build_common",
"parking_lot 0.5.5",
"rusqlite",
"tempfile",
"thiserror",

Просмотреть файл

@ -86,7 +86,7 @@ pub fn search_frecent(conn: &PlacesDb, params: SearchParams) -> Result<Vec<Searc
}
pub fn match_url(conn: &PlacesDb, query: impl AsRef<str>) -> Result<Option<Url>> {
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
let matcher = OriginOrUrl::new(query.as_ref());
// Note: The matcher ignores the limit argument (it's a trait method)
let results = matcher.search(conn, 1)?;
@ -107,7 +107,7 @@ fn match_with_limit(
) -> Result<Vec<SearchResult>> {
let mut results = Vec::new();
let mut rem_results = max_results;
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
for m in matchers {
if rem_results == 0 {
break;

Просмотреть файл

@ -3,7 +3,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::bookmark_sync::BookmarksSyncEngine;
use crate::db::db::PlacesDb;
use crate::db::db::{PlacesDb, SharedPlacesDb};
use crate::error::*;
use crate::history_sync::HistorySyncEngine;
use crate::storage::{
@ -13,7 +13,7 @@ use crate::util::normalize_path;
use lazy_static::lazy_static;
use parking_lot::Mutex;
use rusqlite::OpenFlags;
use sql_support::{SqlInterruptHandle, SqlInterruptScope};
use sql_support::{register_interrupt, SqlInterruptHandle};
use std::cell::Cell;
use std::collections::HashMap;
use std::mem;
@ -129,7 +129,7 @@ pub struct PlacesApi {
// called again, we reuse it.
// - The outer mutex synchronizes the `get_sync_connection()` operation. If multiple threads
// ran that at the same time there would be issues.
sync_connection: Mutex<Weak<Mutex<PlacesDb>>>,
sync_connection: Mutex<Weak<SharedPlacesDb>>,
id: usize,
}
@ -217,7 +217,7 @@ impl PlacesApi {
// - Each connection is wrapped in a `Mutex<>` to synchronize access.
// - The mutex is then wrapped in an Arc<>. If the last Arc<> returned is still alive, then
// get_sync_connection() will reuse it.
pub fn get_sync_connection(&self) -> Result<Arc<Mutex<PlacesDb>>> {
pub fn get_sync_connection(&self) -> Result<Arc<SharedPlacesDb>> {
// First step: lock the outer mutex
let mut conn = self.sync_connection.lock();
match conn.upgrade() {
@ -225,12 +225,13 @@ impl PlacesApi {
Some(db) => Ok(db),
// If not, create a new connection
None => {
let db = Arc::new(Mutex::new(PlacesDb::open(
let db = Arc::new(SharedPlacesDb::new(PlacesDb::open(
self.db_name.clone(),
ConnectionType::Sync,
self.id,
self.coop_tx_lock.clone(),
)?));
register_interrupt(Arc::<SharedPlacesDb>::downgrade(&db));
// Store a weakref for next time
*conn = Arc::downgrade(&db);
Ok(db)
@ -330,7 +331,7 @@ impl PlacesApi {
syncer: F,
) -> Result<telemetry::SyncTelemetryPing>
where
F: FnOnce(Arc<Mutex<PlacesDb>>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
F: FnOnce(Arc<SharedPlacesDb>, &mut MemoryCachedState, &mut Option<String>) -> SyncResult,
{
let mut guard = self.sync_state.lock();
let conn = self.get_sync_connection()?;
@ -450,15 +451,6 @@ impl PlacesApi {
Ok(())
}
/// Create a new SqlInterruptScope for the syncing code
///
/// The syncing code expects a SqlInterruptScope, but it's never been actually hooked up to
/// anything. This method returns something to make the compiler happy, but we should replace
/// this with working code as part of #1684.
pub fn dummy_sync_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(Arc::new(AtomicUsize::new(0)))
}
// Deprecated/Broken interrupt handler method
// This should be removed as part of https://github.com/mozilla/application-services/issues/1684
//

Просмотреть файл

@ -8,7 +8,7 @@ use super::record::{
SeparatorRecord,
};
use super::{SyncedBookmarkKind, SyncedBookmarkValidity};
use crate::db::{GlobalChangeCounterTracker, PlacesDb};
use crate::db::{GlobalChangeCounterTracker, PlacesDb, SharedPlacesDb};
use crate::error::*;
use crate::frecency::{calculate_frecency, DEFAULT_FRECENCY_SETTINGS};
use crate::storage::{
@ -23,7 +23,6 @@ use dogear::{
self, AbortSignal, CompletionOps, Content, Item, MergedRoot, TelemetryEvent, Tree, UploadItem,
UploadTombstone,
};
use parking_lot::Mutex;
use rusqlite::{Row, NO_PARAMS};
use sql_support::{self, ConnExt, SqlInterruptScope};
use std::cell::RefCell;
@ -887,14 +886,14 @@ pub(crate) fn update_frecencies(db: &PlacesDb, scope: &SqlInterruptScope) -> Res
// Short-lived struct that's constructed each sync
pub struct BookmarksSyncEngine {
db: Arc<Mutex<PlacesDb>>,
db: Arc<SharedPlacesDb>,
// Pub so that it can be used by the PlacesApi methods. Once all syncing goes through the
// `SyncManager` we should be able to make this private.
pub(crate) scope: SqlInterruptScope,
}
impl BookmarksSyncEngine {
pub fn new(db: Arc<Mutex<PlacesDb>>) -> Self {
pub fn new(db: Arc<SharedPlacesDb>) -> Self {
Self {
db,
scope: SqlInterruptScope::new(Arc::new(AtomicUsize::new(0))),
@ -1902,7 +1901,7 @@ mod tests {
let conn = db.lock();
// suck records into the database.
let interrupt_scope = api.dummy_sync_interrupt_scope();
let interrupt_scope = conn.begin_interrupt_scope()?;
let mut incoming = IncomingChangeset::new(COLLECTION_NAME, ServerTimestamp(0));
@ -1991,7 +1990,7 @@ mod tests {
}),
);
let interrupt_scope = syncer.begin_interrupt_scope();
let interrupt_scope = syncer.begin_interrupt_scope()?;
let merger =
Merger::with_localtime(&syncer, &interrupt_scope, ServerTimestamp(0), now.into());
@ -2126,7 +2125,7 @@ mod tests {
let sync_db = api.get_sync_connection().unwrap();
let syncer = sync_db.lock();
let interrupt_scope = api.dummy_sync_interrupt_scope();
let interrupt_scope = syncer.begin_interrupt_scope().unwrap();
update_frecencies(&syncer, &interrupt_scope).expect("Should update frecencies");
@ -2707,7 +2706,7 @@ mod tests {
},
)?;
let interrupt_scope = db.begin_interrupt_scope();
let interrupt_scope = db.begin_interrupt_scope()?;
let mut merger = Merger::new(&db, &interrupt_scope, ServerTimestamp(0));
merger.merge()?;

Просмотреть файл

@ -158,8 +158,10 @@ impl PlacesDb {
}
#[inline]
pub fn begin_interrupt_scope(&self) -> SqlInterruptScope {
SqlInterruptScope::new(self.interrupt_counter.clone())
pub fn begin_interrupt_scope(&self) -> Result<SqlInterruptScope> {
Ok(SqlInterruptScope::new_with_shutdown_check(
self.interrupt_counter.clone(),
)?)
}
#[inline]
@ -207,6 +209,46 @@ impl Deref for PlacesDb {
}
}
/// PlacesDB that's behind a Mutex so it can be shared between threads
pub struct SharedPlacesDb {
db: Mutex<PlacesDb>,
interrupt_handle: SqlInterruptHandle,
}
impl SharedPlacesDb {
pub fn new(db: PlacesDb) -> Self {
Self {
interrupt_handle: db.new_interrupt_handle(),
db: Mutex::new(db),
}
}
/// Interrupt any current DB operation
///
/// This method doesn't need to take the Mutex lock (which would defeat the point of
/// interruption, since taking the lock would need to wait for the operation to complete).
pub fn interrupt(&self) {
self.interrupt_handle.interrupt();
}
}
// Deref to a Mutex<PlacesDb>, which is how we will use SharedPlacesDb most of the time
impl Deref for SharedPlacesDb {
type Target = Mutex<PlacesDb>;
#[inline]
fn deref(&self) -> &Mutex<PlacesDb> {
&self.db
}
}
// Also implement AsRef<SqlInterruptHandle> so that we can interrupt this at shutdown
impl AsRef<SqlInterruptHandle> for SharedPlacesDb {
fn as_ref(&self) -> &SqlInterruptHandle {
&self.interrupt_handle
}
}
/// An object that can tell you whether a bookmark changing operation has
/// happened since the object was created.
pub struct GlobalChangeCounterTracker {

Просмотреть файл

@ -9,4 +9,4 @@ mod schema;
mod tx;
pub use self::tx::PlacesTransaction;
pub use crate::db::db::{GlobalChangeCounterTracker, PlacesDb};
pub use crate::db::db::{GlobalChangeCounterTracker, PlacesDb, SharedPlacesDb};

Просмотреть файл

@ -24,7 +24,7 @@ use crate::VisitObservation;
use crate::VisitTransition;
use crate::{PlacesApi, PlacesDb};
use parking_lot::Mutex;
use sql_support::SqlInterruptHandle;
use sql_support::{register_interrupt, SqlInterruptHandle};
use std::sync::Arc;
use sync_guid::Guid;
use types::Timestamp;
@ -106,8 +106,9 @@ impl UniffiCustomTypeWrapper for Guid {
impl PlacesApi {
fn new_connection(&self, conn_type: ConnectionType) -> Result<Arc<PlacesConnection>> {
let db = self.open_connection(conn_type)?;
let connection = PlacesConnection { db: Mutex::new(db) };
Ok(Arc::new(connection))
let connection = Arc::new(PlacesConnection::new(db));
register_interrupt(Arc::<PlacesConnection>::downgrade(&connection));
Ok(connection)
}
// NOTE: These methods are unused on Android but will remain needed for
@ -188,9 +189,17 @@ impl PlacesApi {
pub struct PlacesConnection {
db: Mutex<PlacesDb>,
interrupt_handle: SqlInterruptHandle,
}
impl PlacesConnection {
pub fn new(db: PlacesDb) -> Self {
Self {
interrupt_handle: db.new_interrupt_handle(),
db: Mutex::new(db),
}
}
// A helper that gets the connection from the mutex and converts errors.
fn with_conn<F, T>(&self, f: F) -> Result<T>
where
@ -513,6 +522,12 @@ impl PlacesConnection {
}
}
impl AsRef<SqlInterruptHandle> for PlacesConnection {
fn as_ref(&self) -> &SqlInterruptHandle {
&self.interrupt_handle
}
}
#[derive(Clone, PartialEq)]
pub struct HistoryVisitInfo {
pub url: Url,
@ -587,20 +602,14 @@ mod tests {
#[test]
fn test_accept_result_with_invalid_url() {
let api = new_mem_connection();
let conn = PlacesConnection {
db: Mutex::new(api),
};
let conn = PlacesConnection::new(new_mem_connection());
let invalid_url = "http://1234.56.78.90".to_string();
assert!(PlacesConnection::accept_result(&conn, "ample".to_string(), invalid_url).is_ok());
}
#[test]
fn test_bookmarks_get_all_with_url_with_invalid_url() {
let api = new_mem_connection();
let conn = PlacesConnection {
db: Mutex::new(api),
};
let conn = PlacesConnection::new(new_mem_connection());
let invalid_url = "http://1234.56.78.90".to_string();
assert!(PlacesConnection::bookmarks_get_all_with_url(&conn, invalid_url).is_ok());
}

Просмотреть файл

@ -2,11 +2,10 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::db::PlacesDb;
use crate::db::{PlacesDb, SharedPlacesDb};
use crate::error::*;
use crate::storage::history::{delete_everything, history_sync::reset};
use crate::storage::{get_meta, put_meta};
use parking_lot::Mutex;
use sql_support::SqlInterruptScope;
use std::sync::{atomic::AtomicUsize, Arc};
use sync15::telemetry;
@ -65,14 +64,14 @@ fn do_sync_finished(
// Short-lived struct that's constructed each sync
pub struct HistorySyncEngine {
pub db: Arc<Mutex<PlacesDb>>,
pub db: Arc<SharedPlacesDb>,
// Public because we use it in the [PlacesApi] sync methods. We can probably make this private
// once all syncing goes through the sync manager.
pub(crate) scope: SqlInterruptScope,
}
impl HistorySyncEngine {
pub fn new(db: Arc<Mutex<PlacesDb>>) -> Self {
pub fn new(db: Arc<SharedPlacesDb>) -> Self {
Self {
db,
scope: SqlInterruptScope::new(Arc::new(AtomicUsize::new(0))),

Просмотреть файл

@ -58,7 +58,7 @@ fn do_import(places_api: &PlacesApi, fennec_db_file_url: Url) -> Result<Bookmark
let conn_mutex = places_api.get_sync_connection()?;
let conn = conn_mutex.lock();
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
sql_fns::define_functions(&conn)?;
@ -163,7 +163,7 @@ fn do_pinned_sites_import(
) -> Result<Vec<BookmarkData>> {
let conn_mutex = places_api.get_sync_connection()?;
let conn = conn_mutex.lock();
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
sql_fns::define_functions(&conn)?;

Просмотреть файл

@ -43,7 +43,7 @@ fn do_import(places_api: &PlacesApi, android_db_file_url: Url) -> Result<History
let conn_mutex = places_api.get_sync_connection()?;
let conn = conn_mutex.lock();
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
define_sql_functions(&conn)?;

Просмотреть файл

@ -87,7 +87,7 @@ fn do_import_ios_bookmarks(places_api: &PlacesApi, ios_db_file_url: Url) -> Resu
let conn_mutex = places_api.get_sync_connection()?;
let conn = conn_mutex.lock();
let scope = conn.begin_interrupt_scope();
let scope = conn.begin_interrupt_scope()?;
sql_fns::define_functions(&conn)?;

Просмотреть файл

@ -316,7 +316,7 @@ fn bookmark_from_row(row: &Row<'_>) -> Result<Option<BookmarkData>> {
}
pub fn search_bookmarks(db: &PlacesDb, search: &str, limit: u32) -> Result<Vec<BookmarkData>> {
let scope = db.begin_interrupt_scope();
let scope = db.begin_interrupt_scope()?;
Ok(db
.query_rows_into_cached::<Vec<Option<BookmarkData>>, _, _, _>(
&SEARCH_QUERY,
@ -332,7 +332,7 @@ pub fn search_bookmarks(db: &PlacesDb, search: &str, limit: u32) -> Result<Vec<B
}
pub fn recent_bookmarks(db: &PlacesDb, limit: u32) -> Result<Vec<BookmarkData>> {
let scope = db.begin_interrupt_scope();
let scope = db.begin_interrupt_scope()?;
Ok(db
.query_rows_into_cached::<Vec<Option<BookmarkData>>, _, _, _>(
&RECENT_BOOKMARKS_QUERY,

Просмотреть файл

@ -531,7 +531,7 @@ pub fn fetch_tree(
LEFT JOIN moz_places h ON h.id = d.fk
ORDER BY d.level, d.parent, d.position"#;
let scope = db.begin_interrupt_scope();
let scope = db.begin_interrupt_scope()?;
let mut stmt = db.conn().prepare(sql)?;

Просмотреть файл

@ -14,6 +14,7 @@ log = "0.4"
lazy_static = "1.4"
interrupt-support = { path = "../interrupt" }
ffi-support = "0.4"
parking_lot = "0.5"
thiserror = "1.0"
tempfile = "3.1.0"

Просмотреть файл

@ -2,6 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::in_shutdown;
use ffi_support::implement_into_ffi_by_pointer;
use interrupt_support::Interruptee;
use rusqlite::InterruptHandle;
@ -55,11 +56,27 @@ pub struct SqlInterruptScope {
}
impl SqlInterruptScope {
/// Create a new `SqlInterruptScope`
#[inline]
pub fn new(ptr: Arc<AtomicUsize>) -> Self {
let start_value = ptr.load(Ordering::SeqCst);
Self { start_value, ptr }
}
/// Create a new `SqlInterruptScope`, checking if we're in shutdown mode
///
/// If we're in shutdown mode, then this will return `Err(interrupt_support::Interrupted)`
#[inline]
pub fn new_with_shutdown_check(
ptr: Arc<AtomicUsize>,
) -> Result<Self, interrupt_support::Interrupted> {
if in_shutdown() {
Err(interrupt_support::Interrupted)
} else {
Ok(Self::new(ptr))
}
}
/// Add this as an inherent method to reduce the amount of things users have to bring in.
#[inline]
pub fn err_if_interrupted(&self) -> Result<(), interrupt_support::Interrupted> {

Просмотреть файл

@ -12,6 +12,7 @@ mod maybe_cached;
pub mod open_database;
mod query_plan;
mod repeat;
mod shutdown;
pub use crate::conn_ext::*;
pub use crate::each_chunk::*;
@ -19,6 +20,7 @@ pub use crate::interrupt::*;
pub use crate::maybe_cached::*;
pub use crate::query_plan::*;
pub use crate::repeat::*;
pub use crate::shutdown::*;
/// In PRAGMA foo='bar', `'bar'` must be a constant string (it cannot be a
/// bound parameter), so we need to escape manually. According to

Просмотреть файл

@ -0,0 +1,81 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use interrupt_support::Interruptee;
/// Shutdown handling for database operations
///
/// This crate allows us to enter shutdown mode, causing all `SqlInterruptScope` instances that opt-in to
/// shutdown support to be permanently interrupted. This means:
///
/// - All current scopes will be interrupted
/// - Any attempt to create a new scope will be interrupted
///
/// Here's how add shutdown support to a component:
///
/// - Use `SqlInterruptScope::new_with_shutdown_check()` to create a new
/// `SqlInterruptScope`
/// - Database connections need to be wrapped in a type that:
/// - Implements `AsRef<SqlInterruptHandle>`.
/// - Gets wrapped in an `Arc<>`. This is needed so the shutdown code can get a weak reference to
/// the instance.
/// - Calls `shutdown::register_interrupt_handle()` on creation
///
/// See `PlacesDb::begin_interrupt_scope()` and `PlacesApi::new_connection()` for an example of
/// how this works.
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Weak;
use crate::SqlInterruptHandle;
// Bool that tracks if we're in shutdown mode or not. We use Ordering::Relaxed to read/write to
// variable. It's just a flag so we don't need stronger synchronization guarentees.
static IN_SHUTDOWN: AtomicBool = AtomicBool::new(false);
// `SqlInterruptHandle` instances to interrupt when we shutdown
lazy_static::lazy_static! {
static ref REGISTERED_INTERRUPTS: Mutex<Vec<Weak<dyn AsRef<SqlInterruptHandle> + Send + Sync>>> = Mutex::new(Vec::new());
}
/// Initiate shutdown mode
pub fn shutdown() {
IN_SHUTDOWN.store(true, Ordering::Relaxed);
for weak in REGISTERED_INTERRUPTS.lock().iter() {
if let Some(interrupt) = weak.upgrade() {
interrupt.as_ref().as_ref().interrupt()
}
}
}
/// Check if we're currently in shutdown mode
pub fn in_shutdown() -> bool {
IN_SHUTDOWN.load(Ordering::Relaxed)
}
/// Register a ShutdownInterrupt implementation
///
/// Call this function to ensure that the `SqlInterruptHandle::interrupt()` method will be called
/// at shutdown.
pub fn register_interrupt(interrupt: Weak<dyn AsRef<SqlInterruptHandle> + Send + Sync>) {
// Try to find an existing entry that's been dropped to replace. This keeps the vector growth
// in check
let mut interrupts = REGISTERED_INTERRUPTS.lock();
for weak in interrupts.iter_mut() {
if weak.strong_count() == 0 {
*weak = interrupt;
return;
}
}
// No empty slots, push the new value
interrupts.push(interrupt);
}
// implements Interruptee by checking if we've entered shutdown mode
pub struct ShutdownInterruptee;
impl Interruptee for ShutdownInterruptee {
#[inline]
fn was_interrupted(&self) -> bool {
in_shutdown()
}
}

Просмотреть файл

@ -14,6 +14,7 @@ path = "src/places-utils.rs"
places = { path = "../../components/places" }
sync-guid = { path = "../../components/support/guid" }
types = { path = "../../components/support/types" }
sql-support = { path = "../../components/support/sql" }
sync15 = { path = "../../components/sync15" }
viaduct-reqwest = { path = "../../components/support/viaduct-reqwest" }
serde = "1"

Просмотреть файл

@ -163,7 +163,6 @@ fn run_native_export(db: &PlacesDb, filename: String) -> Result<()> {
#[allow(clippy::too_many_arguments)]
fn sync(
api: &PlacesApi,
mut engine_names: Vec<String>,
cred_file: String,
wipe_all: bool,
@ -227,7 +226,7 @@ fn sync(
&mut mem_cached_state,
&cli_fxa.client_init.clone(),
&cli_fxa.root_sync_key,
&api.dummy_sync_interrupt_scope(),
&sql_support::ShutdownInterruptee,
None,
);
@ -370,6 +369,12 @@ fn main() -> Result<()> {
// Needed to make the get_registered_sync_engine() calls work.
api.clone().register_with_sync_manager();
ctrlc::set_handler(move || {
println!("\nCTRL-C detected, enabling shutdown mode\n");
sql_support::shutdown();
})
.unwrap();
match opts.cmd {
Command::Sync {
engines,
@ -380,7 +385,6 @@ fn main() -> Result<()> {
nsyncs,
wait,
} => sync(
&api,
engines,
credential_file,
wipe_all,