1use crate::bookmark_sync::BookmarksSyncEngine;
6use crate::db::db::{PlacesDb, SharedPlacesDb};
7use crate::error::*;
8use crate::history_sync::HistorySyncEngine;
9use crate::storage::{
10 self, bookmarks::bookmark_sync, delete_meta, get_meta, history::history_sync, put_meta,
11};
12use crate::util::normalize_path;
13use error_support::handle_error;
14use interrupt_support::register_interrupt;
15use lazy_static::lazy_static;
16use parking_lot::Mutex;
17use rusqlite::OpenFlags;
18use std::cell::Cell;
19use std::collections::HashMap;
20use std::path::{Path, PathBuf};
21use std::sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc, Weak,
24};
25use sync15::client::{sync_multiple, MemoryCachedState, Sync15StorageClientInit, SyncResult};
26use sync15::engine::{EngineSyncAssociation, SyncEngine, SyncEngineId};
27use sync15::{telemetry, KeyBundle};
28
29pub const GLOBAL_STATE_META_KEY: &str = "global_sync_state_v2";
35
36lazy_static::lazy_static! {
38 static ref PLACES_API_FOR_SYNC_MANAGER: Mutex<Weak<PlacesApi>> = Mutex::new(Weak::new());
43}
44
45pub fn get_registered_sync_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
48 match PLACES_API_FOR_SYNC_MANAGER.lock().upgrade() {
49 None => {
50 warn!("places: get_registered_sync_engine: no PlacesApi registered");
51 None
52 }
53 Some(places_api) => match create_sync_engine(&places_api, engine_id) {
54 Ok(engine) => Some(engine),
55 Err(e) => {
56 if !matches!(e, Error::OpenDatabaseError(_)) {
61 error_support::report_error!(
62 "places-no-registered-sync-engine",
63 "places: get_registered_sync_engine: {}",
64 e
65 );
66 }
67 None
68 }
69 },
70 }
71}
72
73fn create_sync_engine(
74 places_api: &PlacesApi,
75 engine_id: &SyncEngineId,
76) -> Result<Box<dyn SyncEngine>> {
77 let conn = places_api.get_sync_connection()?;
78 match engine_id {
79 SyncEngineId::Bookmarks => Ok(Box::new(BookmarksSyncEngine::new(conn)?)),
80 SyncEngineId::History => Ok(Box::new(HistorySyncEngine::new(conn)?)),
81 _ => unreachable!("can't provide unknown engine: {}", engine_id),
82 }
83}
84
85#[repr(u8)]
86#[derive(Debug, Copy, Clone, PartialEq, Eq)]
87pub enum ConnectionType {
88 ReadOnly = 1,
89 ReadWrite = 2,
90 Sync = 3,
91}
92
93impl ConnectionType {
94 pub fn from_primitive(p: u8) -> Option<Self> {
95 match p {
96 1 => Some(ConnectionType::ReadOnly),
97 2 => Some(ConnectionType::ReadWrite),
98 3 => Some(ConnectionType::Sync),
99 _ => None,
100 }
101 }
102}
103
104impl ConnectionType {
105 pub fn rusqlite_flags(self) -> OpenFlags {
106 let common_flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
107 match self {
108 ConnectionType::ReadOnly => common_flags | OpenFlags::SQLITE_OPEN_READ_ONLY,
109 ConnectionType::ReadWrite => {
110 common_flags | OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE
111 }
112 ConnectionType::Sync => common_flags | OpenFlags::SQLITE_OPEN_READ_WRITE,
113 }
114 }
115}
116
117lazy_static! {
119 static ref APIS: Mutex<HashMap<PathBuf, Weak<PlacesApi>>> = Mutex::new(HashMap::new());
120}
121
122static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
123
124pub struct SyncState {
125 pub mem_cached_state: Cell<MemoryCachedState>,
126 pub disk_cached_state: Cell<Option<String>>,
127}
128
129#[handle_error(crate::Error)]
133pub fn places_api_new(db_name: impl AsRef<Path>) -> ApiResult<Arc<PlacesApi>> {
134 PlacesApi::new(db_name)
135}
136
137pub struct PlacesApi {
141 db_name: PathBuf,
142 write_connection: Mutex<Option<PlacesDb>>,
143 sync_state: Mutex<Option<SyncState>>,
144 coop_tx_lock: Arc<Mutex<()>>,
145 sync_connection: Mutex<Weak<SharedPlacesDb>>,
154 id: usize,
155}
156
157impl PlacesApi {
158 pub fn new(db_name: impl AsRef<Path>) -> Result<Arc<Self>> {
160 let db_name = normalize_path(db_name)?;
161 Self::new_or_existing(db_name)
162 }
163
164 pub fn new_memory(db_name: &str) -> Result<Arc<Self>> {
168 let name = PathBuf::from(format!("file:{}?mode=memory&cache=shared", db_name));
169 Self::new_or_existing(name)
170 }
171 fn new_or_existing_into(
172 target: &mut HashMap<PathBuf, Weak<PlacesApi>>,
173 db_name: PathBuf,
174 ) -> Result<Arc<Self>> {
175 let id = ID_COUNTER.fetch_add(1, Ordering::SeqCst);
176 match target.get(&db_name).and_then(Weak::upgrade) {
177 Some(existing) => Ok(existing),
178 None => {
179 let coop_tx_lock = Arc::new(Mutex::new(()));
182 let connection = PlacesDb::open(
183 &db_name,
184 ConnectionType::ReadWrite,
185 id,
186 coop_tx_lock.clone(),
187 )?;
188 let new = PlacesApi {
189 db_name: db_name.clone(),
190 write_connection: Mutex::new(Some(connection)),
191 sync_state: Mutex::new(None),
192 sync_connection: Mutex::new(Weak::new()),
193 id,
194 coop_tx_lock,
195 };
196 let arc = Arc::new(new);
197 target.insert(db_name, Arc::downgrade(&arc));
198 Ok(arc)
199 }
200 }
201 }
202
203 fn new_or_existing(db_name: PathBuf) -> Result<Arc<Self>> {
204 let mut guard = APIS.lock();
205 Self::new_or_existing_into(&mut guard, db_name)
206 }
207
208 pub fn open_connection(&self, conn_type: ConnectionType) -> Result<PlacesDb> {
210 match conn_type {
211 ConnectionType::ReadOnly => {
212 PlacesDb::open(
214 self.db_name.clone(),
215 ConnectionType::ReadOnly,
216 self.id,
217 self.coop_tx_lock.clone(),
218 )
219 }
220 ConnectionType::ReadWrite => {
221 let mut guard = self.write_connection.lock();
223 match guard.take() {
224 None => Err(Error::ConnectionAlreadyOpen),
225 Some(db) => Ok(db),
226 }
227 }
228 ConnectionType::Sync => {
229 panic!("Use `get_sync_connection` to open a sync connection");
230 }
231 }
232 }
233
234 pub fn get_sync_connection(&self) -> Result<Arc<SharedPlacesDb>> {
242 let mut conn = self.sync_connection.lock();
244 match conn.upgrade() {
245 Some(db) => Ok(db),
247 None => {
249 let db = Arc::new(SharedPlacesDb::new(PlacesDb::open(
250 self.db_name.clone(),
251 ConnectionType::Sync,
252 self.id,
253 self.coop_tx_lock.clone(),
254 )?));
255 register_interrupt(Arc::<SharedPlacesDb>::downgrade(&db));
256 *conn = Arc::downgrade(&db);
258 Ok(db)
259 }
260 }
261 }
262
263 pub fn close_connection(&self, connection: PlacesDb) -> Result<()> {
266 if connection.api_id() != self.id {
267 return Err(Error::WrongApiForClose);
268 }
269 if connection.conn_type() == ConnectionType::ReadWrite {
270 let mut guard = self.write_connection.lock();
272 assert!((*guard).is_none());
273 *guard = Some(connection);
274 }
275 Ok(())
276 }
277
278 fn get_disk_persisted_state(&self, conn: &PlacesDb) -> Result<Option<String>> {
279 get_meta::<String>(conn, GLOBAL_STATE_META_KEY)
280 }
281
282 fn set_disk_persisted_state(&self, conn: &PlacesDb, state: &Option<String>) -> Result<()> {
283 match state {
284 Some(ref s) => put_meta(conn, GLOBAL_STATE_META_KEY, s),
285 None => delete_meta(conn, GLOBAL_STATE_META_KEY),
286 }
287 }
288
289 pub fn register_with_sync_manager(self: Arc<Self>) {
295 *PLACES_API_FOR_SYNC_MANAGER.lock() = Arc::downgrade(&self);
296 }
297
298 pub fn sync_history(
302 &self,
303 client_init: &Sync15StorageClientInit,
304 key_bundle: &KeyBundle,
305 ) -> Result<telemetry::SyncTelemetryPing> {
306 self.do_sync_one(
307 "history",
308 move |conn, mem_cached_state, disk_cached_state| {
309 let engine = HistorySyncEngine::new(conn)?;
310 Ok(sync_multiple(
311 &[&engine],
312 disk_cached_state,
313 mem_cached_state,
314 client_init,
315 key_bundle,
316 &interrupt_support::ShutdownInterruptee,
317 None,
318 ))
319 },
320 )
321 }
322
323 pub fn sync_bookmarks(
324 &self,
325 client_init: &Sync15StorageClientInit,
326 key_bundle: &KeyBundle,
327 ) -> Result<telemetry::SyncTelemetryPing> {
328 self.do_sync_one(
329 "bookmarks",
330 move |conn, mem_cached_state, disk_cached_state| {
331 let engine = BookmarksSyncEngine::new(conn)?;
332 Ok(sync_multiple(
333 &[&engine],
334 disk_cached_state,
335 mem_cached_state,
336 client_init,
337 key_bundle,
338 &interrupt_support::ShutdownInterruptee,
339 None,
340 ))
341 },
342 )
343 }
344
345 pub fn do_sync_one<F>(
346 &self,
347 name: &'static str,
348 syncer: F,
349 ) -> Result<telemetry::SyncTelemetryPing>
350 where
351 F: FnOnce(
352 Arc<SharedPlacesDb>,
353 &mut MemoryCachedState,
354 &mut Option<String>,
355 ) -> Result<SyncResult>,
356 {
357 let mut guard = self.sync_state.lock();
358 let conn = self.get_sync_connection()?;
359 if guard.is_none() {
360 *guard = Some(SyncState {
361 mem_cached_state: Cell::default(),
362 disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
363 });
364 }
365
366 let sync_state = guard.as_ref().unwrap();
367
368 let mut mem_cached_state = sync_state.mem_cached_state.take();
369 let mut disk_cached_state = sync_state.disk_cached_state.take();
370 let mut result = syncer(conn.clone(), &mut mem_cached_state, &mut disk_cached_state)?;
371 self.set_disk_persisted_state(&conn.lock(), &disk_cached_state)?;
374 sync_state.mem_cached_state.replace(mem_cached_state);
375 sync_state.disk_cached_state.replace(disk_cached_state);
376
377 if let Err(e) = result.result {
379 return Err(e.into());
380 }
381 match result.engine_results.remove(name) {
382 None | Some(Ok(())) => Ok(result.telemetry),
383 Some(Err(e)) => Err(e.into()),
384 }
385 }
386
387 pub fn sync(
396 &self,
397 client_init: &Sync15StorageClientInit,
398 key_bundle: &KeyBundle,
399 ) -> Result<SyncResult> {
400 let mut guard = self.sync_state.lock();
401 let conn = self.get_sync_connection()?;
402 if guard.is_none() {
403 *guard = Some(SyncState {
404 mem_cached_state: Cell::default(),
405 disk_cached_state: Cell::new(self.get_disk_persisted_state(&conn.lock())?),
406 });
407 }
408
409 let sync_state = guard.as_ref().unwrap();
410
411 let bm_engine = BookmarksSyncEngine::new(conn.clone())?;
412 let history_engine = HistorySyncEngine::new(conn.clone())?;
413 let mut mem_cached_state = sync_state.mem_cached_state.take();
414 let mut disk_cached_state = sync_state.disk_cached_state.take();
415
416 let result = sync_multiple(
418 &[&history_engine, &bm_engine],
419 &mut disk_cached_state,
420 &mut mem_cached_state,
421 client_init,
422 key_bundle,
423 &interrupt_support::ShutdownInterruptee,
424 None,
425 );
426 if let Err(e) = self.set_disk_persisted_state(&conn.lock(), &disk_cached_state) {
429 error_support::report_error!(
430 "places-sync-persist-failure",
431 "Failed to persist the sync state: {:?}",
432 e
433 );
434 }
435 sync_state.mem_cached_state.replace(mem_cached_state);
436 sync_state.disk_cached_state.replace(disk_cached_state);
437
438 Ok(result)
439 }
440
441 pub fn wipe_bookmarks(&self) -> Result<()> {
442 let _guard = self.sync_state.lock();
444 let conn = self.get_sync_connection()?;
445
446 storage::bookmarks::delete_everything(&conn.lock())?;
447 Ok(())
448 }
449
450 pub fn reset_bookmarks(&self) -> Result<()> {
451 let _guard = self.sync_state.lock();
453 let conn = self.get_sync_connection()?;
454
455 bookmark_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
456 Ok(())
457 }
458
459 #[handle_error(crate::Error)]
460 pub fn reset_history(&self) -> ApiResult<()> {
461 let _guard = self.sync_state.lock();
463 let conn = self.get_sync_connection()?;
464
465 history_sync::reset(&conn.lock(), &EngineSyncAssociation::Disconnected)?;
466 Ok(())
467 }
468}
469
470#[cfg(test)]
471pub mod test {
472 use super::*;
473 use std::sync::atomic::{AtomicUsize, Ordering};
474
475 static ATOMIC_COUNTER: AtomicUsize = AtomicUsize::new(0);
477
478 pub fn new_mem_api() -> Arc<PlacesApi> {
479 error_support::init_for_tests();
482
483 let counter = ATOMIC_COUNTER.fetch_add(1, Ordering::Relaxed);
484 PlacesApi::new_memory(&format!("test-api-{}", counter)).expect("should get an API")
485 }
486
487 pub fn new_mem_connection() -> PlacesDb {
488 new_mem_api()
489 .open_connection(ConnectionType::ReadWrite)
490 .expect("should get a connection")
491 }
492
493 pub struct MemConnections {
494 pub read: PlacesDb,
495 pub write: PlacesDb,
496 pub api: Arc<PlacesApi>,
497 }
498
499 pub fn new_mem_connections() -> MemConnections {
500 let api = new_mem_api();
501 let read = api
502 .open_connection(ConnectionType::ReadOnly)
503 .expect("should get a read connection");
504 let write = api
505 .open_connection(ConnectionType::ReadWrite)
506 .expect("should get a write connection");
507 MemConnections { read, write, api }
508 }
509}
510
511#[cfg(test)]
512mod tests {
513 use super::test::*;
514 use super::*;
515 use sql_support::ConnExt;
516
517 #[test]
518 fn test_multi_writers_fails() {
519 let api = new_mem_api();
520 let writer1 = api
521 .open_connection(ConnectionType::ReadWrite)
522 .expect("should get writer");
523 api.open_connection(ConnectionType::ReadWrite)
524 .expect_err("should fail to get second writer");
525 api.close_connection(writer1)
527 .expect("should be able to close");
528 api.open_connection(ConnectionType::ReadWrite)
529 .expect("should get a writer after closing the other");
530 }
531
532 #[test]
533 fn test_shared_memory() {
534 let api = new_mem_api();
535 let writer = api
536 .open_connection(ConnectionType::ReadWrite)
537 .expect("should get writer");
538 writer
539 .execute_batch(
540 "CREATE TABLE test_table (test_value INTEGER);
541 INSERT INTO test_table VALUES (999)",
542 )
543 .expect("should insert");
544 let reader = api
545 .open_connection(ConnectionType::ReadOnly)
546 .expect("should get reader");
547 let val = reader
548 .query_one::<i64>("SELECT test_value FROM test_table")
549 .expect("should get value");
550 assert_eq!(val, 999);
551 }
552
553 #[test]
554 fn test_reader_before_writer() {
555 let api = new_mem_api();
556 let reader = api
557 .open_connection(ConnectionType::ReadOnly)
558 .expect("should get reader");
559 let writer = api
560 .open_connection(ConnectionType::ReadWrite)
561 .expect("should get writer");
562 writer
563 .execute_batch(
564 "CREATE TABLE test_table (test_value INTEGER);
565 INSERT INTO test_table VALUES (999)",
566 )
567 .expect("should insert");
568 let val = reader
569 .query_one::<i64>("SELECT test_value FROM test_table")
570 .expect("should get value");
571 assert_eq!(val, 999);
572 }
573
574 #[test]
575 fn test_wrong_writer_close() {
576 let api = new_mem_api();
577 let _writer = api
579 .open_connection(ConnectionType::ReadWrite)
580 .expect("should get writer");
581
582 let fake_api = new_mem_api();
583 let fake_writer = fake_api
584 .open_connection(ConnectionType::ReadWrite)
585 .expect("should get writer 2");
586
587 assert!(matches!(
588 api.close_connection(fake_writer).unwrap_err(),
589 Error::WrongApiForClose
590 ));
591 }
592
593 #[test]
594 fn test_valid_writer_close() {
595 let api = new_mem_api();
596 let writer = api
597 .open_connection(ConnectionType::ReadWrite)
598 .expect("should get writer");
599
600 api.close_connection(writer)
601 .expect("Should allow closing own connection");
602
603 assert!(api.open_connection(ConnectionType::ReadWrite).is_ok());
605 }
606}