sync15/engine/
sync_engine.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use super::CollectionRequest;
use crate::bso::{IncomingBso, OutgoingBso};
use crate::client_types::ClientData;
use crate::{telemetry, CollectionName, Guid, ServerTimestamp};
use anyhow::Result;
use std::fmt;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CollSyncIds {
    pub global: Guid,
    pub coll: Guid,
}

/// Defines how an engine is associated with a particular set of records
/// on a sync storage server. It's either disconnected, or believes it is
/// connected with a specific set of GUIDs. If the server and the engine don't
/// agree on the exact GUIDs, the engine will assume something radical happened
/// so it can't believe anything it thinks it knows about the state of the
/// server (ie, it will "reset" then do a full reconcile)
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EngineSyncAssociation {
    /// This store is disconnected (although it may be connected in the future).
    Disconnected,
    /// Sync is connected, and has the following sync IDs.
    Connected(CollSyncIds),
}

/// The concrete `SyncEngine` implementations
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub enum SyncEngineId {
    // Note that we've derived PartialOrd etc, which uses lexicographic ordering
    // of the variants. We leverage that such that the higher priority engines
    // are listed first.
    // This order matches desktop.
    Passwords,
    Tabs,
    Bookmarks,
    Addresses,
    CreditCards,
    History,
}

impl SyncEngineId {
    // Iterate over all possible engines. Note that we've made a policy decision
    // that this should enumerate in "order" as defined by PartialCmp, and tests
    // enforce this.
    pub fn iter() -> impl Iterator<Item = SyncEngineId> {
        [
            Self::Passwords,
            Self::Tabs,
            Self::Bookmarks,
            Self::Addresses,
            Self::CreditCards,
            Self::History,
        ]
        .into_iter()
    }

    // Get the string identifier for this engine.  This must match the strings in SyncEngineSelection.
    pub fn name(&self) -> &'static str {
        match self {
            Self::Passwords => "passwords",
            Self::History => "history",
            Self::Bookmarks => "bookmarks",
            Self::Tabs => "tabs",
            Self::Addresses => "addresses",
            Self::CreditCards => "creditcards",
        }
    }
}

impl fmt::Display for SyncEngineId {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", self.name())
    }
}

impl TryFrom<&str> for SyncEngineId {
    type Error = String;

    fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
        match value {
            "passwords" => Ok(Self::Passwords),
            "history" => Ok(Self::History),
            "bookmarks" => Ok(Self::Bookmarks),
            "tabs" => Ok(Self::Tabs),
            "addresses" => Ok(Self::Addresses),
            "creditcards" => Ok(Self::CreditCards),
            _ => Err(value.into()),
        }
    }
}

/// A "sync engine" is a thing that knows how to sync. It's often implemented
/// by a "store" (which is the generic term responsible for all storage
/// associated with a component, including storage required for sync.)
///
/// The model described by this trait is that engines first "stage" sets of incoming records,
/// then apply them returning outgoing records, then handle the success (or otherwise) of each
/// batch as it's uploaded.
///
/// Staging incoming records is (or should be ;) done in batches - eg, 1000 record chunks.
/// Some engines will "stage" these into a database temp table, while ones expecting less records
/// might just store them in memory.
///
/// For outgoing records, a single vec is supplied by the engine. The sync client will use the
/// batch facilities of the server to make multiple POST requests and commit them.
/// Sadly it's not truly atomic (there's a batch size limit) - so the model reflects that in that
/// the engine gets told each time a batch is committed, which might happen more than once for the
/// supplied vec. We should upgrade this model so the engine can avoid reading every outgoing
/// record into memory at once (ie, we should try and better reflect the upload batch model at
/// this level)
///
/// Sync Engines should not assume they live for exactly one sync, so `prepare_for_sync()` should
/// clean up any state, including staged records, from previous syncs.
///
/// Different engines will produce errors of different types.  To accommodate
/// this, we force them all to return anyhow::Error.
pub trait SyncEngine {
    fn collection_name(&self) -> CollectionName;

    /// Prepares the engine for syncing. The tabs engine currently uses this to
    /// store the current list of clients, which it uses to look up device names
    /// and types.
    ///
    /// Note that this method is only called by `sync_multiple`, and only if a
    /// command processor is registered. In particular, `prepare_for_sync` will
    /// not be called if the store is synced using `sync::synchronize` or
    /// `sync_multiple::sync_multiple`. It _will_ be called if the store is
    /// synced via the Sync Manager.
    ///
    /// TODO(issue #2590): This is pretty cludgey and will be hard to extend for
    /// any case other than the tabs case. We should find another way to support
    /// tabs...
    fn prepare_for_sync(&self, _get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
        Ok(())
    }

