logins/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::merge::{LocalLogin, MirrorLogin, SyncLoginData};
6use super::update_plan::UpdatePlan;
7use super::SyncStatus;
8use crate::db::CLONE_ENTIRE_MIRROR_SQL;
9use crate::encryption::EncryptorDecryptor;
10use crate::error::*;
11use crate::login::EncryptedLogin;
12use crate::schema;
13use crate::util;
14use crate::LoginDb;
15use crate::LoginStore;
16use interrupt_support::SqlInterruptScope;
17use rusqlite::named_params;
18use sql_support::ConnExt;
19use std::cell::RefCell;
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::time::{Duration, UNIX_EPOCH};
23use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
24use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
25use sync15::{telemetry, ServerTimestamp};
26use sync_guid::Guid;
27
28// The sync engine.
29pub struct LoginsSyncEngine {
30    pub store: Arc<LoginStore>,
31    pub scope: SqlInterruptScope,
32    pub encdec: Arc<dyn EncryptorDecryptor>,
33    pub staged: RefCell<Vec<IncomingBso>>,
34}
35
36impl LoginsSyncEngine {
37    pub fn new(store: Arc<LoginStore>) -> Result<Self> {
38        let db = store.lock_db()?;
39        let scope = db.begin_interrupt_scope()?;
40        let encdec = db.encdec.clone();
41        drop(db);
42        Ok(Self {
43            store,
44            encdec,
45            scope,
46            staged: RefCell::new(vec![]),
47        })
48    }
49
50    fn reconcile(
51        &self,
52        records: Vec<SyncLoginData>,
53        server_now: ServerTimestamp,
54        telem: &mut telemetry::EngineIncoming,
55    ) -> Result<UpdatePlan> {
56        let mut plan = UpdatePlan::default();
57
58        for mut record in records {
59            self.scope.err_if_interrupted()?;
60            debug!("Processing remote change {}", record.guid());
61            let upstream = if let Some(inbound) = record.inbound.take() {
62                inbound
63            } else {
64                debug!("Processing inbound deletion (always prefer)");
65                plan.plan_delete(record.guid.clone());
66                continue;
67            };
68            let upstream_time = record.inbound_ts;
69            match (record.mirror.take(), record.local.take()) {
70                (Some(mirror), Some(local)) => {
71                    debug!("  Conflict between remote and local, Resolving with 3WM");
72                    plan.plan_three_way_merge(
73                        local,
74                        mirror,
75                        upstream,
76                        upstream_time,
77                        server_now,
78                        self.encdec.as_ref(),
79                    )?;
80                    telem.reconciled(1);
81                }
82                (Some(_mirror), None) => {
83                    debug!("  Forwarding mirror to remote");
84                    plan.plan_mirror_update(upstream, upstream_time);
85                    telem.applied(1);
86                }
87                (None, Some(local)) => {
88                    debug!("  Conflicting record without shared parent,  Resolving with 2WM");
89                    plan.plan_two_way_merge(local, (upstream, upstream_time));
90                    telem.reconciled(1);
91                }
92                (None, None) => {
93                    if let Some(dupe) = self.find_dupe_login(&upstream.login)? {
94                        debug!(
95                            "  Incoming record {} was is a dupe of local record {}",
96                            upstream.guid(),
97                            dupe.guid()
98                        );
99                        let local_modified = UNIX_EPOCH
100                            + Duration::from_millis(dupe.meta.time_password_changed as u64);
101                        let local = LocalLogin::Alive {
102                            login: dupe,
103                            local_modified,
104                        };
105                        plan.plan_two_way_merge(local, (upstream, upstream_time));
106                    } else {
107                        debug!("  No dupe found, inserting into mirror");
108                        plan.plan_mirror_insert(upstream, upstream_time, false);
109                    }
110                    telem.applied(1);
111                }
112            }
113        }
114        Ok(plan)
115    }
116
117    fn execute_plan(&self, plan: UpdatePlan) -> Result<()> {
118        // Because rusqlite want a mutable reference to create a transaction
119        // (as a way to save us from ourselves), we side-step that by creating
120        // it manually.
121        let db = self.store.lock_db()?;
122        let tx = db.unchecked_transaction()?;
123        plan.execute(&tx, &self.scope)?;
124        tx.commit()?;
125        Ok(())
126    }
127
128    // Fetch all the data for the provided IDs.
129    // TODO: Might be better taking a fn instead of returning all of it... But that func will likely
130    // want to insert stuff while we're doing this so ugh.
131    fn fetch_login_data(
132        &self,
133        records: Vec<IncomingBso>,
134        telem: &mut telemetry::EngineIncoming,
135    ) -> Result<Vec<SyncLoginData>> {
136        let mut sync_data = Vec::with_capacity(records.len());
137        {
138            let mut seen_ids: HashSet<Guid> = HashSet::with_capacity(records.len());
139            for incoming in records.into_iter() {
140                let id = incoming.envelope.id.clone();
141                match SyncLoginData::from_bso(incoming, self.encdec.as_ref()) {
142                    Ok(v) => sync_data.push(v),
143                    Err(e) => {
144                        match e {
145                            // This is a known error with Desktop logins (see #5233), just log it
146                            // rather than reporting to sentry
147                            Error::InvalidLogin(InvalidLogin::IllegalOrigin) => {
148                                warn!("logins-deserialize-error: {e}");
149                            }
150                            // For all other errors, report them to Sentry
151                            _ => {
152                                report_error!(
153                                    "logins-deserialize-error",
154                                    "Failed to deserialize record {:?}: {e}",
155                                    id
156                                );
157                            }
158                        };
159                        // Ideally we'd track new_failed, but it's unclear how
160                        // much value it has.
161                        telem.failed(1);
162                    }
163                }
164                seen_ids.insert(id);
165            }
166        }
167        self.scope.err_if_interrupted()?;
168
169        sql_support::each_chunk(
170            &sync_data
171                .iter()
172                .map(|s| s.guid.as_str().to_string())
173                .collect::<Vec<String>>(),
174            |chunk, offset| -> Result<()> {
175                // pairs the bound parameter for the guid with an integer index.
176                let values_with_idx = sql_support::repeat_display(chunk.len(), ",", |i, f| {
177                    write!(f, "({},?)", i + offset)
178                });
179                let query = format!(
180                    "WITH to_fetch(guid_idx, fetch_guid) AS (VALUES {vals})
181                     SELECT
182                         {common_cols},
183                         is_overridden,
184                         server_modified,
185                         NULL as local_modified,
186                         NULL as is_deleted,
187                         NULL as sync_status,
188                         1 as is_mirror,
189                         to_fetch.guid_idx as guid_idx
190                     FROM loginsM
191                     JOIN to_fetch
192                         ON loginsM.guid = to_fetch.fetch_guid
193
194                     UNION ALL
195
196                     SELECT
197                         {common_cols},
198                         NULL as is_overridden,
199                         NULL as server_modified,
200                         local_modified,
201                         is_deleted,
202                         sync_status,
203                         0 as is_mirror,
204                         to_fetch.guid_idx as guid_idx
205                     FROM loginsL
206                     JOIN to_fetch
207                         ON loginsL.guid = to_fetch.fetch_guid",
208                    // give each VALUES item 2 entries, an index and the parameter.
209                    vals = values_with_idx,
210                    common_cols = schema::COMMON_COLS,
211                );
212
213                let db = &self.store.lock_db()?;
214                let mut stmt = db.prepare(&query)?;
215
216                let rows = stmt.query_and_then(rusqlite::params_from_iter(chunk), |row| {
217                    let guid_idx_i = row.get::<_, i64>("guid_idx")?;
218                    // Hitting this means our math is wrong...
219                    assert!(guid_idx_i >= 0);
220
221                    let guid_idx = guid_idx_i as usize;
222                    let is_mirror: bool = row.get("is_mirror")?;
223                    if is_mirror {
224                        sync_data[guid_idx].set_mirror(MirrorLogin::from_row(row)?)?;
225                    } else {
226                        sync_data[guid_idx].set_local(LocalLogin::from_row(row)?)?;
227                    }
228                    self.scope.err_if_interrupted()?;
229                    Ok(())
230                })?;
231                // `rows` is an Iterator<Item = Result<()>>, so we need to collect to handle the errors.
232                rows.collect::<Result<()>>()?;
233                Ok(())
234            },
235        )?;
236        Ok(sync_data)
237    }
238
239    fn fetch_outgoing(&self) -> Result<Vec<OutgoingBso>> {
240        // Taken from iOS. Arbitrarily large, so that clients that want to
241        // process deletions first can; for us it doesn't matter.
242        const TOMBSTONE_SORTINDEX: i32 = 5_000_000;
243        const DEFAULT_SORTINDEX: i32 = 1;
244        let db = self.store.lock_db()?;
245        let mut stmt = db.prepare_cached(&format!(
246            "SELECT L.*, M.enc_unknown_fields
247             FROM loginsL L LEFT JOIN loginsM M ON L.guid = M.guid
248             WHERE sync_status IS NOT {synced}",
249            synced = SyncStatus::Synced as u8
250        ))?;
251        let bsos = stmt.query_and_then([], |row| {
252            self.scope.err_if_interrupted()?;
253            Ok(if row.get::<_, bool>("is_deleted")? {
254                let envelope = OutgoingEnvelope {
255                    id: row.get::<_, String>("guid")?.into(),
256                    sortindex: Some(TOMBSTONE_SORTINDEX),
257                    ..Default::default()
258                };
259                OutgoingBso::new_tombstone(envelope)
260            } else {
261                let unknown = row.get::<_, Option<String>>("enc_unknown_fields")?;
262                let mut bso =
263                    EncryptedLogin::from_row(row)?.into_bso(self.encdec.as_ref(), unknown)?;
264                bso.envelope.sortindex = Some(DEFAULT_SORTINDEX);
265                bso
266            })
267        })?;
268        bsos.collect::<Result<_>>()
269    }
270
271    fn do_apply_incoming(
272        &self,
273        inbound: Vec<IncomingBso>,
274        timestamp: ServerTimestamp,
275        telem: &mut telemetry::Engine,
276    ) -> Result<Vec<OutgoingBso>> {
277        let mut incoming_telemetry = telemetry::EngineIncoming::new();
278        let data = self.fetch_login_data(inbound, &mut incoming_telemetry)?;
279        let plan = {
280            let result = self.reconcile(data, timestamp, &mut incoming_telemetry);
281            telem.incoming(incoming_telemetry);
282            result
283        }?;
284        self.execute_plan(plan)?;
285        self.fetch_outgoing()
286    }
287
288    // Note this receives the db to prevent a deadlock
289    pub fn set_last_sync(&self, db: &LoginDb, last_sync: ServerTimestamp) -> Result<()> {
290        debug!("Updating last sync to {}", last_sync);
291        let last_sync_millis = last_sync.as_millis();
292        db.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)
293    }
294
295    fn get_last_sync(&self, db: &LoginDb) -> Result<Option<ServerTimestamp>> {
296        let millis = db.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?.unwrap();
297        Ok(Some(ServerTimestamp(millis)))
298    }
299
300    pub fn set_global_state(&self, state: &Option<String>) -> Result<()> {
301        let to_write = match state {
302            Some(ref s) => s,
303            None => "",
304        };
305        let db = self.store.lock_db()?;
306        db.put_meta(schema::GLOBAL_STATE_META_KEY, &to_write)
307    }
308
309    pub fn get_global_state(&self) -> Result<Option<String>> {
310        let db = self.store.lock_db()?;
311        db.get_meta::<String>(schema::GLOBAL_STATE_META_KEY)
312    }
313
314    fn mark_as_synchronized(&self, guids: &[&str], ts: ServerTimestamp) -> Result<()> {
315        let db = self.store.lock_db()?;
316        let tx = db.unchecked_transaction()?;
317        sql_support::each_chunk(guids, |chunk, _| -> Result<()> {
318            db.execute(
319                &format!(
320                    "DELETE FROM loginsM WHERE guid IN ({vars})",
321                    vars = sql_support::repeat_sql_vars(chunk.len())
322                ),
323                rusqlite::params_from_iter(chunk),
324            )?;
325            self.scope.err_if_interrupted()?;
326
327            db.execute(
328                &format!(
329                    "INSERT OR IGNORE INTO loginsM (
330                         {common_cols}, is_overridden, server_modified
331                     )
332                     SELECT {common_cols}, 0, {modified_ms_i64}
333                     FROM loginsL
334                     WHERE is_deleted = 0 AND guid IN ({vars})",
335                    common_cols = schema::COMMON_COLS,
336                    modified_ms_i64 = ts.as_millis(),
337                    vars = sql_support::repeat_sql_vars(chunk.len())
338                ),
339                rusqlite::params_from_iter(chunk),
340            )?;
341            self.scope.err_if_interrupted()?;
342
343            db.execute(
344                &format!(
345                    "DELETE FROM loginsL WHERE guid IN ({vars})",
346                    vars = sql_support::repeat_sql_vars(chunk.len())
347                ),
348                rusqlite::params_from_iter(chunk),
349            )?;
350            self.scope.err_if_interrupted()?;
351            Ok(())
352        })?;
353        self.set_last_sync(&db, ts)?;
354        tx.commit()?;
355        Ok(())
356    }
357
358    // This exists here as a public function so the store can call it. Ideally
359    // the store would not do that :) Then it can go back into the sync trait
360    // and return an anyhow::Result
361    pub fn do_reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
362        info!("Executing reset on password engine!");
363        let db = self.store.lock_db()?;
364        let tx = db.unchecked_transaction()?;
365        db.execute_all(&[
366            &CLONE_ENTIRE_MIRROR_SQL,
367            "DELETE FROM loginsM",
368            &format!("UPDATE loginsL SET sync_status = {}", SyncStatus::New as u8),
369        ])?;
370        self.set_last_sync(&db, ServerTimestamp(0))?;
371        match assoc {
372            EngineSyncAssociation::Disconnected => {
373                db.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
374                db.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
375            }
376            EngineSyncAssociation::Connected(ids) => {
377                db.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global)?;
378                db.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll)?;
379            }
380        };
381        db.delete_meta(schema::GLOBAL_STATE_META_KEY)?;
382        tx.commit()?;
383        Ok(())
384    }
385
386    // It would be nice if this were a batch-ish api (e.g. takes a slice of records and finds dupes
387    // for each one if they exist)... I can't think of how to write that query, though.
388    // This is subtly different from dupe handling by the main API and maybe
389    // could be consolidated, but for now it remains sync specific.
390    pub(crate) fn find_dupe_login(&self, l: &EncryptedLogin) -> Result<Option<EncryptedLogin>> {
391        let form_submit_host_port = l
392            .fields
393            .form_action_origin
394            .as_ref()
395            .and_then(|s| util::url_host_port(s));
396        let enc_fields = l.decrypt_fields(self.encdec.as_ref())?;
397        let args = named_params! {
398            ":origin": l.fields.origin,
399            ":http_realm": l.fields.http_realm,
400            ":form_submit": form_submit_host_port,
401        };
402        let mut query = format!(
403            "SELECT {common}
404             FROM loginsL
405             WHERE origin IS :origin
406               AND httpRealm IS :http_realm",
407            common = schema::COMMON_COLS,
408        );
409        if form_submit_host_port.is_some() {
410            // Stolen from iOS
411            query += " AND (formActionOrigin = '' OR (instr(formActionOrigin, :form_submit) > 0))";
412        } else {
413            query += " AND formActionOrigin IS :form_submit"
414        }
415        let db = self.store.lock_db()?;
416        let mut stmt = db.prepare_cached(&query)?;
417        for login in stmt
418            .query_and_then(args, EncryptedLogin::from_row)?
419            .collect::<Result<Vec<EncryptedLogin>>>()?
420        {
421            let this_enc_fields = login.decrypt_fields(self.encdec.as_ref())?;
422            if enc_fields.username == this_enc_fields.username {
423                return Ok(Some(login));
424            }
425        }
426        Ok(None)
427    }
428}
429
430impl SyncEngine for LoginsSyncEngine {
431    fn collection_name(&self) -> std::borrow::Cow<'static, str> {
432        "passwords".into()
433    }
434
435    fn stage_incoming(
436        &self,
437        mut inbound: Vec<IncomingBso>,
438        _telem: &mut telemetry::Engine,
439    ) -> anyhow::Result<()> {
440        // We don't have cross-item dependencies like bookmarks does, so we can
441        // just apply now instead of "staging"
442        self.staged.borrow_mut().append(&mut inbound);
443        Ok(())
444    }
445
446    fn apply(
447        &self,
448        timestamp: ServerTimestamp,
449        telem: &mut telemetry::Engine,
450    ) -> anyhow::Result<Vec<OutgoingBso>> {
451        let inbound = (*self.staged.borrow_mut()).drain(..).collect();
452        Ok(self.do_apply_incoming(inbound, timestamp, telem)?)
453    }
454
455    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
456        Ok(self.mark_as_synchronized(
457            &ids.iter().map(Guid::as_str).collect::<Vec<_>>(),
458            new_timestamp,
459        )?)
460    }
461
462    fn get_collection_request(
463        &self,
464        server_timestamp: ServerTimestamp,
465    ) -> anyhow::Result<Option<CollectionRequest>> {
466        let db = self.store.lock_db()?;
467        let since = self.get_last_sync(&db)?.unwrap_or_default();
468        Ok(if since == server_timestamp {
469            None
470        } else {
471            Some(
472                CollectionRequest::new("passwords".into())
473                    .full()
474                    .newer_than(since),
475            )
476        })
477    }
478
479    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
480        let db = self.store.lock_db()?;
481        let global = db.get_meta(schema::GLOBAL_SYNCID_META_KEY)?;
482        let coll = db.get_meta(schema::COLLECTION_SYNCID_META_KEY)?;
483        Ok(if let (Some(global), Some(coll)) = (global, coll) {
484            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
485        } else {
486            EngineSyncAssociation::Disconnected
487        })
488    }
489
490    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
491        self.do_reset(assoc)?;
492        Ok(())
493    }
494}
495
496#[cfg(not(feature = "keydb"))]
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use crate::db::test_utils::insert_login;
501    use crate::encryption::test_utils::TEST_ENCDEC;
502    use crate::login::test_utils::enc_login;
503    use crate::{LoginEntry, LoginFields, LoginMeta, SecureLoginFields};
504    use nss::ensure_initialized;
505    use std::collections::HashMap;
506    use std::sync::Arc;
507
508    // Wrap sync functions for easier testing
509    fn run_fetch_login_data(
510        engine: &mut LoginsSyncEngine,
511        records: Vec<IncomingBso>,
512    ) -> (Vec<SyncLoginData>, telemetry::EngineIncoming) {
513        let mut telem = sync15::telemetry::EngineIncoming::new();
514        (engine.fetch_login_data(records, &mut telem).unwrap(), telem)
515    }
516
517    fn run_fetch_outgoing(store: LoginStore) -> Vec<OutgoingBso> {
518        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
519        engine.fetch_outgoing().unwrap()
520    }
521
522    #[test]
523    fn test_fetch_login_data() {
524        ensure_initialized();
525        // Test some common cases with fetch_login data
526        let store = LoginStore::new_in_memory();
527        insert_login(
528            &store.lock_db().unwrap(),
529            "updated_remotely",
530            None,
531            Some("password"),
532        );
533        insert_login(
534            &store.lock_db().unwrap(),
535            "deleted_remotely",
536            None,
537            Some("password"),
538        );
539        insert_login(
540            &store.lock_db().unwrap(),
541            "three_way_merge",
542            Some("new-local-password"),
543            Some("password"),
544        );
545
546        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
547
548        let (res, _) = run_fetch_login_data(
549            &mut engine,
550            vec![
551                IncomingBso::new_test_tombstone(Guid::new("deleted_remotely")),
552                enc_login("added_remotely", "password")
553                    .into_bso(&*TEST_ENCDEC, None)
554                    .unwrap()
555                    .to_test_incoming(),
556                enc_login("updated_remotely", "new-password")
557                    .into_bso(&*TEST_ENCDEC, None)
558                    .unwrap()
559                    .to_test_incoming(),
560                enc_login("three_way_merge", "new-remote-password")
561                    .into_bso(&*TEST_ENCDEC, None)
562                    .unwrap()
563                    .to_test_incoming(),
564            ],
565        );
566        // For simpler testing, extract/decrypt passwords and put them in a hash map
567        #[derive(Debug, PartialEq)]
568        struct SyncPasswords {
569            local: Option<String>,
570            mirror: Option<String>,
571            inbound: Option<String>,
572        }
573        let extracted_passwords: HashMap<String, SyncPasswords> = res
574            .into_iter()
575            .map(|sync_login_data| {
576                let mut guids_seen = HashSet::new();
577                let passwords = SyncPasswords {
578                    local: sync_login_data.local.map(|local_login| {
579                        guids_seen.insert(local_login.guid_str().to_string());
580                        let LocalLogin::Alive { login, .. } = local_login else {
581                            unreachable!("this test is not expecting a tombstone");
582                        };
583                        login.decrypt_fields(&*TEST_ENCDEC).unwrap().password
584                    }),
585                    mirror: sync_login_data.mirror.map(|mirror_login| {
586                        guids_seen.insert(mirror_login.login.meta.id.clone());
587                        mirror_login
588                            .login
589                            .decrypt_fields(&*TEST_ENCDEC)
590                            .unwrap()
591                            .password
592                    }),
593                    inbound: sync_login_data.inbound.map(|incoming| {
594                        guids_seen.insert(incoming.login.meta.id.clone());
595                        incoming
596                            .login
597                            .decrypt_fields(&*TEST_ENCDEC)
598                            .unwrap()
599                            .password
600                    }),
601                };
602                (guids_seen.into_iter().next().unwrap(), passwords)
603            })
604            .collect();
605
606        assert_eq!(extracted_passwords.len(), 4);
607        assert_eq!(
608            extracted_passwords.get("added_remotely").unwrap(),
609            &SyncPasswords {
610                local: None,
611                mirror: None,
612                inbound: Some("password".into()),
613            }
614        );
615        assert_eq!(
616            extracted_passwords.get("updated_remotely").unwrap(),
617            &SyncPasswords {
618                local: None,
619                mirror: Some("password".into()),
620                inbound: Some("new-password".into()),
621            }
622        );
623        assert_eq!(
624            extracted_passwords.get("deleted_remotely").unwrap(),
625            &SyncPasswords {
626                local: None,
627                mirror: Some("password".into()),
628                inbound: None,
629            }
630        );
631        assert_eq!(
632            extracted_passwords.get("three_way_merge").unwrap(),
633            &SyncPasswords {
634                local: Some("new-local-password".into()),
635                mirror: Some("password".into()),
636                inbound: Some("new-remote-password".into()),
637            }
638        );
639    }
640
641    #[test]
642    fn test_fetch_outgoing() {
643        ensure_initialized();
644        let store = LoginStore::new_in_memory();
645        insert_login(
646            &store.lock_db().unwrap(),
647            "changed",
648            Some("new-password"),
649            Some("password"),
650        );
651        insert_login(
652            &store.lock_db().unwrap(),
653            "unchanged",
654            None,
655            Some("password"),
656        );
657        insert_login(&store.lock_db().unwrap(), "added", Some("password"), None);
658        insert_login(&store.lock_db().unwrap(), "deleted", None, Some("password"));
659        store.lock_db().unwrap().delete("deleted").unwrap();
660
661        let changeset = run_fetch_outgoing(store);
662        let changes: HashMap<String, serde_json::Value> = changeset
663            .into_iter()
664            .map(|b| {
665                (
666                    b.envelope.id.to_string(),
667                    serde_json::from_str(&b.payload).unwrap(),
668                )
669            })
670            .collect();
671        assert_eq!(changes.len(), 3);
672        assert_eq!(changes["added"].get("password").unwrap(), "password");
673        assert_eq!(changes["changed"].get("password").unwrap(), "new-password");
674        assert!(changes["deleted"].get("deleted").is_some());
675        assert!(changes["added"].get("deleted").is_none());
676        assert!(changes["changed"].get("deleted").is_none());
677    }
678
679    #[test]
680    fn test_bad_record() {
681        ensure_initialized();
682        let store = LoginStore::new_in_memory();
683        let test_ids = ["dummy_000001", "dummy_000002", "dummy_000003"];
684        for id in test_ids {
685            insert_login(
686                &store.lock_db().unwrap(),
687                id,
688                Some("password"),
689                Some("password"),
690            );
691        }
692        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
693        engine
694            .mark_as_synchronized(&test_ids, ServerTimestamp::from_millis(100))
695            .unwrap();
696        let (res, telem) = run_fetch_login_data(
697            &mut engine,
698            vec![
699                IncomingBso::new_test_tombstone(Guid::new("dummy_000001")),
700                // invalid
701                IncomingBso::from_test_content(serde_json::json!({
702                    "id": "dummy_000002",
703                    "garbage": "data",
704                    "etc": "not a login"
705                })),
706                // valid
707                IncomingBso::from_test_content(serde_json::json!({
708                    "id": "dummy_000003",
709                    "formSubmitURL": "https://www.example.com/submit",
710                    "hostname": "https://www.example.com",
711                    "username": "test",
712                    "password": "test",
713                })),
714            ],
715        );
716        assert_eq!(telem.get_failed(), 1);
717        assert_eq!(res.len(), 2);
718        assert_eq!(res[0].guid, "dummy_000001");
719        assert_eq!(res[1].guid, "dummy_000003");
720        assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
721    }
722
723    fn make_enc_login(
724        username: &str,
725        password: &str,
726        fao: Option<String>,
727        realm: Option<String>,
728    ) -> EncryptedLogin {
729        ensure_initialized();
730        let id = Guid::random().to_string();
731        let sec_fields = SecureLoginFields {
732            username: username.into(),
733            password: password.into(),
734        }
735        .encrypt(&*TEST_ENCDEC, &id)
736        .unwrap();
737        EncryptedLogin {
738            meta: LoginMeta {
739                id,
740                ..Default::default()
741            },
742            fields: LoginFields {
743                form_action_origin: fao,
744                http_realm: realm,
745                origin: "http://not-relevant-here.com".into(),
746                ..Default::default()
747            },
748            sec_fields,
749        }
750    }
751
752    #[test]
753    fn find_dupe_login() {
754        ensure_initialized();
755        let store = LoginStore::new_in_memory();
756
757        let to_add = LoginEntry {
758            form_action_origin: Some("https://www.example.com".into()),
759            origin: "http://not-relevant-here.com".into(),
760            username: "test".into(),
761            password: "test".into(),
762            ..Default::default()
763        };
764        let first_id = store.add(to_add).expect("should insert first").id;
765
766        let to_add = LoginEntry {
767            form_action_origin: Some("https://www.example1.com".into()),
768            origin: "http://not-relevant-here.com".into(),
769            username: "test1".into(),
770            password: "test1".into(),
771            ..Default::default()
772        };
773        let second_id = store.add(to_add).expect("should insert second").id;
774
775        let to_add = LoginEntry {
776            http_realm: Some("http://some-realm.com".into()),
777            origin: "http://not-relevant-here.com".into(),
778            username: "test1".into(),
779            password: "test1".into(),
780            ..Default::default()
781        };
782        let no_form_origin_id = store.add(to_add).expect("should insert second").id;
783
784        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
785
786        let to_find = make_enc_login("test", "test", Some("https://www.example.com".into()), None);
787        assert_eq!(
788            engine
789                .find_dupe_login(&to_find)
790                .expect("should work")
791                .expect("should be Some()")
792                .meta
793                .id,
794            first_id
795        );
796
797        let to_find = make_enc_login(
798            "test",
799            "test",
800            Some("https://something-else.com".into()),
801            None,
802        );
803        assert!(engine
804            .find_dupe_login(&to_find)
805            .expect("should work")
806            .is_none());
807
808        let to_find = make_enc_login(
809            "test1",
810            "test1",
811            Some("https://www.example1.com".into()),
812            None,
813        );
814        assert_eq!(
815            engine
816                .find_dupe_login(&to_find)
817                .expect("should work")
818                .expect("should be Some()")
819                .meta
820                .id,
821            second_id
822        );
823
824        let to_find = make_enc_login(
825            "other",
826            "other",
827            Some("https://www.example1.com".into()),
828            None,
829        );
830        assert!(engine
831            .find_dupe_login(&to_find)
832            .expect("should work")
833            .is_none());
834
835        // no form origin.
836        let to_find = make_enc_login("test1", "test1", None, Some("http://some-realm.com".into()));
837        assert_eq!(
838            engine
839                .find_dupe_login(&to_find)
840                .expect("should work")
841                .expect("should be Some()")
842                .meta
843                .id,
844            no_form_origin_id
845        );
846    }
847
848    #[test]
849    fn test_roundtrip_unknown() {
850        ensure_initialized();
851        // A couple of helpers
852        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
853            let bso = IncomingBso::from_test_content(payload);
854            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
855            engine.stage_incoming(vec![bso], &mut telem).unwrap();
856            engine
857                .apply(ServerTimestamp::from_millis(0), &mut telem)
858                .unwrap();
859        }
860
861        fn get_outgoing_payload(engine: &LoginsSyncEngine) -> serde_json::Value {
862            // Edit it so it's considered outgoing.
863            engine
864                .store
865                .update(
866                    "dummy_000001",
867                    LoginEntry {
868                        origin: "https://www.example2.com".into(),
869                        http_realm: Some("https://www.example2.com".into()),
870                        username: "test".into(),
871                        password: "test".into(),
872                        ..Default::default()
873                    },
874                )
875                .unwrap();
876            let changeset = engine.fetch_outgoing().unwrap();
877            assert_eq!(changeset.len(), 1);
878            serde_json::from_str::<serde_json::Value>(&changeset[0].payload).unwrap()
879        }
880
881        // The test itself...
882        let store = LoginStore::new_in_memory();
883        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
884
885        apply_incoming_payload(
886            &engine,
887            serde_json::json!({
888                "id": "dummy_000001",
889                "formSubmitURL": "https://www.example.com/submit",
890                "hostname": "https://www.example.com",
891                "username": "test",
892                "password": "test",
893                "unknown1": "?",
894                "unknown2": {"sub": "object"},
895            }),
896        );
897
898        let payload = get_outgoing_payload(&engine);
899
900        // The outgoing payload for our item should have the unknown fields.
901        assert_eq!(payload.get("unknown1").unwrap().as_str().unwrap(), "?");
902        assert_eq!(
903            payload.get("unknown2").unwrap(),
904            &serde_json::json!({"sub": "object"})
905        );
906
907        // test mirror updates - record is already in our mirror, but now it's
908        // incoming with different unknown fields.
909        apply_incoming_payload(
910            &engine,
911            serde_json::json!({
912                "id": "dummy_000001",
913                "formSubmitURL": "https://www.example.com/submit",
914                "hostname": "https://www.example.com",
915                "username": "test",
916                "password": "test",
917                "unknown2": 99,
918                "unknown3": {"something": "else"},
919            }),
920        );
921        let payload = get_outgoing_payload(&engine);
922        // old unknown values were replaced.
923        assert!(payload.get("unknown1").is_none());
924        assert_eq!(payload.get("unknown2").unwrap().as_u64().unwrap(), 99);
925        assert_eq!(
926            payload
927                .get("unknown3")
928                .unwrap()
929                .as_object()
930                .unwrap()
931                .get("something")
932                .unwrap()
933                .as_str()
934                .unwrap(),
935            "else"
936        );
937    }
938
939    fn count(engine: &LoginsSyncEngine, table_name: &str) -> u32 {
940        ensure_initialized();
941        let sql = format!("SELECT COUNT(*) FROM {table_name}");
942        engine
943            .store
944            .lock_db()
945            // TODO: get rid of this unwrap
946            .unwrap()
947            .try_query_one(&sql, [], false)
948            .unwrap()
949            .unwrap()
950    }
951
952    fn do_test_incoming_with_local_unmirrored_tombstone(local_newer: bool) {
953        ensure_initialized();
954        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
955            let bso = IncomingBso::from_test_content(payload);
956            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
957            engine.stage_incoming(vec![bso], &mut telem).unwrap();
958            engine
959                .apply(ServerTimestamp::from_millis(0), &mut telem)
960                .unwrap();
961        }
962
963        // The test itself...
964        let (local_timestamp, remote_timestamp) = if local_newer { (123, 0) } else { (0, 123) };
965
966        let store = LoginStore::new_in_memory();
967        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
968
969        // apply an incoming record - will be in the mirror.
970        apply_incoming_payload(
971            &engine,
972            serde_json::json!({
973                "id": "dummy_000001",
974                "formSubmitURL": "https://www.example.com/submit",
975                "hostname": "https://www.example.com",
976                "username": "test",
977                "password": "test",
978                "timePasswordChanged": local_timestamp,
979                "unknown1": "?",
980                "unknown2": {"sub": "object"},
981            }),
982        );
983
984        // Reset the engine - this wipes the mirror.
985        engine.reset(&EngineSyncAssociation::Disconnected).unwrap();
986        // But the local record does still exist.
987        assert!(engine
988            .store
989            .get("dummy_000001")
990            .expect("should work")
991            .is_some());
992
993        // Delete the local record.
994        engine.store.delete("dummy_000001").unwrap();
995        assert!(engine
996            .store
997            .get("dummy_000001")
998            .expect("should work")
999            .is_none());
1000
1001        // double-check our test preconditions - should now have 1 in LoginsL and 0 in LoginsM
1002        assert_eq!(count(&engine, "LoginsL"), 1);
1003        assert_eq!(count(&engine, "LoginsM"), 0);
1004
1005        // Now we assume we've been reconnected to sync and have an incoming change for the record.
1006        apply_incoming_payload(
1007            &engine,
1008            serde_json::json!({
1009                "id": "dummy_000001",
1010                "formSubmitURL": "https://www.example.com/submit",
1011                "hostname": "https://www.example.com",
1012                "username": "test",
1013                "password": "test2",
1014                "timePasswordChanged": remote_timestamp,
1015                "unknown1": "?",
1016                "unknown2": {"sub": "object"},
1017            }),
1018        );
1019
1020        // Desktop semantics here are that a local tombstone is treated as though it doesn't exist at all.
1021        // ie, the remote record should be taken whether it is newer or older than the tombstone.
1022        assert!(engine
1023            .store
1024            .get("dummy_000001")
1025            .expect("should work")
1026            .is_some());
1027        // and there should never be an outgoing record.
1028        // XXX - but there is! But this is exceedingly rare, we
1029        // should fix it :)
1030        // assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
1031
1032        // should now be no records in loginsL and 1 in loginsM
1033        assert_eq!(count(&engine, "LoginsL"), 0);
1034        assert_eq!(count(&engine, "LoginsM"), 1);
1035    }
1036
1037    #[test]
1038    fn test_incoming_non_mirror_tombstone_local_newer() {
1039        do_test_incoming_with_local_unmirrored_tombstone(true);
1040    }
1041
1042    #[test]
1043    fn test_incoming_non_mirror_tombstone_local_older() {
1044        do_test_incoming_with_local_unmirrored_tombstone(false);
1045    }
1046}