autofill/sync/
mod.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*/
5
6pub mod address;
7mod common;
8pub mod credit_card;
9pub mod engine;
10
11pub(crate) use crate::db::models::Metadata;
12use crate::error::Result;
13use error_support::{trace, warn};
14use interrupt_support::Interruptee;
15use rusqlite::Transaction;
16use sync15::bso::{IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, OutgoingBso};
17use sync15::ServerTimestamp;
18use sync_guid::Guid;
19use types::Timestamp;
20
21// This type is used as a snazzy way to capture all unknown fields from the payload
22// upon deserialization without having to work with a concrete type
23type UnknownFields = serde_json::Map<String, serde_json::Value>;
24
25// The fact that credit-card numbers are encrypted makes things a little tricky
26// for sync in various ways - and one non-obvious way is that the tables that
27// store sync payloads can't just store them directly as they are not encrypted
28// in that form.
29// ie, in the database, an address record's "payload" column looks like:
30// > '{"entry":{"address-level1":"VIC", "street-address":"2/25 Somewhere St","timeCreated":1497567116554, "version":1},"id":"29ac67adae7d"}'
31// or a tombstone: '{"deleted":true,"id":"6544992973e6"}'
32// > (Note a number of fields have been removed from 'entry' for clarity)
33// and in the database a credit-card's "payload" looks like:
34// > 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2R0NNIn0..<snip>-<snip>.<snip lots more>'
35// > while a tombstone here remains encrypted but has the 'deleted' entry after decryption.
36// (Note also that the address entry, and the decrypted credit-card json both have an "id" in
37// the JSON, but we ignore that when deserializing and will stop persisting that soon)
38
39// Some traits that help us abstract away much of the sync functionality.
40
41// A trait that abstracts the *storage* implementation of the specific record
42// types, and must be implemented by the concrete record owners.
43// Note that it doesn't assume a SQL database or anything concrete about the
44// storage, although objects implementing this trait will live only long enough
45// to perform the sync "incoming" steps - ie, a transaction is likely to live
46// exactly as long as this object.
47// XXX - *sob* - although each method has a `&Transaction` param, which in
48// theory could be avoided if the concrete impls could keep the ref (ie, if
49// it was held behind `self`), but markh failed to make this work due to
50// lifetime woes.
51pub trait ProcessIncomingRecordImpl {
52    type Record;
53
54    fn stage_incoming(
55        &self,
56        tx: &Transaction<'_>,
57        incoming: Vec<IncomingBso>,
58        signal: &dyn Interruptee,
59    ) -> Result<()>;
60
61    /// Finish the incoming phase. This will typically caused staged records
62    // to be written to the mirror.
63    fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()>;
64
65    fn fetch_incoming_states(
66        &self,
67        tx: &Transaction<'_>,
68    ) -> Result<Vec<IncomingState<Self::Record>>>;
69
70    /// Returns a local record that has the same values as the given incoming record (with the exception
71    /// of the `guid` values which should differ) that will be used as a local duplicate record for
72    /// syncing.
73    fn get_local_dupe(
74        &self,
75        tx: &Transaction<'_>,
76        incoming: &Self::Record,
77    ) -> Result<Option<Self::Record>>;
78
79    fn update_local_record(
80        &self,
81        tx: &Transaction<'_>,
82        record: Self::Record,
83        was_merged: bool,
84    ) -> Result<()>;
85
86    fn insert_local_record(&self, tx: &Transaction<'_>, record: Self::Record) -> Result<()>;
87
88    fn change_record_guid(
89        &self,
90        tx: &Transaction<'_>,
91        old_guid: &Guid,
92        new_guid: &Guid,
93    ) -> Result<()>;
94
95    fn remove_record(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
96
97    fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
98}
99
100pub trait ProcessOutgoingRecordImpl {
101    type Record;
102
103    fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>>;
104
105    fn finish_synced_items(
106        &self,
107        tx: &Transaction<'_>,
108        records_synced: Vec<Guid>,
109    ) -> anyhow::Result<()>;
110}
111
112// A trait that abstracts the functionality in the record itself.
113pub trait SyncRecord {
114    fn record_name() -> &'static str; // "addresses" or similar, for logging/debuging.
115    fn id(&self) -> &Guid;
116    fn metadata(&self) -> &Metadata;
117    fn metadata_mut(&mut self) -> &mut Metadata;
118    // Merge or fork multiple copies of the same record. The resulting record
119    // might have the same guid as the inputs, meaning it was truly merged, or
120    // a different guid, in which case it was forked due to conflicting changes.
121    fn merge(incoming: &Self, local: &Self, mirror: &Option<Self>) -> MergeResult<Self>
122    where
123        Self: Sized;
124}
125
126impl Metadata {
127    /// Merge the metadata from `other`, and possibly `mirror`, into `self`
128    /// (which must already have valid metadata).
129    /// Note that mirror being None is an edge-case and typically means first
130    /// sync since a "reset" (eg, disconnecting and reconnecting.
131    pub fn merge(&mut self, other: &Metadata, mirror: Option<&Metadata>) {
132        match mirror {
133            Some(m) => {
134                fn get_latest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
135                    std::cmp::max(t1, std::cmp::max(t2, t3))
136                }
137                fn get_earliest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
138                    std::cmp::min(t1, std::cmp::min(t2, t3))
139                }
140                self.time_created =
141                    get_earliest_time(self.time_created, other.time_created, m.time_created);
142                self.time_last_used =
143                    get_latest_time(self.time_last_used, other.time_last_used, m.time_last_used);
144                self.time_last_modified = get_latest_time(
145                    self.time_last_modified,
146                    other.time_last_modified,
147                    m.time_last_modified,
148                );
149
150                self.times_used = m.times_used
151                    + std::cmp::max(other.times_used - m.times_used, 0)
152                    + std::cmp::max(self.times_used - m.times_used, 0);
153            }
154            None => {
155                fn get_latest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
156                    std::cmp::max(t1, t2)
157                }
158                fn get_earliest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
159                    std::cmp::min(t1, t2)
160                }
161                self.time_created = get_earliest_time(self.time_created, other.time_created);
162                self.time_last_used = get_latest_time(self.time_last_used, other.time_last_used);
163                self.time_last_modified =
164                    get_latest_time(self.time_last_modified, other.time_last_modified);
165                // No mirror is an edge-case that almost certainly means the
166                // client was disconnected and this is the first sync after
167                // reconnection. So we can't really do a simple sum() of the
168                // times_used values as if the disconnection was recent, it will
169                // be double the expected value.
170                // So we just take the largest.
171                self.times_used = std::cmp::max(other.times_used, self.times_used);
172            }
173        }
174    }
175}
176
177// A local record can be in any of these 5 states.
178#[derive(Debug)]
179enum LocalRecordInfo<T> {
180    Unmodified { record: T },
181    Modified { record: T },
182    // encrypted data was scrubbed from the local record and needs to be resynced from the server
183    Scrubbed { record: T },
184    Tombstone { guid: Guid },
185    Missing,
186}
187
188// An enum for the return value from our "merge" function, which might either
189// update the record, or might fork it.
190#[derive(Debug)]
191pub enum MergeResult<T> {
192    Merged { merged: T },
193    Forked { forked: T },
194}
195
196// This ties the 3 possible records together and is what we expect the
197// implementations to put together for us.
198#[derive(Debug)]
199pub struct IncomingState<T> {
200    incoming: IncomingContent<T>,
201    local: LocalRecordInfo<T>,
202    // We don't have an enum for the mirror - an Option<> is fine because
203    // although we do store tombstones there, we ignore them when reconciling
204    // (ie, we ignore tombstones in the mirror)
205    // don't store tombstones there.
206    mirror: Option<T>,
207}
208
209/// The distinct incoming sync actions to be performed for incoming records.
210#[derive(Debug, PartialEq)]
211enum IncomingAction<T> {
212    // Remove the local record with this GUID.
213    DeleteLocalRecord { guid: Guid },
214    // Insert a new record.
215    Insert { record: T },
216    // Update an existing record. If `was_merged` was true, then the updated
217    // record isn't identical to the incoming one, so needs to be flagged as
218    // dirty.
219    Update { record: T, was_merged: bool },
220    // We forked a record because we couldn't merge it. `forked` will have
221    // a new guid, while `incoming` is the unmodified version of the incoming
222    // record which we need to apply.
223    Fork { forked: T, incoming: T },
224    // An existing record with old_guid needs to be replaced with this record.
225    UpdateLocalGuid { old_guid: Guid, record: T },
226    // There's a remote tombstone, but our copy of the record is dirty. The
227    // remote tombstone should be replaced with this.
228    ResurrectRemoteTombstone { record: T },
229    // There's a local tombstone - it should be removed and replaced with this.
230    ResurrectLocalTombstone { record: T },
231    // Nothing to do.
232    DoNothing,
233}
234
235/// Convert a IncomingState to an IncomingAction - this is where the "policy"
236/// lives for when we resurrect, or merge etc.
237fn plan_incoming<T: std::fmt::Debug + SyncRecord>(
238    rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
239    tx: &Transaction<'_>,
240    staged_info: IncomingState<T>,
241) -> Result<IncomingAction<T>> {
242    trace!("plan_incoming: {:?}", staged_info);
243    let IncomingState {
244        incoming,
245        local,
246        mirror,
247    } = staged_info;
248
249    let state = match incoming.kind {
250        IncomingKind::Tombstone => {
251            match local {
252                LocalRecordInfo::Unmodified { .. } | LocalRecordInfo::Scrubbed { .. } => {
253                    // Note: On desktop, when there's a local record for an incoming tombstone, a local tombstone
254                    // would created. But we don't actually need to create a local tombstone here. If we did it would
255                    // immediately be deleted after being uploaded to the server.
256                    IncomingAction::DeleteLocalRecord {
257                        guid: incoming.envelope.id,
258                    }
259                }
260                LocalRecordInfo::Modified { record } => {
261                    // Incoming tombstone with local changes should cause us to "resurrect" the local.
262                    // At a minimum, the implementation will need to ensure the record is marked as
263                    // dirty so it's uploaded, overwriting the server's tombstone.
264                    IncomingAction::ResurrectRemoteTombstone { record }
265                }
266                LocalRecordInfo::Tombstone {
267                    guid: tombstone_guid,
268                } => {
269                    assert_eq!(incoming.envelope.id, tombstone_guid);
270                    IncomingAction::DoNothing
271                }
272                LocalRecordInfo::Missing => IncomingAction::DoNothing,
273            }
274        }
275        IncomingKind::Content(mut incoming_record) => {
276            match local {
277                LocalRecordInfo::Unmodified {
278                    record: local_record,
279                }
280                | LocalRecordInfo::Scrubbed {
281                    record: local_record,
282                } => {
283                    // The local record was either unmodified, or scrubbed of its encrypted data.
284                    // Either way we want to:
285                    //   - Merge the metadata
286                    //   - Update the local record using data from the server
287                    //   - Don't flag the local item as dirty.  We don't want to reupload for just
288                    //     metadata changes.
289                    let metadata = incoming_record.metadata_mut();
290                    metadata.merge(
291                        local_record.metadata(),
292                        mirror.as_ref().map(|m| m.metadata()),
293                    );
294                    // a micro-optimization here would be to `::DoNothing` if
295                    // the metadata was actually identical and the local data wasn't scrubbed, but
296                    // this seems like an edge-case on an edge-case?
297                    IncomingAction::Update {
298                        record: incoming_record,
299                        was_merged: false,
300                    }
301                }
302                LocalRecordInfo::Modified {
303                    record: local_record,
304                } => {
305                    match SyncRecord::merge(&incoming_record, &local_record, &mirror) {
306                        MergeResult::Merged { merged } => {
307                            // The record we save locally has material differences
308                            // from the incoming one, so we are going to need to
309                            // reupload it.
310                            IncomingAction::Update {
311                                record: merged,
312                                was_merged: true,
313                            }
314                        }
315                        MergeResult::Forked { forked } => IncomingAction::Fork {
316                            forked,
317                            incoming: incoming_record,
318                        },
319                    }
320                }
321                LocalRecordInfo::Tombstone { .. } => IncomingAction::ResurrectLocalTombstone {
322                    record: incoming_record,
323                },
324                LocalRecordInfo::Missing => {
325                    match rec_impl.get_local_dupe(tx, &incoming_record)? {
326                        None => IncomingAction::Insert {
327                            record: incoming_record,
328                        },
329                        Some(local_dupe) => {
330                            // local record is missing but we found a dupe - so
331                            // the dupe must have a different guid (or we wouldn't
332                            // consider the local record missing!)
333                            assert_ne!(incoming_record.id(), local_dupe.id());
334                            // The existing item is identical except for the metadata, so
335                            // we still merge that metadata.
336                            let metadata = incoming_record.metadata_mut();
337                            metadata.merge(
338                                local_dupe.metadata(),
339                                mirror.as_ref().map(|m| m.metadata()),
340                            );
341                            IncomingAction::UpdateLocalGuid {
342                                old_guid: local_dupe.id().clone(),
343                                record: incoming_record,
344                            }
345                        }
346                    }
347                }
348            }
349        }
350        IncomingKind::Malformed => {
351            warn!("skipping incoming record: {}", incoming.envelope.id);
352            IncomingAction::DoNothing
353        }
354    };
355    trace!("plan_incoming resulted in {:?}", state);
356    Ok(state)
357}
358
359/// Apply the incoming action
360fn apply_incoming_action<T: std::fmt::Debug + SyncRecord>(
361    rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
362    tx: &Transaction<'_>,
363    action: IncomingAction<T>,
364) -> Result<()> {
365    trace!("applying action: {:?}", action);
366    match action {
367        IncomingAction::Update { record, was_merged } => {
368            rec_impl.update_local_record(tx, record, was_merged)?;
369        }
370        IncomingAction::Fork { forked, incoming } => {
371            // `forked` exists in the DB with the same guid as `incoming`, so fix that.
372            // change_record_guid will also update the mirror (if it exists) to prevent
373            // the server from overriding the forked mirror record (and losing any unknown fields)
374            rec_impl.change_record_guid(tx, incoming.id(), forked.id())?;
375            // `incoming` has the correct new guid.
376            rec_impl.insert_local_record(tx, incoming)?;
377        }
378        IncomingAction::Insert { record } => {
379            rec_impl.insert_local_record(tx, record)?;
380        }
381        IncomingAction::UpdateLocalGuid { old_guid, record } => {
382            // expect record to have the new guid.
383            assert_ne!(old_guid, *record.id());
384            rec_impl.change_record_guid(tx, &old_guid, record.id())?;
385            // the item is identical with the item with the new guid
386            // *except* for the metadata - so we still need to update, but
387            // don't need to treat the item as dirty.
388            rec_impl.update_local_record(tx, record, false)?;
389        }
390        IncomingAction::ResurrectLocalTombstone { record } => {
391            rec_impl.remove_tombstone(tx, record.id())?;
392            rec_impl.insert_local_record(tx, record)?;
393        }
394        IncomingAction::ResurrectRemoteTombstone { record } => {
395            // This is just "ensure local record dirty", which
396            // update_local_record conveniently does.
397            rec_impl.update_local_record(tx, record, true)?;
398        }
399        IncomingAction::DeleteLocalRecord { guid } => {
400            rec_impl.remove_record(tx, &guid)?;
401        }
402        IncomingAction::DoNothing => {}
403    }
404    Ok(())
405}
406
407// Helpers for tests
408#[cfg(test)]
409mod tests; // pull in our integration tests
410
411// and a module for unit test utilities.
412#[cfg(test)]
413pub mod test {
414    use crate::db::{schema::create_empty_sync_temp_tables, test::new_mem_db, AutofillDb};
415
416    pub fn new_syncable_mem_db() -> AutofillDb {
417        error_support::init_for_tests();
418        let db = new_mem_db();
419        create_empty_sync_temp_tables(&db).expect("should work");
420        db
421    }
422}