1use anyhow::Result;
6use rusqlite::Transaction;
7use std::sync::{Arc, Weak};
8use sync15::bso::{IncomingBso, OutgoingBso};
9use sync15::engine::{ApplyResults, BridgedEngine as Sync15BridgedEngine};
10use sync_guid::Guid as SyncGuid;
11
12use crate::db::{delete_meta, get_meta, put_meta, ThreadSafeStorageDb};
13use crate::schema;
14use crate::sync::incoming::{apply_actions, get_incoming, plan_incoming, stage_incoming};
15use crate::sync::outgoing::{get_outgoing, record_uploaded, stage_outgoing};
16use crate::WebExtStorageStore;
17
18const LAST_SYNC_META_KEY: &str = "last_sync_time";
19const SYNC_ID_META_KEY: &str = "sync_id";
20
21impl WebExtStorageStore {
22 pub fn bridged_engine(self: Arc<Self>) -> Arc<WebExtStorageBridgedEngine> {
24 let engine = Box::new(BridgedEngine::new(&self.db));
25 let bridged_engine = WebExtStorageBridgedEngine {
26 bridge_impl: engine,
27 };
28 Arc::new(bridged_engine)
29 }
30}
31
32pub struct BridgedEngine {
42 db: Weak<ThreadSafeStorageDb>,
43}
44
45impl BridgedEngine {
46 pub fn new(db: &Arc<ThreadSafeStorageDb>) -> Self {
48 BridgedEngine {
49 db: Arc::downgrade(db),
50 }
51 }
52
53 fn do_reset(&self, tx: &Transaction<'_>) -> Result<()> {
54 tx.execute_batch(
55 "DELETE FROM storage_sync_mirror;
56 UPDATE storage_sync_data SET sync_change_counter = 1;",
57 )?;
58 delete_meta(tx, LAST_SYNC_META_KEY)?;
59 Ok(())
60 }
61
62 fn thread_safe_storage_db(&self) -> Result<Arc<ThreadSafeStorageDb>> {
63 self.db
64 .upgrade()
65 .ok_or_else(|| crate::error::Error::DatabaseConnectionClosed.into())
66 }
67}
68
69impl Sync15BridgedEngine for BridgedEngine {
70 fn last_sync(&self) -> Result<i64> {
71 let shared_db = self.thread_safe_storage_db()?;
72 let db = shared_db.lock();
73 let conn = db.get_connection()?;
74 Ok(get_meta(conn, LAST_SYNC_META_KEY)?.unwrap_or(0))
75 }
76
77 fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
78 let shared_db = self.thread_safe_storage_db()?;
79 let db = shared_db.lock();
80 let conn = db.get_connection()?;
81 put_meta(conn, LAST_SYNC_META_KEY, &last_sync_millis)?;
82 Ok(())
83 }
84
85 fn sync_id(&self) -> Result<Option<String>> {
86 let shared_db = self.thread_safe_storage_db()?;
87 let db = shared_db.lock();
88 let conn = db.get_connection()?;
89 Ok(get_meta(conn, SYNC_ID_META_KEY)?)
90 }
91
92 fn reset_sync_id(&self) -> Result<String> {
93 let shared_db = self.thread_safe_storage_db()?;
94 let db = shared_db.lock();
95 let conn = db.get_connection()?;
96 let tx = conn.unchecked_transaction()?;
97 let new_id = SyncGuid::random().to_string();
98 self.do_reset(&tx)?;
99 put_meta(&tx, SYNC_ID_META_KEY, &new_id)?;
100 tx.commit()?;
101 Ok(new_id)
102 }
103
104 fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
105 let shared_db = self.thread_safe_storage_db()?;
106 let db = shared_db.lock();
107 let conn = db.get_connection()?;
108 let current: Option<String> = get_meta(conn, SYNC_ID_META_KEY)?;
109 Ok(match current {
110 Some(current) if current == sync_id => current,
111 _ => {
112 let conn = db.get_connection()?;
113 let tx = conn.unchecked_transaction()?;
114 self.do_reset(&tx)?;
115 let result = sync_id.to_string();
116 put_meta(&tx, SYNC_ID_META_KEY, &result)?;
117 tx.commit()?;
118 result
119 }
120 })
121 }
122
123 fn sync_started(&self) -> Result<()> {
124 let shared_db = self.thread_safe_storage_db()?;
125 let db = shared_db.lock();
126 let conn = db.get_connection()?;
127 schema::create_empty_sync_temp_tables(conn)?;
128 Ok(())
129 }
130
131 fn store_incoming(&self, incoming_bsos: Vec<IncomingBso>) -> Result<()> {
132 let shared_db = self.thread_safe_storage_db()?;
133 let db = shared_db.lock();
134 let signal = db.begin_interrupt_scope()?;
135 let conn = db.get_connection()?;
136 let tx = conn.unchecked_transaction()?;
137 let incoming_content: Vec<_> = incoming_bsos
138 .into_iter()
139 .map(IncomingBso::into_content::<super::WebextRecord>)
140 .collect();
141 stage_incoming(&tx, &incoming_content, &signal)?;
142 tx.commit()?;
143 Ok(())
144 }
145
146 fn apply(&self) -> Result<ApplyResults> {
147 let shared_db = self.thread_safe_storage_db()?;
148 let db = shared_db.lock();
149 let signal = db.begin_interrupt_scope()?;
150 let conn = db.get_connection()?;
151 let tx = conn.unchecked_transaction()?;
152 let incoming = get_incoming(&tx)?;
153 let actions = incoming
154 .into_iter()
155 .map(|(item, state)| (item, plan_incoming(state)))
156 .collect();
157 apply_actions(&tx, actions, &signal)?;
158 stage_outgoing(&tx)?;
159 tx.commit()?;
160
161 Ok(get_outgoing(conn, &signal)?.into())
162 }
163
164 fn set_uploaded(&self, _server_modified_millis: i64, ids: &[SyncGuid]) -> Result<()> {
165 let shared_db = self.thread_safe_storage_db()?;
166 let db = shared_db.lock();
167 let conn = db.get_connection()?;
168 let signal = db.begin_interrupt_scope()?;
169 let tx = conn.unchecked_transaction()?;
170 record_uploaded(&tx, ids, &signal)?;
171 tx.commit()?;
172
173 Ok(())
174 }
175
176 fn sync_finished(&self) -> Result<()> {
177 let shared_db = self.thread_safe_storage_db()?;
178 let db = shared_db.lock();
179 let conn = db.get_connection()?;
180 schema::create_empty_sync_temp_tables(conn)?;
181 Ok(())
182 }
183
184 fn reset(&self) -> Result<()> {
185 let shared_db = self.thread_safe_storage_db()?;
186 let db = shared_db.lock();
187 let conn = db.get_connection()?;
188 let tx = conn.unchecked_transaction()?;
189 self.do_reset(&tx)?;
190 delete_meta(&tx, SYNC_ID_META_KEY)?;
191 tx.commit()?;
192 Ok(())
193 }
194
195 fn wipe(&self) -> Result<()> {
196 let shared_db = self.thread_safe_storage_db()?;
197 let db = shared_db.lock();
198 let conn = db.get_connection()?;
199 let tx = conn.unchecked_transaction()?;
200 tx.execute_batch(
202 "DELETE FROM storage_sync_data; DELETE FROM storage_sync_mirror; DELETE FROM meta;",
203 )?;
204 tx.commit()?;
205 Ok(())
206 }
207}
208
209pub struct WebExtStorageBridgedEngine {
210 bridge_impl: Box<dyn Sync15BridgedEngine>,
211}
212
213impl WebExtStorageBridgedEngine {
214 pub fn new(bridge_impl: Box<dyn Sync15BridgedEngine>) -> Self {
215 Self { bridge_impl }
216 }
217
218 pub fn last_sync(&self) -> Result<i64> {
219 self.bridge_impl.last_sync()
220 }
221
222 pub fn set_last_sync(&self, last_sync: i64) -> Result<()> {
223 self.bridge_impl.set_last_sync(last_sync)
224 }
225
226 pub fn sync_id(&self) -> Result<Option<String>> {
227 self.bridge_impl.sync_id()
228 }
229
230 pub fn reset_sync_id(&self) -> Result<String> {
231 self.bridge_impl.reset_sync_id()
232 }
233
234 pub fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
235 self.bridge_impl.ensure_current_sync_id(sync_id)
236 }
237
238 pub fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
239 self.bridge_impl.prepare_for_sync(client_data)
240 }
241
242 pub fn store_incoming(&self, incoming: Vec<String>) -> Result<()> {
243 self.bridge_impl
244 .store_incoming(self.convert_incoming_bsos(incoming)?)
245 }
246
247 pub fn apply(&self) -> Result<Vec<String>> {
248 let apply_results = self.bridge_impl.apply()?;
249 self.convert_outgoing_bsos(apply_results.records)
250 }
251
252 pub fn set_uploaded(&self, server_modified_millis: i64, guids: Vec<SyncGuid>) -> Result<()> {
253 self.bridge_impl
254 .set_uploaded(server_modified_millis, &guids)
255 }
256
257 pub fn sync_started(&self) -> Result<()> {
258 self.bridge_impl.sync_started()
259 }
260
261 pub fn sync_finished(&self) -> Result<()> {
262 self.bridge_impl.sync_finished()
263 }
264
265 pub fn reset(&self) -> Result<()> {
266 self.bridge_impl.reset()
267 }
268
269 pub fn wipe(&self) -> Result<()> {
270 self.bridge_impl.wipe()
271 }
272
273 fn convert_incoming_bsos(&self, incoming: Vec<String>) -> Result<Vec<IncomingBso>> {
274 let mut bsos = Vec::with_capacity(incoming.len());
275 for inc in incoming {
276 bsos.push(serde_json::from_str::<IncomingBso>(&inc)?);
277 }
278 Ok(bsos)
279 }
280
281 fn convert_outgoing_bsos(&self, outgoing: Vec<OutgoingBso>) -> Result<Vec<String>> {
283 let mut bsos = Vec::with_capacity(outgoing.len());
284 for e in outgoing {
285 bsos.push(serde_json::to_string(&e)?);
286 }
287 Ok(bsos)
288 }
289}
290
291impl From<anyhow::Error> for crate::error::Error {
292 fn from(value: anyhow::Error) -> Self {
293 crate::error::Error::SyncError(value.to_string())
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::db::test::new_mem_thread_safe_storage_db;
301 use crate::db::StorageDb;
302 use sync15::engine::BridgedEngine;
303
304 fn query_count(db: &StorageDb, table: &str) -> u32 {
305 let conn = db.get_connection().expect("should retrieve connection");
306 conn.query_row_and_then(&format!("SELECT COUNT(*) FROM {};", table), [], |row| {
307 row.get::<_, u32>(0)
308 })
309 .expect("should work")
310 }
311
312 fn setup_mock_data(engine: &super::BridgedEngine) -> Result<()> {
314 {
315 let shared = engine.thread_safe_storage_db()?;
316 let db = shared.lock();
317 let conn = db.get_connection().expect("should retrieve connection");
318 conn.execute(
319 "INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
320 VALUES ('ext-a', 'invalid-json', 2)",
321 [],
322 )?;
323 conn.execute(
324 "INSERT INTO storage_sync_mirror (guid, ext_id, data)
325 VALUES ('guid', 'ext-a', '3')",
326 [],
327 )?;
328 }
329 engine.set_last_sync(1)?;
330
331 let shared = engine.thread_safe_storage_db()?;
332 let db = shared.lock();
333 assert_eq!(query_count(&db, "storage_sync_data"), 1);
335 assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
336 assert_eq!(query_count(&db, "meta"), 1);
337 Ok(())
338 }
339
340 fn assert_reset(engine: &super::BridgedEngine) -> Result<()> {
342 let shared = engine.thread_safe_storage_db()?;
344 let db = shared.lock();
345 let conn = db.get_connection().expect("should retrieve connection");
346 assert_eq!(query_count(&db, "storage_sync_data"), 1);
347
348 let cc = conn.query_row_and_then(
350 "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
351 [],
352 |row| row.get::<_, u32>(0),
353 )?;
354 assert_eq!(cc, 1);
355 assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
357 assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_none());
359 Ok(())
360 }
361
362 fn assert_not_reset(engine: &super::BridgedEngine) -> Result<()> {
364 let shared = engine.thread_safe_storage_db()?;
365 let db = shared.lock();
366 let conn = db.get_connection().expect("should retrieve connection");
367 assert_eq!(query_count(&db, "storage_sync_data"), 1);
368 let cc = conn.query_row_and_then(
369 "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext-a';",
370 [],
371 |row| row.get::<_, u32>(0),
372 )?;
373 assert_eq!(cc, 2);
374 assert_eq!(query_count(&db, "storage_sync_mirror"), 1);
375 assert!(get_meta::<i64>(conn, LAST_SYNC_META_KEY)?.is_some());
377 Ok(())
378 }
379
380 #[test]
381 fn test_wipe() -> Result<()> {
382 let strong = new_mem_thread_safe_storage_db();
383 let engine = super::BridgedEngine::new(&strong);
384
385 setup_mock_data(&engine)?;
386
387 engine.wipe()?;
388
389 let shared = engine.thread_safe_storage_db()?;
390 let db = shared.lock();
391
392 assert_eq!(query_count(&db, "storage_sync_data"), 0);
393 assert_eq!(query_count(&db, "storage_sync_mirror"), 0);
394 assert_eq!(query_count(&db, "meta"), 0);
395 Ok(())
396 }
397
398 #[test]
399 fn test_reset() -> Result<()> {
400 let strong = &new_mem_thread_safe_storage_db();
401 let engine = super::BridgedEngine::new(strong);
402
403 setup_mock_data(&engine)?;
404 {
405 let db = strong.lock();
406 let conn = db.get_connection()?;
407 put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
408 }
409
410 engine.reset()?;
411 assert_reset(&engine)?;
412
413 {
414 let db = strong.lock();
415 let conn = db.get_connection()?;
416 assert_eq!(get_meta::<String>(conn, SYNC_ID_META_KEY)?, None);
418 }
419
420 Ok(())
421 }
422
423 #[test]
424 fn test_ensure_missing_sync_id() -> Result<()> {
425 let strong = new_mem_thread_safe_storage_db();
426 let engine = super::BridgedEngine::new(&strong);
427
428 setup_mock_data(&engine)?;
429
430 assert_eq!(engine.sync_id()?, None);
431 engine.ensure_current_sync_id("new-id")?;
433 assert_reset(&engine)?;
435 Ok(())
436 }
437
438 #[test]
439 fn test_ensure_new_sync_id() -> Result<()> {
440 let strong = new_mem_thread_safe_storage_db();
441 let engine = super::BridgedEngine::new(&strong);
442
443 setup_mock_data(&engine)?;
444
445 {
446 let storage_db = &engine.thread_safe_storage_db()?;
447 let db = storage_db.lock();
448 let conn = db.get_connection()?;
449 put_meta(conn, SYNC_ID_META_KEY, &"old-id".to_string())?;
450 }
451
452 assert_not_reset(&engine)?;
453 assert_eq!(engine.sync_id()?, Some("old-id".to_string()));
454
455 engine.ensure_current_sync_id("new-id")?;
456 assert_reset(&engine)?;
458 assert_eq!(engine.sync_id()?, Some("new-id".to_string()));
460 Ok(())
461 }
462
463 #[test]
464 fn test_ensure_same_sync_id() -> Result<()> {
465 let strong = new_mem_thread_safe_storage_db();
466 let engine = super::BridgedEngine::new(&strong);
467
468 setup_mock_data(&engine)?;
469 assert_not_reset(&engine)?;
470
471 {
472 let storage_db = &engine.thread_safe_storage_db()?;
473 let db = storage_db.lock();
474 let conn = db.get_connection()?;
475 put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
476 }
477
478 engine.ensure_current_sync_id("sync-id")?;
479 assert_not_reset(&engine)?;
481 Ok(())
482 }
483
484 #[test]
485 fn test_reset_sync_id() -> Result<()> {
486 let strong = new_mem_thread_safe_storage_db();
487 let engine = super::BridgedEngine::new(&strong);
488
489 setup_mock_data(&engine)?;
490
491 {
492 let storage_db = &engine.thread_safe_storage_db()?;
493 let db = storage_db.lock();
494 let conn = db.get_connection()?;
495 put_meta(conn, SYNC_ID_META_KEY, &"sync-id".to_string())?;
496 }
497
498 assert_eq!(engine.sync_id()?, Some("sync-id".to_string()));
499 let new_id = engine.reset_sync_id()?;
500 assert_reset(&engine)?;
502 assert_eq!(engine.sync_id()?, Some(new_id));
503 Ok(())
504 }
505}