autofill/sync/mod.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2* License, v. 2.0. If a copy of the MPL was not distributed with this
3* file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*/
5
6pub mod address;
7mod common;
8pub mod credit_card;
9pub mod engine;
10
11pub(crate) use crate::db::models::Metadata;
12use crate::error::Result;
13use error_support::{trace, warn};
14use interrupt_support::Interruptee;
15use rusqlite::Transaction;
16use sync15::bso::{IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, OutgoingBso};
17use sync15::ServerTimestamp;
18use sync_guid::Guid;
19use types::Timestamp;
20
21// This type is used as a snazzy way to capture all unknown fields from the payload
22// upon deserialization without having to work with a concrete type
23type UnknownFields = serde_json::Map<String, serde_json::Value>;
24
25// The fact that credit-card numbers are encrypted makes things a little tricky
26// for sync in various ways - and one non-obvious way is that the tables that
27// store sync payloads can't just store them directly as they are not encrypted
28// in that form.
29// ie, in the database, an address record's "payload" column looks like:
30// > '{"entry":{"address-level1":"VIC", "street-address":"2/25 Somewhere St","timeCreated":1497567116554, "version":1},"id":"29ac67adae7d"}'
31// or a tombstone: '{"deleted":true,"id":"6544992973e6"}'
32// > (Note a number of fields have been removed from 'entry' for clarity)
33// and in the database a credit-card's "payload" looks like:
34// > 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2R0NNIn0..<snip>-<snip>.<snip lots more>'
35// > while a tombstone here remains encrypted but has the 'deleted' entry after decryption.
36// (Note also that the address entry, and the decrypted credit-card json both have an "id" in
37// the JSON, but we ignore that when deserializing and will stop persisting that soon)
38
39// Some traits that help us abstract away much of the sync functionality.
40
41// A trait that abstracts the *storage* implementation of the specific record
42// types, and must be implemented by the concrete record owners.
43// Note that it doesn't assume a SQL database or anything concrete about the
44// storage, although objects implementing this trait will live only long enough
45// to perform the sync "incoming" steps - ie, a transaction is likely to live
46// exactly as long as this object.
47// XXX - *sob* - although each method has a `&Transaction` param, which in
48// theory could be avoided if the concrete impls could keep the ref (ie, if
49// it was held behind `self`), but markh failed to make this work due to
50// lifetime woes.
51pub trait ProcessIncomingRecordImpl {
52 type Record;
53
54 fn stage_incoming(
55 &self,
56 tx: &Transaction<'_>,
57 incoming: Vec<IncomingBso>,
58 signal: &dyn Interruptee,
59 ) -> Result<()>;
60
61 /// Finish the incoming phase. This will typically caused staged records
62 // to be written to the mirror.
63 fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()>;
64
65 fn fetch_incoming_states(
66 &self,
67 tx: &Transaction<'_>,
68 ) -> Result<Vec<IncomingState<Self::Record>>>;
69
70 /// Returns a local record that has the same values as the given incoming record (with the exception
71 /// of the `guid` values which should differ) that will be used as a local duplicate record for
72 /// syncing.
73 fn get_local_dupe(
74 &self,
75 tx: &Transaction<'_>,
76 incoming: &Self::Record,
77 ) -> Result<Option<Self::Record>>;
78
79 fn update_local_record(
80 &self,
81 tx: &Transaction<'_>,
82 record: Self::Record,
83 was_merged: bool,
84 ) -> Result<()>;
85
86 fn insert_local_record(&self, tx: &Transaction<'_>, record: Self::Record) -> Result<()>;
87
88 fn change_record_guid(
89 &self,
90 tx: &Transaction<'_>,
91 old_guid: &Guid,
92 new_guid: &Guid,
93 ) -> Result<()>;
94
95 fn remove_record(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
96
97 fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
98}
99
100pub trait ProcessOutgoingRecordImpl {
101 type Record;
102
103 fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>>;
104
105 fn finish_synced_items(
106 &self,
107 tx: &Transaction<'_>,
108 records_synced: Vec<Guid>,
109 ) -> anyhow::Result<()>;
110}
111
112// A trait that abstracts the functionality in the record itself.
113pub trait SyncRecord {
114 fn record_name() -> &'static str; // "addresses" or similar, for logging/debuging.
115 fn id(&self) -> &Guid;
116 fn metadata(&self) -> &Metadata;
117 fn metadata_mut(&mut self) -> &mut Metadata;
118 // Merge or fork multiple copies of the same record. The resulting record
119 // might have the same guid as the inputs, meaning it was truly merged, or
120 // a different guid, in which case it was forked due to conflicting changes.
121 fn merge(incoming: &Self, local: &Self, mirror: &Option<Self>) -> MergeResult<Self>
122 where
123 Self: Sized;
124}
125
126impl Metadata {
127 /// Merge the metadata from `other`, and possibly `mirror`, into `self`
128 /// (which must already have valid metadata).
129 /// Note that mirror being None is an edge-case and typically means first
130 /// sync since a "reset" (eg, disconnecting and reconnecting.
131 pub fn merge(&mut self, other: &Metadata, mirror: Option<&Metadata>) {
132 match mirror {
133 Some(m) => {
134 fn get_latest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
135 std::cmp::max(t1, std::cmp::max(t2, t3))
136 }
137 fn get_earliest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
138 std::cmp::min(t1, std::cmp::min(t2, t3))
139 }
140 self.time_created =
141 get_earliest_time(self.time_created, other.time_created, m.time_created);
142 self.time_last_used =
143 get_latest_time(self.time_last_used, other.time_last_used, m.time_last_used);
144 self.time_last_modified = get_latest_time(
145 self.time_last_modified,
146 other.time_last_modified,
147 m.time_last_modified,
148 );
149
150 self.times_used = m.times_used
151 + std::cmp::max(other.times_used - m.times_used, 0)
152 + std::cmp::max(self.times_used - m.times_used, 0);
153 }
154 None => {
155 fn get_latest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
156 std::cmp::max(t1, t2)
157 }
158 fn get_earliest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
159 std::cmp::min(t1, t2)
160 }
161 self.time_created = get_earliest_time(self.time_created, other.time_created);
162 self.time_last_used = get_latest_time(self.time_last_used, other.time_last_used);
163 self.time_last_modified =
164 get_latest_time(self.time_last_modified, other.time_last_modified);
165 // No mirror is an edge-case that almost certainly means the
166 // client was disconnected and this is the first sync after
167 // reconnection. So we can't really do a simple sum() of the
168 // times_used values as if the disconnection was recent, it will
169 // be double the expected value.
170 // So we just take the largest.
171 self.times_used = std::cmp::max(other.times_used, self.times_used);
172 }
173 }
174 }
175}
176
177// A local record can be in any of these 5 states.
178#[derive(Debug)]
179enum LocalRecordInfo<T> {
180 Unmodified { record: T },
181 Modified { record: T },
182 // encrypted data was scrubbed from the local record and needs to be resynced from the server
183 Scrubbed { record: T },
184 Tombstone { guid: Guid },
185 Missing,
186}
187
188// An enum for the return value from our "merge" function, which might either
189// update the record, or might fork it.
190#[derive(Debug)]
191pub enum MergeResult<T> {
192 Merged { merged: T },
193 Forked { forked: T },
194}
195
196// This ties the 3 possible records together and is what we expect the
197// implementations to put together for us.
198#[derive(Debug)]
199pub struct IncomingState<T> {
200 incoming: IncomingContent<T>,
201 local: LocalRecordInfo<T>,
202 // We don't have an enum for the mirror - an Option<> is fine because
203 // although we do store tombstones there, we ignore them when reconciling
204 // (ie, we ignore tombstones in the mirror)
205 // don't store tombstones there.
206 mirror: Option<T>,
207}
208
209/// The distinct incoming sync actions to be performed for incoming records.
210#[derive(Debug, PartialEq)]
211enum IncomingAction<T> {
212 // Remove the local record with this GUID.
213 DeleteLocalRecord { guid: Guid },
214 // Insert a new record.
215 Insert { record: T },
216 // Update an existing record. If `was_merged` was true, then the updated
217 // record isn't identical to the incoming one, so needs to be flagged as
218 // dirty.
219 Update { record: T, was_merged: bool },
220 // We forked a record because we couldn't merge it. `forked` will have
221 // a new guid, while `incoming` is the unmodified version of the incoming
222 // record which we need to apply.
223 Fork { forked: T, incoming: T },
224 // An existing record with old_guid needs to be replaced with this record.
225 UpdateLocalGuid { old_guid: Guid, record: T },
226 // There's a remote tombstone, but our copy of the record is dirty. The
227 // remote tombstone should be replaced with this.
228 ResurrectRemoteTombstone { record: T },
229 // There's a local tombstone - it should be removed and replaced with this.
230 ResurrectLocalTombstone { record: T },
231 // Nothing to do.
232 DoNothing,
233}
234
235/// Convert a IncomingState to an IncomingAction - this is where the "policy"
236/// lives for when we resurrect, or merge etc.
237fn plan_incoming<T: std::fmt::Debug + SyncRecord>(
238 rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
239 tx: &Transaction<'_>,
240 staged_info: IncomingState<T>,
241) -> Result<IncomingAction<T>> {
242 trace!("plan_incoming: {:?}", staged_info);
243 let IncomingState {
244 incoming,
245 local,
246 mirror,
247 } = staged_info;
248
249 let state = match incoming.kind {
250 IncomingKind::Tombstone => {
251 match local {
252 LocalRecordInfo::Unmodified { .. } | LocalRecordInfo::Scrubbed { .. } => {
253 // Note: On desktop, when there's a local record for an incoming tombstone, a local tombstone
254 // would created. But we don't actually need to create a local tombstone here. If we did it would
255 // immediately be deleted after being uploaded to the server.
256 IncomingAction::DeleteLocalRecord {
257 guid: incoming.envelope.id,
258 }
259 }
260 LocalRecordInfo::Modified { record } => {
261 // Incoming tombstone with local changes should cause us to "resurrect" the local.
262 // At a minimum, the implementation will need to ensure the record is marked as
263 // dirty so it's uploaded, overwriting the server's tombstone.
264 IncomingAction::ResurrectRemoteTombstone { record }
265 }
266 LocalRecordInfo::Tombstone {
267 guid: tombstone_guid,
268 } => {
269 assert_eq!(incoming.envelope.id, tombstone_guid);
270 IncomingAction::DoNothing
271 }
272 LocalRecordInfo::Missing => IncomingAction::DoNothing,
273 }
274 }
275 IncomingKind::Content(mut incoming_record) => {
276 match local {
277 LocalRecordInfo::Unmodified {
278 record: local_record,
279 }
280 | LocalRecordInfo::Scrubbed {
281 record: local_record,
282 } => {
283 // The local record was either unmodified, or scrubbed of its encrypted data.
284 // Either way we want to:
285 // - Merge the metadata
286 // - Update the local record using data from the server
287 // - Don't flag the local item as dirty. We don't want to reupload for just
288 // metadata changes.
289 let metadata = incoming_record.metadata_mut();
290 metadata.merge(
291 local_record.metadata(),
292 mirror.as_ref().map(|m| m.metadata()),
293 );
294 // a micro-optimization here would be to `::DoNothing` if
295 // the metadata was actually identical and the local data wasn't scrubbed, but
296 // this seems like an edge-case on an edge-case?
297 IncomingAction::Update {
298 record: incoming_record,
299 was_merged: false,
300 }
301 }
302 LocalRecordInfo::Modified {
303 record: local_record,
304 } => {
305 match SyncRecord::merge(&incoming_record, &local_record, &mirror) {
306 MergeResult::Merged { merged } => {
307 // The record we save locally has material differences
308 // from the incoming one, so we are going to need to
309 // reupload it.
310 IncomingAction::Update {
311 record: merged,
312 was_merged: true,
313 }
314 }
315 MergeResult::Forked { forked } => IncomingAction::Fork {
316 forked,
317 incoming: incoming_record,
318 },
319 }
320 }
321 LocalRecordInfo::Tombstone { .. } => IncomingAction::ResurrectLocalTombstone {
322 record: incoming_record,
323 },
324 LocalRecordInfo::Missing => {
325 match rec_impl.get_local_dupe(tx, &incoming_record)? {
326 None => IncomingAction::Insert {
327 record: incoming_record,
328 },
329 Some(local_dupe) => {
330 // local record is missing but we found a dupe - so
331 // the dupe must have a different guid (or we wouldn't
332 // consider the local record missing!)
333 assert_ne!(incoming_record.id(), local_dupe.id());
334 // The existing item is identical except for the metadata, so
335 // we still merge that metadata.
336 let metadata = incoming_record.metadata_mut();
337 metadata.merge(
338 local_dupe.metadata(),
339 mirror.as_ref().map(|m| m.metadata()),
340 );
341 IncomingAction::UpdateLocalGuid {
342 old_guid: local_dupe.id().clone(),
343 record: incoming_record,
344 }
345 }
346 }
347 }
348 }
349 }
350 IncomingKind::Malformed => {
351 warn!("skipping incoming record: {}", incoming.envelope.id);
352 IncomingAction::DoNothing
353 }
354 };
355 trace!("plan_incoming resulted in {:?}", state);
356 Ok(state)
357}
358
359/// Apply the incoming action
360fn apply_incoming_action<T: std::fmt::Debug + SyncRecord>(
361 rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
362 tx: &Transaction<'_>,
363 action: IncomingAction<T>,
364) -> Result<()> {
365 trace!("applying action: {:?}", action);
366 match action {
367 IncomingAction::Update { record, was_merged } => {
368 rec_impl.update_local_record(tx, record, was_merged)?;
369 }
370 IncomingAction::Fork { forked, incoming } => {
371 // `forked` exists in the DB with the same guid as `incoming`, so fix that.
372 // change_record_guid will also update the mirror (if it exists) to prevent
373 // the server from overriding the forked mirror record (and losing any unknown fields)
374 rec_impl.change_record_guid(tx, incoming.id(), forked.id())?;
375 // `incoming` has the correct new guid.
376 rec_impl.insert_local_record(tx, incoming)?;
377 }
378 IncomingAction::Insert { record } => {
379 rec_impl.insert_local_record(tx, record)?;
380 }
381 IncomingAction::UpdateLocalGuid { old_guid, record } => {
382 // expect record to have the new guid.
383 assert_ne!(old_guid, *record.id());
384 rec_impl.change_record_guid(tx, &old_guid, record.id())?;
385 // the item is identical with the item with the new guid
386 // *except* for the metadata - so we still need to update, but
387 // don't need to treat the item as dirty.
388 rec_impl.update_local_record(tx, record, false)?;
389 }
390 IncomingAction::ResurrectLocalTombstone { record } => {
391 rec_impl.remove_tombstone(tx, record.id())?;
392 rec_impl.insert_local_record(tx, record)?;
393 }
394 IncomingAction::ResurrectRemoteTombstone { record } => {
395 // This is just "ensure local record dirty", which
396 // update_local_record conveniently does.
397 rec_impl.update_local_record(tx, record, true)?;
398 }
399 IncomingAction::DeleteLocalRecord { guid } => {
400 rec_impl.remove_record(tx, &guid)?;
401 }
402 IncomingAction::DoNothing => {}
403 }
404 Ok(())
405}
406
407// Helpers for tests
408#[cfg(test)]
409mod tests; // pull in our integration tests
410
411// and a module for unit test utilities.
412#[cfg(test)]
413pub mod test {
414 use crate::db::{schema::create_empty_sync_temp_tables, test::new_mem_db, AutofillDb};
415
416 pub fn new_syncable_mem_db() -> AutofillDb {
417 error_support::init_for_tests();
418 let db = new_mem_db();
419 create_empty_sync_temp_tables(&db).expect("should work");
420 db
421 }
422}