autofill/sync/address/
incoming.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*/
5
6use super::AddressPayload;
7use crate::db::addresses::{add_internal_address, update_internal_address};
8use crate::db::models::address::InternalAddress;
9use crate::db::schema::ADDRESS_COMMON_COLS;
10use crate::error::*;
11use crate::sync::address::name_utils::{join_name_parts, split_name, NameParts};
12use crate::sync::common::*;
13use crate::sync::{
14    IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, IncomingState, LocalRecordInfo,
15    ProcessIncomingRecordImpl, ServerTimestamp, SyncRecord,
16};
17use interrupt_support::Interruptee;
18use rusqlite::{named_params, Transaction};
19use sql_support::ConnExt;
20use sync_guid::Guid as SyncGuid;
21
22// When an incoming record lacks the `name` field but includes any `*_name` fields, we can
23// assume that the record originates from an older device.
24
25// If the record comes from an older device, we compare the `*_name` fields with those in
26// the corresponding local record. If the values of the `*_name`
27// fields differ, it indicates that the incoming record has updated these fields. If the
28// values are the same, we replace the name field of the incoming record with the local
29// name field to ensure the completeness of the name field when reconciling.
30//
31// Here is an example:
32// Assume the local record is {"name": "Mr. John Doe"}. If an updated incoming record
33// has {"given_name": "John", "family_name": "Doe"}, we will NOT join the `*_name` fields
34// and replace the local `name` field with "John Doe". This allows us to retain the complete
35// name - "Mr. John Doe".
36// However, if the updated incoming record has {"given_name": "Jane", "family_name": "Poe"},
37// we will rebuild it and replace the local `name` field with "Jane Poe".
38fn update_name(payload_content: &mut IncomingContent<AddressPayload>, local_name: String) {
39    // Check if the kind is IncomingKind::Content and get a mutable reference to internal_address
40    let internal_address =
41        if let IncomingKind::Content(internal_address) = &mut payload_content.kind {
42            internal_address
43        } else {
44            return;
45        };
46
47    let entry = &mut internal_address.entry;
48
49    // Return early if the name is not empty or `*-name`` parts are empty
50    if !entry.name.is_empty()
51        || (entry.given_name.is_empty()
52            && entry.additional_name.is_empty()
53            && entry.family_name.is_empty())
54    {
55        return;
56    }
57
58    // Split the local name into its parts
59    let NameParts {
60        given,
61        middle,
62        family,
63    } = split_name(&local_name);
64
65    // Check if the local name matches the entry names
66    let is_local_name_matching =
67        entry.given_name == given && entry.additional_name == middle && entry.family_name == family;
68
69    // Update the name based on whether the local name matches
70    entry.name = if is_local_name_matching {
71        local_name
72    } else {
73        join_name_parts(&NameParts {
74            given: entry.given_name.clone(),
75            middle: entry.additional_name.clone(),
76            family: entry.family_name.clone(),
77        })
78    };
79}
80
81fn create_incoming_bso(id: SyncGuid, raw: String) -> IncomingContent<AddressPayload> {
82    let bso = IncomingBso {
83        envelope: IncomingEnvelope {
84            id,
85            modified: ServerTimestamp::default(),
86            sortindex: None,
87            ttl: None,
88        },
89        payload: raw,
90    };
91    bso.into_content::<AddressPayload>()
92}
93
94fn bso_to_incoming(
95    payload_content: IncomingContent<AddressPayload>,
96) -> Result<IncomingContent<InternalAddress>> {
97    Ok(match payload_content.kind {
98        IncomingKind::Content(content) => IncomingContent {
99            envelope: payload_content.envelope,
100            kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
101        },
102        IncomingKind::Tombstone => IncomingContent {
103            envelope: payload_content.envelope,
104            kind: IncomingKind::Tombstone,
105        },
106        IncomingKind::Malformed => IncomingContent {
107            envelope: payload_content.envelope,
108            kind: IncomingKind::Malformed,
109        },
110    })
111}
112
113// Takes a raw payload, as stored in our database, and returns an InternalAddress
114// or a tombstone. Addresses store the raw payload as cleartext json.
115fn raw_payload_to_incoming(id: SyncGuid, raw: String) -> Result<IncomingContent<InternalAddress>> {
116    let payload_content = create_incoming_bso(id, raw);
117
118    Ok(match payload_content.kind {
119        IncomingKind::Content(content) => IncomingContent {
120            envelope: payload_content.envelope,
121            kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
122        },
123        IncomingKind::Tombstone => IncomingContent {
124            envelope: payload_content.envelope,
125            kind: IncomingKind::Tombstone,
126        },
127        IncomingKind::Malformed => IncomingContent {
128            envelope: payload_content.envelope,
129            kind: IncomingKind::Malformed,
130        },
131    })
132}
133
134pub(super) struct IncomingAddressesImpl {}
135
136impl ProcessIncomingRecordImpl for IncomingAddressesImpl {
137    type Record = InternalAddress;
138
139    /// The first step in the "apply incoming" process - stage the records
140    fn stage_incoming(
141        &self,
142        tx: &Transaction<'_>,
143        incoming: Vec<IncomingBso>,
144        signal: &dyn Interruptee,
145    ) -> Result<()> {
146        let to_stage = incoming
147            .into_iter()
148            // We persist the entire payload as cleartext - which it already is!
149            .map(|bso| (bso.envelope.id, bso.payload, bso.envelope.modified))
150            .collect();
151        common_stage_incoming_records(tx, "addresses_sync_staging", to_stage, signal)
152    }
153
154    fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()> {
155        common_mirror_staged_records(tx, "addresses_sync_staging", "addresses_mirror")
156    }
157
158    /// The second step in the "apply incoming" process for syncing autofill address records.
159    /// Incoming items are retrieved from the temp tables, deserialized, and
160    /// assigned `IncomingState` values.
161    fn fetch_incoming_states(
162        &self,
163        tx: &Transaction<'_>,
164    ) -> Result<Vec<IncomingState<Self::Record>>> {
165        let sql = "
166        SELECT
167            s.guid as guid,
168            l.guid as l_guid,
169            t.guid as t_guid,
170            s.payload as s_payload,
171            m.payload as m_payload,
172            l.name,
173            l.organization,
174            l.street_address,
175            l.address_level3,
176            l.address_level2,
177            l.address_level1,
178            l.postal_code,
179            l.country,
180            l.tel,
181            l.email,
182            l.time_created,
183            l.time_last_used,
184            l.time_last_modified,
185            l.times_used,
186            l.sync_change_counter
187        FROM temp.addresses_sync_staging s
188        LEFT JOIN addresses_mirror m ON s.guid = m.guid
189        LEFT JOIN addresses_data l ON s.guid = l.guid
190        LEFT JOIN addresses_tombstones t ON s.guid = t.guid";
191
192        tx.query_rows_and_then(sql, [], |row| -> Result<IncomingState<Self::Record>> {
193            // the 'guid' and 's_payload' rows must be non-null.
194            let guid: SyncGuid = row.get("guid")?;
195
196            // We update the 'name' field using the update_name function.
197            // We utilize create_incoming_bso and bso_to_incoming functions
198            // instead of payload_to_incoming. This is done to avoid directly passing
199            // row.get("name") to payload_to_incoming, which would result in having to pass
200            // None parameters in a few places.
201            let mut payload_content = create_incoming_bso(guid.clone(), row.get("s_payload")?);
202            update_name(
203                &mut payload_content,
204                row.get("name").unwrap_or("".to_string()),
205            );
206            let incoming = bso_to_incoming(payload_content)?;
207
208            Ok(IncomingState {
209                incoming,
210                local: match row.get_unwrap::<_, Option<String>>("l_guid") {
211                    Some(l_guid) => {
212                        assert_eq!(l_guid, guid);
213                        // local record exists, check the state.
214                        let record = InternalAddress::from_row(row)?;
215                        let has_changes = record.metadata().sync_change_counter != 0;
216                        if has_changes {
217                            LocalRecordInfo::Modified { record }
218                        } else {
219                            LocalRecordInfo::Unmodified { record }
220                        }
221                    }
222                    None => {
223                        // no local record - maybe a tombstone?
224                        match row.get::<_, Option<String>>("t_guid")? {
225                            Some(t_guid) => {
226                                assert_eq!(guid, t_guid);
227                                LocalRecordInfo::Tombstone { guid: guid.clone() }
228                            }
229                            None => LocalRecordInfo::Missing,
230                        }
231                    }
232                },
233                mirror: {
234                    match row.get::<_, Option<String>>("m_payload")? {
235                        Some(m_payload) => {
236                            // a tombstone in the mirror can be treated as though it's missing.
237                            raw_payload_to_incoming(guid, m_payload)?.content()
238                        }
239                        None => None,
240                    }
241                },
242            })
243        })
244    }
245
246    /// Returns a local record that has the same values as the given incoming record (with the exception
247    /// of the `guid` values which should differ) that will be used as a local duplicate record for
248    /// syncing.
249    fn get_local_dupe(
250        &self,
251        tx: &Transaction<'_>,
252        incoming: &Self::Record,
253    ) -> Result<Option<Self::Record>> {
254        let sql = format!("
255            SELECT
256                {common_cols},
257                sync_change_counter
258            FROM addresses_data
259            WHERE
260                -- `guid <> :guid` is a pre-condition for this being called, but...
261                guid <> :guid
262                -- only non-synced records are candidates, which means can't already be in the mirror.
263                AND guid NOT IN (
264                    SELECT guid
265                    FROM addresses_mirror
266                )
267                -- and sql can check the field values.
268                AND name == :name
269                AND organization == :organization
270                AND street_address == :street_address
271                AND address_level3 == :address_level3
272                AND address_level2 == :address_level2
273                AND address_level1 == :address_level1
274                AND postal_code == :postal_code
275                AND country == :country
276                AND tel == :tel
277                AND email == :email", common_cols = ADDRESS_COMMON_COLS);
278
279        let params = named_params! {
280            ":guid": incoming.guid,
281            ":name": incoming.name,
282            ":organization": incoming.organization,
283            ":street_address": incoming.street_address,
284            ":address_level3": incoming.address_level3,
285            ":address_level2": incoming.address_level2,
286            ":address_level1": incoming.address_level1,
287            ":postal_code": incoming.postal_code,
288            ":country": incoming.country,
289            ":tel": incoming.tel,
290            ":email": incoming.email,
291        };
292
293        let result = tx.query_row(&sql, params, |row| {
294            Ok(Self::Record::from_row(row).expect("wtf? '?' doesn't work :("))
295        });
296
297        match result {
298            Ok(r) => Ok(Some(r)),
299            Err(e) => match e {
300                rusqlite::Error::QueryReturnedNoRows => Ok(None),
301                _ => Err(Error::SqlError(e)),
302            },
303        }
304    }
305
306    fn update_local_record(
307        &self,
308        tx: &Transaction<'_>,
309        new_record: Self::Record,
310        flag_as_changed: bool,
311    ) -> Result<()> {
312        update_internal_address(tx, &new_record, flag_as_changed)?;
313        Ok(())
314    }
315
316    fn insert_local_record(&self, tx: &Transaction<'_>, new_record: Self::Record) -> Result<()> {
317        add_internal_address(tx, &new_record)?;
318        Ok(())
319    }
320
321    /// Changes the guid of the local record for the given `old_guid` to the given `new_guid` used
322    /// for the `HasLocalDupe` incoming state, and mark the item as dirty.
323    /// We also update the mirror record if it exists in forking scenarios
324    fn change_record_guid(
325        &self,
326        tx: &Transaction<'_>,
327        old_guid: &SyncGuid,
328        new_guid: &SyncGuid,
329    ) -> Result<()> {
330        common_change_guid(tx, "addresses_data", "addresses_mirror", old_guid, new_guid)
331    }
332
333    fn remove_record(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
334        common_remove_record(tx, "addresses_data", guid)
335    }
336
337    fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
338        common_remove_record(tx, "addresses_tombstones", guid)
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::super::super::test::new_syncable_mem_db;
345    use super::*;
346    use crate::db::addresses::get_address;
347    use crate::sync::common::tests::*;
348
349    use error_support::info;
350    use interrupt_support::NeverInterrupts;
351    use serde_json::{json, Map, Value};
352    use sql_support::ConnExt;
353
354    impl InternalAddress {
355        fn into_test_incoming_bso(self) -> IncomingBso {
356            IncomingBso::from_test_content(self.into_payload().expect("is json"))
357        }
358    }
359
360    lazy_static::lazy_static! {
361        static ref TEST_JSON_RECORDS: Map<String, Value> = {
362            // NOTE: the JSON here is the same as stored on the sync server -
363            // the superfluous `entry` is unfortunate but from desktop.
364            // JSON from the server is kebab-style, EXCEPT the times{X} fields
365            // see PayloadEntry struct
366            let val = json! {{
367                "A" : {
368                    "id": expand_test_guid('A'),
369                    "entry": {
370                        "name": "john doe",
371                        "given-name": "john",
372                        "family-name": "doe",
373                        "street-address": "1300 Broadway",
374                        "address-level2": "New York, NY",
375                        "country": "United States",
376                        "version": 1,
377                    }
378                },
379                "C" : {
380                    "id": expand_test_guid('C'),
381                    "entry": {
382                        "name": "jane doe",
383                        "given-name": "jane",
384                        "family-name": "doe",
385                        "street-address": "3050 South La Brea Ave",
386                        "address-level2": "Los Angeles, CA",
387                        "country": "United States",
388                        "timeCreated": 0,
389                        "timeLastUsed": 0,
390                        "timeLastModified": 0,
391                        "timesUsed": 0,
392                        "version": 1,
393                    }
394                },
395                "D" : {
396                    "id": expand_test_guid('D'),
397                    "entry": {
398                        "name": "test1 test2",
399                        "given-name": "test1",
400                        "family-name": "test2",
401                        "street-address": "85 Pike St",
402                        "address-level2": "Seattle, WA",
403                        "country": "United States",
404                        "foo": "bar",
405                        "baz": "qux",
406                        "version": 1,
407                    }
408                }
409            }};
410            val.as_object().expect("literal is an object").clone()
411        };
412    }
413
414    fn test_json_record(guid_prefix: char) -> Value {
415        TEST_JSON_RECORDS
416            .get(&guid_prefix.to_string())
417            .expect("should exist")
418            .clone()
419    }
420
421    fn test_record(guid_prefix: char) -> InternalAddress {
422        let json = test_json_record(guid_prefix);
423        let address_payload = serde_json::from_value(json).unwrap();
424        InternalAddress::from_payload(address_payload).expect("should be valid")
425    }
426
427    #[test]
428    fn test_stage_incoming() -> Result<()> {
429        error_support::init_for_tests();
430        let mut db = new_syncable_mem_db();
431        struct TestCase {
432            incoming_records: Vec<Value>,
433            mirror_records: Vec<Value>,
434            expected_record_count: usize,
435            expected_tombstone_count: usize,
436        }
437
438        let test_cases = vec![
439            TestCase {
440                incoming_records: vec![test_json_record('A')],
441                mirror_records: vec![],
442                expected_record_count: 1,
443                expected_tombstone_count: 0,
444            },
445            TestCase {
446                incoming_records: vec![test_json_tombstone('A')],
447                mirror_records: vec![],
448                expected_record_count: 0,
449                expected_tombstone_count: 1,
450            },
451            TestCase {
452                incoming_records: vec![
453                    test_json_record('A'),
454                    test_json_record('C'),
455                    test_json_tombstone('B'),
456                ],
457                mirror_records: vec![],
458                expected_record_count: 2,
459                expected_tombstone_count: 1,
460            },
461            // incoming tombstone with existing tombstone in the mirror
462            TestCase {
463                incoming_records: vec![test_json_tombstone('B')],
464                mirror_records: vec![test_json_tombstone('B')],
465                expected_record_count: 0,
466                expected_tombstone_count: 1,
467            },
468        ];
469
470        for tc in test_cases {
471            info!("starting new testcase");
472            let tx = db.transaction()?;
473
474            // Add required items to the mirrors.
475            let mirror_sql = "INSERT OR REPLACE INTO addresses_mirror (guid, payload)
476                              VALUES (:guid, :payload)";
477            for payload in tc.mirror_records {
478                tx.execute(
479                    mirror_sql,
480                    rusqlite::named_params! {
481                        ":guid": payload["id"].as_str().unwrap(),
482                        ":payload": payload.to_string(),
483                    },
484                )
485                .expect("should insert mirror record");
486            }
487
488            let ri = IncomingAddressesImpl {};
489            ri.stage_incoming(
490                &tx,
491                array_to_incoming(tc.incoming_records),
492                &NeverInterrupts,
493            )?;
494
495            let records = tx.conn().query_rows_and_then(
496                "SELECT * FROM temp.addresses_sync_staging;",
497                [],
498                |row| -> Result<IncomingContent<InternalAddress>> {
499                    let guid: SyncGuid = row.get_unwrap("guid");
500                    let payload: String = row.get_unwrap("payload");
501                    raw_payload_to_incoming(guid, payload)
502                },
503            )?;
504
505            let record_count = records
506                .iter()
507                .filter(|p| !matches!(p.kind, IncomingKind::Tombstone))
508                .count();
509            let tombstone_count = records.len() - record_count;
510
511            assert_eq!(record_count, tc.expected_record_count);
512            assert_eq!(tombstone_count, tc.expected_tombstone_count);
513
514            ri.fetch_incoming_states(&tx)?;
515
516            tx.execute("DELETE FROM temp.addresses_sync_staging;", [])?;
517        }
518        Ok(())
519    }
520
521    #[test]
522    fn test_change_record_guid() -> Result<()> {
523        let mut db = new_syncable_mem_db();
524        let tx = db.transaction()?;
525        let ri = IncomingAddressesImpl {};
526
527        ri.insert_local_record(&tx, test_record('C'))?;
528
529        ri.change_record_guid(
530            &tx,
531            &SyncGuid::new(&expand_test_guid('C')),
532            &SyncGuid::new(&expand_test_guid('B')),
533        )?;
534        tx.commit()?;
535        assert!(get_address(&db.writer, &expand_test_guid('C').into()).is_err());
536        assert!(get_address(&db.writer, &expand_test_guid('B').into()).is_ok());
537        Ok(())
538    }
539
540    #[test]
541    fn test_get_incoming() {
542        let mut db = new_syncable_mem_db();
543        let tx = db.transaction().expect("should get tx");
544        let ai = IncomingAddressesImpl {};
545        let record = test_record('C');
546        let bso = record.clone().into_test_incoming_bso();
547        do_test_incoming_same(&ai, &tx, record, bso);
548    }
549
550    #[test]
551    fn test_get_incoming_unknown_fields() {
552        let json = test_json_record('D');
553        let address_payload = serde_json::from_value::<AddressPayload>(json).unwrap();
554        // The incoming payload should've correctly deserialized any unknown_fields into a Map<String,Value>
555        assert_eq!(address_payload.entry.unknown_fields.len(), 2);
556        assert_eq!(
557            address_payload
558                .entry
559                .unknown_fields
560                .get("foo")
561                .unwrap()
562                .as_str()
563                .unwrap(),
564            "bar"
565        );
566    }
567
568    #[test]
569    fn test_incoming_tombstone() {
570        let mut db = new_syncable_mem_db();
571        let tx = db.transaction().expect("should get tx");
572        let ai = IncomingAddressesImpl {};
573        do_test_incoming_tombstone(&ai, &tx, test_record('C'));
574    }
575
576    #[test]
577    fn test_staged_to_mirror() {
578        let mut db = new_syncable_mem_db();
579        let tx = db.transaction().expect("should get tx");
580        let ai = IncomingAddressesImpl {};
581        let record = test_record('C');
582        let bso = record.clone().into_test_incoming_bso();
583        do_test_staged_to_mirror(&ai, &tx, record, bso, "addresses_mirror");
584    }
585}