webext_storage/sync/
bridge.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use anyhow::Result;
6use rusqlite::Transaction;
7use std::sync::{Arc, Weak};
8use sync15::bso::{IncomingBso, OutgoingBso};
9use sync15::engine::{ApplyResults, BridgedEngine as Sync15BridgedEngine};
10use sync_guid::Guid as SyncGuid;
11
12use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb};
13use crate::schema;
14use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
15use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
16use crate::WebExtStorageStore;
17
18const LAST_SYNC_META_KEY: &str = "last_sync_time";
19const SYNC_ID_META_KEY: &str = "sync_id";
20
21impl WebExtStorageStore {
22    // Returns a bridged sync engine for this store.
23    pub fn bridged_engine(self: Arc<Self>) -> Arc<WebExtStorageBridgedEngine> {
24        let engine = Box::new(BridgedEngine::new(&self.db));
25        let bridged_engine = WebExtStorageBridgedEngine {
26            bridge_impl: engine,
27        };
28        Arc::new(bridged_engine)
29    }
30}
31
32/// A bridged engine implements all the methods needed to make the
33/// `storage.sync` store work with Desktop's Sync implementation.
34/// Conceptually, it's similar to `sync15::Store`, which we
35/// should eventually rename and unify with this trait (#2841).
36///
37/// Unlike most of our other implementation which hold a strong reference
38/// to the store, this engine keeps a weak reference in an attempt to keep
39/// the desktop semantics as close as possible to what they were when the
40/// engines all took lifetime params to ensure they don't outlive the store.
41pub struct BridgedEngine {
42    db: Weak<ThreadSafeStorageDb>,
43}
44
45impl BridgedEngine {
46    /// Creates a bridged engine for syncing.
47    pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self {
48        BridgedEngine {
49            db: Arc::downgrade(db),
50        }
51    }
52
53    fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> {
54        tx.execute_batch(
55            "DELETE FROM storage_sync_mirror;
56             UPDATE storage_sync_data SET sync_change_counter = 1;",
57        )?;
58        delete_meta(tx, LAST_SYNC_META_KEY)?;
59        Ok(())
60    }
61
62    fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> {
63        self.db
64            .upgrade()
65            .ok_or_else(|| crate::error::Error::DatabaseConnectionClosed.into())
66    }
67}
68
69impl Sync15BridgedEngine for BridgedEngine {
70    fn last_sync(&self) -> Result<i64> {
71        let shared_db = self.thread_safe_storage_db()?;
72        let db = shared_db.lock();
73        let conn = db.get_connection()?;
74        Ok(get_meta(conn, LAST_SYNC_META_KEY)?.unwrap_or(0))
75    }
76
77    fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
78        let shared_db = self.thread_safe_storage_db()?;
79        let db = shared_db.lock();
80        let conn = db.get_connection()?;
81        put_meta(conn, LAST_SYNC_META_KEY, &last_sync_millis)?;
82        Ok(())
83    }
84
85    fn sync_id(&self) -> Result<Option<String>> {
86        let shared_db = self.thread_safe_storage_db()?;
87        let db = shared_db.lock();
88        let conn = db.get_connection()?;
89        Ok(get_meta(conn, SYNC_ID_META_KEY)?)
90    }
91
92    fn reset_sync_id(&self) -> Result<String> {
93        let shared_db = self.thread_safe_storage_db()?;
94        let db = shared_db.lock();
95        let conn = db.get_connection()?;
96        let tx = conn.unchecked_transaction()?;
97        let new_id = SyncGuid::random().to_string();
98        self.do_reset(&tx)?;
99        put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
100        tx.commit()?;
101        Ok(new_id)
102    }
103
104    fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
105        let shared_db = self.thread_safe_storage_db()?;
106        let db = shared_db.lock();
107        let conn = db.get_connection()?;
108        let current: Option<String> = get_meta(conn, SYNC_ID_META_KEY)?;
109        Ok(match current {
110            Some(current) if current == sync_id => current,
111            _ => {
112                let conn = db.get_connection()?;
113                let tx = conn.unchecked_transaction()?;
114                self.do_reset(&tx)?;
115                let result = sync_id.to_string();
116                put_meta(&tx, SYNC_ID_META_KEY, &result)?;
117                tx.commit()?;
118                result
119            }
120        })
121    }
122
123    fn sync_started(&self) -> Result<()> {
124        let shared_db = self.thread_safe_storage_db()?;
125        let db = shared_db.lock();
126        let conn = db.get_connection()?;
127        schema::create_empty_sync_temp_tables(conn)?;
128        Ok(())
129    }
130
131    fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> {
132        let shared_db = self.thread_safe_storage_db()?;
133        let db = shared_db.lock();
134        let signal = db.begin_interrupt_scope()?;
135        let conn = db.get_connection()?;
136        let tx = conn.unchecked_transaction()?;
137        let incoming_content: Vec<_> = incoming_bsos
138            .into_iter()
139            .map(IncomingBso::into_content::<super::WebextRecord>)
140            .collect();
141        stage_incoming(&tx, &incoming_content, &signal)?;
142        tx.commit()?;
143        Ok(())
144    }
145
146    fn apply(&self) -> Result<ApplyResults> {
147        let shared_db = self.thread_safe_storage_db()?;
148        let db = shared_db.lock();
149        let signal = db.begin_interrupt_scope()?;
150        let conn = db.get_connection()?;
151        let tx = conn.unchecked_transaction()?;
152        let incoming = get_incoming(&tx)?;
153        let actions = incoming
154            .into_iter()
155            .map(|(item, state)| (item, plan_incoming(state)))
156            .collect();
157        apply_actions(&tx, actions, &signal)?;
158        stage_outgoing(&tx)?;
159        tx.commit()?;
160
161        Ok(get_outgoing(conn, &signal)?.into())
162    }
163
164    fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
165        let shared_db = self.thread_safe_storage_db()?;
166        let db = shared_db.lock();
167        let conn = db.get_connection()?;
168        let signal = db.begin_interrupt_scope()?;
169        let tx = conn.unchecked_transaction()?;
170        record_uploaded(&tx, ids, &signal)?;
171        tx.commit()?;
172
173        Ok(())
174    }
175
176    fn sync_finished(&self) -> Result<()> {
177        let shared_db = self.thread_safe_storage_db()?;
178        let db = shared_db.lock();
179        let conn = db.get_connection()?;
180        schema::create_empty_sync_temp_tables(conn)?;
181        Ok(())
182    }
183
184    fn reset(&self) -> Result<()> {
185        let shared_db = self.thread_safe_storage_db()?;
186        let db = shared_db.lock();
187        let conn = db.get_connection()?;
188        let tx = conn.unchecked_transaction()?;
189        self.do_reset(&tx)?;
190        delete_meta(&tx, SYNC_ID_META_KEY)?;
191        tx.commit()?;
192        Ok(())
193    }
194
195    fn wipe(&self) -> Result<()> {
196        let shared_db = self.thread_safe_storage_db()?;
197        let db = shared_db.lock();
198        let conn = db.get_connection()?;
199        let tx = conn.unchecked_transaction()?;
200        // We assume the meta table is only used by sync.
201        tx.execute_batch(
202            "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
203        )?;
204        tx.commit()?;
205        Ok(())
206    }
207}
208
209pub struct WebExtStorageBridgedEngine {
210    bridge_impl: Box<dyn Sync15BridgedEngine>,
211}
212
213impl WebExtStorageBridgedEngine {
214    pub fn new(bridge_impl: Box<dyn Sync15BridgedEngine>) -> Self {
215        Self { bridge_impl }
216    }
217
218    pub fn last_sync(&self) -> Result<i64> {
219        self.bridge_impl.last_sync()
220    }
221
222    pub fn set_last_sync(&self, last_sync: i64) -> Result<()> {
223        self.bridge_impl.set_last_sync(last_sync)
224    }
225
226    pub fn sync_id(&self) -> Result<Option<String>> {
227        self.bridge_impl.sync_id()
228    }
229
230    pub fn reset_sync_id(&self) -> Result<String> {
231        self.bridge_impl.reset_sync_id()
232    }
233
234    pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
235        self.bridge_impl.ensure_current_sync_id(sync_id)
236    }
237
238    pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
239        self.bridge_impl.prepare_for_sync(client_data)
240    }
241
242    pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> {
243        self.bridge_impl
244            .store_incoming(self.convert_incoming_bsos(incoming)?)
245    }
246
247    pub fn apply(&self) -> Result<Vec<String>> {
248        let apply_results = self.bridge_impl.apply()?;
249        self.convert_outgoing_bsos(apply_results.records)
250    }
251
252    pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> {
253        self.bridge_impl
254            .set_uploaded(server_modified_millis, &guids)
255    }
256
257    pub fn sync_started(&self) -> Result<()> {
258        self.bridge_impl.sync_started()
259    }
260
261    pub fn sync_finished(&self) -> Result<()> {
262        self.bridge_impl.sync_finished()
263    }
264
265    pub fn reset(&self) -> Result<()> {
266        self.bridge_impl.reset()
267    }
268
269    pub fn wipe(&self) -> Result<()> {
270        self.bridge_impl.wipe()
271    }
272
273    fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> {
274        let mut bsos = Vec::with_capacity(incoming.len());
275        for inc in incoming {
276            bsos.push(serde_json::from_str::<IncomingBso>(&inc)?);
277        }
278        Ok(bsos)
279    }
280
281    // Encode OutgoingBso's into JSON for UniFFI
282    fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> {
283        let mut bsos = Vec::with_capacity(outgoing.len());
284        for e in outgoing {
285            bsos.push(serde_json::to_string(&e)?);
286        }
287        Ok(bsos)
288    }
289}
290
291impl From<anyhow::Error> for crate::error::Error {
292    fn from(value: anyhow::Error) -> Self {
293        crate::error::Error::SyncError(value.to_string())
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::db::test::new_mem_thread_safe_storage_db;
301    use crate::db::StorageDb;
302    use sync15::engine::BridgedEngine;
303
304    fn query_count(db: &StorageDb, table: &str) -> u32 {
305        let conn = db.get_connection().expect("should retrieve connection");
306        conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| {
307            row.get::<_, u32>(0)
308        })
309        .expect("should work")
310    }
311
312    // Sets up mock data for the tests here.
313    fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> {
314        {
315            let shared = engine.thread_safe_storage_db()?;
316            let db = shared.lock();
317            let conn = db.get_connection().expect("should retrieve connection");
318            conn.execute(
319                "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
320                    VALUES ('ext-a', 'invalid-json', 2)",
321                [],
322            )?;
323            conn.execute(
324                "INSERT INTO storage_sync_mirror (guid, ext_id, data)
325                    VALUES ('guid', 'ext-a', '3')",
326                [],
327            )?;
328        }
329        engine.set_last_sync(1)?;
330
331        let shared = engine.thread_safe_storage_db()?;
332        let db = shared.lock();
333        // and assert we wrote what we think we did.
334        assert_eq!(query_count(&db, "storage_sync_data"), 1);
335        assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
336        assert_eq!(query_count(&db, "meta"), 1);
337        Ok(())
338    }
339
340    // Assuming a DB setup with setup_mock_data, assert it was correctly reset.
341    fn assert_reset(engine: &super::BridgedEngine) -> Result<()> {
342        // A reset never wipes data...
343        let shared = engine.thread_safe_storage_db()?;
344        let db = shared.lock();
345        let conn = db.get_connection().expect("should retrieve connection");
346        assert_eq!(query_count(&db, "storage_sync_data"), 1);
347
348        // But did reset the change counter.
349        let cc = conn.query_row_and_then(
350            "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
351            [],
352            |row| row.get::<_, u32>(0),
353        )?;
354        assert_eq!(cc, 1);
355        // But did wipe the mirror...
356        assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
357        // And the last_sync should have been wiped.
358        assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_none());
359        Ok(())
360    }
361
362    // Assuming a DB setup with setup_mock_data, assert it has not been reset.
363    fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
364        let shared = engine.thread_safe_storage_db()?;
365        let db = shared.lock();
366        let conn = db.get_connection().expect("should retrieve connection");
367        assert_eq!(query_count(&db, "storage_sync_data"), 1);
368        let cc = conn.query_row_and_then(
369            "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
370            [],
371            |row| row.get::<_, u32>(0),
372        )?;
373        assert_eq!(cc, 2);
374        assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
375        // And the last_sync should remain.
376        assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_some());
377        Ok(())
378    }
379
380    #[test]
381    fn test_wipe() -> Result<()> {
382        let strong = new_mem_thread_safe_storage_db();
383        let engine = super::BridgedEngine::new(&strong);
384
385        setup_mock_data(&engine)?;
386
387        engine.wipe()?;
388
389        let shared = engine.thread_safe_storage_db()?;
390        let db = shared.lock();
391
392        assert_eq!(query_count(&db, "storage_sync_data"), 0);
393        assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
394        assert_eq!(query_count(&db, "meta"), 0);
395        Ok(())
396    }
397
398    #[test]
399    fn test_reset() -> Result<()> {
400        let strong = &new_mem_thread_safe_storage_db();
401        let engine = super::BridgedEngine::new(strong);
402
403        setup_mock_data(&engine)?;
404        {
405            let db = strong.lock();
406            let conn = db.get_connection()?;
407            put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
408        }
409
410        engine.reset()?;
411        assert_reset(&engine)?;
412
413        {
414            let db = strong.lock();
415            let conn = db.get_connection()?;
416            // Only an explicit reset kills the sync-id, so check that here.
417            assert_eq!(get_meta::<String>(conn, SYNC_ID_META_KEY)?, None);
418        }
419
420        Ok(())
421    }
422
423    #[test]
424    fn test_ensure_missing_sync_id() -> Result<()> {
425        let strong = new_mem_thread_safe_storage_db();
426        let engine = super::BridgedEngine::new(&strong);
427
428        setup_mock_data(&engine)?;
429
430        assert_eq!(engine.sync_id()?, None);
431        // We don't have a sync ID - so setting one should reset.
432        engine.ensure_current_sync_id("new-id")?;
433        // should have cause a reset.
434        assert_reset(&engine)?;
435        Ok(())
436    }
437
438    #[test]
439    fn test_ensure_new_sync_id() -> Result<()> {
440        let strong = new_mem_thread_safe_storage_db();
441        let engine = super::BridgedEngine::new(&strong);
442
443        setup_mock_data(&engine)?;
444
445        {
446            let storage_db = &engine.thread_safe_storage_db()?;
447            let db = storage_db.lock();
448            let conn = db.get_connection()?;
449            put_meta(conn, SYNC_ID_META_KEY, &"old-id".to_string())?;
450        }
451
452        assert_not_reset(&engine)?;
453        assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
454
455        engine.ensure_current_sync_id("new-id")?;
456        // should have cause a reset.
457        assert_reset(&engine)?;
458        // should have the new id.
459        assert_eq!(engine.sync_id()?, Some("new-id".to_string()));
460        Ok(())
461    }
462
463    #[test]
464    fn test_ensure_same_sync_id() -> Result<()> {
465        let strong = new_mem_thread_safe_storage_db();
466        let engine = super::BridgedEngine::new(&strong);
467
468        setup_mock_data(&engine)?;
469        assert_not_reset(&engine)?;
470
471        {
472            let storage_db = &engine.thread_safe_storage_db()?;
473            let db = storage_db.lock();
474            let conn = db.get_connection()?;
475            put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
476        }
477
478        engine.ensure_current_sync_id("sync-id")?;
479        // should not have reset.
480        assert_not_reset(&engine)?;
481        Ok(())
482    }
483
484    #[test]
485    fn test_reset_sync_id() -> Result<()> {
486        let strong = new_mem_thread_safe_storage_db();
487        let engine = super::BridgedEngine::new(&strong);
488
489        setup_mock_data(&engine)?;
490
491        {
492            let storage_db = &engine.thread_safe_storage_db()?;
493            let db = storage_db.lock();
494            let conn = db.get_connection()?;
495            put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
496        }
497
498        assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
499        let new_id = engine.reset_sync_id()?;
500        // should have cause a reset.
501        assert_reset(&engine)?;
502        assert_eq!(engine.sync_id()?, Some(new_id));
503        Ok(())
504    }
505}