autofill/sync/
common.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*/
5
6// This contains sync functionality we've managed to share between addresses
7// and credit-cards. It's not "generic" in the way that traits are, it's
8// literally just code we can share.
9// For example, this code doesn't abstract storage away - it knows we are
10// using a sql database and knows that the schemas for addresses and cards are
11// very similar.
12
13use crate::error::*;
14use error_support::{error, info, trace};
15use interrupt_support::Interruptee;
16use rusqlite::{types::ToSql, Connection, Row};
17use sync15::bso::OutgoingBso;
18use sync15::ServerTimestamp;
19use sync_guid::Guid;
20
21/// Stages incoming records (excluding incoming tombstones) in preparation for
22/// applying incoming changes for the syncing autofill records.
23/// The incoming record is a String, because for credit-cards it is the encrypted
24/// version of a JSON record.
25pub(super) fn common_stage_incoming_records(
26    conn: &Connection,
27    table_name: &str,
28    incoming: Vec<(Guid, String, ServerTimestamp)>,
29    signal: &dyn Interruptee,
30) -> Result<()> {
31    info!(
32        "staging {} incoming records into {}",
33        incoming.len(),
34        table_name
35    );
36    let chunk_size = 2;
37    let vals: Vec<(Guid, String)> = incoming
38        .into_iter()
39        .map(|(guid, data, _)| (guid, data))
40        .collect();
41    sql_support::each_sized_chunk(
42        &vals,
43        sql_support::default_max_variable_number() / chunk_size,
44        |chunk, _| -> Result<()> {
45            signal.err_if_interrupted()?;
46            let sql = format!(
47                "INSERT OR REPLACE INTO temp.{table_name} (guid, payload)
48                 VALUES {vals}",
49                table_name = table_name,
50                vals = sql_support::repeat_multi_values(chunk.len(), 2)
51            );
52            let mut params = Vec::with_capacity(chunk.len() * chunk_size);
53            for (guid, json) in chunk {
54                params.push(guid as &dyn ToSql);
55                params.push(json);
56            }
57            conn.execute(&sql, rusqlite::params_from_iter(params))?;
58            Ok(())
59        },
60    )?;
61    trace!("staged");
62    Ok(())
63}
64
65pub(super) fn common_remove_record(conn: &Connection, table_name: &str, guid: &Guid) -> Result<()> {
66    conn.execute(
67        &format!(
68            "DELETE FROM {}
69            WHERE guid = :guid",
70            table_name
71        ),
72        rusqlite::named_params! {
73            ":guid": guid,
74        },
75    )?;
76    Ok(())
77}
78
79// This optionally takes a mirror table name to update if
80// there is a corresponding record with the same guid we'd like to update
81pub(super) fn common_change_guid(
82    conn: &Connection,
83    table_name: &str,
84    mirror_table_name: &str,
85    old_guid: &Guid,
86    new_guid: &Guid,
87) -> Result<()> {
88    assert_ne!(old_guid, new_guid);
89    let nrows = conn.execute(
90        &format!(
91            "UPDATE {}
92            SET guid = :new_guid,
93            sync_change_counter = sync_change_counter + 1
94            WHERE guid = :old_guid",
95            table_name
96        ),
97        rusqlite::named_params! {
98            ":old_guid": old_guid,
99            ":new_guid": new_guid,
100        },
101    )?;
102    // something's gone badly wrong if this didn't affect exactly 1 row.
103    assert_eq!(nrows, 1);
104
105    // If there is also a corresponding mirror row (e.g forking situations), update aswell
106    conn.execute(
107        &format!(
108            "UPDATE {}
109                SET guid = :new_guid
110                WHERE guid = :old_guid",
111            mirror_table_name
112        ),
113        rusqlite::named_params! {
114            ":old_guid": old_guid,
115            ":new_guid": new_guid,
116        },
117    )?;
118
119    Ok(())
120}
121
122/// Records in the incoming staging table need to end up in the mirror.
123pub(super) fn common_mirror_staged_records(
124    conn: &Connection,
125    staging_table_name: &str,
126    mirror_table_name: &str,
127) -> Result<()> {
128    conn.execute(
129        &format!(
130            "INSERT OR REPLACE INTO {} (guid, payload)
131             SELECT guid, payload FROM temp.{}",
132            mirror_table_name, staging_table_name,
133        ),
134        [],
135    )?;
136    Ok(())
137}
138
139// A macro for our record merge implementation.
140// We allow all "common" fields from the sub-types to be getters on the
141// InsertableItem type.
142// Macros don't have fine-grained visibility and is visible to the entire
143// crate, so we give it a very specific name.
144#[macro_export]
145macro_rules! sync_merge_field_check {
146    ($field_name:ident,
147    $incoming:ident,
148    $local:ident,
149    $mirror:ident,
150    $merged_record:ident
151    ) => {
152        let incoming_field = &$incoming.$field_name;
153        let local_field = &$local.$field_name;
154        let is_local_same;
155        let is_incoming_same;
156
157        match &$mirror {
158            Some(m) => {
159                let mirror_field = &m.$field_name;
160                is_local_same = mirror_field == local_field;
161                is_incoming_same = mirror_field == incoming_field;
162            }
163            None => {
164                is_local_same = true;
165                is_incoming_same = local_field == incoming_field;
166            }
167        };
168
169        let should_use_local = is_incoming_same || local_field == incoming_field;
170
171        if is_local_same && !is_incoming_same {
172            $merged_record.$field_name = incoming_field.clone();
173        } else if should_use_local {
174            $merged_record.$field_name = local_field.clone();
175        } else {
176            // There are conflicting differences, so we "fork" the record - we
177            // will end up giving the local one a new guid and save the remote
178            // one with its incoming ID.
179            return MergeResult::Forked {
180                forked: get_forked_record($local.clone()),
181            };
182        }
183    };
184}
185
186pub(super) fn common_get_outgoing_staging_records(
187    conn: &Connection,
188    data_sql: &str,
189    tombstones_sql: &str,
190    payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
191) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
192    let outgoing_records =
193        common_get_outgoing_records(conn, data_sql, tombstones_sql, payload_from_data_row)?;
194    Ok(outgoing_records.into_iter().collect::<Vec<_>>())
195}
196
197fn get_outgoing_records(
198    conn: &Connection,
199    sql: &str,
200    record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
201) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
202    conn.prepare(sql)?
203        .query_map([], |row| Ok(record_from_data_row(row)))?
204        .map(|r| {
205            r.unwrap().map_err(|e| {
206                error!(
207                    "Failed to retrieve a record from a row with the following error: {}",
208                    e
209                );
210                e.into()
211            })
212        })
213        .collect::<std::result::Result<Vec<_>, _>>()
214}
215
216pub(super) fn common_get_outgoing_records(
217    conn: &Connection,
218    data_sql: &str,
219    tombstone_sql: &str,
220    record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
221) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
222    let mut payload = get_outgoing_records(conn, data_sql, record_from_data_row)?;
223
224    payload.append(&mut get_outgoing_records(conn, tombstone_sql, &|row| {
225        Ok((
226            OutgoingBso::new_tombstone(Guid::from_string(row.get("guid")?).into()),
227            0,
228        ))
229    })?);
230
231    Ok(payload)
232}
233
234pub(super) fn common_save_outgoing_records(
235    conn: &Connection,
236    table_name: &str,
237    staging_records: Vec<(Guid, String, i64)>,
238) -> anyhow::Result<()> {
239    let chunk_size = 3;
240    sql_support::each_sized_chunk(
241        &staging_records,
242        sql_support::default_max_variable_number() / chunk_size,
243        |chunk, _| -> anyhow::Result<()> {
244            let sql = format!(
245                "INSERT OR REPLACE INTO temp.{table_name} (guid, payload, sync_change_counter)
246                VALUES {staging_records}",
247                table_name = table_name,
248                staging_records = sql_support::repeat_multi_values(chunk.len(), chunk_size)
249            );
250            let mut params = Vec::with_capacity(chunk.len() * chunk_size);
251            for (guid, json, sync_change_counter) in chunk {
252                params.push(guid as &dyn ToSql);
253                params.push(json);
254                params.push(sync_change_counter);
255            }
256            conn.execute(&sql, rusqlite::params_from_iter(params))?;
257            Ok(())
258        },
259    )?;
260    Ok(())
261}
262
263pub(super) fn common_finish_synced_items(
264    conn: &Connection,
265    data_table_name: &str,
266    mirror_table_name: &str,
267    outgoing_staging_table_name: &str,
268    records_synced: Vec<Guid>,
269) -> anyhow::Result<()> {
270    // Update the local change counter for uploaded items.
271    reset_sync_change_counter(
272        conn,
273        data_table_name,
274        outgoing_staging_table_name,
275        records_synced,
276    )?;
277    // Copy from the outgoing staging table into the mirror.
278    let sql = format!(
279        "INSERT OR REPLACE INTO {mirror_table_name}
280            SELECT guid, payload FROM temp.{outgoing_staging_table_name}",
281        mirror_table_name = mirror_table_name,
282        outgoing_staging_table_name = outgoing_staging_table_name,
283    );
284    conn.execute(&sql, [])?;
285    Ok(())
286}
287
288// When we started syncing, we saved `sync_change_counter` in the staging
289// table for every record. Now that we've uploaded the server, we need to
290// decrement that value from the current value - anything that ends up with
291// a non-zero value must have changed since while we were uploading so remains
292// dirty. This does that decrement.
293fn reset_sync_change_counter(
294    conn: &Connection,
295    data_table_name: &str,
296    outgoing_table_name: &str,
297    records_synced: Vec<Guid>,
298) -> anyhow::Result<()> {
299    sql_support::each_chunk(&records_synced, |chunk, _| -> anyhow::Result<()> {
300        conn.execute(
301            &format!(
302                // We're making two checks that in practice should be redundant. First we're limiting the
303                // number of records that we're pulling from the outgoing staging table to one. Lastly we're
304                // ensuring that the updated local records are also in `records_synced` which should be the
305                // case since the sync will fail entirely if the server rejects individual records.
306                "UPDATE {data_table_name} AS data
307                SET sync_change_counter = sync_change_counter -
308                    (
309                        SELECT outgoing.sync_change_counter
310                        FROM temp.{outgoing_table_name} AS outgoing
311                        WHERE outgoing.guid = data.guid LIMIT 1
312                    )
313                WHERE guid IN ({values})",
314                data_table_name = data_table_name,
315                outgoing_table_name = outgoing_table_name,
316                values = sql_support::repeat_sql_values(chunk.len())
317            ),
318            rusqlite::params_from_iter(chunk),
319        )?;
320        Ok(())
321    })?;
322
323    Ok(())
324}
325
326// And common helpers for tests (although no actual tests!)
327#[cfg(test)]
328pub(super) mod tests {
329    use super::super::*;
330    use interrupt_support::NeverInterrupts;
331    use serde_json::{json, Value};
332
333    pub(in crate::sync) fn array_to_incoming(vals: Vec<Value>) -> Vec<IncomingBso> {
334        vals.into_iter()
335            .map(IncomingBso::from_test_content)
336            .collect()
337    }
338
339    pub(in crate::sync) fn expand_test_guid(c: char) -> String {
340        c.to_string().repeat(12)
341    }
342
343    pub(in crate::sync) fn test_json_tombstone(guid_prefix: char) -> Value {
344        let t = json! {
345            {
346                "id": expand_test_guid(guid_prefix),
347                "deleted": true,
348            }
349        };
350        t
351    }
352
353    // Incoming record is identical to a local record.
354    pub(in crate::sync) fn do_test_incoming_same<T: SyncRecord + std::fmt::Debug + Clone>(
355        ri: &dyn ProcessIncomingRecordImpl<Record = T>,
356        tx: &Transaction<'_>,
357        record: T,
358        bso: IncomingBso,
359    ) {
360        ri.insert_local_record(tx, record)
361            .expect("insert should work");
362        ri.stage_incoming(tx, vec![bso], &NeverInterrupts)
363            .expect("stage should work");
364        let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
365        assert_eq!(states.len(), 1, "1 records == 1 state!");
366        let action =
367            crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
368        // Even though the records are identical, we still merged the metadata
369        // so treat this as an Update.
370        assert!(matches!(action, crate::sync::IncomingAction::Update { .. }));
371    }
372
373    // Incoming tombstone for an existing local record.
374    pub(in crate::sync) fn do_test_incoming_tombstone<T: SyncRecord + std::fmt::Debug + Clone>(
375        ri: &dyn ProcessIncomingRecordImpl<Record = T>,
376        tx: &Transaction<'_>,
377        record: T,
378    ) {
379        let guid = record.id().clone();
380        ri.insert_local_record(tx, record)
381            .expect("insert should work");
382        ri.stage_incoming(
383            tx,
384            vec![IncomingBso::new_test_tombstone(guid)],
385            &NeverInterrupts,
386        )
387        .expect("stage should work");
388        let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
389        assert_eq!(states.len(), 1, "1 records == 1 state!");
390        let action =
391            crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
392        // Even though the records are identical, we still merged the metadata
393        // so treat this as an Update.
394        assert!(matches!(
395            action,
396            crate::sync::IncomingAction::DeleteLocalRecord { .. }
397        ));
398    }
399
400    // local record was scrubbed of encrypted data -- we should update it using server data
401    pub(in crate::sync) fn do_test_scrubbed_local_data<T: SyncRecord + std::fmt::Debug + Clone>(
402        ri: &dyn ProcessIncomingRecordImpl<Record = T>,
403        tx: &Transaction<'_>,
404        record: T,
405        bso: IncomingBso,
406    ) {
407        ri.insert_local_record(tx, record)
408            .expect("insert should work");
409        ri.stage_incoming(tx, vec![bso], &NeverInterrupts)
410            .expect("stage should work");
411        let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
412        assert_eq!(states.len(), 1, "1 records == 1 state!");
413        assert!(
414            matches!(states[0].local, LocalRecordInfo::Scrubbed { .. }),
415            "state should be LocalRecordInfo::Scubbed but it is: {:?}",
416            states[0].local
417        );
418
419        let action =
420            crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
421        assert!(matches!(action, crate::sync::IncomingAction::Update { .. }));
422    }
423
424    // "Staged" records are moved to the mirror by finish_incoming().
425    pub(in crate::sync) fn do_test_staged_to_mirror<T: SyncRecord + std::fmt::Debug + Clone>(
426        ri: &dyn ProcessIncomingRecordImpl<Record = T>,
427        tx: &Transaction<'_>,
428        record: T,
429        bso1: IncomingBso,
430        mirror_table_name: &str,
431    ) {
432        let guid1 = record.id().clone();
433        let guid2 = Guid::random();
434        let bso2 = IncomingBso::new_test_tombstone(guid2.clone());
435
436        ri.stage_incoming(tx, vec![bso1, bso2], &NeverInterrupts)
437            .expect("stage should work");
438
439        ri.finish_incoming(tx).expect("finish should work");
440
441        let sql = format!(
442            "SELECT COUNT(*) FROM {} where guid = '{}' OR guid = '{}'",
443            mirror_table_name, guid1, guid2
444        );
445        let num_rows = tx
446            .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
447            .unwrap();
448        assert_eq!(num_rows, 2);
449    }
450
451    fn exists_in_table(tx: &Transaction<'_>, table_name: &str, guid: &Guid) {
452        let sql = format!(
453            "SELECT COUNT(*) FROM {} where guid = '{}'",
454            table_name, guid
455        );
456        let num_rows = tx
457            .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
458            .unwrap();
459        assert_eq!(num_rows, 1);
460    }
461
462    pub(in crate::sync) fn exists_with_counter_value_in_table(
463        tx: &Transaction<'_>,
464        table_name: &str,
465        guid: &Guid,
466        expected_counter_value: i64,
467    ) {
468        let sql = format!(
469            "SELECT COUNT(*)
470            FROM {table_name}
471            WHERE sync_change_counter = {expected_counter_value}
472                AND guid = :guid",
473            table_name = table_name,
474            expected_counter_value = expected_counter_value,
475        );
476
477        let num_rows = tx
478            .query_row(&sql, [guid], |row| Ok(row.get::<_, u32>(0).unwrap()))
479            .unwrap();
480        assert_eq!(num_rows, 1);
481    }
482
483    pub(in crate::sync) fn do_test_outgoing_never_synced<
484        T: SyncRecord + std::fmt::Debug + Clone,
485    >(
486        tx: &Transaction<'_>,
487        ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
488        guid: &Guid,
489        data_table_name: &str,
490        mirror_table_name: &str,
491        staging_table_name: &str,
492    ) {
493        // call fetch outgoing records
494        assert!(ro.fetch_outgoing_records(tx).is_ok());
495
496        // check that the record is in the outgoing table
497        exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
498
499        // call push synced items
500        assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
501
502        // check that the sync change counter
503        exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
504
505        // check that the outgoing record is in the mirror
506        exists_in_table(tx, mirror_table_name, guid);
507    }
508
509    pub(in crate::sync) fn do_test_outgoing_tombstone<T: SyncRecord + std::fmt::Debug + Clone>(
510        tx: &Transaction<'_>,
511        ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
512        guid: &Guid,
513        data_table_name: &str,
514        mirror_table_name: &str,
515        staging_table_name: &str,
516    ) {
517        // call fetch outgoing records
518        assert!(ro.fetch_outgoing_records(tx).is_ok());
519
520        // check that the record is in the outgoing table
521        exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
522
523        // call push synced items
524        assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
525
526        // check that the record wasn't copied to the data table
527        let sql = format!(
528            "SELECT COUNT(*) FROM {} where guid = '{}'",
529            data_table_name, guid
530        );
531        let num_rows = tx
532            .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
533            .unwrap();
534        assert_eq!(num_rows, 0);
535
536        // check that the outgoing record is in the mirror
537        exists_in_table(tx, mirror_table_name, guid);
538    }
539
540    pub(in crate::sync) fn do_test_outgoing_synced_with_local_change<
541        T: SyncRecord + std::fmt::Debug + Clone,
542    >(
543        tx: &Transaction<'_>,
544        ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
545        guid: &Guid,
546        data_table_name: &str,
547        mirror_table_name: &str,
548        staging_table_name: &str,
549    ) {
550        // call fetch outgoing records
551        assert!(ro.fetch_outgoing_records(tx).is_ok());
552
553        // check that the record is in the outgoing table
554        exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
555
556        // call push synced items
557        assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
558
559        // check that the sync change counter
560        exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
561
562        // check that the outgoing record is in the mirror
563        exists_in_table(tx, mirror_table_name, guid);
564    }
565
566    pub(in crate::sync) fn do_test_outgoing_synced_with_no_change<
567        T: SyncRecord + std::fmt::Debug + Clone,
568    >(
569        tx: &Transaction<'_>,
570        ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
571        guid: &Guid,
572        data_table_name: &str,
573        staging_table_name: &str,
574    ) {
575        // call fetch outgoing records
576        assert!(ro.fetch_outgoing_records(tx).is_ok());
577
578        // check that the record is not in the outgoing table
579        let sql = format!(
580            "SELECT COUNT(*) FROM {} where guid = '{}'",
581            &format!("temp.{}", staging_table_name),
582            guid
583        );
584        let num_rows = tx
585            .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
586            .unwrap();
587        assert_eq!(num_rows, 0);
588
589        // call push synced items
590        assert!(ro.finish_synced_items(tx, Vec::<Guid>::new()).is_ok());
591
592        // check that the sync change counter is unchanged
593        exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
594    }
595}