1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

pub mod address;
mod common;
pub mod credit_card;
pub mod engine;

pub(crate) use crate::db::models::Metadata;
use crate::error::Result;
use interrupt_support::Interruptee;
use rusqlite::Transaction;
use sync15::bso::{IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, OutgoingBso};
use sync15::ServerTimestamp;
use sync_guid::Guid;
use types::Timestamp;

// This type is used as a snazzy way to capture all unknown fields from the payload
// upon deserialization without having to work with a concrete type
type UnknownFields = serde_json::Map<String, serde_json::Value>;

// The fact that credit-card numbers are encrypted makes things a little tricky
// for sync in various ways - and one non-obvious way is that the tables that
// store sync payloads can't just store them directly as they are not encrypted
// in that form.
// ie, in the database, an address record's "payload" column looks like:
// > '{"entry":{"address-level1":"VIC", "street-address":"2/25 Somewhere St","timeCreated":1497567116554, "version":1},"id":"29ac67adae7d"}'
// or a tombstone: '{"deleted":true,"id":"6544992973e6"}'
// > (Note a number of fields have been removed from 'entry' for clarity)
// and in the database a credit-card's "payload" looks like:
// > 'eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2R0NNIn0..<snip>-<snip>.<snip lots more>'
// > while a tombstone here remains encrypted but has the 'deleted' entry after decryption.
// (Note also that the address entry, and the decrypted credit-card json both have an "id" in
// the JSON, but we ignore that when deserializing and will stop persisting that soon)

// Some traits that help us abstract away much of the sync functionality.

// A trait that abstracts the *storage* implementation of the specific record
// types, and must be implemented by the concrete record owners.
// Note that it doesn't assume a SQL database or anything concrete about the
// storage, although objects implementing this trait will live only long enough
// to perform the sync "incoming" steps - ie, a transaction is likely to live
// exactly as long as this object.
// XXX - *sob* - although each method has a `&Transaction` param, which in
// theory could be avoided if the concrete impls could keep the ref (ie, if
// it was held behind `self`), but markh failed to make this work due to
// lifetime woes.
pub trait ProcessIncomingRecordImpl {
    type Record;

    fn stage_incoming(
        &self,
        tx: &Transaction<'_>,
        incoming: Vec<IncomingBso>,
        signal: &dyn Interruptee,
    ) -> Result<()>;

    /// Finish the incoming phase. This will typically caused staged records
    // to be written to the mirror.
    fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()>;

    fn fetch_incoming_states(
        &self,
        tx: &Transaction<'_>,
    ) -> Result<Vec<IncomingState<Self::Record>>>;

    /// Returns a local record that has the same values as the given incoming record (with the exception
    /// of the `guid` values which should differ) that will be used as a local duplicate record for
    /// syncing.
    fn get_local_dupe(
        &self,
        tx: &Transaction<'_>,
        incoming: &Self::Record,
    ) -> Result<Option<Self::Record>>;

    fn update_local_record(
        &self,
        tx: &Transaction<'_>,
        record: Self::Record,
        was_merged: bool,
    ) -> Result<()>;

    fn insert_local_record(&self, tx: &Transaction<'_>, record: Self::Record) -> Result<()>;

    fn change_record_guid(
        &self,
        tx: &Transaction<'_>,
        old_guid: &Guid,
        new_guid: &Guid,
    ) -> Result<()>;

    fn remove_record(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;

    fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &Guid) -> Result<()>;
}

pub trait ProcessOutgoingRecordImpl {
    type Record;

    fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>>;

    fn finish_synced_items(
        &self,
        tx: &Transaction<'_>,
        records_synced: Vec<Guid>,
    ) -> anyhow::Result<()>;
}

// A trait that abstracts the functionality in the record itself.
pub trait SyncRecord {
    fn record_name() -> &'static str; // "addresses" or similar, for logging/debuging.
    fn id(&self) -> &Guid;
    fn metadata(&self) -> &Metadata;
    fn metadata_mut(&mut self) -> &mut Metadata;
    // Merge or fork multiple copies of the same record. The resulting record
    // might have the same guid as the inputs, meaning it was truly merged, or
    // a different guid, in which case it was forked due to conflicting changes.
    fn merge(incoming: &Self, local: &Self, mirror: &Option<Self>) -> MergeResult<Self>
    where
        Self: Sized;
}

impl Metadata {
    /// Merge the metadata from `other`, and possibly `mirror`, into `self`
    /// (which must already have valid metadata).
    /// Note that mirror being None is an edge-case and typically means first
    /// sync since a "reset" (eg, disconnecting and reconnecting.
    pub fn merge(&mut self, other: &Metadata, mirror: Option<&Metadata>) {
        match mirror {
            Some(m) => {
                fn get_latest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
                    std::cmp::max(t1, std::cmp::max(t2, t3))
                }
                fn get_earliest_time(t1: Timestamp, t2: Timestamp, t3: Timestamp) -> Timestamp {
                    std::cmp::min(t1, std::cmp::min(t2, t3))
                }
                self.time_created =
                    get_earliest_time(self.time_created, other.time_created, m.time_created);
                self.time_last_used =
                    get_latest_time(self.time_last_used, other.time_last_used, m.time_last_used);
                self.time_last_modified = get_latest_time(
                    self.time_last_modified,
                    other.time_last_modified,
                    m.time_last_modified,
                );

                self.times_used = m.times_used
                    + std::cmp::max(other.times_used - m.times_used, 0)
                    + std::cmp::max(self.times_used - m.times_used, 0);
            }
            None => {
                fn get_latest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
                    std::cmp::max(t1, t2)
                }
                fn get_earliest_time(t1: Timestamp, t2: Timestamp) -> Timestamp {
                    std::cmp::min(t1, t2)
                }
                self.time_created = get_earliest_time(self.time_created, other.time_created);
                self.time_last_used = get_latest_time(self.time_last_used, other.time_last_used);
                self.time_last_modified =
                    get_latest_time(self.time_last_modified, other.time_last_modified);
                // No mirror is an edge-case that almost certainly means the
                // client was disconnected and this is the first sync after
                // reconnection. So we can't really do a simple sum() of the
                // times_used values as if the disconnection was recent, it will
                // be double the expected value.
                // So we just take the largest.
                self.times_used = std::cmp::max(other.times_used, self.times_used);
            }
        }
    }
}

