autofill/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
6use crate::error::*;
7use crate::Store;
8use error_support::warn;
9use rusqlite::{
10    types::{FromSql, ToSql},
11    Connection, Transaction,
12};
13use std::sync::Arc;
14use sync15::bso::{IncomingBso, OutgoingBso};
15use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
16use sync15::{telemetry, CollectionName, ServerTimestamp};
17use sync_guid::Guid;
18
19// We have 2 engines in this crate and they are identical except for stuff
20// abstracted here!
21pub struct EngineConfig {
22    pub(crate) namespace: String,          // prefix for meta keys, etc.
23    pub(crate) collection: CollectionName, // collection name on the server.
24}
25
26// meta keys, will be prefixed by the "namespace"
27pub const LAST_SYNC_META_KEY: &str = "last_sync_time";
28pub const GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
29pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
30
31// A trait to abstract the broader sync processes.
32pub trait SyncEngineStorageImpl<T> {
33    fn get_incoming_impl(
34        &self,
35        enc_key: &Option<String>,
36    ) -> Result<Box<dyn ProcessIncomingRecordImpl<Record = T>>>;
37    fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
38    fn get_outgoing_impl(
39        &self,
40        enc_key: &Option<String>,
41    ) -> Result<Box<dyn ProcessOutgoingRecordImpl<Record = T>>>;
42}
43
44// A sync engine that gets functionality from an EngineConfig.
45pub struct ConfigSyncEngine<T> {
46    pub(crate) config: EngineConfig,
47    pub(crate) store: Arc<Store>,
48    pub(crate) storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
49    local_enc_key: Option<String>,
50}
51
52impl<T> ConfigSyncEngine<T> {
53    pub fn new(
54        config: EngineConfig,
55        store: Arc<Store>,
56        storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
57    ) -> Self {
58        Self {
59            config,
60            store,
61            storage_impl,
62            local_enc_key: None,
63        }
64    }
65    fn put_meta(&self, conn: &Connection, tail: &str, value: &dyn ToSql) -> Result<()> {
66        let key = format!("{}.{}", self.config.namespace, tail);
67        crate::db::store::put_meta(conn, &key, value)
68    }
69    fn get_meta<V: FromSql>(&self, conn: &Connection, tail: &str) -> Result<Option<V>> {
70        let key = format!("{}.{}", self.config.namespace, tail);
71        crate::db::store::get_meta(conn, &key)
72    }
73    fn delete_meta(&self, conn: &Connection, tail: &str) -> Result<()> {
74        let key = format!("{}.{}", self.config.namespace, tail);
75        crate::db::store::delete_meta(conn, &key)
76    }
77    // Reset the local sync data so the next server request fetches all records.
78    pub fn reset_local_sync_data(&self) -> Result<()> {
79        let db = &self.store.db.lock().unwrap();
80        let tx = db.unchecked_transaction()?;
81        self.storage_impl.reset_storage(&tx)?;
82        self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
83        tx.commit()?;
84        Ok(())
85    }
86
87    // We cannot call `reset_local_sync_data` from  the store's `scrub_undecryptable_credit_card_data_for_remote_replacement`
88    // function, so we've created this function specifically for it's use. It should not be used anywhere else.
89    pub fn reset_local_sync_data_for_verification(&self, conn: &Connection) -> Result<()> {
90        let tx = conn.unchecked_transaction()?;
91        self.storage_impl.reset_storage(&tx)?;
92        self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
93        tx.commit()?;
94        Ok(())
95    }
96}
97
98impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
99    fn collection_name(&self) -> CollectionName {
100        self.config.collection.clone()
101    }
102
103    fn set_local_encryption_key(&mut self, key: &str) -> anyhow::Result<()> {
104        self.local_enc_key = Some(key.to_string());
105        Ok(())
106    }
107
108    fn prepare_for_sync(
109        &self,
110        _get_client_data: &dyn Fn() -> sync15::ClientData,
111    ) -> anyhow::Result<()> {
112        let db = &self.store.db.lock().unwrap();
113        let signal = db.begin_interrupt_scope()?;
114        crate::db::schema::create_empty_sync_temp_tables(&db.writer)?;
115        signal.err_if_interrupted()?;
116        Ok(())
117    }
118
119    fn stage_incoming(
120        &self,
121        inbound: Vec<IncomingBso>,
122        telem: &mut telemetry::Engine,
123    ) -> anyhow::Result<()> {
124        let db = &self.store.db.lock().unwrap();
125        let signal = db.begin_interrupt_scope()?;
126
127        // Stage all incoming items.
128        let mut incoming_telemetry = telemetry::EngineIncoming::new();
129        incoming_telemetry.applied(inbound.len() as u32);
130        telem.incoming(incoming_telemetry);
131        let tx = db.writer.unchecked_transaction()?;
132        let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
133
134        incoming_impl.stage_incoming(&tx, inbound, &signal)?;
135        tx.commit()?;
136        Ok(())
137    }
138
139    fn apply(
140        &self,
141        timestamp: ServerTimestamp,
142        _telem: &mut telemetry::Engine,
143    ) -> anyhow::Result<Vec<OutgoingBso>> {
144        let db = &self.store.db.lock().unwrap();
145        let signal = db.begin_interrupt_scope()?;
146        let tx = db.writer.unchecked_transaction()?;
147        let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
148        let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
149
150        // Get "states" for each record...
151        for state in incoming_impl.fetch_incoming_states(&tx)? {
152            signal.err_if_interrupted()?;
153            // Finally get a "plan" and apply it.
154            let action = plan_incoming(&*incoming_impl, &tx, state)?;
155            super::apply_incoming_action(&*incoming_impl, &tx, action)?;
156        }
157
158        // write the timestamp now, so if we are interrupted merging or
159        // creating outgoing changesets we don't need to re-download the same
160        // records.
161        self.put_meta(&tx, LAST_SYNC_META_KEY, &timestamp.as_millis())?;
162
163        incoming_impl.finish_incoming(&tx)?;
164
165        // Finally, stage outgoing items.
166        let outgoing = outgoing_impl.fetch_outgoing_records(&tx)?;
167        // we're committing now because it may take a long time to actually perform the upload
168        // and we've already staged everything we need to complete the sync in a way that
169        // doesn't require the transaction to stay alive, so we commit now and start a new
170        // transaction once complete
171        tx.commit()?;
172        Ok(outgoing)
173    }
174
175    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
176        let db = &self.store.db.lock().unwrap();
177        self.put_meta(&db.writer, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
178        let tx = db.writer.unchecked_transaction()?;
179        let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
180        outgoing_impl.finish_synced_items(&tx, ids)?;
181        tx.commit()?;
182        Ok(())
183    }
184
185    fn get_collection_request(
186        &self,
187        server_timestamp: ServerTimestamp,
188    ) -> anyhow::Result<Option<CollectionRequest>> {
189        let db = &self.store.db.lock().unwrap();
190        let since = ServerTimestamp(
191            self.get_meta::<i64>(&db.writer, LAST_SYNC_META_KEY)?
192                .unwrap_or_default(),
193        );
194        Ok(if since == server_timestamp {
195            None
196        } else {
197            Some(
198                CollectionRequest::new(self.collection_name())
199                    .full()
200                    .newer_than(since),
201            )
202        })
203    }
204
205    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
206        let db = &self.store.db.lock().unwrap();
207        let global = self.get_meta(&db.writer, GLOBAL_SYNCID_META_KEY)?;
208        let coll = self.get_meta(&db.writer, COLLECTION_SYNCID_META_KEY)?;
209        Ok(if let (Some(global), Some(coll)) = (global, coll) {
210            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
211        } else {
212            EngineSyncAssociation::Disconnected
213        })
214    }
215
216    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
217        let db = &self.store.db.lock().unwrap();
218        let tx = db.unchecked_transaction()?;
219        self.storage_impl.reset_storage(&tx)?;
220        // Reset the last sync time, so that the next sync fetches fresh records
221        // from the server.
222        self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
223
224        // Clear the sync ID if we're signing out, or set it to whatever the
225        // server gave us if we're signing in.
226        match assoc {
227            EngineSyncAssociation::Disconnected => {
228                self.delete_meta(&tx, GLOBAL_SYNCID_META_KEY)?;
229                self.delete_meta(&tx, COLLECTION_SYNCID_META_KEY)?;
230            }
231            EngineSyncAssociation::Connected(ids) => {
232                self.put_meta(&tx, GLOBAL_SYNCID_META_KEY, &ids.global)?;
233                self.put_meta(&tx, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
234            }
235        }
236
237        tx.commit()?;
238        Ok(())
239    }
240
241    fn wipe(&self) -> anyhow::Result<()> {
242        warn!("not implemented as there isn't a valid use case for it");
243        Ok(())
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use crate::db::credit_cards::add_internal_credit_card;
251    use crate::db::credit_cards::tests::{
252        get_all, insert_tombstone_record, test_insert_mirror_record,
253    };
254    use crate::db::models::credit_card::InternalCreditCard;
255    use crate::db::schema::create_empty_sync_temp_tables;
256    use crate::encryption::EncryptorDecryptor;
257    use crate::sync::{IncomingBso, UnknownFields};
258    use nss::ensure_initialized;
259    use sql_support::ConnExt;
260
261    impl InternalCreditCard {
262        pub fn into_test_incoming_bso(
263            self,
264            encdec: &EncryptorDecryptor,
265            unknown_fields: UnknownFields,
266        ) -> IncomingBso {
267            let mut payload = self.into_payload(encdec).expect("is json");
268            payload.entry.unknown_fields = unknown_fields;
269            IncomingBso::from_test_content(payload)
270        }
271    }
272
273    // We use the credit-card engine here.
274    fn create_engine() -> ConfigSyncEngine<InternalCreditCard> {
275        let store = crate::db::store::Store::new_memory();
276        crate::sync::credit_card::create_engine(Arc::new(store))
277    }
278
279    pub fn clear_cc_tables(conn: &Connection) -> rusqlite::Result<(), rusqlite::Error> {
280        conn.execute_all(&[
281            "DELETE FROM credit_cards_data;",
282            "DELETE FROM credit_cards_mirror;",
283            "DELETE FROM credit_cards_tombstones;",
284            "DELETE FROM moz_meta;",
285        ])
286    }
287
288    #[test]
289    fn test_credit_card_engine_apply_timestamp() -> Result<()> {
290        ensure_initialized();
291        let mut credit_card_engine = create_engine();
292        let test_key = crate::encryption::create_autofill_key().unwrap();
293        credit_card_engine
294            .set_local_encryption_key(&test_key)
295            .unwrap();
296        {
297            create_empty_sync_temp_tables(&credit_card_engine.store.db.lock().unwrap())?;
298        }
299
300        let mut telem = telemetry::Engine::new("whatever");
301        let last_sync = 24;
302        let result = credit_card_engine.apply(ServerTimestamp::from_millis(last_sync), &mut telem);
303        assert!(result.is_ok());
304
305        // check that last sync metadata was set
306        let conn = &credit_card_engine.store.db.lock().unwrap().writer;
307
308        assert_eq!(
309            credit_card_engine.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?,
310            Some(last_sync)
311        );
312
313        Ok(())
314    }
315
316    #[test]
317    fn test_credit_card_engine_get_sync_assoc() -> Result<()> {
318        ensure_initialized();
319        let credit_card_engine = create_engine();
320
321        let result = credit_card_engine.get_sync_assoc();
322        assert!(result.is_ok());
323
324        // check that we disconnect if sync IDs not found
325        assert_eq!(result.unwrap(), EngineSyncAssociation::Disconnected);
326
327        // create sync metadata
328        let global_guid = Guid::new("AAAA");
329        let coll_guid = Guid::new("AAAA");
330        let ids = CollSyncIds {
331            global: global_guid,
332            coll: coll_guid,
333        };
334        {
335            let conn = &credit_card_engine.store.db.lock().unwrap().writer;
336            credit_card_engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
337            credit_card_engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
338        }
339
340        let result = credit_card_engine.get_sync_assoc();
341        assert!(result.is_ok());
342
343        // check that we return the metadata
344        assert_eq!(result.unwrap(), EngineSyncAssociation::Connected(ids));
345        Ok(())
346    }
347
348    #[test]
349    fn test_engine_sync_reset() -> Result<()> {
350        ensure_initialized();
351        let engine = create_engine();
352        let encdec = EncryptorDecryptor::new_with_random_key().unwrap();
353
354        let cc = InternalCreditCard {
355            guid: Guid::random(),
356            cc_name: "Ms Jane Doe".to_string(),
357            cc_number_enc: encdec.encrypt("12341232412341234")?,
358            cc_number_last_4: "1234".to_string(),
359            cc_exp_month: 12,
360            cc_exp_year: 2021,
361            cc_type: "visa".to_string(),
362            ..Default::default()
363        };
364
365        {
366            // temp scope for the mutex lock.
367            let db = &engine.store.db.lock().unwrap();
368            let tx = db.writer.unchecked_transaction()?;
369            // create a normal record, a mirror record and a tombstone.
370            add_internal_credit_card(&tx, &cc)?;
371            test_insert_mirror_record(
372                &tx,
373                cc.clone()
374                    .into_test_incoming_bso(&encdec, Default::default()),
375            );
376            insert_tombstone_record(&tx, Guid::random().to_string())?;
377            tx.commit()?;
378        }
379
380        // create sync metadata
381        let global_guid = Guid::new("AAAA");
382        let coll_guid = Guid::new("AAAA");
383        let ids = CollSyncIds {
384            global: global_guid.clone(),
385            coll: coll_guid.clone(),
386        };
387        {
388            let conn = &engine.store.db.lock().unwrap().writer;
389            engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
390            engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
391        }
392
393        // call reset for sign out
394        engine
395            .reset(&EngineSyncAssociation::Disconnected)
396            .expect("should work");
397
398        {
399            let conn = &engine.store.db.lock().unwrap().writer;
400
401            // check that the mirror and tombstone tables have no records
402            assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());
403            assert!(get_all(conn, "credit_cards_tombstones".to_string())?.is_empty());
404
405            // check that the last sync time was reset to 0
406            let expected_sync_time = 0;
407            assert_eq!(
408                engine
409                    .get_meta::<i64>(conn, LAST_SYNC_META_KEY)?
410                    .unwrap_or(1),
411                expected_sync_time
412            );
413
414            // check that the meta records were deleted
415            assert!(engine
416                .get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?
417                .is_none());
418            assert!(engine
419                .get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?
420                .is_none());
421
422            clear_cc_tables(conn)?;
423
424            // re-populating the tables
425            let tx = conn.unchecked_transaction()?;
426            add_internal_credit_card(&tx, &cc)?;
427            test_insert_mirror_record(&tx, cc.into_test_incoming_bso(&encdec, Default::default()));
428            insert_tombstone_record(&tx, Guid::random().to_string())?;
429            tx.commit()?;
430        }
431
432        // call reset for sign in
433        engine
434            .reset(&EngineSyncAssociation::Connected(ids))
435            .expect("should work");
436
437        let conn = &engine.store.db.lock().unwrap().writer;
438        // check that the meta records were set
439        let retrieved_global_sync_id = engine.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?;
440        assert_eq!(
441            retrieved_global_sync_id.unwrap_or_default(),
442            global_guid.to_string()
443        );
444
445        let retrieved_coll_sync_id = engine.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?;
446        assert_eq!(
447            retrieved_coll_sync_id.unwrap_or_default(),
448            coll_guid.to_string()
449        );
450        Ok(())
451    }
452}