autofill/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
6use crate::error::*;
7use crate::Store;
8use error_support::warn;
9use rusqlite::{
10    types::{FromSql, ToSql},
11    Connection, Transaction,
12};
13use std::sync::Arc;
14use sync15::bso::{IncomingBso, OutgoingBso};
15use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
16use sync15::{telemetry, CollectionName, ServerTimestamp};
17use sync_guid::Guid;
18
19// We have 2 engines in this crate and they are identical except for stuff
20// abstracted here!
21pub struct EngineConfig {
22    pub(crate) namespace: String,          // prefix for meta keys, etc.
23    pub(crate) collection: CollectionName, // collection name on the server.
24}
25
26// meta keys, will be prefixed by the "namespace"
27pub const LAST_SYNC_META_KEY: &str = "last_sync_time";
28pub const GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
29pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
30
31// A trait to abstract the broader sync processes.
32pub trait SyncEngineStorageImpl<T> {
33    fn get_incoming_impl(
34        &self,
35        enc_key: &Option<String>,
36    ) -> Result<Box<dyn ProcessIncomingRecordImpl<Record = T>>>;
37    fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
38    fn get_outgoing_impl(
39        &self,
40        enc_key: &Option<String>,
41    ) -> Result<Box<dyn ProcessOutgoingRecordImpl<Record = T>>>;
42}
43
44// A sync engine that gets functionality from an EngineConfig.
45pub struct ConfigSyncEngine<T> {
46    pub(crate) config: EngineConfig,
47    pub(crate) store: Arc<Store>,
48    pub(crate) storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
49    local_enc_key: Option<String>,
50}
51
52impl<T> ConfigSyncEngine<T> {
53    pub fn new(
54        config: EngineConfig,
55        store: Arc<Store>,
56        storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
57    ) -> Self {
58        Self {
59            config,
60            store,
61            storage_impl,
62            local_enc_key: None,
63        }
64    }
65    fn put_meta(&self, conn: &Connection, tail: &str, value: &dyn ToSql) -> Result<()> {
66        let key = format!("{}.{}", self.config.namespace, tail);
67        crate::db::store::put_meta(conn, &key, value)
68    }
69    fn get_meta<V: FromSql>(&self, conn: &Connection, tail: &str) -> Result<Option<V>> {
70        let key = format!("{}.{}", self.config.namespace, tail);
71        crate::db::store::get_meta(conn, &key)
72    }
73    fn delete_meta(&self, conn: &Connection, tail: &str) -> Result<()> {
74        let key = format!("{}.{}", self.config.namespace, tail);
75        crate::db::store::delete_meta(conn, &key)
76    }
77    // Reset the local sync data so the next server request fetches all records.
78    pub fn reset_local_sync_data(&self) -> Result<()> {
79        let db = &self.store.db.lock().unwrap();
80        let tx = db.unchecked_transaction()?;
81        self.storage_impl.reset_storage(&tx)?;
82        self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
83        tx.commit()?;
84        Ok(())
85    }
86}
87
88impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
89    fn collection_name(&self) -> CollectionName {
90        self.config.collection.clone()
91    }
92
93    fn set_local_encryption_key(&mut self, key: &str) -> anyhow::Result<()> {
94        self.local_enc_key = Some(key.to_string());
95        Ok(())
96    }
97
98    fn prepare_for_sync(
99        &self,
100        _get_client_data: &dyn Fn() -> sync15::ClientData,
101    ) -> anyhow::Result<()> {
102        let db = &self.store.db.lock().unwrap();
103        let signal = db.begin_interrupt_scope()?;
104        crate::db::schema::create_empty_sync_temp_tables(&db.writer)?;
105        signal.err_if_interrupted()?;
106        Ok(())
107    }
108
109    fn stage_incoming(
110        &self,
111        inbound: Vec<IncomingBso>,
112        telem: &mut telemetry::Engine,
113    ) -> anyhow::Result<()> {
114        let db = &self.store.db.lock().unwrap();
115        let signal = db.begin_interrupt_scope()?;
116
117        // Stage all incoming items.
118        let mut incoming_telemetry = telemetry::EngineIncoming::new();
119        incoming_telemetry.applied(inbound.len() as u32);
120        telem.incoming(incoming_telemetry);
121        let tx = db.writer.unchecked_transaction()?;
122        let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
123
124        incoming_impl.stage_incoming(&tx, inbound, &signal)?;
125        tx.commit()?;
126        Ok(())
127    }
128
129    fn apply(
130        &self,
131        timestamp: ServerTimestamp,
132        _telem: &mut telemetry::Engine,
133    ) -> anyhow::Result<Vec<OutgoingBso>> {
134        let db = &self.store.db.lock().unwrap();
135        let signal = db.begin_interrupt_scope()?;
136        let tx = db.writer.unchecked_transaction()?;
137        let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
138        let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
139
140        // Get "states" for each record...
141        for state in incoming_impl.fetch_incoming_states(&tx)? {
142            signal.err_if_interrupted()?;
143            // Finally get a "plan" and apply it.
144            let action = plan_incoming(&*incoming_impl, &tx, state)?;
145            super::apply_incoming_action(&*incoming_impl, &tx, action)?;
146        }
147
148        // write the timestamp now, so if we are interrupted merging or
149        // creating outgoing changesets we don't need to re-download the same
150        // records.
151        self.put_meta(&tx, LAST_SYNC_META_KEY, &timestamp.as_millis())?;
152
153        incoming_impl.finish_incoming(&tx)?;
154
155        // Finally, stage outgoing items.
156        let outgoing = outgoing_impl.fetch_outgoing_records(&tx)?;
157        // we're committing now because it may take a long time to actually perform the upload
158        // and we've already staged everything we need to complete the sync in a way that
159        // doesn't require the transaction to stay alive, so we commit now and start a new
160        // transaction once complete
161        tx.commit()?;
162        Ok(outgoing)
163    }
164
165    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
166        let db = &self.store.db.lock().unwrap();
167        self.put_meta(&db.writer, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
168        let tx = db.writer.unchecked_transaction()?;
169        let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
170        outgoing_impl.finish_synced_items(&tx, ids)?;
171        tx.commit()?;
172        Ok(())
173    }
174
175    fn get_collection_request(
176        &self,
177        server_timestamp: ServerTimestamp,
178    ) -> anyhow::Result<Option<CollectionRequest>> {
179        let db = &self.store.db.lock().unwrap();
180        let since = ServerTimestamp(
181            self.get_meta::<i64>(&db.writer, LAST_SYNC_META_KEY)?
182                .unwrap_or_default(),
183        );
184        Ok(if since == server_timestamp {
185            None
186        } else {
187            Some(
188                CollectionRequest::new(self.collection_name())
189                    .full()
190                    .newer_than(since),
191            )
192        })
193    }
194
195    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
196        let db = &self.store.db.lock().unwrap();
197        let global = self.get_meta(&db.writer, GLOBAL_SYNCID_META_KEY)?;
198        let coll = self.get_meta(&db.writer, COLLECTION_SYNCID_META_KEY)?;
199        Ok(if let (Some(global), Some(coll)) = (global, coll) {
200            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
201        } else {
202            EngineSyncAssociation::Disconnected
203        })
204    }
205
206    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
207        let db = &self.store.db.lock().unwrap();
208        let tx = db.unchecked_transaction()?;
209        self.storage_impl.reset_storage(&tx)?;
210        // Reset the last sync time, so that the next sync fetches fresh records
211        // from the server.
212        self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
213
214        // Clear the sync ID if we're signing out, or set it to whatever the
215        // server gave us if we're signing in.
216        match assoc {
217            EngineSyncAssociation::Disconnected => {
218                self.delete_meta(&tx, GLOBAL_SYNCID_META_KEY)?;
219                self.delete_meta(&tx, COLLECTION_SYNCID_META_KEY)?;
220            }
221            EngineSyncAssociation::Connected(ids) => {
222                self.put_meta(&tx, GLOBAL_SYNCID_META_KEY, &ids.global)?;
223                self.put_meta(&tx, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
224            }
225        }
226
227        tx.commit()?;
228        Ok(())
229    }
230
231    fn wipe(&self) -> anyhow::Result<()> {
232        warn!("not implemented as there isn't a valid use case for it");
233        Ok(())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::db::credit_cards::add_internal_credit_card;
241    use crate::db::credit_cards::tests::{
242        get_all, insert_tombstone_record, test_insert_mirror_record,
243    };
244    use crate::db::models::credit_card::InternalCreditCard;
245    use crate::db::schema::create_empty_sync_temp_tables;
246    use crate::encryption::EncryptorDecryptor;
247    use crate::sync::{IncomingBso, UnknownFields};
248    use nss::ensure_initialized;
249    use sql_support::ConnExt;
250
251    impl InternalCreditCard {
252        pub fn into_test_incoming_bso(
253            self,
254            encdec: &EncryptorDecryptor,
255            unknown_fields: UnknownFields,
256        ) -> IncomingBso {
257            let mut payload = self.into_payload(encdec).expect("is json");
258            payload.entry.unknown_fields = unknown_fields;
259            IncomingBso::from_test_content(payload)
260        }
261    }
262
263    // We use the credit-card engine here.
264    fn create_engine() -> ConfigSyncEngine<InternalCreditCard> {
265        let store = crate::db::store::Store::new_memory();
266        crate::sync::credit_card::create_engine(Arc::new(store))
267    }
268
269    pub fn clear_cc_tables(conn: &Connection) -> rusqlite::Result<(), rusqlite::Error> {
270        conn.execute_all(&[
271            "DELETE FROM credit_cards_data;",
272            "DELETE FROM credit_cards_mirror;",
273            "DELETE FROM credit_cards_tombstones;",
274            "DELETE FROM moz_meta;",
275        ])
276    }
277
278    #[test]
279    fn test_credit_card_engine_apply_timestamp() -> Result<()> {
280        ensure_initialized();
281        let mut credit_card_engine = create_engine();
282        let test_key = crate::encryption::create_autofill_key().unwrap();
283        credit_card_engine
284            .set_local_encryption_key(&test_key)
285            .unwrap();
286        {
287            create_empty_sync_temp_tables(&credit_card_engine.store.db.lock().unwrap())?;
288        }
289
290        let mut telem = telemetry::Engine::new("whatever");
291        let last_sync = 24;
292        let result = credit_card_engine.apply(ServerTimestamp::from_millis(last_sync), &mut telem);
293        assert!(result.is_ok());
294
295        // check that last sync metadata was set
296        let conn = &credit_card_engine.store.db.lock().unwrap().writer;
297
298        assert_eq!(
299            credit_card_engine.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?,
300            Some(last_sync)
301        );
302
303        Ok(())
304    }
305
306    #[test]
307    fn test_credit_card_engine_get_sync_assoc() -> Result<()> {
308        ensure_initialized();
309        let credit_card_engine = create_engine();
310
311        let result = credit_card_engine.get_sync_assoc();
312        assert!(result.is_ok());
313
314        // check that we disconnect if sync IDs not found
315        assert_eq!(result.unwrap(), EngineSyncAssociation::Disconnected);
316
317        // create sync metadata
318        let global_guid = Guid::new("AAAA");
319        let coll_guid = Guid::new("AAAA");
320        let ids = CollSyncIds {
321            global: global_guid,
322            coll: coll_guid,
323        };
324        {
325            let conn = &credit_card_engine.store.db.lock().unwrap().writer;
326            credit_card_engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
327            credit_card_engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
328        }
329
330        let result = credit_card_engine.get_sync_assoc();
331        assert!(result.is_ok());
332
333        // check that we return the metadata
334        assert_eq!(result.unwrap(), EngineSyncAssociation::Connected(ids));
335        Ok(())
336    }
337
338    #[test]
339    fn test_engine_sync_reset() -> Result<()> {
340        ensure_initialized();
341        let engine = create_engine();
342        let encdec = EncryptorDecryptor::new_with_random_key().unwrap();
343
344        let cc = InternalCreditCard {
345            guid: Guid::random(),
346            cc_name: "Ms Jane Doe".to_string(),
347            cc_number_enc: encdec.encrypt("12341232412341234")?,
348            cc_number_last_4: "1234".to_string(),
349            cc_exp_month: 12,
350            cc_exp_year: 2021,
351            cc_type: "visa".to_string(),
352            ..Default::default()
353        };
354
355        {
356            // temp scope for the mutex lock.
357            let db = &engine.store.db.lock().unwrap();
358            let tx = db.writer.unchecked_transaction()?;
359            // create a normal record, a mirror record and a tombstone.
360            add_internal_credit_card(&tx, &cc)?;
361            test_insert_mirror_record(
362                &tx,
363                cc.clone()
364                    .into_test_incoming_bso(&encdec, Default::default()),
365            );
366            insert_tombstone_record(&tx, Guid::random().to_string())?;
367            tx.commit()?;
368        }
369
370        // create sync metadata
371        let global_guid = Guid::new("AAAA");
372        let coll_guid = Guid::new("AAAA");
373        let ids = CollSyncIds {
374            global: global_guid.clone(),
375            coll: coll_guid.clone(),
376        };
377        {
378            let conn = &engine.store.db.lock().unwrap().writer;
379            engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
380            engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
381        }
382
383        // call reset for sign out
384        engine
385            .reset(&EngineSyncAssociation::Disconnected)
386            .expect("should work");
387
388        {
389            let conn = &engine.store.db.lock().unwrap().writer;
390
391            // check that the mirror and tombstone tables have no records
392            assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());
393            assert!(get_all(conn, "credit_cards_tombstones".to_string())?.is_empty());
394
395            // check that the last sync time was reset to 0
396            let expected_sync_time = 0;
397            assert_eq!(
398                engine
399                    .get_meta::<i64>(conn, LAST_SYNC_META_KEY)?
400                    .unwrap_or(1),
401                expected_sync_time
402            );
403
404            // check that the meta records were deleted
405            assert!(engine
406                .get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?
407                .is_none());
408            assert!(engine
409                .get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?
410                .is_none());
411
412            clear_cc_tables(conn)?;
413
414            // re-populating the tables
415            let tx = conn.unchecked_transaction()?;
416            add_internal_credit_card(&tx, &cc)?;
417            test_insert_mirror_record(&tx, cc.into_test_incoming_bso(&encdec, Default::default()));
418            insert_tombstone_record(&tx, Guid::random().to_string())?;
419            tx.commit()?;
420        }
421
422        // call reset for sign in
423        engine
424            .reset(&EngineSyncAssociation::Connected(ids))
425            .expect("should work");
426
427        let conn = &engine.store.db.lock().unwrap().writer;
428        // check that the meta records were set
429        let retrieved_global_sync_id = engine.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?;
430        assert_eq!(
431            retrieved_global_sync_id.unwrap_or_default(),
432            global_guid.to_string()
433        );
434
435        let retrieved_coll_sync_id = engine.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?;
436        assert_eq!(
437            retrieved_coll_sync_id.unwrap_or_default(),
438            coll_guid.to_string()
439        );
440        Ok(())
441    }
442}