// A local record can be in any of these 5 states.
#[derive(Debug)]
enum LocalRecordInfo<T> {
    Unmodified { record: T },
    Modified { record: T },
    // encrypted data was scrubbed from the local record and needs to be resynced from the server
    Scrubbed { record: T },
    Tombstone { guid: Guid },
    Missing,
}

// An enum for the return value from our "merge" function, which might either
// update the record, or might fork it.
#[derive(Debug)]
pub enum MergeResult<T> {
    Merged { merged: T },
    Forked { forked: T },
}

// This ties the 3 possible records together and is what we expect the
// implementations to put together for us.
#[derive(Debug)]
pub struct IncomingState<T> {
    incoming: IncomingContent<T>,
    local: LocalRecordInfo<T>,
    // We don't have an enum for the mirror - an Option<> is fine because
    // although we do store tombstones there, we ignore them when reconciling
    // (ie, we ignore tombstones in the mirror)
    // don't store tombstones there.
    mirror: Option<T>,
}

/// The distinct incoming sync actions to be performed for incoming records.
#[derive(Debug, PartialEq)]
enum IncomingAction<T> {
    // Remove the local record with this GUID.
    DeleteLocalRecord { guid: Guid },
    // Insert a new record.
    Insert { record: T },
    // Update an existing record. If `was_merged` was true, then the updated
    // record isn't identical to the incoming one, so needs to be flagged as
    // dirty.
    Update { record: T, was_merged: bool },
    // We forked a record because we couldn't merge it. `forked` will have
    // a new guid, while `incoming` is the unmodified version of the incoming
    // record which we need to apply.
    Fork { forked: T, incoming: T },
    // An existing record with old_guid needs to be replaced with this record.
    UpdateLocalGuid { old_guid: Guid, record: T },
    // There's a remote tombstone, but our copy of the record is dirty. The
    // remote tombstone should be replaced with this.
    ResurrectRemoteTombstone { record: T },
    // There's a local tombstone - it should be removed and replaced with this.
    ResurrectLocalTombstone { record: T },
    // Nothing to do.
    DoNothing,
}

