places/api/
places_api.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::bookmark_sync::BookmarksSyncEngine;
6use crate::db::db::{PlacesDb, SharedPlacesDb};
7use crate::error::*;
8use crate::history_sync::HistorySyncEngine;
9use crate::storage::{
10    self, bookmarks::bookmark_sync, delete_meta, get_meta, history::history_sync, put_meta,
11};
12use crate::util::normalize_path;
13use error_support::handle_error;
14use interrupt_support::register_interrupt;
15use lazy_static::lazy_static;
16use parking_lot::Mutex;
17use rusqlite::OpenFlags;
18use std::cell::Cell;
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use std::sync::{
22    atomic::{AtomicUsize, Ordering},
23    Arc, Weak,
24};
25use sync15::client::{sync_multiple, MemoryCachedState, Sync15StorageClientInit, SyncResult};
26use sync15::engine::{EngineSyncAssociation, SyncEngine, SyncEngineId};
27use sync15::{telemetry, KeyBundle};
28
29// Not clear if this should be here, but this is the "global sync state"
30// which is persisted to disk and reused for all engines.
31// Note that this is only ever round-tripped, and never changed by, or impacted
32// by a store or collection, so it's safe to storage globally rather than
33// per collection.
34pub const GLOBAL_STATE_META_KEY: &str = "global_sync_state_v2";
35
36// Our "sync manager" will use whatever is stashed here.
37lazy_static::lazy_static! {
38    // Mutex: just taken long enough to update the contents - needed to wrap
39    //        the Weak as it isn't `Sync`
40    // [Arc/Weak]: Stores the places api used to create the connection for
41    //             BookmarksSyncEngine/HistorySyncEngine
42    static ref PLACES_API_FOR_SYNC_MANAGER: Mutex<Weak<PlacesApi>> = Mutex::new(Weak::new());
43}
44
45// Called by the sync manager to get a sync engine via the PlacesApi previously
46// registered with the sync manager.
47pub fn get_registered_sync_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
48    match PLACES_API_FOR_SYNC_MANAGER.lock().upgrade() {
49        None => {
50            warn!("places: get_registered_sync_engine: no PlacesApi registered");
51            None
52        }
53        Some(places_api) => match create_sync_engine(&places_api, engine_id) {
54            Ok(engine) => Some(engine),
55            Err(e) => {
56                // Report this to Sentry, except if it's an open database error.  That indicates
57                // that there is a registered sync engine, but the connection is busy so we can't
58                // open it.  This is a known issue that we don't need more reports for (see
59                // https://github.com/mozilla/application-services/issues/5237 for discussion).
60                if !matches!(e, Error::OpenDatabaseError(_)) {
61                    error_support::report_error!(
62                        "places-no-registered-sync-engine",
63                        "places: get_registered_sync_engine: {}",
64                        e
65                    );
66                }
67                None
68            }
69        },
70    }
71}
72
73fn create_sync_engine(
74    places_api: &PlacesApi,
75    engine_id: &SyncEngineId,
76) -> Result<Box<dyn SyncEngine>> {
77    let conn = places_api.get_sync_connection()?;
78    match engine_id {
79        SyncEngineId::Bookmarks => Ok(Box::new(BookmarksSyncEngine::new(conn)?)),
80        SyncEngineId::History => Ok(Box::new(HistorySyncEngine::new(conn)?)),
81        _ => unreachable!("can't provide unknown engine: {}", engine_id),
82    }
83}
84
85#[repr(u8)]
86#[derive(Debug, Copy, Clone, PartialEq, Eq)]
87pub enum ConnectionType {
88    ReadOnly = 1,
89    ReadWrite = 2,
90    Sync = 3,
91}
92
93impl ConnectionType {
94    pub fn from_primitive(p: u8) -> Option<Self> {
95        match p {
96            1 => Some(ConnectionType::ReadOnly),
97            2 => Some(ConnectionType::ReadWrite),
98            3 => Some(ConnectionType::Sync),
99            _ => None,
100        }
101    }
102}
103
104impl ConnectionType {
105    pub fn rusqlite_flags(self) -> OpenFlags {
106        let common_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
107        match self {
108            ConnectionType::ReadOnly => common_flags | OpenFlags::SQLITE_OPEN_READ_ONLY,
109            ConnectionType::ReadWrite => {
110                common_flags | OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE
111            }
112            ConnectionType::Sync => common_flags | OpenFlags::SQLITE_OPEN_READ_WRITE,
113        }
114    }
115}
116
117// We only allow a single PlacesApi per filename.
118lazy_static! {
119    static ref APIS: Mutex<HashMap<PathBuf, Weak<PlacesApi>>> = Mutex::new(HashMap::new());
120}
121
122static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
123
124pub struct SyncState {
125    pub mem_cached_state: Cell<MemoryCachedState>,
126    pub disk_cached_state: Cell<Option<String>>,
127}
128
129/// For uniffi we need to expose our `Arc` returning constructor as a global function :(
130/// https://github.com/mozilla/uniffi-rs/pull/1063 would fix this, but got some pushback
131/// meaning we are forced into this unfortunate workaround.
132#[handle_error(crate::Error)]
133pub fn places_api_new(db_name: impl AsRef<Path>) -> ApiResult<Arc<PlacesApi>> {
134    PlacesApi::new(db_name)
135}
136
137/// The entry-point to the places API. This object gives access to database
138/// connections and other helpers. It enforces that only 1 write connection
139/// can exist to the database at once.
140pub struct PlacesApi {
141    db_name: PathBuf,
142    write_connection: Mutex<Option<PlacesDb>>,
143    sync_state: Mutex<Option<SyncState>>,
144    coop_tx_lock: Arc<Mutex<()>>,
145    // Used for get_sync_connection()
146    // - The inner mutex synchronizes sync operation (for example one of the [SyncEngine] methods).
147    //   This avoids issues like #867
148    // - The weak facilitates connection sharing.  When `get_sync_connection()` returns an Arc, we
149    //   keep a weak reference to it.  If the Arc is still alive when `get_sync_connection()` is
150    //   called again, we reuse it.
151    // - The outer mutex synchronizes the `get_sync_connection()` operation.  If multiple threads
152    //   ran that at the same time there would be issues.
153    sync_connection: Mutex<Weak<SharedPlacesDb>>,
154    id: usize,
155}
156
157impl PlacesApi {
158    /// Create a new, or fetch an already open, PlacesApi backed by a file on disk.
159    pub fn new(db_name: impl AsRef<Path>) -> Result<Arc<Self>> {
160        let db_name = normalize_path(db_name)?;
161        Self::new_or_existing(db_name)
162    }
163
164    /// Create a new, or fetch an already open, memory-based PlacesApi. You must
165    /// provide a name, but you are still able to have a single writer and many
166    ///  reader connections to the same memory DB open.
167    pub fn new_memory(db_name: &str) -> Result<Arc<Self>> {
168        let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_name));
169        Self::new_or_existing(name)
170    }
171    fn new_or_existing_into(
172        target: &mut HashMap<PathBuf, Weak<PlacesApi>>,
173        db_name: PathBuf,
174    ) -> Result<Arc<Self>> {
175        let id = ID_COUNTER.fetch_add(1, Ordering::SeqCst);
176        match target.get(&db_name).and_then(Weak::upgrade) {
177            Some(existing) => Ok(existing),
178            None => {
179                // We always create a new read-write connection for an initial open so
180                // we can create the schema and/or do version upgrades.
181                let coop_tx_lock = Arc::new(Mutex::new(()));
182                let connection = PlacesDb::open(
183                    &db_name,
184                    ConnectionType::ReadWrite,
185                    id,
186                    coop_tx_lock.clone(),
187                )?;
188                let new = PlacesApi {
189                    db_name: db_name.clone(),
190                    write_connection: Mutex::new(Some(connection)),
191                    sync_state: Mutex::new(None),
192                    sync_connection: Mutex::new(Weak::new()),
193                    id,
194                    coop_tx_lock,
195                };
196                let arc = Arc::new(new);
197                target.insert(db_name, Arc::downgrade(&arc));
198                Ok(arc)
199            }
200        }
201    }
202
203    fn new_or_existing(db_name: PathBuf) -> Result<Arc<Self>> {
204        let mut guard = APIS.lock();
205        Self::new_or_existing_into(&mut guard, db_name)
206    }
207
208    /// Open a connection to the database.
209    pub fn open_connection(&self, conn_type: ConnectionType) -> Result<PlacesDb> {
210        match conn_type {
211            ConnectionType::ReadOnly => {
212                // make a new one - we can have as many of these as we want.
213                PlacesDb::open(
214                    self.db_name.clone(),
215                    ConnectionType::ReadOnly,
216                    self.id,
217                    self.coop_tx_lock.clone(),
218                )
219            }
220            ConnectionType::ReadWrite => {
221                // We only allow one of these.
222                let mut guard = self.write_connection.lock();
223                match guard.take() {
224                    None => Err(Error::ConnectionAlreadyOpen),
225                    Some(db) => Ok(db),
226                }
227            }
228            ConnectionType::Sync => {
229                panic!("Use `get_sync_connection` to open a sync connection");
230            }
231        }
232    }
233
234    // Get a database connection to sync with
235    //
236    // This function provides a couple features to facilitate sharing the connection between
237    // different sync engines:
238    //   - Each connection is wrapped in a `Mutex<>` to synchronize access.
239    //   - The mutex is then wrapped in an Arc<>.  If the last Arc<> returned is still alive, then
240    //     get_sync_connection() will reuse it.
241    pub fn get_sync_connection(&self) -> Result<Arc<SharedPlacesDb>> {
242        // First step: lock the outer mutex
243        let mut conn = self.sync_connection.lock();
244        match conn.upgrade() {
245            // If our Weak is still alive, then re-use that
246            Some(db) => Ok(db),
247            // If not, create a new connection
248            None => {
249                let db = Arc::new(SharedPlacesDb::new(PlacesDb::open(
250                    self.db_name.clone(),
251                    ConnectionType::Sync,
252                    self.id,
253                    self.coop_tx_lock.clone(),
254                )?));
255                register_interrupt(Arc::<SharedPlacesDb>::downgrade(&db));
256                // Store a weakref for next time
257                *conn = Arc::downgrade(&db);
258                Ok(db)
259            }
260        }
261    }
262
263    /// Close a connection to the database. If the connection is the write
264    /// connection, you can re-fetch it using open_connection.
265    pub fn close_connection(&self, connection: PlacesDb) -> Result<()> {
266        if connection.api_id() != self.id {
267            return Err(Error::WrongApiForClose);
268        }
269        if connection.conn_type() == ConnectionType::ReadWrite {
270            // We only allow one of these.
271            let mut guard = self.write_connection.lock();
272            assert!((*guard).is_none());
273            *guard = Some(connection);
274        }
275        Ok(())
276    }
277
278    fn get_disk_persisted_state(&self, conn: &PlacesDb) -> Result<Option<String>> {
279        get_meta::<String>(conn, GLOBAL_STATE_META_KEY)
280    }
281
282    fn set_disk_persisted_state(&self, conn: &PlacesDb, state: &Option<String>) -> Result<()> {
283        match state {
284            Some(ref s) => put_meta(conn, GLOBAL_STATE_META_KEY, s),
285            None => delete_meta(conn, GLOBAL_STATE_META_KEY),
286        }
287    }
288
289    // This allows the embedding app to say "make this instance available to
290    // the sync manager". The implementation is more like "offer to sync mgr"
291    // (thereby avoiding us needing to link with the sync manager) but
292    // `register_with_sync_manager()` is logically what's happening so that's
293    // the name it gets.
294    pub fn register_with_sync_manager(self: Arc<Self>) {
295        *PLACES_API_FOR_SYNC_MANAGER.lock() = Arc::downgrade(&self);
296    }
297
298    // NOTE: These should be deprecated as soon as possible - that will be once
299    // all consumers have been updated to use the .sync() method below, and/or
300    // we have implemented the sync manager and migrated consumers to that.
301    pub fn sync_history(
302        &self,
303        client_init: &Sync15StorageClientInit,
304        key_bundle: &KeyBundle,
305    ) -> Result<telemetry::SyncTelemetryPing> {
306        self.do_sync_one(
307            "history",
308            move |conn, mem_cached_state, disk_cached_state| {
309                let engine = HistorySyncEngine::new(conn)?;
310                Ok(sync_multiple(
311                    &[&engine],
312                    disk_cached_state,
313                    mem_cached_state,
314                    client_init,
315                    key_bundle,
316                    &interrupt_support::ShutdownInterruptee,
317                    None,
318                ))
319            },
320        )
321    }
322
323    pub fn sync_bookmarks(
324        &self,
325        client_init: &Sync15StorageClientInit,
326        key_bundle: &KeyBundle,
327    ) -> Result<telemetry::SyncTelemetryPing> {
328        self.do_sync_one(
329            "bookmarks",
330            move |conn, mem_cached_state, disk_cached_state| {
331                let engine = BookmarksSyncEngine::new(conn)?;
332                Ok(sync_multiple(
333                    &[&engine],
334                    disk_cached_state,
335                    mem_cached_state,
336                    client_init,
337                    key_bundle,
338                    &interrupt_support::ShutdownInterruptee,
339                    None,
340                ))
341            },
342        )
343    }
344
345    pub fn do_sync_one<F>(
346        &self,
347        name: &'static str,
348        syncer: F,
349    ) -> Result<telemetry::SyncTelemetryPing>
350    where
351        F: FnOnce(
352            Arc<SharedPlacesDb>,
353            &mut MemoryCachedState,
354            &mut Option<String>,
355        ) -> Result<SyncResult>,
356    {
357        let mut guard = self.sync_state.lock();
358        let conn = self.get_sync_connection()?;
359        if guard.is_none() {
360            *guard = Some(SyncState {
361                mem_cached_state: Cell::default(),
362                disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
363            });
364        }
365
366        let sync_state = guard.as_ref().unwrap();
367
368        let mut mem_cached_state = sync_state.mem_cached_state.take();
369        let mut disk_cached_state = sync_state.disk_cached_state.take();
370        let mut result = syncer(conn.clone(), &mut mem_cached_state, &mut disk_cached_state)?;
371        // even on failure we set the persisted state - sync itself takes care
372        // to ensure this has been None'd out if necessary.
373        self.set_disk_persisted_state(&conn.lock(), &disk_cached_state)?;
374        sync_state.mem_cached_state.replace(mem_cached_state);
375        sync_state.disk_cached_state.replace(disk_cached_state);
376
377        // for b/w compat reasons, we do some dances with the result.
378        if let Err(e) = result.result {
379            return Err(e.into());
380        }
381        match result.engine_results.remove(name) {
382            None | Some(Ok(())) => Ok(result.telemetry),
383            Some(Err(e)) => Err(e.into()),
384        }
385    }
386
387    // This is the new sync API until the sync manager lands. It's currently
388    // not wired up via the FFI - it's possible we'll do declined engines too
389    // before we do.
390    // Note we've made a policy decision about the return value - even though
391    // it is Result<SyncResult>, we will only return an Err() if there's a
392    // fatal error that prevents us starting a sync, such as failure to open
393    // the DB. Any errors that happen *after* sync must not escape - ie, once
394    // we have a SyncResult, we must return it.
395    pub fn sync(
396        &self,
397        client_init: &Sync15StorageClientInit,
398        key_bundle: &KeyBundle,
399    ) -> Result<SyncResult> {
400        let mut guard = self.sync_state.lock();
401        let conn = self.get_sync_connection()?;
402        if guard.is_none() {
403            *guard = Some(SyncState {
404                mem_cached_state: Cell::default(),
405                disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
406            });
407        }
408
409        let sync_state = guard.as_ref().unwrap();
410
411        let bm_engine = BookmarksSyncEngine::new(conn.clone())?;
412        let history_engine = HistorySyncEngine::new(conn.clone())?;
413        let mut mem_cached_state = sync_state.mem_cached_state.take();
414        let mut disk_cached_state = sync_state.disk_cached_state.take();
415
416        // NOTE: After here we must never return Err()!
417        let result = sync_multiple(
418            &[&history_engine, &bm_engine],
419            &mut disk_cached_state,
420            &mut mem_cached_state,
421            client_init,
422            key_bundle,
423            &interrupt_support::ShutdownInterruptee,
424            None,
425        );
426        // even on failure we set the persisted state - sync itself takes care
427        // to ensure this has been None'd out if necessary.
428        if let Err(e) = self.set_disk_persisted_state(&conn.lock(), &disk_cached_state) {
429            error_support::report_error!(
430                "places-sync-persist-failure",
431                "Failed to persist the sync state: {:?}",
432                e
433            );
434        }
435        sync_state.mem_cached_state.replace(mem_cached_state);
436        sync_state.disk_cached_state.replace(disk_cached_state);
437
438        Ok(result)
439    }
440
441    pub fn wipe_bookmarks(&self) -> Result<()> {
442        // Take the lock to prevent syncing while we're doing this.
443        let _guard = self.sync_state.lock();
444        let conn = self.get_sync_connection()?;
445
446        storage::bookmarks::delete_everything(&conn.lock())?;
447        Ok(())
448    }
449
450    pub fn reset_bookmarks(&self) -> Result<()> {
451        // Take the lock to prevent syncing while we're doing this.
452        let _guard = self.sync_state.lock();
453        let conn = self.get_sync_connection()?;
454
455        bookmark_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
456        Ok(())
457    }
458
459    #[handle_error(crate::Error)]
460    pub fn reset_history(&self) -> ApiResult<()> {
461        // Take the lock to prevent syncing while we're doing this.
462        let _guard = self.sync_state.lock();
463        let conn = self.get_sync_connection()?;
464
465        history_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
466        Ok(())
467    }
468}
469
470#[cfg(test)]
471pub mod test {
472    use super::*;
473    use std::sync::atomic::{AtomicUsize, Ordering};
474
475    // A helper for our tests to get their own memory Api.
476    static ATOMIC_COUNTER: AtomicUsize = AtomicUsize::new(0);
477
478    pub fn new_mem_api() -> Arc<PlacesApi> {
479        // A bit hacky, but because this is a test-only function that almost all tests use,
480        // it's a convenient place to initialize logging for tests.
481        error_support::init_for_tests();
482
483        let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
484        PlacesApi::new_memory(&format!("test-api-{}", counter)).expect("should get an API")
485    }
486
487    pub fn new_mem_connection() -> PlacesDb {
488        new_mem_api()
489            .open_connection(ConnectionType::ReadWrite)
490            .expect("should get a connection")
491    }
492
493    pub struct MemConnections {
494        pub read: PlacesDb,
495        pub write: PlacesDb,
496        pub api: Arc<PlacesApi>,
497    }
498
499    pub fn new_mem_connections() -> MemConnections {
500        let api = new_mem_api();
501        let read = api
502            .open_connection(ConnectionType::ReadOnly)
503            .expect("should get a read connection");
504        let write = api
505            .open_connection(ConnectionType::ReadWrite)
506            .expect("should get a write connection");
507        MemConnections { read, write, api }
508    }
509}
510
511#[cfg(test)]
512mod tests {
513    use super::test::*;
514    use super::*;
515    use sql_support::ConnExt;
516
517    #[test]
518    fn test_multi_writers_fails() {
519        let api = new_mem_api();
520        let writer1 = api
521            .open_connection(ConnectionType::ReadWrite)
522            .expect("should get writer");
523        api.open_connection(ConnectionType::ReadWrite)
524            .expect_err("should fail to get second writer");
525        // But we should be able to re-get it after closing it.
526        api.close_connection(writer1)
527            .expect("should be able to close");
528        api.open_connection(ConnectionType::ReadWrite)
529            .expect("should get a writer after closing the other");
530    }
531
532    #[test]
533    fn test_shared_memory() {
534        let api = new_mem_api();
535        let writer = api
536            .open_connection(ConnectionType::ReadWrite)
537            .expect("should get writer");
538        writer
539            .execute_batch(
540                "CREATE TABLE test_table (test_value INTEGER);
541                              INSERT INTO test_table VALUES (999)",
542            )
543            .expect("should insert");
544        let reader = api
545            .open_connection(ConnectionType::ReadOnly)
546            .expect("should get reader");
547        let val = reader
548            .query_one::<i64>("SELECT test_value FROM test_table")
549            .expect("should get value");
550        assert_eq!(val, 999);
551    }
552
553    #[test]
554    fn test_reader_before_writer() {
555        let api = new_mem_api();
556        let reader = api
557            .open_connection(ConnectionType::ReadOnly)
558            .expect("should get reader");
559        let writer = api
560            .open_connection(ConnectionType::ReadWrite)
561            .expect("should get writer");
562        writer
563            .execute_batch(
564                "CREATE TABLE test_table (test_value INTEGER);
565                              INSERT INTO test_table VALUES (999)",
566            )
567            .expect("should insert");
568        let val = reader
569            .query_one::<i64>("SELECT test_value FROM test_table")
570            .expect("should get value");
571        assert_eq!(val, 999);
572    }
573
574    #[test]
575    fn test_wrong_writer_close() {
576        let api = new_mem_api();
577        // Grab this so `api` doesn't think it still has a writer.
578        let _writer = api
579            .open_connection(ConnectionType::ReadWrite)
580            .expect("should get writer");
581
582        let fake_api = new_mem_api();
583        let fake_writer = fake_api
584            .open_connection(ConnectionType::ReadWrite)
585            .expect("should get writer 2");
586
587        assert!(matches!(
588            api.close_connection(fake_writer).unwrap_err(),
589            Error::WrongApiForClose
590        ));
591    }
592
593    #[test]
594    fn test_valid_writer_close() {
595        let api = new_mem_api();
596        let writer = api
597            .open_connection(ConnectionType::ReadWrite)
598            .expect("should get writer");
599
600        api.close_connection(writer)
601            .expect("Should allow closing own connection");
602
603        // Make sure we can open it again.
604        assert!(api.open_connection(ConnectionType::ReadWrite).is_ok());
605    }
606}