autofill/sync/address/
outgoing.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*/
5
6use crate::db::models::address::InternalAddress;
7use crate::db::schema::ADDRESS_COMMON_COLS;
8use crate::error::*;
9use crate::sync::{address::AddressPayload, common::*};
10use crate::sync::{OutgoingBso, ProcessOutgoingRecordImpl};
11use rusqlite::{Row, Transaction};
12use sync_guid::Guid as SyncGuid;
13
14const DATA_TABLE_NAME: &str = "addresses_data";
15const MIRROR_TABLE_NAME: &str = "addresses_mirror";
16const STAGING_TABLE_NAME: &str = "addresses_sync_outgoing_staging";
17
18pub(super) struct OutgoingAddressesImpl {}
19
20impl ProcessOutgoingRecordImpl for OutgoingAddressesImpl {
21    type Record = InternalAddress;
22
23    /// Gets the local records that have unsynced changes or don't have corresponding mirror
24    /// records and upserts them to the mirror table
25    fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>> {
26        // We left join the mirror table since we'll need to know if
27        // there were any unknown fields from the server we need to roundtrip
28        let data_sql = format!(
29            "SELECT
30                l.{common_cols},
31                m.payload,
32                l.sync_change_counter
33            FROM addresses_data l
34            LEFT JOIN addresses_mirror m
35            ON l.guid = m.guid
36            WHERE sync_change_counter > 0
37                OR l.guid NOT IN (
38                    SELECT m.guid
39                    FROM addresses_mirror m
40                )",
41            common_cols = ADDRESS_COMMON_COLS,
42        );
43        let record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)> = &|row| {
44            let mut record = InternalAddress::from_row(row)?.into_payload()?;
45            // If the server had unknown fields we fetch it and add it to the record
46            // we'll be uploading
47            if let Some(s) = row.get::<_, Option<String>>("payload")? {
48                let mirror_payload: AddressPayload = serde_json::from_str(&s)?;
49                record.entry.unknown_fields = mirror_payload.entry.unknown_fields;
50            };
51
52            Ok((
53                OutgoingBso::from_content_with_id(record)?,
54                row.get::<_, i64>("sync_change_counter")?,
55            ))
56        };
57
58        let tombstones_sql = "SELECT guid FROM addresses_tombstones";
59
60        // save outgoing records to the mirror table.
61        // unlike credit-cards, which stores records encrypted as they are
62        // on the server to protect the sensitive fields, we just store the
63        // plaintext payload.
64        let staging_records = common_get_outgoing_staging_records(
65            tx,
66            &data_sql,
67            tombstones_sql,
68            record_from_data_row,
69        )?
70        .into_iter()
71        .map(|(bso, change_counter)| (bso.envelope.id, bso.payload, change_counter))
72        .collect::<Vec<_>>();
73        common_save_outgoing_records(tx, STAGING_TABLE_NAME, staging_records)?;
74
75        // return outgoing changes
76        Ok(
77            common_get_outgoing_records(tx, &data_sql, tombstones_sql, record_from_data_row)?
78                .into_iter()
79                .map(|(bso, _change_counter)| bso)
80                .collect::<Vec<OutgoingBso>>(),
81        )
82    }
83
84    fn finish_synced_items(
85        &self,
86        tx: &Transaction<'_>,
87        records_synced: Vec<SyncGuid>,
88    ) -> anyhow::Result<()> {
89        common_finish_synced_items(
90            tx,
91            DATA_TABLE_NAME,
92            MIRROR_TABLE_NAME,
93            STAGING_TABLE_NAME,
94            records_synced,
95        )?;
96        Ok(())
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103    use crate::db::{addresses::add_internal_address, models::address::InternalAddress};
104    use crate::sync::{common::tests::*, test::new_syncable_mem_db, UnknownFields};
105    use rusqlite::Connection;
106    use serde_json::{json, Map, Value};
107    use types::Timestamp;
108
109    fn test_insert_mirror_record(
110        conn: &Connection,
111        address: InternalAddress,
112        unknown_fields: UnknownFields,
113    ) {
114        // This should probably be in the sync module, but it's used here.
115        let guid = address.guid.clone();
116        let mut addr_payload = address.into_payload().unwrap();
117        addr_payload.entry.unknown_fields = unknown_fields;
118        let payload = serde_json::to_string(&addr_payload).expect("is json");
119        conn.execute(
120            "INSERT OR IGNORE INTO addresses_mirror (guid, payload)
121             VALUES (:guid, :payload)",
122            rusqlite::named_params! {
123                ":guid": guid,
124                ":payload": &payload,
125            },
126        )
127        .expect("should insert");
128    }
129
130    lazy_static::lazy_static! {
131        static ref TEST_JSON_RECORDS: Map<String, Value> = {
132            // NOTE: the JSON here is the same as stored on the sync server -
133            // the superfluous `entry` is unfortunate but from desktop.
134            let val = json! {{
135                "C" : {
136                    "id": expand_test_guid('C'),
137                    "entry": {
138                        "name": "jane doe",
139                        "streetAddress": "3050 South La Brea Ave",
140                        "addressLevel2": "Los Angeles, CA",
141                        "country": "United States",
142                        "timeCreated": 0,
143                        "timeLastUsed": 0,
144                        "timeLastModified": 0,
145                        "timesUsed": 0,
146                        "version": 1,
147                    }
148                },
149                "D" : {
150                    "id": expand_test_guid('D'),
151                    "entry": {
152                        "name": "john doe",
153                        "street-address": "85 Pike St",
154                        "address-level2": "Seattle, WA",
155                        "country": "United States",
156                        "timeCreated": 0,
157                        "timeLastUsed": 0,
158                        "timeLastModified": 0,
159                        "timesUsed": 0,
160                        "version": 1,
161                        // Fields we don't understand from the server
162                        "foo": "bar",
163                        "baz": "qux",
164                    }
165                }
166            }};
167            val.as_object().expect("literal is an object").clone()
168        };
169    }
170
171    fn test_json_record(guid_prefix: char) -> Value {
172        TEST_JSON_RECORDS
173            .get(&guid_prefix.to_string())
174            .expect("should exist")
175            .clone()
176    }
177
178    fn test_record(guid_prefix: char) -> InternalAddress {
179        let json = test_json_record(guid_prefix);
180        let payload = serde_json::from_value(json).unwrap();
181        InternalAddress::from_payload(payload).expect("should be valid")
182    }
183
184    #[test]
185    fn test_outgoing_never_synced() {
186        let mut db = new_syncable_mem_db();
187        let tx = db.transaction().expect("should get tx");
188        let ao = OutgoingAddressesImpl {};
189        let test_record = test_record('C');
190
191        // create data record
192        assert!(add_internal_address(&tx, &test_record).is_ok());
193        do_test_outgoing_never_synced(
194            &tx,
195            &ao,
196            &test_record.guid,
197            DATA_TABLE_NAME,
198            MIRROR_TABLE_NAME,
199            STAGING_TABLE_NAME,
200        );
201    }
202
203    #[test]
204    fn test_outgoing_tombstone() {
205        let mut db = new_syncable_mem_db();
206        let tx = db.transaction().expect("should get tx");
207        let ao = OutgoingAddressesImpl {};
208        let test_record = test_record('C');
209
210        // create tombstone record
211        assert!(tx
212            .execute(
213                "INSERT INTO addresses_tombstones (
214                    guid,
215                    time_deleted
216                ) VALUES (
217                    :guid,
218                    :time_deleted
219                )",
220                rusqlite::named_params! {
221                    ":guid": test_record.guid,
222                    ":time_deleted": Timestamp::now(),
223                },
224            )
225            .is_ok());
226        do_test_outgoing_tombstone(
227            &tx,
228            &ao,
229            &test_record.guid,
230            DATA_TABLE_NAME,
231            MIRROR_TABLE_NAME,
232            STAGING_TABLE_NAME,
233        );
234    }
235
236    #[test]
237    fn test_outgoing_synced_with_local_change() {
238        let mut db = new_syncable_mem_db();
239        let tx = db.transaction().expect("should get tx");
240        let ao = OutgoingAddressesImpl {};
241
242        // create synced record with non-zero sync_change_counter
243        let mut test_record = test_record('C');
244        let initial_change_counter_val = 2;
245        test_record.metadata.sync_change_counter = initial_change_counter_val;
246        assert!(add_internal_address(&tx, &test_record).is_ok());
247        test_insert_mirror_record(&tx, test_record.clone(), Default::default());
248        exists_with_counter_value_in_table(
249            &tx,
250            DATA_TABLE_NAME,
251            &test_record.guid,
252            initial_change_counter_val,
253        );
254
255        do_test_outgoing_synced_with_local_change(
256            &tx,
257            &ao,
258            &test_record.guid,
259            DATA_TABLE_NAME,
260            MIRROR_TABLE_NAME,
261            STAGING_TABLE_NAME,
262        );
263    }
264
265    #[test]
266    fn test_outgoing_synced_with_no_change() {
267        let mut db = new_syncable_mem_db();
268        let tx = db.transaction().expect("should get tx");
269        let ao = OutgoingAddressesImpl {};
270
271        // create synced record with no changes (sync_change_counter = 0)
272        let test_record = test_record('C');
273        assert!(add_internal_address(&tx, &test_record).is_ok());
274        test_insert_mirror_record(&tx, test_record.clone(), Default::default());
275
276        do_test_outgoing_synced_with_no_change(
277            &tx,
278            &ao,
279            &test_record.guid,
280            DATA_TABLE_NAME,
281            STAGING_TABLE_NAME,
282        );
283    }
284
285    #[test]
286    fn test_outgoing_roundtrip_unknown() {
287        let mut db = new_syncable_mem_db();
288        let tx = db.transaction().expect("should get tx");
289        let ao = OutgoingAddressesImpl {};
290
291        // create synced record with non-zero sync_change_counter
292        let mut test_record = test_record('D');
293        let initial_change_counter_val = 2;
294        test_record.metadata.sync_change_counter = initial_change_counter_val;
295        assert!(add_internal_address(&tx, &test_record).is_ok());
296        // put "unknown_fields" into the mirror payload to imitate the server
297        let unknown_fields: UnknownFields =
298            serde_json::from_value(json! {{ "foo": "bar", "baz": "qux"}}).unwrap();
299        test_insert_mirror_record(&tx, test_record.clone(), unknown_fields);
300        exists_with_counter_value_in_table(
301            &tx,
302            DATA_TABLE_NAME,
303            &test_record.guid,
304            initial_change_counter_val,
305        );
306
307        let outgoing = &ao.fetch_outgoing_records(&tx).unwrap();
308        // Ensure we have our unknown values for the roundtrip
309        let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
310        let entry = bso_payload.get("entry").unwrap();
311        assert_eq!(entry.get("foo").unwrap(), "bar");
312        assert_eq!(entry.get("baz").unwrap(), "qux");
313        do_test_outgoing_synced_with_local_change(
314            &tx,
315            &ao,
316            &test_record.guid,
317            DATA_TABLE_NAME,
318            MIRROR_TABLE_NAME,
319            STAGING_TABLE_NAME,
320        );
321    }
322
323    #[test]
324    fn test_outgoing_with_migrated_fields() {
325        let mut db = new_syncable_mem_db();
326        let tx = db.transaction().expect("should get tx");
327        let ao = OutgoingAddressesImpl {};
328        let mut test_record = test_record('C');
329        let initial_change_counter_val = 2;
330        test_record.metadata.sync_change_counter = initial_change_counter_val;
331        assert!(add_internal_address(&tx, &test_record).is_ok());
332
333        let outgoing = ao.fetch_outgoing_records(&tx).unwrap();
334        // *-name fields are: {"given-name": "john", "family-name": "doe"}
335        let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
336        let entry = bso_payload.get("entry").unwrap();
337        assert_eq!(entry.get("name").unwrap(), "jane doe");
338        assert_eq!(entry.get("given-name").unwrap(), "jane");
339        assert_eq!(entry.get("additional-name").unwrap(), "");
340        assert_eq!(entry.get("family-name").unwrap(), "doe");
341    }
342}