    /// Tells the engine what the local encryption key is for the data managed
    /// by the engine. This is only used by collections that store data
    /// encrypted locally and is unrelated to the encryption used by Sync.
    /// The intent is that for such collections, this key can be used to
    /// decrypt local data before it is re-encrypted by Sync and sent to the
    /// storage servers, and similarly, data from the storage servers will be
    /// decrypted by Sync, then encrypted by the local encryption key before
    /// being added to the local database.
    ///
    /// The expectation is that the key value is being maintained by the
    /// embedding application in some secure way suitable for the environment
    /// in which the app is running - eg, the OS "keychain". The value of the
    /// key is implementation dependent - it is expected that the engine and
    /// embedding application already have some external agreement about how
    /// to generate keys and in what form they are exchanged. Finally, there's
    /// an assumption that sync engines are short-lived and only live for a
    /// single sync - this means that sync doesn't hold on to the key for an
    /// extended period. In practice, all sync engines which aren't a "bridged
    /// engine" are short lived - we might need to rethink this later if we need
    /// engines with local encryption keys to be used on desktop.
    ///
    /// This will panic if called by an engine that doesn't have explicit
    /// support for local encryption keys as that implies a degree of confusion
    /// which shouldn't be possible to ignore.
    fn set_local_encryption_key(&mut self, _key: &str) -> Result<()> {
        unimplemented!("This engine does not support local encryption");
    }

    /// Stage some incoming records. This might be called multiple times in the same sync
    /// if we fetch the incoming records in batches.
    ///
    /// Note there is no timestamp provided here, because the procedure for fetching in batches
    /// means that the timestamp advancing during a batch means we must abort and start again.
    /// The final collection timestamp after staging all records is supplied to `apply()`
    fn stage_incoming(
        &self,
        inbound: Vec<IncomingBso>,
        telem: &mut telemetry::Engine,
    ) -> Result<()>;

    /// Apply the staged records, returning outgoing records.
    /// Ideally we would adjust this model to better support batching of outgoing records
    /// without needing to keep them all in memory (ie, an iterator or similar?)
    fn apply(
        &self,
        timestamp: ServerTimestamp,
        telem: &mut telemetry::Engine,
    ) -> Result<Vec<OutgoingBso>>;

    /// Indicates that the given record IDs were uploaded successfully to the server.
    /// This may be called multiple times per sync, once for each batch. Batching is determined
    /// dynamically based on payload sizes and counts via the server's advertised limits.
    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> Result<()>;

    /// Called once the sync is finished. Not currently called if uploads fail (which
    /// seems sad, but the other batching confusion there needs sorting out first).
    /// Many engines will have nothing to do here, as most "post upload" work should be
    /// done in `set_uploaded()`
    fn sync_finished(&self) -> Result<()> {
        Ok(())
    }

    /// The engine is responsible for building a single collection request. Engines
    /// typically will store a lastModified timestamp and use that to build a
    /// request saying "give me full records since that date" - however, other
    /// engines might do something fancier. It can return None if the server timestamp
    /// has not advanced since the last sync.
    /// This could even later be extended to handle "backfills", and we might end up
    /// wanting one engine to use multiple collections (eg, as a "foreign key" via guid), etc.
    fn get_collection_request(
        &self,
        server_timestamp: ServerTimestamp,
    ) -> Result<Option<CollectionRequest>>;

    /// Get persisted sync IDs. If they don't match the global state we'll be
    /// `reset()` with the new IDs.
    fn get_sync_assoc(&self) -> Result<EngineSyncAssociation>;

    /// Reset the engine (and associated store) without wiping local data,
    /// ready for a "first sync".
    /// `assoc` defines how this store is to be associated with sync.
    fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()>;

    /// Wipes the engine's data
    /// This is typically triggered by a client command, which at the time of writing, only
    /// supported wiping bookmarks.
    ///
    /// This panics if triggered on a sync engine that does not explicitly implement wipe, because
    /// that implies a confustion that shouldn't occur.
    fn wipe(&self) -> Result<()> {
        unimplemented!("The engine does not implement wipe, no wipe should be requested")
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use std::iter::zip;

    #[test]
    fn test_engine_priority() {
        fn sorted(mut engines: Vec<SyncEngineId>) -> Vec<SyncEngineId> {
            engines.sort();
            engines
        }
        assert_eq!(
            vec![SyncEngineId::Passwords, SyncEngineId::Tabs],
            sorted(vec![SyncEngineId::Passwords, SyncEngineId::Tabs])
        );
        assert_eq!(
            vec![SyncEngineId::Passwords, SyncEngineId::Tabs],
            sorted(vec![SyncEngineId::Tabs, SyncEngineId::Passwords])
        );
    }

    #[test]
    fn test_engine_enum_order() {
        let unsorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>();
        let mut sorted = SyncEngineId::iter().collect::<Vec<SyncEngineId>>();
        sorted.sort();

        // iterating should supply identical elements in each.
        assert!(zip(unsorted, sorted).fold(true, |acc, (a, b)| acc && (a == b)))
    }
}