logins/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::merge::{LocalLogin, MirrorLogin, SyncLoginData};
6use super::update_plan::UpdatePlan;
7use super::SyncStatus;
8use crate::db::CLONE_ENTIRE_MIRROR_SQL;
9use crate::encryption::EncryptorDecryptor;
10use crate::error::*;
11use crate::login::EncryptedLogin;
12use crate::schema;
13use crate::util;
14use crate::LoginDb;
15use crate::LoginStore;
16use interrupt_support::SqlInterruptScope;
17use rusqlite::named_params;
18use sql_support::ConnExt;
19use std::cell::RefCell;
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::time::{Duration, UNIX_EPOCH};
23use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
24use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
25use sync15::{telemetry, ServerTimestamp};
26use sync_guid::Guid;
27
28// The sync engine.
29pub struct LoginsSyncEngine {
30    pub store: Arc<LoginStore>,
31    pub scope: SqlInterruptScope,
32    pub encdec: Arc<dyn EncryptorDecryptor>,
33    pub staged: RefCell<Vec<IncomingBso>>,
34}
35
36impl LoginsSyncEngine {
37    pub fn new(store: Arc<LoginStore>) -> Result<Self> {
38        let db = store.lock_db()?;
39        let scope = db.begin_interrupt_scope()?;
40        let encdec = db.encdec.clone();
41        drop(db);
42        Ok(Self {
43            store,
44            encdec,
45            scope,
46            staged: RefCell::new(vec![]),
47        })
48    }
49
50    fn reconcile(
51        &self,
52        records: Vec<SyncLoginData>,
53        server_now: ServerTimestamp,
54        telem: &mut telemetry::EngineIncoming,
55    ) -> Result<UpdatePlan> {
56        let mut plan = UpdatePlan::default();
57
58        for mut record in records {
59            self.scope.err_if_interrupted()?;
60            debug!("Processing remote change {}", record.guid());
61            let upstream = if let Some(inbound) = record.inbound.take() {
62                inbound
63            } else {
64                debug!("Processing inbound deletion (always prefer)");
65                plan.plan_delete(record.guid.clone());
66                continue;
67            };
68            let upstream_time = record.inbound_ts;
69            match (record.mirror.take(), record.local.take()) {
70                (Some(mirror), Some(local)) => {
71                    debug!("  Conflict between remote and local, Resolving with 3WM");
72                    plan.plan_three_way_merge(
73                        local,
74                        mirror,
75                        upstream,
76                        upstream_time,
77                        server_now,
78                        self.encdec.as_ref(),
79                    )?;
80                    telem.reconciled(1);
81                }
82                (Some(_mirror), None) => {
83                    debug!("  Forwarding mirror to remote");
84                    plan.plan_mirror_update(upstream, upstream_time);
85                    telem.applied(1);
86                }
87                (None, Some(local)) => {
88                    debug!("  Conflicting record without shared parent,  Resolving with 2WM");
89                    plan.plan_two_way_merge(local, (upstream, upstream_time));
90                    telem.reconciled(1);
91                }
92                (None, None) => {
93                    if let Some(dupe) = self.find_dupe_login(&upstream.login)? {
94                        debug!(
95                            "  Incoming recordĀ {} was is a dupe of local record {}",
96                            upstream.guid(),
97                            dupe.guid()
98                        );
99                        let local_modified = UNIX_EPOCH
100                            + Duration::from_millis(dupe.meta.time_password_changed as u64);
101                        let local = LocalLogin::Alive {
102                            login: dupe,
103                            local_modified,
104                        };
105                        plan.plan_two_way_merge(local, (upstream, upstream_time));
106                    } else {
107                        debug!("  No dupe found, inserting into mirror");
108                        plan.plan_mirror_insert(upstream, upstream_time, false);
109                    }
110                    telem.applied(1);
111                }
112            }
113        }
114        Ok(plan)
115    }
116
117    fn execute_plan(&self, plan: UpdatePlan) -> Result<()> {
118        // Because rusqlite want a mutable reference to create a transaction
119        // (as a way to save us from ourselves), we side-step that by creating
120        // it manually.
121        let db = self.store.lock_db()?;
122        let tx = db.unchecked_transaction()?;
123        plan.execute(&tx, &self.scope)?;
124        tx.commit()?;
125        Ok(())
126    }
127
128    // Fetch all the data for the provided IDs.
129    // TODO: Might be better taking a fn instead of returning all of it... But that func will likely
130    // want to insert stuff while we're doing this so ugh.
131    fn fetch_login_data(
132        &self,
133        records: Vec<IncomingBso>,
134        telem: &mut telemetry::EngineIncoming,
135    ) -> Result<Vec<SyncLoginData>> {
136        let mut sync_data = Vec::with_capacity(records.len());
137        {
138            let mut seen_ids: HashSet<Guid> = HashSet::with_capacity(records.len());
139            for incoming in records.into_iter() {
140                let id = incoming.envelope.id.clone();
141                match SyncLoginData::from_bso(incoming, self.encdec.as_ref()) {
142                    Ok(v) => sync_data.push(v),
143                    Err(e) => {
144                        match e {
145                            // This is a known error with Desktop logins (see #5233), just log it
146                            // rather than reporting to sentry
147                            Error::InvalidLogin(InvalidLogin::IllegalOrigin) => {
148                                warn!("logins-deserialize-error: {e}");
149                            }
150                            // For all other errors, report them to Sentry
151                            _ => {
152                                report_error!(
153                                    "logins-deserialize-error",
154                                    "Failed to deserialize record {:?}: {e}",
155                                    id
156                                );
157                            }
158                        };
159                        // Ideally we'd track new_failed, but it's unclear how
160                        // much value it has.
161                        telem.failed(1);
162                    }
163                }
164                seen_ids.insert(id);
165            }
166        }
167        self.scope.err_if_interrupted()?;
168
169        sql_support::each_chunk(
170            &sync_data
171                .iter()
172                .map(|s| s.guid.as_str().to_string())
173                .collect::<Vec<String>>(),
174            |chunk, offset| -> Result<()> {
175                // pairs the bound parameter for the guid with an integer index.
176                let values_with_idx = sql_support::repeat_display(chunk.len(), ",", |i, f| {
177                    write!(f, "({},?)", i + offset)
178                });
179                let query = format!(
180                    "WITH to_fetch(guid_idx, fetch_guid) AS (VALUES {vals})
181                     SELECT
182                         {common_cols},
183                         is_overridden,
184                         server_modified,
185                         NULL as local_modified,
186                         NULL as is_deleted,
187                         NULL as sync_status,
188                         1 as is_mirror,
189                         to_fetch.guid_idx as guid_idx
190                     FROM loginsM
191                     JOIN to_fetch
192                         ON loginsM.guid = to_fetch.fetch_guid
193
194                     UNION ALL
195
196                     SELECT
197                         {common_cols},
198                         NULL as is_overridden,
199                         NULL as server_modified,
200                         local_modified,
201                         is_deleted,
202                         sync_status,
203                         0 as is_mirror,
204                         to_fetch.guid_idx as guid_idx
205                     FROM loginsL
206                     JOIN to_fetch
207                         ON loginsL.guid = to_fetch.fetch_guid",
208                    // give each VALUES item 2 entries, an index and the parameter.
209                    vals = values_with_idx,
210                    common_cols = schema::COMMON_COLS,
211                );
212
213                let db = &self.store.lock_db()?;
214                let mut stmt = db.prepare(&query)?;
215
216                let rows = stmt.query_and_then(rusqlite::params_from_iter(chunk), |row| {
217                    let guid_idx_i = row.get::<_, i64>("guid_idx")?;
218                    // Hitting this means our math is wrong...
219                    assert!(guid_idx_i >= 0);
220
221                    let guid_idx = guid_idx_i as usize;
222                    let is_mirror: bool = row.get("is_mirror")?;
223                    if is_mirror {
224                        sync_data[guid_idx].set_mirror(MirrorLogin::from_row(row)?)?;
225                    } else {
226                        sync_data[guid_idx].set_local(LocalLogin::from_row(row)?)?;
227                    }
228                    self.scope.err_if_interrupted()?;
229                    Ok(())
230                })?;
231                // `rows` is an Iterator<Item = Result<()>>, so we need to collect to handle the errors.
232                rows.collect::<Result<()>>()?;
233                Ok(())
234            },
235        )?;
236        Ok(sync_data)
237    }
238
239    fn fetch_outgoing(&self) -> Result<Vec<OutgoingBso>> {
240        // Taken from iOS. Arbitrarily large, so that clients that want to
241        // process deletions first can; for us it doesn't matter.
242        const TOMBSTONE_SORTINDEX: i32 = 5_000_000;
243        const DEFAULT_SORTINDEX: i32 = 1;
244        let db = self.store.lock_db()?;
245        let mut stmt = db.prepare_cached(&format!(
246            "SELECT L.*, M.enc_unknown_fields
247             FROM loginsL L LEFT JOIN loginsM M ON L.guid = M.guid
248             WHERE sync_status IS NOT {synced}",
249            synced = SyncStatus::Synced as u8
250        ))?;
251        let bsos = stmt.query_and_then([], |row| {
252            self.scope.err_if_interrupted()?;
253            Ok(if row.get::<_, bool>("is_deleted")? {
254                let envelope = OutgoingEnvelope {
255                    id: row.get::<_, String>("guid")?.into(),
256                    sortindex: Some(TOMBSTONE_SORTINDEX),
257                    ..Default::default()
258                };
259                OutgoingBso::new_tombstone(envelope)
260            } else {
261                let unknown = row.get::<_, Option<String>>("enc_unknown_fields")?;
262                let mut bso =
263                    EncryptedLogin::from_row(row)?.into_bso(self.encdec.as_ref(), unknown)?;
264                bso.envelope.sortindex = Some(DEFAULT_SORTINDEX);
265                bso
266            })
267        })?;
268        bsos.collect::<Result<_>>()
269    }
270
271    fn do_apply_incoming(
272        &self,
273        inbound: Vec<IncomingBso>,
274        timestamp: ServerTimestamp,
275        telem: &mut telemetry::Engine,
276    ) -> Result<Vec<OutgoingBso>> {
277        let mut incoming_telemetry = telemetry::EngineIncoming::new();
278        let data = self.fetch_login_data(inbound, &mut incoming_telemetry)?;
279        let plan = {
280            let result = self.reconcile(data, timestamp, &mut incoming_telemetry);
281            telem.incoming(incoming_telemetry);
282            result
283        }?;
284        self.execute_plan(plan)?;
285        self.fetch_outgoing()
286    }
287
288    // Note this receives the db to prevent a deadlock
289    pub fn set_last_sync(&self, db: &LoginDb, last_sync: ServerTimestamp) -> Result<()> {
290        debug!("Updating last sync to {}", last_sync);
291        let last_sync_millis = last_sync.as_millis();
292        db.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)
293    }
294
295    fn get_last_sync(&self, db: &LoginDb) -> Result<Option<ServerTimestamp>> {
296        let millis = db.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?.unwrap();
297        Ok(Some(ServerTimestamp(millis)))
298    }
299
300    pub fn set_global_state(&self, state: &Option<String>) -> Result<()> {
301        let to_write = match state {
302            Some(ref s) => s,
303            None => "",
304        };
305        let db = self.store.lock_db()?;
306        db.put_meta(schema::GLOBAL_STATE_META_KEY, &to_write)
307    }
308
309    pub fn get_global_state(&self) -> Result<Option<String>> {
310        let db = self.store.lock_db()?;
311        db.get_meta::<String>(schema::GLOBAL_STATE_META_KEY)
312    }
313
314    fn mark_as_synchronized(&self, guids: &[&str], ts: ServerTimestamp) -> Result<()> {
315        let db = self.store.lock_db()?;
316        let tx = db.unchecked_transaction()?;
317        sql_support::each_chunk(guids, |chunk, _| -> Result<()> {
318            db.execute(
319                &format!(
320                    "DELETE FROM loginsM WHERE guid IN ({vars})",
321                    vars = sql_support::repeat_sql_vars(chunk.len())
322                ),
323                rusqlite::params_from_iter(chunk),
324            )?;
325            self.scope.err_if_interrupted()?;
326
327            db.execute(
328                &format!(
329                    "INSERT OR IGNORE INTO loginsM (
330                         {common_cols}, is_overridden, server_modified
331                     )
332                     SELECT {common_cols}, 0, {modified_ms_i64}
333                     FROM loginsL
334                     WHERE is_deleted = 0 AND guid IN ({vars})",
335                    common_cols = schema::COMMON_COLS,
336                    modified_ms_i64 = ts.as_millis(),
337                    vars = sql_support::repeat_sql_vars(chunk.len())
338                ),
339                rusqlite::params_from_iter(chunk),
340            )?;
341            self.scope.err_if_interrupted()?;
342
343            db.execute(
344                &format!(
345                    "DELETE FROM loginsL WHERE guid IN ({vars})",
346                    vars = sql_support::repeat_sql_vars(chunk.len())
347                ),
348                rusqlite::params_from_iter(chunk),
349            )?;
350            self.scope.err_if_interrupted()?;
351            Ok(())
352        })?;
353        self.set_last_sync(&db, ts)?;
354        tx.commit()?;
355        Ok(())
356    }
357
358    // This exists here as a public function so the store can call it. Ideally
359    // the store would not do that :) Then it can go back into the sync trait
360    // and return an anyhow::Result
361    pub fn do_reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
362        info!("Executing reset on password engine!");
363        let db = self.store.lock_db()?;
364        let tx = db.unchecked_transaction()?;
365        db.execute_all(&[
366            &CLONE_ENTIRE_MIRROR_SQL,
367            "DELETE FROM loginsM",
368            &format!("UPDATE loginsL SET sync_status = {}", SyncStatus::New as u8),
369        ])?;
370        self.set_last_sync(&db, ServerTimestamp(0))?;
371        match assoc {
372            EngineSyncAssociation::Disconnected => {
373                db.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
374                db.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
375            }
376            EngineSyncAssociation::Connected(ids) => {
377                db.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global)?;
378                db.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll)?;
379            }
380        };
381        db.delete_meta(schema::GLOBAL_STATE_META_KEY)?;
382        tx.commit()?;
383        Ok(())
384    }
385
386    // It would be nice if this were a batch-ish api (e.g. takes a slice of records and finds dupes
387    // for each one if they exist)... I can't think of how to write that query, though.
388    // This is subtly different from dupe handling by the main API and maybe
389    // could be consolidated, but for now it remains sync specific.
390    pub(crate) fn find_dupe_login(&self, l: &EncryptedLogin) -> Result<Option<EncryptedLogin>> {
391        let form_submit_host_port = l
392            .fields
393            .form_action_origin
394            .as_ref()
395            .and_then(|s| util::url_host_port(s));
396        let enc_fields = l.decrypt_fields(self.encdec.as_ref())?;
397        let args = named_params! {
398            ":origin": l.fields.origin,
399            ":http_realm": l.fields.http_realm,
400            ":form_submit": form_submit_host_port,
401        };
402        let mut query = format!(
403            "SELECT {common}
404             FROM loginsL
405             WHERE origin IS :origin
406               AND httpRealm IS :http_realm",
407            common = schema::COMMON_COLS,
408        );
409        if form_submit_host_port.is_some() {
410            // Stolen from iOS
411            query += " AND (formActionOrigin = '' OR (instr(formActionOrigin, :form_submit) > 0))";
412        } else {
413            query += " AND formActionOrigin IS :form_submit"
414        }
415        let db = self.store.lock_db()?;
416        let mut stmt = db.prepare_cached(&query)?;
417        for login in stmt
418            .query_and_then(args, EncryptedLogin::from_row)?
419            .collect::<Result<Vec<EncryptedLogin>>>()?
420        {
421            let this_enc_fields = login.decrypt_fields(self.encdec.as_ref())?;
422            if enc_fields.username == this_enc_fields.username {
423                return Ok(Some(login));
424            }
425        }
426        Ok(None)
427    }
428}
429
430impl SyncEngine for LoginsSyncEngine {
431    fn collection_name(&self) -> std::borrow::Cow<'static, str> {
432        "passwords".into()
433    }
434
435    fn stage_incoming(
436        &self,
437        mut inbound: Vec<IncomingBso>,
438        _telem: &mut telemetry::Engine,
439    ) -> anyhow::Result<()> {
440        // We don't have cross-item dependencies like bookmarks does, so we can
441        // just apply now instead of "staging"
442        self.staged.borrow_mut().append(&mut inbound);
443        Ok(())
444    }
445
446    fn apply(
447        &self,
448        timestamp: ServerTimestamp,
449        telem: &mut telemetry::Engine,
450    ) -> anyhow::Result<Vec<OutgoingBso>> {
451        let inbound = (*self.staged.borrow_mut()).drain(..).collect();
452        Ok(self.do_apply_incoming(inbound, timestamp, telem)?)
453    }
454
455    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
456        Ok(self.mark_as_synchronized(
457            &ids.iter().map(Guid::as_str).collect::<Vec<_>>(),
458            new_timestamp,
459        )?)
460    }
461
462    fn get_collection_request(
463        &self,
464        server_timestamp: ServerTimestamp,
465    ) -> anyhow::Result<Option<CollectionRequest>> {
466        let db = self.store.lock_db()?;
467        let since = self.get_last_sync(&db)?.unwrap_or_default();
468        Ok(if since == server_timestamp {
469            None
470        } else {
471            Some(
472                CollectionRequest::new("passwords".into())
473                    .full()
474                    .newer_than(since),
475            )
476        })
477    }
478
479    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
480        let db = self.store.lock_db()?;
481        let global = db.get_meta(schema::GLOBAL_SYNCID_META_KEY)?;
482        let coll = db.get_meta(schema::COLLECTION_SYNCID_META_KEY)?;
483        Ok(if let (Some(global), Some(coll)) = (global, coll) {
484            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
485        } else {
486            EngineSyncAssociation::Disconnected
487        })
488    }
489
490    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
491        self.do_reset(assoc)?;
492        Ok(())
493    }
494}
495
496#[cfg(not(feature = "keydb"))]
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::db::test_utils::insert_login;
501    use crate::encryption::test_utils::TEST_ENCDEC;
502    use crate::login::test_utils::enc_login;
503    use crate::{LoginEntry, LoginFields, LoginMeta, SecureLoginFields};
504    use nss::ensure_initialized;
505    use std::collections::HashMap;
506    use std::sync::Arc;
507
508    // Wrap sync functions for easier testing
509    fn run_fetch_login_data(
510        engine: &mut LoginsSyncEngine,
511        records: Vec<IncomingBso>,
512    ) -> (Vec<SyncLoginData>, telemetry::EngineIncoming) {
513        let mut telem = sync15::telemetry::EngineIncoming::new();
514        (engine.fetch_login_data(records, &mut telem).unwrap(), telem)
515    }
516
517    fn run_fetch_outgoing(store: LoginStore) -> Vec<OutgoingBso> {
518        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
519        engine.fetch_outgoing().unwrap()
520    }
521
522    #[test]
523    fn test_fetch_login_data() {
524        ensure_initialized();
525        // Test some common cases with fetch_login data
526        let store = LoginStore::new_in_memory();
527        insert_login(
528            &store.lock_db().unwrap(),
529            "updated_remotely",
530            None,
531            Some("password"),
532        );
533        insert_login(
534            &store.lock_db().unwrap(),
535            "deleted_remotely",
536            None,
537            Some("password"),
538        );
539        insert_login(
540            &store.lock_db().unwrap(),
541            "three_way_merge",
542            Some("new-local-password"),
543            Some("password"),
544        );
545
546        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
547
548        let (res, _) = run_fetch_login_data(
549            &mut engine,
550            vec![
551                IncomingBso::new_test_tombstone(Guid::new("deleted_remotely")),
552                enc_login("added_remotely", "password")
553                    .into_bso(&*TEST_ENCDEC, None)
554                    .unwrap()
555                    .to_test_incoming(),
556                enc_login("updated_remotely", "new-password")
557                    .into_bso(&*TEST_ENCDEC, None)
558                    .unwrap()
559                    .to_test_incoming(),
560                enc_login("three_way_merge", "new-remote-password")
561                    .into_bso(&*TEST_ENCDEC, None)
562                    .unwrap()
563                    .to_test_incoming(),
564            ],
565        );
566        // For simpler testing, extract/decrypt passwords and put them in a hash map
567        #[derive(Debug, PartialEq)]
568        struct SyncPasswords {
569            local: Option<String>,
570            mirror: Option<String>,
571            inbound: Option<String>,
572        }
573        let extracted_passwords: HashMap<String, SyncPasswords> = res
574            .into_iter()
575            .map(|sync_login_data| {
576                let mut guids_seen = HashSet::new();
577                let passwords = SyncPasswords {
578                    local: sync_login_data.local.map(|local_login| {
579                        guids_seen.insert(local_login.guid_str().to_string());
580                        let LocalLogin::Alive { login, .. } = local_login else {
581                            unreachable!("this test is not expecting a tombstone");
582                        };
583                        login.decrypt_fields(&*TEST_ENCDEC).unwrap().password
584                    }),
585                    mirror: sync_login_data.mirror.map(|mirror_login| {
586                        guids_seen.insert(mirror_login.login.meta.id.clone());
587                        mirror_login
588                            .login
589                            .decrypt_fields(&*TEST_ENCDEC)
590                            .unwrap()
591                            .password
592                    }),
593                    inbound: sync_login_data.inbound.map(|incoming| {
594                        guids_seen.insert(incoming.login.meta.id.clone());
595                        incoming
596                            .login
597                            .decrypt_fields(&*TEST_ENCDEC)
598                            .unwrap()
599                            .password
600                    }),
601                };
602                (guids_seen.into_iter().next().unwrap(), passwords)
603            })
604            .collect();
605
606        assert_eq!(extracted_passwords.len(), 4);
607        assert_eq!(
608            extracted_passwords.get("added_remotely").unwrap(),
609            &SyncPasswords {
610                local: None,
611                mirror: None,
612                inbound: Some("password".into()),
613            }
614        );
615        assert_eq!(
616            extracted_passwords.get("updated_remotely").unwrap(),
617            &SyncPasswords {
618                local: None,
619                mirror: Some("password".into()),
620                inbound: Some("new-password".into()),
621            }
622        );
623        assert_eq!(
624            extracted_passwords.get("deleted_remotely").unwrap(),
625            &SyncPasswords {
626                local: None,
627                mirror: Some("password".into()),
628                inbound: None,
629            }
630        );
631        assert_eq!(
632            extracted_passwords.get("three_way_merge").unwrap(),
633            &SyncPasswords {
634                local: Some("new-local-password".into()),
635                mirror: Some("password".into()),
636                inbound: Some("new-remote-password".into()),
637            }
638        );
639    }
640
641    #[test]
642    fn test_sync_local_delete() {
643        ensure_initialized();
644        let store = LoginStore::new_in_memory();
645        insert_login(
646            &store.lock_db().unwrap(),
647            "local-deleted",
648            Some("password"),
649            None,
650        );
651        store.lock_db().unwrap().delete("local-deleted").unwrap();
652        let changeset = run_fetch_outgoing(store);
653        let changes: HashMap<String, serde_json::Value> = changeset
654            .into_iter()
655            .map(|b| {
656                (
657                    b.envelope.id.to_string(),
658                    serde_json::from_str(&b.payload).unwrap(),
659                )
660            })
661            .collect();
662        assert_eq!(changes.len(), 1);
663        assert!(changes["local-deleted"].get("deleted").is_some());
664
665        // hmmm. In theory, we do not need to sync a local-only deletion
666    }
667
668    #[test]
669    fn test_sync_local_readd() {
670        ensure_initialized();
671        let store = LoginStore::new_in_memory();
672        insert_login(
673            &store.lock_db().unwrap(),
674            "local-readded",
675            Some("password"),
676            None,
677        );
678        store.lock_db().unwrap().delete("local-readded").unwrap();
679        insert_login(
680            &store.lock_db().unwrap(),
681            "local-readded",
682            Some("password"),
683            None,
684        );
685        let changeset = run_fetch_outgoing(store);
686        let changes: HashMap<String, serde_json::Value> = changeset
687            .into_iter()
688            .map(|b| {
689                (
690                    b.envelope.id.to_string(),
691                    serde_json::from_str(&b.payload).unwrap(),
692                )
693            })
694            .collect();
695        assert_eq!(changes.len(), 1);
696        assert_eq!(
697            changes["local-readded"].get("password").unwrap(),
698            "password"
699        );
700    }
701
702    #[test]
703    fn test_sync_local_readd_of_remote_deletion() {
704        ensure_initialized();
705        let other_store = LoginStore::new_in_memory();
706        let mut engine = LoginsSyncEngine::new(Arc::new(other_store)).unwrap();
707        let (_res, _telem) = run_fetch_login_data(
708            &mut engine,
709            vec![IncomingBso::new_test_tombstone(Guid::new("remote-readded"))],
710        );
711
712        let store = LoginStore::new_in_memory();
713        insert_login(
714            &store.lock_db().unwrap(),
715            "remote-readded",
716            Some("password"),
717            None,
718        );
719        let changeset = run_fetch_outgoing(store);
720        let changes: HashMap<String, serde_json::Value> = changeset
721            .into_iter()
722            .map(|b| {
723                (
724                    b.envelope.id.to_string(),
725                    serde_json::from_str(&b.payload).unwrap(),
726                )
727            })
728            .collect();
729        assert_eq!(changes.len(), 1);
730        assert_eq!(
731            changes["remote-readded"].get("password").unwrap(),
732            "password"
733        );
734    }
735
736    #[test]
737    fn test_sync_local_readd_redelete_of_remote_login() {
738        ensure_initialized();
739        let other_store = LoginStore::new_in_memory();
740        let mut engine = LoginsSyncEngine::new(Arc::new(other_store)).unwrap();
741        let (_res, _telem) = run_fetch_login_data(
742            &mut engine,
743            vec![IncomingBso::from_test_content(serde_json::json!({
744                "id": "remote-readded-redeleted",
745                "formSubmitURL": "https://www.example.com/submit",
746                "hostname": "https://www.example.com",
747                "username": "test",
748                "password": "test",
749            }))],
750        );
751
752        let store = LoginStore::new_in_memory();
753        store
754            .lock_db()
755            .unwrap()
756            .delete("remote-readded-redeleted")
757            .unwrap();
758        insert_login(
759            &store.lock_db().unwrap(),
760            "remote-readded-redeleted",
761            Some("password"),
762            None,
763        );
764        store
765            .lock_db()
766            .unwrap()
767            .delete("remote-readded-redeleted")
768            .unwrap();
769        let changeset = run_fetch_outgoing(store);
770        let changes: HashMap<String, serde_json::Value> = changeset
771            .into_iter()
772            .map(|b| {
773                (
774                    b.envelope.id.to_string(),
775                    serde_json::from_str(&b.payload).unwrap(),
776                )
777            })
778            .collect();
779        assert_eq!(changes.len(), 1);
780        assert!(changes["remote-readded-redeleted"].get("deleted").is_some());
781    }
782
783    #[test]
784    fn test_fetch_outgoing() {
785        ensure_initialized();
786        let store = LoginStore::new_in_memory();
787        insert_login(
788            &store.lock_db().unwrap(),
789            "changed",
790            Some("new-password"),
791            Some("password"),
792        );
793        insert_login(
794            &store.lock_db().unwrap(),
795            "unchanged",
796            None,
797            Some("password"),
798        );
799        insert_login(&store.lock_db().unwrap(), "added", Some("password"), None);
800        insert_login(&store.lock_db().unwrap(), "deleted", None, Some("password"));
801        store.lock_db().unwrap().delete("deleted").unwrap();
802
803        let changeset = run_fetch_outgoing(store);
804        let changes: HashMap<String, serde_json::Value> = changeset
805            .into_iter()
806            .map(|b| {
807                (
808                    b.envelope.id.to_string(),
809                    serde_json::from_str(&b.payload).unwrap(),
810                )
811            })
812            .collect();
813        assert_eq!(changes.len(), 3);
814        assert_eq!(changes["added"].get("password").unwrap(), "password");
815        assert_eq!(changes["changed"].get("password").unwrap(), "new-password");
816        assert!(changes["deleted"].get("deleted").is_some());
817        assert!(changes["added"].get("deleted").is_none());
818        assert!(changes["changed"].get("deleted").is_none());
819    }
820
821    #[test]
822    fn test_bad_record() {
823        ensure_initialized();
824        let store = LoginStore::new_in_memory();
825        let test_ids = ["dummy_000001", "dummy_000002", "dummy_000003"];
826        for id in test_ids {
827            insert_login(
828                &store.lock_db().unwrap(),
829                id,
830                Some("password"),
831                Some("password"),
832            );
833        }
834        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
835        engine
836            .mark_as_synchronized(&test_ids, ServerTimestamp::from_millis(100))
837            .unwrap();
838        let (res, telem) = run_fetch_login_data(
839            &mut engine,
840            vec![
841                IncomingBso::new_test_tombstone(Guid::new("dummy_000001")),
842                // invalid
843                IncomingBso::from_test_content(serde_json::json!({
844                    "id": "dummy_000002",
845                    "garbage": "data",
846                    "etc": "not a login"
847                })),
848                // valid
849                IncomingBso::from_test_content(serde_json::json!({
850                    "id": "dummy_000003",
851                    "formSubmitURL": "https://www.example.com/submit",
852                    "hostname": "https://www.example.com",
853                    "username": "test",
854                    "password": "test",
855                })),
856            ],
857        );
858        assert_eq!(telem.get_failed(), 1);
859        assert_eq!(res.len(), 2);
860        assert_eq!(res[0].guid, "dummy_000001");
861        assert_eq!(res[1].guid, "dummy_000003");
862        assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
863    }
864
865    fn make_enc_login(
866        username: &str,
867        password: &str,
868        fao: Option<String>,
869        realm: Option<String>,
870    ) -> EncryptedLogin {
871        ensure_initialized();
872        let id = Guid::random().to_string();
873        let sec_fields = SecureLoginFields {
874            username: username.into(),
875            password: password.into(),
876        }
877        .encrypt(&*TEST_ENCDEC, &id)
878        .unwrap();
879        EncryptedLogin {
880            meta: LoginMeta {
881                id,
882                ..Default::default()
883            },
884            fields: LoginFields {
885                form_action_origin: fao,
886                http_realm: realm,
887                origin: "http://not-relevant-here.com".into(),
888                ..Default::default()
889            },
890            sec_fields,
891        }
892    }
893
894    #[test]
895    fn find_dupe_login() {
896        ensure_initialized();
897        let store = LoginStore::new_in_memory();
898
899        let to_add = LoginEntry {
900            form_action_origin: Some("https://www.example.com".into()),
901            origin: "http://not-relevant-here.com".into(),
902            username: "test".into(),
903            password: "test".into(),
904            ..Default::default()
905        };
906        let first_id = store.add(to_add).expect("should insert first").id;
907
908        let to_add = LoginEntry {
909            form_action_origin: Some("https://www.example1.com".into()),
910            origin: "http://not-relevant-here.com".into(),
911            username: "test1".into(),
912            password: "test1".into(),
913            ..Default::default()
914        };
915        let second_id = store.add(to_add).expect("should insert second").id;
916
917        let to_add = LoginEntry {
918            http_realm: Some("http://some-realm.com".into()),
919            origin: "http://not-relevant-here.com".into(),
920            username: "test1".into(),
921            password: "test1".into(),
922            ..Default::default()
923        };
924        let no_form_origin_id = store.add(to_add).expect("should insert second").id;
925
926        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
927
928        let to_find = make_enc_login("test", "test", Some("https://www.example.com".into()), None);
929        assert_eq!(
930            engine
931                .find_dupe_login(&to_find)
932                .expect("should work")
933                .expect("should be Some()")
934                .meta
935                .id,
936            first_id
937        );
938
939        let to_find = make_enc_login(
940            "test",
941            "test",
942            Some("https://something-else.com".into()),
943            None,
944        );
945        assert!(engine
946            .find_dupe_login(&to_find)
947            .expect("should work")
948            .is_none());
949
950        let to_find = make_enc_login(
951            "test1",
952            "test1",
953            Some("https://www.example1.com".into()),
954            None,
955        );
956        assert_eq!(
957            engine
958                .find_dupe_login(&to_find)
959                .expect("should work")
960                .expect("should be Some()")
961                .meta
962                .id,
963            second_id
964        );
965
966        let to_find = make_enc_login(
967            "other",
968            "other",
969            Some("https://www.example1.com".into()),
970            None,
971        );
972        assert!(engine
973            .find_dupe_login(&to_find)
974            .expect("should work")
975            .is_none());
976
977        // no form origin.
978        let to_find = make_enc_login("test1", "test1", None, Some("http://some-realm.com".into()));
979        assert_eq!(
980            engine
981                .find_dupe_login(&to_find)
982                .expect("should work")
983                .expect("should be Some()")
984                .meta
985                .id,
986            no_form_origin_id
987        );
988    }
989
990    #[test]
991    fn test_roundtrip_unknown() {
992        ensure_initialized();
993        // A couple of helpers
994        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
995            let bso = IncomingBso::from_test_content(payload);
996            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
997            engine.stage_incoming(vec![bso], &mut telem).unwrap();
998            engine
999                .apply(ServerTimestamp::from_millis(0), &mut telem)
1000                .unwrap();
1001        }
1002
1003        fn get_outgoing_payload(engine: &LoginsSyncEngine) -> serde_json::Value {
1004            // Edit it so it's considered outgoing.
1005            engine
1006                .store
1007                .update(
1008                    "dummy_000001",
1009                    LoginEntry {
1010                        origin: "https://www.example2.com".into(),
1011                        http_realm: Some("https://www.example2.com".into()),
1012                        username: "test".into(),
1013                        password: "test".into(),
1014                        ..Default::default()
1015                    },
1016                )
1017                .unwrap();
1018            let changeset = engine.fetch_outgoing().unwrap();
1019            assert_eq!(changeset.len(), 1);
1020            serde_json::from_str::<serde_json::Value>(&changeset[0].payload).unwrap()
1021        }
1022
1023        // The test itself...
1024        let store = LoginStore::new_in_memory();
1025        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
1026
1027        apply_incoming_payload(
1028            &engine,
1029            serde_json::json!({
1030                "id": "dummy_000001",
1031                "formSubmitURL": "https://www.example.com/submit",
1032                "hostname": "https://www.example.com",
1033                "username": "test",
1034                "password": "test",
1035                "unknown1": "?",
1036                "unknown2": {"sub": "object"},
1037            }),
1038        );
1039
1040        let payload = get_outgoing_payload(&engine);
1041
1042        // The outgoing payload for our item should have the unknown fields.
1043        assert_eq!(payload.get("unknown1").unwrap().as_str().unwrap(), "?");
1044        assert_eq!(
1045            payload.get("unknown2").unwrap(),
1046            &serde_json::json!({"sub": "object"})
1047        );
1048
1049        // test mirror updates - record is already in our mirror, but now it's
1050        // incoming with different unknown fields.
1051        apply_incoming_payload(
1052            &engine,
1053            serde_json::json!({
1054                "id": "dummy_000001",
1055                "formSubmitURL": "https://www.example.com/submit",
1056                "hostname": "https://www.example.com",
1057                "username": "test",
1058                "password": "test",
1059                "unknown2": 99,
1060                "unknown3": {"something": "else"},
1061            }),
1062        );
1063        let payload = get_outgoing_payload(&engine);
1064        // old unknown values were replaced.
1065        assert!(payload.get("unknown1").is_none());
1066        assert_eq!(payload.get("unknown2").unwrap().as_u64().unwrap(), 99);
1067        assert_eq!(
1068            payload
1069                .get("unknown3")
1070                .unwrap()
1071                .as_object()
1072                .unwrap()
1073                .get("something")
1074                .unwrap()
1075                .as_str()
1076                .unwrap(),
1077            "else"
1078        );
1079    }
1080
1081    fn count(engine: &LoginsSyncEngine, table_name: &str) -> u32 {
1082        ensure_initialized();
1083        let sql = format!("SELECT COUNT(*) FROM {table_name}");
1084        engine
1085            .store
1086            .lock_db()
1087            // TODO: get rid of this unwrap
1088            .unwrap()
1089            .try_query_one(&sql, [], false)
1090            .unwrap()
1091            .unwrap()
1092    }
1093
1094    fn do_test_incoming_with_local_unmirrored_tombstone(local_newer: bool) {
1095        ensure_initialized();
1096        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
1097            let bso = IncomingBso::from_test_content(payload);
1098            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
1099            engine.stage_incoming(vec![bso], &mut telem).unwrap();
1100            engine
1101                .apply(ServerTimestamp::from_millis(0), &mut telem)
1102                .unwrap();
1103        }
1104
1105        // The test itself...
1106        let (local_timestamp, remote_timestamp) = if local_newer { (123, 0) } else { (0, 123) };
1107
1108        let store = LoginStore::new_in_memory();
1109        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
1110
1111        // apply an incoming record - will be in the mirror.
1112        apply_incoming_payload(
1113            &engine,
1114            serde_json::json!({
1115                "id": "dummy_000001",
1116                "formSubmitURL": "https://www.example.com/submit",
1117                "hostname": "https://www.example.com",
1118                "username": "test",
1119                "password": "test",
1120                "timePasswordChanged": local_timestamp,
1121                "unknown1": "?",
1122                "unknown2": {"sub": "object"},
1123            }),
1124        );
1125
1126        // Reset the engine - this wipes the mirror.
1127        engine.reset(&EngineSyncAssociation::Disconnected).unwrap();
1128        // But the local record does still exist.
1129        assert!(engine
1130            .store
1131            .get("dummy_000001")
1132            .expect("should work")
1133            .is_some());
1134
1135        // Delete the local record.
1136        engine.store.delete("dummy_000001").unwrap();
1137        assert!(engine
1138            .store
1139            .get("dummy_000001")
1140            .expect("should work")
1141            .is_none());
1142
1143        // double-check our test preconditions - should now have 1 in LoginsL and 0 in LoginsM
1144        assert_eq!(count(&engine, "LoginsL"), 1);
1145        assert_eq!(count(&engine, "LoginsM"), 0);
1146
1147        // Now we assume we've been reconnected to sync and have an incoming change for the record.
1148        apply_incoming_payload(
1149            &engine,
1150            serde_json::json!({
1151                "id": "dummy_000001",
1152                "formSubmitURL": "https://www.example.com/submit",
1153                "hostname": "https://www.example.com",
1154                "username": "test",
1155                "password": "test2",
1156                "timePasswordChanged": remote_timestamp,
1157                "unknown1": "?",
1158                "unknown2": {"sub": "object"},
1159            }),
1160        );
1161
1162        // Desktop semantics here are that a local tombstone is treated as though it doesn't exist at all.
1163        // ie, the remote record should be taken whether it is newer or older than the tombstone.
1164        assert!(engine
1165            .store
1166            .get("dummy_000001")
1167            .expect("should work")
1168            .is_some());
1169        // and there should never be an outgoing record.
1170        // XXX - but there is! But this is exceedingly rare, we
1171        // should fix it :)
1172        // assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
1173
1174        // should now be no records in loginsL and 1 in loginsM
1175        assert_eq!(count(&engine, "LoginsL"), 0);
1176        assert_eq!(count(&engine, "LoginsM"), 1);
1177    }
1178
1179    #[test]
1180    fn test_incoming_non_mirror_tombstone_local_newer() {
1181        do_test_incoming_with_local_unmirrored_tombstone(true);
1182    }
1183
1184    #[test]
1185    fn test_incoming_non_mirror_tombstone_local_older() {
1186        do_test_incoming_with_local_unmirrored_tombstone(false);
1187    }
1188}