/// Convert a IncomingState to an IncomingAction - this is where the "policy"
/// lives for when we resurrect, or merge etc.
fn plan_incoming<T: std::fmt::Debug + SyncRecord>(
    rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
    tx: &Transaction<'_>,
    staged_info: IncomingState<T>,
) -> Result<IncomingAction<T>> {
    log::trace!("plan_incoming: {:?}", staged_info);
    let IncomingState {
        incoming,
        local,
        mirror,
    } = staged_info;

    let state = match incoming.kind {
        IncomingKind::Tombstone => {
            match local {
                LocalRecordInfo::Unmodified { .. } | LocalRecordInfo::Scrubbed { .. } => {
                    // Note: On desktop, when there's a local record for an incoming tombstone, a local tombstone
                    // would created. But we don't actually need to create a local tombstone here. If we did it would
                    // immediately be deleted after being uploaded to the server.
                    IncomingAction::DeleteLocalRecord {
                        guid: incoming.envelope.id,
                    }
                }
                LocalRecordInfo::Modified { record } => {
                    // Incoming tombstone with local changes should cause us to "resurrect" the local.
                    // At a minimum, the implementation will need to ensure the record is marked as
                    // dirty so it's uploaded, overwriting the server's tombstone.
                    IncomingAction::ResurrectRemoteTombstone { record }
                }
                LocalRecordInfo::Tombstone {
                    guid: tombstone_guid,
                } => {
                    assert_eq!(incoming.envelope.id, tombstone_guid);
                    IncomingAction::DoNothing
                }
                LocalRecordInfo::Missing => IncomingAction::DoNothing,
            }
        }
        IncomingKind::Content(mut incoming_record) => {
            match local {
                LocalRecordInfo::Unmodified {
                    record: local_record,
                }
                | LocalRecordInfo::Scrubbed {
                    record: local_record,
                } => {
                    // The local record was either unmodified, or scrubbed of its encrypted data.
                    // Either way we want to:
                    //   - Merge the metadata
                    //   - Update the local record using data from the server
                    //   - Don't flag the local item as dirty.  We don't want to reupload for just
                    //     metadata changes.
                    let metadata = incoming_record.metadata_mut();
                    metadata.merge(
                        local_record.metadata(),
                        mirror.as_ref().map(|m| m.metadata()),
                    );
                    // a micro-optimization here would be to `::DoNothing` if
                    // the metadata was actually identical and the local data wasn't scrubbed, but
                    // this seems like an edge-case on an edge-case?
                    IncomingAction::Update {
                        record: incoming_record,
                        was_merged: false,
                    }
                }
                LocalRecordInfo::Modified {
                    record: local_record,
                } => {
                    match SyncRecord::merge(&incoming_record, &local_record, &mirror) {
                        MergeResult::Merged { merged } => {
                            // The record we save locally has material differences
                            // from the incoming one, so we are going to need to
                            // reupload it.
                            IncomingAction::Update {
                                record: merged,
                                was_merged: true,
                            }
                        }
                        MergeResult::Forked { forked } => IncomingAction::Fork {
                            forked,
                            incoming: incoming_record,
                        },
                    }
                }
                LocalRecordInfo::Tombstone { .. } => IncomingAction::ResurrectLocalTombstone {
                    record: incoming_record,
                },
                LocalRecordInfo::Missing => {
                    match rec_impl.get_local_dupe(tx, &incoming_record)? {
                        None => IncomingAction::Insert {
                            record: incoming_record,
                        },
                        Some(local_dupe) => {
                            // local record is missing but we found a dupe - so
                            // the dupe must have a different guid (or we wouldn't
                            // consider the local record missing!)
                            assert_ne!(incoming_record.id(), local_dupe.id());
                            // The existing item is identical except for the metadata, so
                            // we still merge that metadata.
                            let metadata = incoming_record.metadata_mut();
                            metadata.merge(
                                local_dupe.metadata(),
                                mirror.as_ref().map(|m| m.metadata()),
                            );
                            IncomingAction::UpdateLocalGuid {
                                old_guid: local_dupe.id().clone(),
                                record: incoming_record,
                            }
                        }
                    }
                }
            }
        }
        IncomingKind::Malformed => {
            log::warn!("skipping incoming record: {}", incoming.envelope.id);
            IncomingAction::DoNothing
        }
    };
    log::trace!("plan_incoming resulted in {:?}", state);
    Ok(state)
}

/// Apply the incoming action
fn apply_incoming_action<T: std::fmt::Debug + SyncRecord>(
    rec_impl: &dyn ProcessIncomingRecordImpl<Record = T>,
    tx: &Transaction<'_>,
    action: IncomingAction<T>,
) -> Result<()> {
    log::trace!("applying action: {:?}", action);
    match action {
        IncomingAction::Update { record, was_merged } => {
            rec_impl.update_local_record(tx, record, was_merged)?;
        }
        IncomingAction::Fork { forked, incoming } => {
            // `forked` exists in the DB with the same guid as `incoming`, so fix that.
            // change_record_guid will also update the mirror (if it exists) to prevent
            // the server from overriding the forked mirror record (and losing any unknown fields)
            rec_impl.change_record_guid(tx, incoming.id(), forked.id())?;
            // `incoming` has the correct new guid.
            rec_impl.insert_local_record(tx, incoming)?;
        }
        IncomingAction::Insert { record } => {
            rec_impl.insert_local_record(tx, record)?;
        }
        IncomingAction::UpdateLocalGuid { old_guid, record } => {
            // expect record to have the new guid.
            assert_ne!(old_guid, *record.id());
            rec_impl.change_record_guid(tx, &old_guid, record.id())?;
            // the item is identical with the item with the new guid
            // *except* for the metadata - so we still need to update, but
            // don't need to treat the item as dirty.
            rec_impl.update_local_record(tx, record, false)?;
        }
        IncomingAction::ResurrectLocalTombstone { record } => {
            rec_impl.remove_tombstone(tx, record.id())?;
            rec_impl.insert_local_record(tx, record)?;
        }
        IncomingAction::ResurrectRemoteTombstone { record } => {
            // This is just "ensure local record dirty", which
            // update_local_record conveniently does.
            rec_impl.update_local_record(tx, record, true)?;
        }
        IncomingAction::DeleteLocalRecord { guid } => {
            rec_impl.remove_record(tx, &guid)?;
        }
        IncomingAction::DoNothing => {}
    }
    Ok(())
}

// Helpers for tests
#[cfg(test)]
mod tests; // pull in our integration tests

// and a module for unit test utilities.
#[cfg(test)]
pub mod test {
    use crate::db::{schema::create_empty_sync_temp_tables, test::new_mem_db, AutofillDb};

    pub fn new_syncable_mem_db() -> AutofillDb {
        let _ = env_logger::try_init();
        let db = new_mem_db();
        create_empty_sync_temp_tables(&db).expect("should work");
        db
    }
}