1use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
6use crate::error::*;
7use crate::Store;
8use error_support::warn;
9use rusqlite::{
10 types::{FromSql, ToSql},
11 Connection, Transaction,
12};
13use std::sync::Arc;
14use sync15::bso::{IncomingBso, OutgoingBso};
15use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
16use sync15::{telemetry, CollectionName, ServerTimestamp};
17use sync_guid::Guid;
18
19pub struct EngineConfig {
22 pub(crate) namespace: String, pub(crate) collection: CollectionName, }
25
26pub const LAST_SYNC_META_KEY: &str = "last_sync_time";
28pub const GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
29pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
30
31pub trait SyncEngineStorageImpl<T> {
33 fn get_incoming_impl(
34 &self,
35 enc_key: &Option<String>,
36 ) -> Result<Box<dyn ProcessIncomingRecordImpl<Record = T>>>;
37 fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
38 fn get_outgoing_impl(
39 &self,
40 enc_key: &Option<String>,
41 ) -> Result<Box<dyn ProcessOutgoingRecordImpl<Record = T>>>;
42}
43
44pub struct ConfigSyncEngine<T> {
46 pub(crate) config: EngineConfig,
47 pub(crate) store: Arc<Store>,
48 pub(crate) storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
49 local_enc_key: Option<String>,
50}
51
52impl<T> ConfigSyncEngine<T> {
53 pub fn new(
54 config: EngineConfig,
55 store: Arc<Store>,
56 storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
57 ) -> Self {
58 Self {
59 config,
60 store,
61 storage_impl,
62 local_enc_key: None,
63 }
64 }
65 fn put_meta(&self, conn: &Connection, tail: &str, value: &dyn ToSql) -> Result<()> {
66 let key = format!("{}.{}", self.config.namespace, tail);
67 crate::db::store::put_meta(conn, &key, value)
68 }
69 fn get_meta<V: FromSql>(&self, conn: &Connection, tail: &str) -> Result<Option<V>> {
70 let key = format!("{}.{}", self.config.namespace, tail);
71 crate::db::store::get_meta(conn, &key)
72 }
73 fn delete_meta(&self, conn: &Connection, tail: &str) -> Result<()> {
74 let key = format!("{}.{}", self.config.namespace, tail);
75 crate::db::store::delete_meta(conn, &key)
76 }
77 pub fn reset_local_sync_data(&self) -> Result<()> {
79 let db = &self.store.db.lock().unwrap();
80 let tx = db.unchecked_transaction()?;
81 self.storage_impl.reset_storage(&tx)?;
82 self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
83 tx.commit()?;
84 Ok(())
85 }
86
87 pub fn reset_local_sync_data_for_verification(&self, conn: &Connection) -> Result<()> {
90 let tx = conn.unchecked_transaction()?;
91 self.storage_impl.reset_storage(&tx)?;
92 self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
93 tx.commit()?;
94 Ok(())
95 }
96}
97
98impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
99 fn collection_name(&self) -> CollectionName {
100 self.config.collection.clone()
101 }
102
103 fn set_local_encryption_key(&mut self, key: &str) -> anyhow::Result<()> {
104 self.local_enc_key = Some(key.to_string());
105 Ok(())
106 }
107
108 fn prepare_for_sync(
109 &self,
110 _get_client_data: &dyn Fn() -> sync15::ClientData,
111 ) -> anyhow::Result<()> {
112 let db = &self.store.db.lock().unwrap();
113 let signal = db.begin_interrupt_scope()?;
114 crate::db::schema::create_empty_sync_temp_tables(&db.writer)?;
115 signal.err_if_interrupted()?;
116 Ok(())
117 }
118
119 fn stage_incoming(
120 &self,
121 inbound: Vec<IncomingBso>,
122 telem: &mut telemetry::Engine,
123 ) -> anyhow::Result<()> {
124 let db = &self.store.db.lock().unwrap();
125 let signal = db.begin_interrupt_scope()?;
126
127 let mut incoming_telemetry = telemetry::EngineIncoming::new();
129 incoming_telemetry.applied(inbound.len() as u32);
130 telem.incoming(incoming_telemetry);
131 let tx = db.writer.unchecked_transaction()?;
132 let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
133
134 incoming_impl.stage_incoming(&tx, inbound, &signal)?;
135 tx.commit()?;
136 Ok(())
137 }
138
139 fn apply(
140 &self,
141 timestamp: ServerTimestamp,
142 _telem: &mut telemetry::Engine,
143 ) -> anyhow::Result<Vec<OutgoingBso>> {
144 let db = &self.store.db.lock().unwrap();
145 let signal = db.begin_interrupt_scope()?;
146 let tx = db.writer.unchecked_transaction()?;
147 let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
148 let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
149
150 for state in incoming_impl.fetch_incoming_states(&tx)? {
152 signal.err_if_interrupted()?;
153 let action = plan_incoming(&*incoming_impl, &tx, state)?;
155 super::apply_incoming_action(&*incoming_impl, &tx, action)?;
156 }
157
158 self.put_meta(&tx, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
162
163 incoming_impl.finish_incoming(&tx)?;
164
165 let outgoing = outgoing_impl.fetch_outgoing_records(&tx)?;
167 tx.commit()?;
172 Ok(outgoing)
173 }
174
175 fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
176 let db = &self.store.db.lock().unwrap();
177 self.put_meta(&db.writer, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
178 let tx = db.writer.unchecked_transaction()?;
179 let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
180 outgoing_impl.finish_synced_items(&tx, ids)?;
181 tx.commit()?;
182 Ok(())
183 }
184
185 fn get_collection_request(
186 &self,
187 server_timestamp: ServerTimestamp,
188 ) -> anyhow::Result<Option<CollectionRequest>> {
189 let db = &self.store.db.lock().unwrap();
190 let since = ServerTimestamp(
191 self.get_meta::<i64>(&db.writer, LAST_SYNC_META_KEY)?
192 .unwrap_or_default(),
193 );
194 Ok(if since == server_timestamp {
195 None
196 } else {
197 Some(
198 CollectionRequest::new(self.collection_name())
199 .full()
200 .newer_than(since),
201 )
202 })
203 }
204
205 fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
206 let db = &self.store.db.lock().unwrap();
207 let global = self.get_meta(&db.writer, GLOBAL_SYNCID_META_KEY)?;
208 let coll = self.get_meta(&db.writer, COLLECTION_SYNCID_META_KEY)?;
209 Ok(if let (Some(global), Some(coll)) = (global, coll) {
210 EngineSyncAssociation::Connected(CollSyncIds { global, coll })
211 } else {
212 EngineSyncAssociation::Disconnected
213 })
214 }
215
216 fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
217 let db = &self.store.db.lock().unwrap();
218 let tx = db.unchecked_transaction()?;
219 self.storage_impl.reset_storage(&tx)?;
220 self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
223
224 match assoc {
227 EngineSyncAssociation::Disconnected => {
228 self.delete_meta(&tx, GLOBAL_SYNCID_META_KEY)?;
229 self.delete_meta(&tx, COLLECTION_SYNCID_META_KEY)?;
230 }
231 EngineSyncAssociation::Connected(ids) => {
232 self.put_meta(&tx, GLOBAL_SYNCID_META_KEY, &ids.global)?;
233 self.put_meta(&tx, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
234 }
235 }
236
237 tx.commit()?;
238 Ok(())
239 }
240
241 fn wipe(&self) -> anyhow::Result<()> {
242 warn!("not implemented as there isn't a valid use case for it");
243 Ok(())
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use crate::db::credit_cards::add_internal_credit_card;
251 use crate::db::credit_cards::tests::{
252 get_all, insert_tombstone_record, test_insert_mirror_record,
253 };
254 use crate::db::models::credit_card::InternalCreditCard;
255 use crate::db::schema::create_empty_sync_temp_tables;
256 use crate::encryption::EncryptorDecryptor;
257 use crate::sync::{IncomingBso, UnknownFields};
258 use nss::ensure_initialized;
259 use sql_support::ConnExt;
260
261 impl InternalCreditCard {
262 pub fn into_test_incoming_bso(
263 self,
264 encdec: &EncryptorDecryptor,
265 unknown_fields: UnknownFields,
266 ) -> IncomingBso {
267 let mut payload = self.into_payload(encdec).expect("is json");
268 payload.entry.unknown_fields = unknown_fields;
269 IncomingBso::from_test_content(payload)
270 }
271 }
272
273 fn create_engine() -> ConfigSyncEngine<InternalCreditCard> {
275 let store = crate::db::store::Store::new_memory();
276 crate::sync::credit_card::create_engine(Arc::new(store))
277 }
278
279 pub fn clear_cc_tables(conn: &Connection) -> rusqlite::Result<(), rusqlite::Error> {
280 conn.execute_all(&[
281 "DELETE FROM credit_cards_data;",
282 "DELETE FROM credit_cards_mirror;",
283 "DELETE FROM credit_cards_tombstones;",
284 "DELETE FROM moz_meta;",
285 ])
286 }
287
288 #[test]
289 fn test_credit_card_engine_apply_timestamp() -> Result<()> {
290 ensure_initialized();
291 let mut credit_card_engine = create_engine();
292 let test_key = crate::encryption::create_autofill_key().unwrap();
293 credit_card_engine
294 .set_local_encryption_key(&test_key)
295 .unwrap();
296 {
297 create_empty_sync_temp_tables(&credit_card_engine.store.db.lock().unwrap())?;
298 }
299
300 let mut telem = telemetry::Engine::new("whatever");
301 let last_sync = 24;
302 let result = credit_card_engine.apply(ServerTimestamp::from_millis(last_sync), &mut telem);
303 assert!(result.is_ok());
304
305 let conn = &credit_card_engine.store.db.lock().unwrap().writer;
307
308 assert_eq!(
309 credit_card_engine.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?,
310 Some(last_sync)
311 );
312
313 Ok(())
314 }
315
316 #[test]
317 fn test_credit_card_engine_get_sync_assoc() -> Result<()> {
318 ensure_initialized();
319 let credit_card_engine = create_engine();
320
321 let result = credit_card_engine.get_sync_assoc();
322 assert!(result.is_ok());
323
324 assert_eq!(result.unwrap(), EngineSyncAssociation::Disconnected);
326
327 let global_guid = Guid::new("AAAA");
329 let coll_guid = Guid::new("AAAA");
330 let ids = CollSyncIds {
331 global: global_guid,
332 coll: coll_guid,
333 };
334 {
335 let conn = &credit_card_engine.store.db.lock().unwrap().writer;
336 credit_card_engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
337 credit_card_engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
338 }
339
340 let result = credit_card_engine.get_sync_assoc();
341 assert!(result.is_ok());
342
343 assert_eq!(result.unwrap(), EngineSyncAssociation::Connected(ids));
345 Ok(())
346 }
347
348 #[test]
349 fn test_engine_sync_reset() -> Result<()> {
350 ensure_initialized();
351 let engine = create_engine();
352 let encdec = EncryptorDecryptor::new_with_random_key().unwrap();
353
354 let cc = InternalCreditCard {
355 guid: Guid::random(),
356 cc_name: "Ms Jane Doe".to_string(),
357 cc_number_enc: encdec.encrypt("12341232412341234")?,
358 cc_number_last_4: "1234".to_string(),
359 cc_exp_month: 12,
360 cc_exp_year: 2021,
361 cc_type: "visa".to_string(),
362 ..Default::default()
363 };
364
365 {
366 let db = &engine.store.db.lock().unwrap();
368 let tx = db.writer.unchecked_transaction()?;
369 add_internal_credit_card(&tx, &cc)?;
371 test_insert_mirror_record(
372 &tx,
373 cc.clone()
374 .into_test_incoming_bso(&encdec, Default::default()),
375 );
376 insert_tombstone_record(&tx, Guid::random().to_string())?;
377 tx.commit()?;
378 }
379
380 let global_guid = Guid::new("AAAA");
382 let coll_guid = Guid::new("AAAA");
383 let ids = CollSyncIds {
384 global: global_guid.clone(),
385 coll: coll_guid.clone(),
386 };
387 {
388 let conn = &engine.store.db.lock().unwrap().writer;
389 engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
390 engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
391 }
392
393 engine
395 .reset(&EngineSyncAssociation::Disconnected)
396 .expect("should work");
397
398 {
399 let conn = &engine.store.db.lock().unwrap().writer;
400
401 assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());
403 assert!(get_all(conn, "credit_cards_tombstones".to_string())?.is_empty());
404
405 let expected_sync_time = 0;
407 assert_eq!(
408 engine
409 .get_meta::<i64>(conn, LAST_SYNC_META_KEY)?
410 .unwrap_or(1),
411 expected_sync_time
412 );
413
414 assert!(engine
416 .get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?
417 .is_none());
418 assert!(engine
419 .get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?
420 .is_none());
421
422 clear_cc_tables(conn)?;
423
424 let tx = conn.unchecked_transaction()?;
426 add_internal_credit_card(&tx, &cc)?;
427 test_insert_mirror_record(&tx, cc.into_test_incoming_bso(&encdec, Default::default()));
428 insert_tombstone_record(&tx, Guid::random().to_string())?;
429 tx.commit()?;
430 }
431
432 engine
434 .reset(&EngineSyncAssociation::Connected(ids))
435 .expect("should work");
436
437 let conn = &engine.store.db.lock().unwrap().writer;
438 let retrieved_global_sync_id = engine.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?;
440 assert_eq!(
441 retrieved_global_sync_id.unwrap_or_default(),
442 global_guid.to_string()
443 );
444
445 let retrieved_coll_sync_id = engine.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?;
446 assert_eq!(
447 retrieved_coll_sync_id.unwrap_or_default(),
448 coll_guid.to_string()
449 );
450 Ok(())
451 }
452}