logins/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::merge::{LocalLogin, MirrorLogin, SyncLoginData};
6use super::update_plan::UpdatePlan;
7use super::SyncStatus;
8use crate::db::CLONE_ENTIRE_MIRROR_SQL;
9use crate::encryption::EncryptorDecryptor;
10use crate::error::*;
11use crate::login::EncryptedLogin;
12use crate::schema;
13use crate::util;
14use crate::LoginDb;
15use crate::LoginStore;
16use interrupt_support::SqlInterruptScope;
17use rusqlite::named_params;
18use sql_support::ConnExt;
19use std::cell::RefCell;
20use std::collections::HashSet;
21use std::sync::Arc;
22use std::time::{Duration, UNIX_EPOCH};
23use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
24use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
25use sync15::{telemetry, ServerTimestamp};
26use sync_guid::Guid;
27
28// The sync engine.
29pub struct LoginsSyncEngine {
30    pub store: Arc<LoginStore>,
31    pub scope: SqlInterruptScope,
32    pub encdec: Arc<dyn EncryptorDecryptor>,
33    pub staged: RefCell<Vec<IncomingBso>>,
34}
35
36impl LoginsSyncEngine {
37    pub fn new(store: Arc<LoginStore>) -> Result<Self> {
38        let db = store.lock_db()?;
39        let scope = db.begin_interrupt_scope()?;
40        let encdec = db.encdec.clone();
41        drop(db);
42        Ok(Self {
43            store,
44            encdec,
45            scope,
46            staged: RefCell::new(vec![]),
47        })
48    }
49
50    fn reconcile(
51        &self,
52        records: Vec<SyncLoginData>,
53        server_now: ServerTimestamp,
54        telem: &mut telemetry::EngineIncoming,
55    ) -> Result<UpdatePlan> {
56        let mut plan = UpdatePlan::default();
57
58        for mut record in records {
59            self.scope.err_if_interrupted()?;
60            debug!("Processing remote change {}", record.guid());
61            let upstream = if let Some(inbound) = record.inbound.take() {
62                inbound
63            } else {
64                debug!("Processing inbound deletion (always prefer)");
65                plan.plan_delete(record.guid.clone());
66                continue;
67            };
68            let upstream_time = record.inbound_ts;
69            match (record.mirror.take(), record.local.take()) {
70                (Some(mirror), Some(local)) => {
71                    debug!("  Conflict between remote and local, Resolving with 3WM");
72                    plan.plan_three_way_merge(
73                        local,
74                        mirror,
75                        upstream,
76                        upstream_time,
77                        server_now,
78                        self.encdec.as_ref(),
79                    )?;
80                    telem.reconciled(1);
81                }
82                (Some(_mirror), None) => {
83                    debug!("  Forwarding mirror to remote");
84                    plan.plan_mirror_update(upstream, upstream_time);
85                    telem.applied(1);
86                }
87                (None, Some(local)) => {
88                    debug!("  Conflicting record without shared parent,  Resolving with 2WM");
89                    plan.plan_two_way_merge(local, (upstream, upstream_time));
90                    telem.reconciled(1);
91                }
92                (None, None) => {
93                    if let Some(dupe) = self.find_dupe_login(&upstream.login)? {
94                        debug!(
95                            "  Incoming recordĀ {} was is a dupe of local record {}",
96                            upstream.guid(),
97                            dupe.guid()
98                        );
99                        let local_modified = UNIX_EPOCH
100                            + Duration::from_millis(dupe.meta.time_password_changed as u64);
101                        let local = LocalLogin::Alive {
102                            login: Box::new(dupe),
103                            local_modified,
104                        };
105                        plan.plan_two_way_merge(local, (upstream, upstream_time));
106                    } else {
107                        debug!("  No dupe found, inserting into mirror");
108                        plan.plan_mirror_insert(upstream, upstream_time, false);
109                    }
110                    telem.applied(1);
111                }
112            }
113        }
114        Ok(plan)
115    }
116
117    fn execute_plan(&self, plan: UpdatePlan) -> Result<()> {
118        // Because rusqlite want a mutable reference to create a transaction
119        // (as a way to save us from ourselves), we side-step that by creating
120        // it manually.
121        let db = self.store.lock_db()?;
122        let tx = db.unchecked_transaction()?;
123        plan.execute(&tx, &self.scope)?;
124        tx.commit()?;
125        Ok(())
126    }
127
128    // Fetch all the data for the provided IDs.
129    // TODO: Might be better taking a fn instead of returning all of it... But that func will likely
130    // want to insert stuff while we're doing this so ugh.
131    fn fetch_login_data(
132        &self,
133        records: Vec<IncomingBso>,
134        telem: &mut telemetry::EngineIncoming,
135    ) -> Result<Vec<SyncLoginData>> {
136        let mut sync_data = Vec::with_capacity(records.len());
137        {
138            let mut seen_ids: HashSet<Guid> = HashSet::with_capacity(records.len());
139            for incoming in records.into_iter() {
140                let id = incoming.envelope.id.clone();
141                match SyncLoginData::from_bso(incoming, self.encdec.as_ref()) {
142                    Ok(v) => sync_data.push(v),
143                    Err(e) => {
144                        match e {
145                            // This is a known error with Desktop logins (see #5233), just log it
146                            // rather than reporting to sentry
147                            Error::InvalidLogin(InvalidLogin::IllegalOrigin { reason: _ }) => {
148                                warn!("logins-deserialize-error: {e}");
149                            }
150                            // For all other errors, report them to Sentry
151                            _ => {
152                                report_error!(
153                                    "logins-deserialize-error",
154                                    "Failed to deserialize record {:?}: {e}",
155                                    id
156                                );
157                            }
158                        };
159                        // Ideally we'd track new_failed, but it's unclear how
160                        // much value it has.
161                        telem.failed(1);
162                    }
163                }
164                seen_ids.insert(id);
165            }
166        }
167        self.scope.err_if_interrupted()?;
168
169        sql_support::each_chunk(
170            &sync_data
171                .iter()
172                .map(|s| s.guid.as_str().to_string())
173                .collect::<Vec<String>>(),
174            |chunk, offset| -> Result<()> {
175                // pairs the bound parameter for the guid with an integer index.
176                let values_with_idx = sql_support::repeat_display(chunk.len(), ",", |i, f| {
177                    write!(f, "({},?)", i + offset)
178                });
179                let query = format!(
180                    "WITH to_fetch(guid_idx, fetch_guid) AS (VALUES {vals})
181                     SELECT
182                         {common_cols},
183                         is_overridden,
184                         server_modified,
185                         NULL as local_modified,
186                         NULL as is_deleted,
187                         NULL as sync_status,
188                         1 as is_mirror,
189                         to_fetch.guid_idx as guid_idx
190                     FROM loginsM
191                     JOIN to_fetch
192                         ON loginsM.guid = to_fetch.fetch_guid
193
194                     UNION ALL
195
196                     SELECT
197                         {common_cols},
198                         NULL as is_overridden,
199                         NULL as server_modified,
200                         local_modified,
201                         is_deleted,
202                         sync_status,
203                         0 as is_mirror,
204                         to_fetch.guid_idx as guid_idx
205                     FROM loginsL
206                     JOIN to_fetch
207                         ON loginsL.guid = to_fetch.fetch_guid",
208                    // give each VALUES item 2 entries, an index and the parameter.
209                    vals = values_with_idx,
210                    common_cols = schema::COMMON_COLS,
211                );
212
213                let db = &self.store.lock_db()?;
214                let mut stmt = db.prepare(&query)?;
215
216                let rows = stmt.query_and_then(rusqlite::params_from_iter(chunk), |row| {
217                    let guid_idx_i = row.get::<_, i64>("guid_idx")?;
218                    // Hitting this means our math is wrong...
219                    assert!(guid_idx_i >= 0);
220
221                    let guid_idx = guid_idx_i as usize;
222                    let is_mirror: bool = row.get("is_mirror")?;
223                    if is_mirror {
224                        sync_data[guid_idx].set_mirror(MirrorLogin::from_row(row)?)?;
225                    } else {
226                        sync_data[guid_idx].set_local(LocalLogin::from_row(row)?)?;
227                    }
228                    self.scope.err_if_interrupted()?;
229                    Ok(())
230                })?;
231                // `rows` is an Iterator<Item = Result<()>>, so we need to collect to handle the errors.
232                rows.collect::<Result<()>>()?;
233                Ok(())
234            },
235        )?;
236        Ok(sync_data)
237    }
238
239    fn fetch_outgoing(&self) -> Result<Vec<OutgoingBso>> {
240        // Taken from iOS. Arbitrarily large, so that clients that want to
241        // process deletions first can; for us it doesn't matter.
242        const TOMBSTONE_SORTINDEX: i32 = 5_000_000;
243        const DEFAULT_SORTINDEX: i32 = 1;
244        let db = self.store.lock_db()?;
245        let mut stmt = db.prepare_cached(&format!(
246            "SELECT L.*, M.enc_unknown_fields
247             FROM loginsL L LEFT JOIN loginsM M ON L.guid = M.guid
248             WHERE sync_status IS NOT {synced}",
249            synced = SyncStatus::Synced as u8
250        ))?;
251        let bsos = stmt.query_and_then([], |row| {
252            self.scope.err_if_interrupted()?;
253            Ok(if row.get::<_, bool>("is_deleted")? {
254                let envelope = OutgoingEnvelope {
255                    id: row.get::<_, String>("guid")?.into(),
256                    sortindex: Some(TOMBSTONE_SORTINDEX),
257                    ..Default::default()
258                };
259                OutgoingBso::new_tombstone(envelope)
260            } else {
261                let unknown = row.get::<_, Option<String>>("enc_unknown_fields")?;
262                let mut bso =
263                    EncryptedLogin::from_row(row)?.into_bso(self.encdec.as_ref(), unknown)?;
264                bso.envelope.sortindex = Some(DEFAULT_SORTINDEX);
265                bso
266            })
267        })?;
268        bsos.collect::<Result<_>>()
269    }
270
271    fn do_apply_incoming(
272        &self,
273        inbound: Vec<IncomingBso>,
274        timestamp: ServerTimestamp,
275        telem: &mut telemetry::Engine,
276    ) -> Result<Vec<OutgoingBso>> {
277        let mut incoming_telemetry = telemetry::EngineIncoming::new();
278        let data = self.fetch_login_data(inbound, &mut incoming_telemetry)?;
279        let plan = {
280            let result = self.reconcile(data, timestamp, &mut incoming_telemetry);
281            telem.incoming(incoming_telemetry);
282            result
283        }?;
284        self.execute_plan(plan)?;
285        self.fetch_outgoing()
286    }
287
288    // Note this receives the db to prevent a deadlock
289    pub fn set_last_sync(&self, db: &LoginDb, last_sync: ServerTimestamp) -> Result<()> {
290        debug!("Updating last sync to {}", last_sync);
291        let last_sync_millis = last_sync.as_millis();
292        db.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)
293    }
294
295    fn get_last_sync(&self, db: &LoginDb) -> Result<Option<ServerTimestamp>> {
296        let millis = db.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?.unwrap();
297        Ok(Some(ServerTimestamp(millis)))
298    }
299
300    fn mark_as_synchronized(&self, guids: &[&str], ts: ServerTimestamp) -> Result<()> {
301        let db = self.store.lock_db()?;
302        let tx = db.unchecked_transaction()?;
303        sql_support::each_chunk(guids, |chunk, _| -> Result<()> {
304            db.execute(
305                &format!(
306                    "DELETE FROM loginsM WHERE guid IN ({vars})",
307                    vars = sql_support::repeat_sql_vars(chunk.len())
308                ),
309                rusqlite::params_from_iter(chunk),
310            )?;
311            self.scope.err_if_interrupted()?;
312
313            db.execute(
314                &format!(
315                    "INSERT OR IGNORE INTO loginsM (
316                         {common_cols}, is_overridden, server_modified
317                     )
318                     SELECT {common_cols}, 0, {modified_ms_i64}
319                     FROM loginsL
320                     WHERE is_deleted = 0 AND guid IN ({vars})",
321                    common_cols = schema::COMMON_COLS,
322                    modified_ms_i64 = ts.as_millis(),
323                    vars = sql_support::repeat_sql_vars(chunk.len())
324                ),
325                rusqlite::params_from_iter(chunk),
326            )?;
327            self.scope.err_if_interrupted()?;
328
329            db.execute(
330                &format!(
331                    "DELETE FROM loginsL WHERE guid IN ({vars})",
332                    vars = sql_support::repeat_sql_vars(chunk.len())
333                ),
334                rusqlite::params_from_iter(chunk),
335            )?;
336            self.scope.err_if_interrupted()?;
337            Ok(())
338        })?;
339        self.set_last_sync(&db, ts)?;
340        tx.commit()?;
341        Ok(())
342    }
343
344    // This exists here as a public function so the store can call it. Ideally
345    // the store would not do that :) Then it can go back into the sync trait
346    // and return an anyhow::Result
347    pub fn do_reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
348        info!("Executing reset on password engine!");
349        let db = self.store.lock_db()?;
350        let tx = db.unchecked_transaction()?;
351        db.execute_all(&[
352            &CLONE_ENTIRE_MIRROR_SQL,
353            "DELETE FROM loginsM",
354            &format!("UPDATE loginsL SET sync_status = {}", SyncStatus::New as u8),
355        ])?;
356        self.set_last_sync(&db, ServerTimestamp(0))?;
357        match assoc {
358            EngineSyncAssociation::Disconnected => {
359                db.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
360                db.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
361            }
362            EngineSyncAssociation::Connected(ids) => {
363                db.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global)?;
364                db.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll)?;
365            }
366        };
367        tx.commit()?;
368        Ok(())
369    }
370
371    // It would be nice if this were a batch-ish api (e.g. takes a slice of records and finds dupes
372    // for each one if they exist)... I can't think of how to write that query, though.
373    // This is subtly different from dupe handling by the main API and maybe
374    // could be consolidated, but for now it remains sync specific.
375    pub(crate) fn find_dupe_login(&self, l: &EncryptedLogin) -> Result<Option<EncryptedLogin>> {
376        let form_submit_host_port = l
377            .fields
378            .form_action_origin
379            .as_ref()
380            .and_then(|s| util::url_host_port(s));
381        let enc_fields = l.decrypt_fields(self.encdec.as_ref())?;
382        let args = named_params! {
383            ":origin": l.fields.origin,
384            ":http_realm": l.fields.http_realm,
385            ":form_submit": form_submit_host_port,
386        };
387        let mut query = format!(
388            "SELECT {common}
389             FROM loginsL
390             WHERE origin IS :origin
391               AND httpRealm IS :http_realm",
392            common = schema::COMMON_COLS,
393        );
394        if form_submit_host_port.is_some() {
395            // Stolen from iOS
396            query += " AND (formActionOrigin = '' OR (instr(formActionOrigin, :form_submit) > 0))";
397        } else {
398            query += " AND formActionOrigin IS :form_submit"
399        }
400        let db = self.store.lock_db()?;
401        let mut stmt = db.prepare_cached(&query)?;
402        for login in stmt
403            .query_and_then(args, EncryptedLogin::from_row)?
404            .collect::<Result<Vec<EncryptedLogin>>>()?
405        {
406            let this_enc_fields = login.decrypt_fields(self.encdec.as_ref())?;
407            if enc_fields.username == this_enc_fields.username {
408                return Ok(Some(login));
409            }
410        }
411        Ok(None)
412    }
413}
414
415impl SyncEngine for LoginsSyncEngine {
416    fn collection_name(&self) -> std::borrow::Cow<'static, str> {
417        "passwords".into()
418    }
419
420    fn stage_incoming(
421        &self,
422        mut inbound: Vec<IncomingBso>,
423        _telem: &mut telemetry::Engine,
424    ) -> anyhow::Result<()> {
425        // We don't have cross-item dependencies like bookmarks does, so we can
426        // just apply now instead of "staging"
427        self.staged.borrow_mut().append(&mut inbound);
428        Ok(())
429    }
430
431    fn apply(
432        &self,
433        timestamp: ServerTimestamp,
434        telem: &mut telemetry::Engine,
435    ) -> anyhow::Result<Vec<OutgoingBso>> {
436        let inbound = (*self.staged.borrow_mut()).drain(..).collect();
437        Ok(self.do_apply_incoming(inbound, timestamp, telem)?)
438    }
439
440    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
441        Ok(self.mark_as_synchronized(
442            &ids.iter().map(Guid::as_str).collect::<Vec<_>>(),
443            new_timestamp,
444        )?)
445    }
446
447    fn get_collection_request(
448        &self,
449        server_timestamp: ServerTimestamp,
450    ) -> anyhow::Result<Option<CollectionRequest>> {
451        let db = self.store.lock_db()?;
452        let since = self.get_last_sync(&db)?.unwrap_or_default();
453        Ok(if since == server_timestamp {
454            None
455        } else {
456            Some(
457                CollectionRequest::new("passwords".into())
458                    .full()
459                    .newer_than(since),
460            )
461        })
462    }
463
464    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
465        let db = self.store.lock_db()?;
466        let global = db.get_meta(schema::GLOBAL_SYNCID_META_KEY)?;
467        let coll = db.get_meta(schema::COLLECTION_SYNCID_META_KEY)?;
468        Ok(if let (Some(global), Some(coll)) = (global, coll) {
469            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
470        } else {
471            EngineSyncAssociation::Disconnected
472        })
473    }
474
475    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
476        self.do_reset(assoc)?;
477        Ok(())
478    }
479}
480
481#[cfg(not(feature = "keydb"))]
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use crate::db::test_utils::insert_login;
486    use crate::encryption::test_utils::TEST_ENCDEC;
487    use crate::login::test_utils::enc_login;
488    use crate::{LoginEntry, LoginFields, LoginMeta, SecureLoginFields};
489    use nss::ensure_initialized;
490    use std::collections::HashMap;
491    use std::sync::Arc;
492
493    // Wrap sync functions for easier testing
494    fn run_fetch_login_data(
495        engine: &mut LoginsSyncEngine,
496        records: Vec<IncomingBso>,
497    ) -> (Vec<SyncLoginData>, telemetry::EngineIncoming) {
498        let mut telem = sync15::telemetry::EngineIncoming::new();
499        (engine.fetch_login_data(records, &mut telem).unwrap(), telem)
500    }
501
502    fn run_fetch_outgoing(store: LoginStore) -> Vec<OutgoingBso> {
503        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
504        engine.fetch_outgoing().unwrap()
505    }
506
507    #[test]
508    fn test_fetch_login_data() {
509        ensure_initialized();
510        // Test some common cases with fetch_login data
511        let store = LoginStore::new_in_memory();
512        insert_login(
513            &store.lock_db().unwrap(),
514            "updated_remotely",
515            None,
516            Some("password"),
517        );
518        insert_login(
519            &store.lock_db().unwrap(),
520            "deleted_remotely",
521            None,
522            Some("password"),
523        );
524        insert_login(
525            &store.lock_db().unwrap(),
526            "three_way_merge",
527            Some("new-local-password"),
528            Some("password"),
529        );
530
531        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
532
533        let (res, _) = run_fetch_login_data(
534            &mut engine,
535            vec![
536                IncomingBso::new_test_tombstone(Guid::new("deleted_remotely")),
537                enc_login("added_remotely", "password")
538                    .into_bso(&*TEST_ENCDEC, None)
539                    .unwrap()
540                    .to_test_incoming(),
541                enc_login("updated_remotely", "new-password")
542                    .into_bso(&*TEST_ENCDEC, None)
543                    .unwrap()
544                    .to_test_incoming(),
545                enc_login("three_way_merge", "new-remote-password")
546                    .into_bso(&*TEST_ENCDEC, None)
547                    .unwrap()
548                    .to_test_incoming(),
549            ],
550        );
551        // For simpler testing, extract/decrypt passwords and put them in a hash map
552        #[derive(Debug, PartialEq)]
553        struct SyncPasswords {
554            local: Option<String>,
555            mirror: Option<String>,
556            inbound: Option<String>,
557        }
558        let extracted_passwords: HashMap<String, SyncPasswords> = res
559            .into_iter()
560            .map(|sync_login_data| {
561                let mut guids_seen = HashSet::new();
562                let passwords = SyncPasswords {
563                    local: sync_login_data.local.map(|local_login| {
564                        guids_seen.insert(local_login.guid_str().to_string());
565                        let LocalLogin::Alive { login, .. } = local_login else {
566                            unreachable!("this test is not expecting a tombstone");
567                        };
568                        login.decrypt_fields(&*TEST_ENCDEC).unwrap().password
569                    }),
570                    mirror: sync_login_data.mirror.map(|mirror_login| {
571                        guids_seen.insert(mirror_login.login.meta.id.clone());
572                        mirror_login
573                            .login
574                            .decrypt_fields(&*TEST_ENCDEC)
575                            .unwrap()
576                            .password
577                    }),
578                    inbound: sync_login_data.inbound.map(|incoming| {
579                        guids_seen.insert(incoming.login.meta.id.clone());
580                        incoming
581                            .login
582                            .decrypt_fields(&*TEST_ENCDEC)
583                            .unwrap()
584                            .password
585                    }),
586                };
587                (guids_seen.into_iter().next().unwrap(), passwords)
588            })
589            .collect();
590
591        assert_eq!(extracted_passwords.len(), 4);
592        assert_eq!(
593            extracted_passwords.get("added_remotely").unwrap(),
594            &SyncPasswords {
595                local: None,
596                mirror: None,
597                inbound: Some("password".into()),
598            }
599        );
600        assert_eq!(
601            extracted_passwords.get("updated_remotely").unwrap(),
602            &SyncPasswords {
603                local: None,
604                mirror: Some("password".into()),
605                inbound: Some("new-password".into()),
606            }
607        );
608        assert_eq!(
609            extracted_passwords.get("deleted_remotely").unwrap(),
610            &SyncPasswords {
611                local: None,
612                mirror: Some("password".into()),
613                inbound: None,
614            }
615        );
616        assert_eq!(
617            extracted_passwords.get("three_way_merge").unwrap(),
618            &SyncPasswords {
619                local: Some("new-local-password".into()),
620                mirror: Some("password".into()),
621                inbound: Some("new-remote-password".into()),
622            }
623        );
624    }
625
626    #[test]
627    fn test_sync_local_delete() {
628        ensure_initialized();
629        let store = LoginStore::new_in_memory();
630        insert_login(
631            &store.lock_db().unwrap(),
632            "local-deleted",
633            Some("password"),
634            None,
635        );
636        store.lock_db().unwrap().delete("local-deleted").unwrap();
637        let changeset = run_fetch_outgoing(store);
638        let changes: HashMap<String, serde_json::Value> = changeset
639            .into_iter()
640            .map(|b| {
641                (
642                    b.envelope.id.to_string(),
643                    serde_json::from_str(&b.payload).unwrap(),
644                )
645            })
646            .collect();
647        assert_eq!(changes.len(), 1);
648        assert!(changes["local-deleted"].get("deleted").is_some());
649
650        // hmmm. In theory, we do not need to sync a local-only deletion
651    }
652
653    #[test]
654    fn test_sync_local_readd() {
655        ensure_initialized();
656        let store = LoginStore::new_in_memory();
657        insert_login(
658            &store.lock_db().unwrap(),
659            "local-readded",
660            Some("password"),
661            None,
662        );
663        store.lock_db().unwrap().delete("local-readded").unwrap();
664        insert_login(
665            &store.lock_db().unwrap(),
666            "local-readded",
667            Some("password"),
668            None,
669        );
670        let changeset = run_fetch_outgoing(store);
671        let changes: HashMap<String, serde_json::Value> = changeset
672            .into_iter()
673            .map(|b| {
674                (
675                    b.envelope.id.to_string(),
676                    serde_json::from_str(&b.payload).unwrap(),
677                )
678            })
679            .collect();
680        assert_eq!(changes.len(), 1);
681        assert_eq!(
682            changes["local-readded"].get("password").unwrap(),
683            "password"
684        );
685    }
686
687    #[test]
688    fn test_sync_local_readd_of_remote_deletion() {
689        ensure_initialized();
690        let other_store = LoginStore::new_in_memory();
691        let mut engine = LoginsSyncEngine::new(Arc::new(other_store)).unwrap();
692        let (_res, _telem) = run_fetch_login_data(
693            &mut engine,
694            vec![IncomingBso::new_test_tombstone(Guid::new("remote-readded"))],
695        );
696
697        let store = LoginStore::new_in_memory();
698        insert_login(
699            &store.lock_db().unwrap(),
700            "remote-readded",
701            Some("password"),
702            None,
703        );
704        let changeset = run_fetch_outgoing(store);
705        let changes: HashMap<String, serde_json::Value> = changeset
706            .into_iter()
707            .map(|b| {
708                (
709                    b.envelope.id.to_string(),
710                    serde_json::from_str(&b.payload).unwrap(),
711                )
712            })
713            .collect();
714        assert_eq!(changes.len(), 1);
715        assert_eq!(
716            changes["remote-readded"].get("password").unwrap(),
717            "password"
718        );
719    }
720
721    #[test]
722    fn test_sync_local_readd_redelete_of_remote_login() {
723        ensure_initialized();
724        let other_store = LoginStore::new_in_memory();
725        let mut engine = LoginsSyncEngine::new(Arc::new(other_store)).unwrap();
726        let (_res, _telem) = run_fetch_login_data(
727            &mut engine,
728            vec![IncomingBso::from_test_content(serde_json::json!({
729                "id": "remote-readded-redeleted",
730                "formSubmitURL": "https://www.example.com/submit",
731                "hostname": "https://www.example.com",
732                "username": "test",
733                "password": "test",
734            }))],
735        );
736
737        let store = LoginStore::new_in_memory();
738        store
739            .lock_db()
740            .unwrap()
741            .delete("remote-readded-redeleted")
742            .unwrap();
743        insert_login(
744            &store.lock_db().unwrap(),
745            "remote-readded-redeleted",
746            Some("password"),
747            None,
748        );
749        store
750            .lock_db()
751            .unwrap()
752            .delete("remote-readded-redeleted")
753            .unwrap();
754        let changeset = run_fetch_outgoing(store);
755        let changes: HashMap<String, serde_json::Value> = changeset
756            .into_iter()
757            .map(|b| {
758                (
759                    b.envelope.id.to_string(),
760                    serde_json::from_str(&b.payload).unwrap(),
761                )
762            })
763            .collect();
764        assert_eq!(changes.len(), 1);
765        assert!(changes["remote-readded-redeleted"].get("deleted").is_some());
766    }
767
768    #[test]
769    fn test_fetch_outgoing() {
770        ensure_initialized();
771        let store = LoginStore::new_in_memory();
772        insert_login(
773            &store.lock_db().unwrap(),
774            "changed",
775            Some("new-password"),
776            Some("password"),
777        );
778        insert_login(
779            &store.lock_db().unwrap(),
780            "unchanged",
781            None,
782            Some("password"),
783        );
784        insert_login(&store.lock_db().unwrap(), "added", Some("password"), None);
785        insert_login(&store.lock_db().unwrap(), "deleted", None, Some("password"));
786        store.lock_db().unwrap().delete("deleted").unwrap();
787
788        let changeset = run_fetch_outgoing(store);
789        let changes: HashMap<String, serde_json::Value> = changeset
790            .into_iter()
791            .map(|b| {
792                (
793                    b.envelope.id.to_string(),
794                    serde_json::from_str(&b.payload).unwrap(),
795                )
796            })
797            .collect();
798        assert_eq!(changes.len(), 3);
799        assert_eq!(changes["added"].get("password").unwrap(), "password");
800        assert_eq!(changes["changed"].get("password").unwrap(), "new-password");
801        assert!(changes["deleted"].get("deleted").is_some());
802        assert!(changes["added"].get("deleted").is_none());
803        assert!(changes["changed"].get("deleted").is_none());
804    }
805
806    #[test]
807    fn test_bad_record() {
808        ensure_initialized();
809        let store = LoginStore::new_in_memory();
810        let test_ids = ["dummy_000001", "dummy_000002", "dummy_000003"];
811        for id in test_ids {
812            insert_login(
813                &store.lock_db().unwrap(),
814                id,
815                Some("password"),
816                Some("password"),
817            );
818        }
819        let mut engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
820        engine
821            .mark_as_synchronized(&test_ids, ServerTimestamp::from_millis(100))
822            .unwrap();
823        let (res, telem) = run_fetch_login_data(
824            &mut engine,
825            vec![
826                IncomingBso::new_test_tombstone(Guid::new("dummy_000001")),
827                // invalid
828                IncomingBso::from_test_content(serde_json::json!({
829                    "id": "dummy_000002",
830                    "garbage": "data",
831                    "etc": "not a login"
832                })),
833                // valid
834                IncomingBso::from_test_content(serde_json::json!({
835                    "id": "dummy_000003",
836                    "formSubmitURL": "https://www.example.com/submit",
837                    "hostname": "https://www.example.com",
838                    "username": "test",
839                    "password": "test",
840                })),
841            ],
842        );
843        assert_eq!(telem.get_failed(), 1);
844        assert_eq!(res.len(), 2);
845        assert_eq!(res[0].guid, "dummy_000001");
846        assert_eq!(res[1].guid, "dummy_000003");
847        assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
848    }
849
850    fn make_enc_login(
851        username: &str,
852        password: &str,
853        fao: Option<String>,
854        realm: Option<String>,
855    ) -> EncryptedLogin {
856        ensure_initialized();
857        let id = Guid::random().to_string();
858        let sec_fields = SecureLoginFields {
859            username: username.into(),
860            password: password.into(),
861        }
862        .encrypt(&*TEST_ENCDEC, &id)
863        .unwrap();
864        EncryptedLogin {
865            meta: LoginMeta {
866                id,
867                ..Default::default()
868            },
869            fields: LoginFields {
870                form_action_origin: fao,
871                http_realm: realm,
872                origin: "http://not-relevant-here.com".into(),
873                ..Default::default()
874            },
875            sec_fields,
876        }
877    }
878
879    #[test]
880    fn find_dupe_login() {
881        ensure_initialized();
882        let store = LoginStore::new_in_memory();
883
884        let to_add = LoginEntry {
885            form_action_origin: Some("https://www.example.com".into()),
886            origin: "http://not-relevant-here.com".into(),
887            username: "test".into(),
888            password: "test".into(),
889            ..Default::default()
890        };
891        let first_id = store.add(to_add).expect("should insert first").id;
892
893        let to_add = LoginEntry {
894            form_action_origin: Some("https://www.example1.com".into()),
895            origin: "http://not-relevant-here.com".into(),
896            username: "test1".into(),
897            password: "test1".into(),
898            ..Default::default()
899        };
900        let second_id = store.add(to_add).expect("should insert second").id;
901
902        let to_add = LoginEntry {
903            http_realm: Some("http://some-realm.com".into()),
904            origin: "http://not-relevant-here.com".into(),
905            username: "test1".into(),
906            password: "test1".into(),
907            ..Default::default()
908        };
909        let no_form_origin_id = store.add(to_add).expect("should insert second").id;
910
911        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
912
913        let to_find = make_enc_login("test", "test", Some("https://www.example.com".into()), None);
914        assert_eq!(
915            engine
916                .find_dupe_login(&to_find)
917                .expect("should work")
918                .expect("should be Some()")
919                .meta
920                .id,
921            first_id
922        );
923
924        let to_find = make_enc_login(
925            "test",
926            "test",
927            Some("https://something-else.com".into()),
928            None,
929        );
930        assert!(engine
931            .find_dupe_login(&to_find)
932            .expect("should work")
933            .is_none());
934
935        let to_find = make_enc_login(
936            "test1",
937            "test1",
938            Some("https://www.example1.com".into()),
939            None,
940        );
941        assert_eq!(
942            engine
943                .find_dupe_login(&to_find)
944                .expect("should work")
945                .expect("should be Some()")
946                .meta
947                .id,
948            second_id
949        );
950
951        let to_find = make_enc_login(
952            "other",
953            "other",
954            Some("https://www.example1.com".into()),
955            None,
956        );
957        assert!(engine
958            .find_dupe_login(&to_find)
959            .expect("should work")
960            .is_none());
961
962        // no form origin.
963        let to_find = make_enc_login("test1", "test1", None, Some("http://some-realm.com".into()));
964        assert_eq!(
965            engine
966                .find_dupe_login(&to_find)
967                .expect("should work")
968                .expect("should be Some()")
969                .meta
970                .id,
971            no_form_origin_id
972        );
973    }
974
975    #[test]
976    fn test_roundtrip_unknown() {
977        ensure_initialized();
978        // A couple of helpers
979        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
980            let bso = IncomingBso::from_test_content(payload);
981            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
982            engine.stage_incoming(vec![bso], &mut telem).unwrap();
983            engine
984                .apply(ServerTimestamp::from_millis(0), &mut telem)
985                .unwrap();
986        }
987
988        fn get_outgoing_payload(engine: &LoginsSyncEngine) -> serde_json::Value {
989            // Edit it so it's considered outgoing.
990            engine
991                .store
992                .update(
993                    "dummy_000001",
994                    LoginEntry {
995                        origin: "https://www.example2.com".into(),
996                        http_realm: Some("https://www.example2.com".into()),
997                        username: "test".into(),
998                        password: "test".into(),
999                        ..Default::default()
1000                    },
1001                )
1002                .unwrap();
1003            let changeset = engine.fetch_outgoing().unwrap();
1004            assert_eq!(changeset.len(), 1);
1005            serde_json::from_str::<serde_json::Value>(&changeset[0].payload).unwrap()
1006        }
1007
1008        // The test itself...
1009        let store = LoginStore::new_in_memory();
1010        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
1011
1012        apply_incoming_payload(
1013            &engine,
1014            serde_json::json!({
1015                "id": "dummy_000001",
1016                "formSubmitURL": "https://www.example.com/submit",
1017                "hostname": "https://www.example.com",
1018                "username": "test",
1019                "password": "test",
1020                "unknown1": "?",
1021                "unknown2": {"sub": "object"},
1022            }),
1023        );
1024
1025        let payload = get_outgoing_payload(&engine);
1026
1027        // The outgoing payload for our item should have the unknown fields.
1028        assert_eq!(payload.get("unknown1").unwrap().as_str().unwrap(), "?");
1029        assert_eq!(
1030            payload.get("unknown2").unwrap(),
1031            &serde_json::json!({"sub": "object"})
1032        );
1033
1034        // test mirror updates - record is already in our mirror, but now it's
1035        // incoming with different unknown fields.
1036        apply_incoming_payload(
1037            &engine,
1038            serde_json::json!({
1039                "id": "dummy_000001",
1040                "formSubmitURL": "https://www.example.com/submit",
1041                "hostname": "https://www.example.com",
1042                "username": "test",
1043                "password": "test",
1044                "unknown2": 99,
1045                "unknown3": {"something": "else"},
1046            }),
1047        );
1048        let payload = get_outgoing_payload(&engine);
1049        // old unknown values were replaced.
1050        assert!(payload.get("unknown1").is_none());
1051        assert_eq!(payload.get("unknown2").unwrap().as_u64().unwrap(), 99);
1052        assert_eq!(
1053            payload
1054                .get("unknown3")
1055                .unwrap()
1056                .as_object()
1057                .unwrap()
1058                .get("something")
1059                .unwrap()
1060                .as_str()
1061                .unwrap(),
1062            "else"
1063        );
1064    }
1065
1066    fn count(engine: &LoginsSyncEngine, table_name: &str) -> u32 {
1067        ensure_initialized();
1068        let sql = format!("SELECT COUNT(*) FROM {table_name}");
1069        engine
1070            .store
1071            .lock_db()
1072            // TODO: get rid of this unwrap
1073            .unwrap()
1074            .try_query_one(&sql, [], false)
1075            .unwrap()
1076            .unwrap()
1077    }
1078
1079    fn do_test_incoming_with_local_unmirrored_tombstone(local_newer: bool) {
1080        ensure_initialized();
1081        fn apply_incoming_payload(engine: &LoginsSyncEngine, payload: serde_json::Value) {
1082            let bso = IncomingBso::from_test_content(payload);
1083            let mut telem = sync15::telemetry::Engine::new(engine.collection_name());
1084            engine.stage_incoming(vec![bso], &mut telem).unwrap();
1085            engine
1086                .apply(ServerTimestamp::from_millis(0), &mut telem)
1087                .unwrap();
1088        }
1089
1090        // The test itself...
1091        let (local_timestamp, remote_timestamp) = if local_newer { (123, 0) } else { (0, 123) };
1092
1093        let store = LoginStore::new_in_memory();
1094        let engine = LoginsSyncEngine::new(Arc::new(store)).unwrap();
1095
1096        // apply an incoming record - will be in the mirror.
1097        apply_incoming_payload(
1098            &engine,
1099            serde_json::json!({
1100                "id": "dummy_000001",
1101                "formSubmitURL": "https://www.example.com/submit",
1102                "hostname": "https://www.example.com",
1103                "username": "test",
1104                "password": "test",
1105                "timePasswordChanged": local_timestamp,
1106                "unknown1": "?",
1107                "unknown2": {"sub": "object"},
1108            }),
1109        );
1110
1111        // Reset the engine - this wipes the mirror.
1112        engine.reset(&EngineSyncAssociation::Disconnected).unwrap();
1113        // But the local record does still exist.
1114        assert!(engine
1115            .store
1116            .get("dummy_000001")
1117            .expect("should work")
1118            .is_some());
1119
1120        // Delete the local record.
1121        engine.store.delete("dummy_000001").unwrap();
1122        assert!(engine
1123            .store
1124            .get("dummy_000001")
1125            .expect("should work")
1126            .is_none());
1127
1128        // double-check our test preconditions - should now have 1 in LoginsL and 0 in LoginsM
1129        assert_eq!(count(&engine, "LoginsL"), 1);
1130        assert_eq!(count(&engine, "LoginsM"), 0);
1131
1132        // Now we assume we've been reconnected to sync and have an incoming change for the record.
1133        apply_incoming_payload(
1134            &engine,
1135            serde_json::json!({
1136                "id": "dummy_000001",
1137                "formSubmitURL": "https://www.example.com/submit",
1138                "hostname": "https://www.example.com",
1139                "username": "test",
1140                "password": "test2",
1141                "timePasswordChanged": remote_timestamp,
1142                "unknown1": "?",
1143                "unknown2": {"sub": "object"},
1144            }),
1145        );
1146
1147        // Desktop semantics here are that a local tombstone is treated as though it doesn't exist at all.
1148        // ie, the remote record should be taken whether it is newer or older than the tombstone.
1149        assert!(engine
1150            .store
1151            .get("dummy_000001")
1152            .expect("should work")
1153            .is_some());
1154        // and there should never be an outgoing record.
1155        // XXX - but there is! But this is exceedingly rare, we
1156        // should fix it :)
1157        // assert_eq!(engine.fetch_outgoing().unwrap().len(), 0);
1158
1159        // should now be no records in loginsL and 1 in loginsM
1160        assert_eq!(count(&engine, "LoginsL"), 0);
1161        assert_eq!(count(&engine, "LoginsM"), 1);
1162    }
1163
1164    #[test]
1165    fn test_incoming_non_mirror_tombstone_local_newer() {
1166        do_test_incoming_with_local_unmirrored_tombstone(true);
1167    }
1168
1169    #[test]
1170    fn test_incoming_non_mirror_tombstone_local_older() {
1171        do_test_incoming_with_local_unmirrored_tombstone(false);
1172    }
1173}