1use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
6use crate::error::*;
7use crate::Store;
8use error_support::warn;
9use rusqlite::{
10 types::{FromSql, ToSql},
11 Connection, Transaction,
12};
13use std::sync::Arc;
14use sync15::bso::{IncomingBso, OutgoingBso};
15use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
16use sync15::{telemetry, CollectionName, ServerTimestamp};
17use sync_guid::Guid;
18
19pub struct EngineConfig {
22 pub(crate) namespace: String, pub(crate) collection: CollectionName, }
25
26pub const LAST_SYNC_META_KEY: &str = "last_sync_time";
28pub const GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
29pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
30
31pub trait SyncEngineStorageImpl<T> {
33 fn get_incoming_impl(
34 &self,
35 enc_key: &Option<String>,
36 ) -> Result<Box<dyn ProcessIncomingRecordImpl<Record = T>>>;
37 fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
38 fn get_outgoing_impl(
39 &self,
40 enc_key: &Option<String>,
41 ) -> Result<Box<dyn ProcessOutgoingRecordImpl<Record = T>>>;
42}
43
44pub struct ConfigSyncEngine<T> {
46 pub(crate) config: EngineConfig,
47 pub(crate) store: Arc<Store>,
48 pub(crate) storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
49 local_enc_key: Option<String>,
50}
51
52impl<T> ConfigSyncEngine<T> {
53 pub fn new(
54 config: EngineConfig,
55 store: Arc<Store>,
56 storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
57 ) -> Self {
58 Self {
59 config,
60 store,
61 storage_impl,
62 local_enc_key: None,
63 }
64 }
65 fn put_meta(&self, conn: &Connection, tail: &str, value: &dyn ToSql) -> Result<()> {
66 let key = format!("{}.{}", self.config.namespace, tail);
67 crate::db::store::put_meta(conn, &key, value)
68 }
69 fn get_meta<V: FromSql>(&self, conn: &Connection, tail: &str) -> Result<Option<V>> {
70 let key = format!("{}.{}", self.config.namespace, tail);
71 crate::db::store::get_meta(conn, &key)
72 }
73 fn delete_meta(&self, conn: &Connection, tail: &str) -> Result<()> {
74 let key = format!("{}.{}", self.config.namespace, tail);
75 crate::db::store::delete_meta(conn, &key)
76 }
77 pub fn reset_local_sync_data(&self) -> Result<()> {
79 let db = &self.store.db.lock().unwrap();
80 let tx = db.unchecked_transaction()?;
81 self.storage_impl.reset_storage(&tx)?;
82 self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
83 tx.commit()?;
84 Ok(())
85 }
86}
87
88impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
89 fn collection_name(&self) -> CollectionName {
90 self.config.collection.clone()
91 }
92
93 fn set_local_encryption_key(&mut self, key: &str) -> anyhow::Result<()> {
94 self.local_enc_key = Some(key.to_string());
95 Ok(())
96 }
97
98 fn prepare_for_sync(
99 &self,
100 _get_client_data: &dyn Fn() -> sync15::ClientData,
101 ) -> anyhow::Result<()> {
102 let db = &self.store.db.lock().unwrap();
103 let signal = db.begin_interrupt_scope()?;
104 crate::db::schema::create_empty_sync_temp_tables(&db.writer)?;
105 signal.err_if_interrupted()?;
106 Ok(())
107 }
108
109 fn stage_incoming(
110 &self,
111 inbound: Vec<IncomingBso>,
112 telem: &mut telemetry::Engine,
113 ) -> anyhow::Result<()> {
114 let db = &self.store.db.lock().unwrap();
115 let signal = db.begin_interrupt_scope()?;
116
117 let mut incoming_telemetry = telemetry::EngineIncoming::new();
119 incoming_telemetry.applied(inbound.len() as u32);
120 telem.incoming(incoming_telemetry);
121 let tx = db.writer.unchecked_transaction()?;
122 let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
123
124 incoming_impl.stage_incoming(&tx, inbound, &signal)?;
125 tx.commit()?;
126 Ok(())
127 }
128
129 fn apply(
130 &self,
131 timestamp: ServerTimestamp,
132 _telem: &mut telemetry::Engine,
133 ) -> anyhow::Result<Vec<OutgoingBso>> {
134 let db = &self.store.db.lock().unwrap();
135 let signal = db.begin_interrupt_scope()?;
136 let tx = db.writer.unchecked_transaction()?;
137 let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
138 let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
139
140 for state in incoming_impl.fetch_incoming_states(&tx)? {
142 signal.err_if_interrupted()?;
143 let action = plan_incoming(&*incoming_impl, &tx, state)?;
145 super::apply_incoming_action(&*incoming_impl, &tx, action)?;
146 }
147
148 self.put_meta(&tx, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
152
153 incoming_impl.finish_incoming(&tx)?;
154
155 let outgoing = outgoing_impl.fetch_outgoing_records(&tx)?;
157 tx.commit()?;
162 Ok(outgoing)
163 }
164
165 fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
166 let db = &self.store.db.lock().unwrap();
167 self.put_meta(&db.writer, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
168 let tx = db.writer.unchecked_transaction()?;
169 let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
170 outgoing_impl.finish_synced_items(&tx, ids)?;
171 tx.commit()?;
172 Ok(())
173 }
174
175 fn get_collection_request(
176 &self,
177 server_timestamp: ServerTimestamp,
178 ) -> anyhow::Result<Option<CollectionRequest>> {
179 let db = &self.store.db.lock().unwrap();
180 let since = ServerTimestamp(
181 self.get_meta::<i64>(&db.writer, LAST_SYNC_META_KEY)?
182 .unwrap_or_default(),
183 );
184 Ok(if since == server_timestamp {
185 None
186 } else {
187 Some(
188 CollectionRequest::new(self.collection_name())
189 .full()
190 .newer_than(since),
191 )
192 })
193 }
194
195 fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
196 let db = &self.store.db.lock().unwrap();
197 let global = self.get_meta(&db.writer, GLOBAL_SYNCID_META_KEY)?;
198 let coll = self.get_meta(&db.writer, COLLECTION_SYNCID_META_KEY)?;
199 Ok(if let (Some(global), Some(coll)) = (global, coll) {
200 EngineSyncAssociation::Connected(CollSyncIds { global, coll })
201 } else {
202 EngineSyncAssociation::Disconnected
203 })
204 }
205
206 fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
207 let db = &self.store.db.lock().unwrap();
208 let tx = db.unchecked_transaction()?;
209 self.storage_impl.reset_storage(&tx)?;
210 self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
213
214 match assoc {
217 EngineSyncAssociation::Disconnected => {
218 self.delete_meta(&tx, GLOBAL_SYNCID_META_KEY)?;
219 self.delete_meta(&tx, COLLECTION_SYNCID_META_KEY)?;
220 }
221 EngineSyncAssociation::Connected(ids) => {
222 self.put_meta(&tx, GLOBAL_SYNCID_META_KEY, &ids.global)?;
223 self.put_meta(&tx, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
224 }
225 }
226
227 tx.commit()?;
228 Ok(())
229 }
230
231 fn wipe(&self) -> anyhow::Result<()> {
232 warn!("not implemented as there isn't a valid use case for it");
233 Ok(())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::db::credit_cards::add_internal_credit_card;
241 use crate::db::credit_cards::tests::{
242 get_all, insert_tombstone_record, test_insert_mirror_record,
243 };
244 use crate::db::models::credit_card::InternalCreditCard;
245 use crate::db::schema::create_empty_sync_temp_tables;
246 use crate::encryption::EncryptorDecryptor;
247 use crate::sync::{IncomingBso, UnknownFields};
248 use nss::ensure_initialized;
249 use sql_support::ConnExt;
250
251 impl InternalCreditCard {
252 pub fn into_test_incoming_bso(
253 self,
254 encdec: &EncryptorDecryptor,
255 unknown_fields: UnknownFields,
256 ) -> IncomingBso {
257 let mut payload = self.into_payload(encdec).expect("is json");
258 payload.entry.unknown_fields = unknown_fields;
259 IncomingBso::from_test_content(payload)
260 }
261 }
262
263 fn create_engine() -> ConfigSyncEngine<InternalCreditCard> {
265 let store = crate::db::store::Store::new_memory();
266 crate::sync::credit_card::create_engine(Arc::new(store))
267 }
268
269 pub fn clear_cc_tables(conn: &Connection) -> rusqlite::Result<(), rusqlite::Error> {
270 conn.execute_all(&[
271 "DELETE FROM credit_cards_data;",
272 "DELETE FROM credit_cards_mirror;",
273 "DELETE FROM credit_cards_tombstones;",
274 "DELETE FROM moz_meta;",
275 ])
276 }
277
278 #[test]
279 fn test_credit_card_engine_apply_timestamp() -> Result<()> {
280 ensure_initialized();
281 let mut credit_card_engine = create_engine();
282 let test_key = crate::encryption::create_autofill_key().unwrap();
283 credit_card_engine
284 .set_local_encryption_key(&test_key)
285 .unwrap();
286 {
287 create_empty_sync_temp_tables(&credit_card_engine.store.db.lock().unwrap())?;
288 }
289
290 let mut telem = telemetry::Engine::new("whatever");
291 let last_sync = 24;
292 let result = credit_card_engine.apply(ServerTimestamp::from_millis(last_sync), &mut telem);
293 assert!(result.is_ok());
294
295 let conn = &credit_card_engine.store.db.lock().unwrap().writer;
297
298 assert_eq!(
299 credit_card_engine.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?,
300 Some(last_sync)
301 );
302
303 Ok(())
304 }
305
306 #[test]
307 fn test_credit_card_engine_get_sync_assoc() -> Result<()> {
308 ensure_initialized();
309 let credit_card_engine = create_engine();
310
311 let result = credit_card_engine.get_sync_assoc();
312 assert!(result.is_ok());
313
314 assert_eq!(result.unwrap(), EngineSyncAssociation::Disconnected);
316
317 let global_guid = Guid::new("AAAA");
319 let coll_guid = Guid::new("AAAA");
320 let ids = CollSyncIds {
321 global: global_guid,
322 coll: coll_guid,
323 };
324 {
325 let conn = &credit_card_engine.store.db.lock().unwrap().writer;
326 credit_card_engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
327 credit_card_engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
328 }
329
330 let result = credit_card_engine.get_sync_assoc();
331 assert!(result.is_ok());
332
333 assert_eq!(result.unwrap(), EngineSyncAssociation::Connected(ids));
335 Ok(())
336 }
337
338 #[test]
339 fn test_engine_sync_reset() -> Result<()> {
340 ensure_initialized();
341 let engine = create_engine();
342 let encdec = EncryptorDecryptor::new_with_random_key().unwrap();
343
344 let cc = InternalCreditCard {
345 guid: Guid::random(),
346 cc_name: "Ms Jane Doe".to_string(),
347 cc_number_enc: encdec.encrypt("12341232412341234")?,
348 cc_number_last_4: "1234".to_string(),
349 cc_exp_month: 12,
350 cc_exp_year: 2021,
351 cc_type: "visa".to_string(),
352 ..Default::default()
353 };
354
355 {
356 let db = &engine.store.db.lock().unwrap();
358 let tx = db.writer.unchecked_transaction()?;
359 add_internal_credit_card(&tx, &cc)?;
361 test_insert_mirror_record(
362 &tx,
363 cc.clone()
364 .into_test_incoming_bso(&encdec, Default::default()),
365 );
366 insert_tombstone_record(&tx, Guid::random().to_string())?;
367 tx.commit()?;
368 }
369
370 let global_guid = Guid::new("AAAA");
372 let coll_guid = Guid::new("AAAA");
373 let ids = CollSyncIds {
374 global: global_guid.clone(),
375 coll: coll_guid.clone(),
376 };
377 {
378 let conn = &engine.store.db.lock().unwrap().writer;
379 engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
380 engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
381 }
382
383 engine
385 .reset(&EngineSyncAssociation::Disconnected)
386 .expect("should work");
387
388 {
389 let conn = &engine.store.db.lock().unwrap().writer;
390
391 assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());
393 assert!(get_all(conn, "credit_cards_tombstones".to_string())?.is_empty());
394
395 let expected_sync_time = 0;
397 assert_eq!(
398 engine
399 .get_meta::<i64>(conn, LAST_SYNC_META_KEY)?
400 .unwrap_or(1),
401 expected_sync_time
402 );
403
404 assert!(engine
406 .get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?
407 .is_none());
408 assert!(engine
409 .get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?
410 .is_none());
411
412 clear_cc_tables(conn)?;
413
414 let tx = conn.unchecked_transaction()?;
416 add_internal_credit_card(&tx, &cc)?;
417 test_insert_mirror_record(&tx, cc.into_test_incoming_bso(&encdec, Default::default()));
418 insert_tombstone_record(&tx, Guid::random().to_string())?;
419 tx.commit()?;
420 }
421
422 engine
424 .reset(&EngineSyncAssociation::Connected(ids))
425 .expect("should work");
426
427 let conn = &engine.store.db.lock().unwrap().writer;
428 let retrieved_global_sync_id = engine.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?;
430 assert_eq!(
431 retrieved_global_sync_id.unwrap_or_default(),
432 global_guid.to_string()
433 );
434
435 let retrieved_coll_sync_id = engine.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?;
436 assert_eq!(
437 retrieved_coll_sync_id.unwrap_or_default(),
438 coll_guid.to_string()
439 );
440 Ok(())
441 }
442}