sync15/engine/
bridged_engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::error::debug;
6use crate::{telemetry, ServerTimestamp};
7use anyhow::Result;
8
9use crate::bso::{IncomingBso, OutgoingBso};
10use crate::Guid;
11
12use super::{CollSyncIds, EngineSyncAssociation, SyncEngine};
13
14/// A BridgedEngine acts as a bridge between application-services, rust
15/// implemented sync engines and sync engines as defined by Desktop Firefox.
16///
17/// [Desktop Firefox has an abstract implementation of a Sync
18/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js)
19/// with a number of functions each engine is expected to override. Engines
20/// implemented in Rust use a different shape (specifically, the
21/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts
22/// between the 2.
23pub trait BridgedEngine: Send + Sync {
24    /// Returns the last sync time, in milliseconds, for this engine's
25    /// collection. This is called before each sync, to determine the lower
26    /// bound for new records to fetch from the server.
27    fn last_sync(&self) -> Result<i64>;
28
29    /// Sets the last sync time, in milliseconds. This is called throughout
30    /// the sync, to fast-forward the stored last sync time to match the
31    /// timestamp on the uploaded records.
32    fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
33
34    /// Returns the sync ID for this engine's collection. This is only used in
35    /// tests.
36    fn sync_id(&self) -> Result<Option<String>>;
37
38    /// Resets the sync ID for this engine's collection, returning the new ID.
39    /// As a side effect, implementations should reset all local Sync state,
40    /// as in `reset`.
41    /// (Note that bridged engines never maintain the "global" guid - that's all managed
42    /// by the bridged_engine consumer (ie, desktop). bridged_engines only care about
43    /// the per-collection one.)
44    fn reset_sync_id(&self) -> Result<String>;
45
46    /// Ensures that the locally stored sync ID for this engine's collection
47    /// matches the `new_sync_id` from the server. If the two don't match,
48    /// implementations should reset all local Sync state, as in `reset`.
49    /// This method returns the assigned sync ID, which can be either the
50    /// `new_sync_id`, or a different one if the engine wants to force other
51    /// devices to reset their Sync state for this collection the next time they
52    /// sync.
53    fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>;
54
55    /// Tells the tabs engine about recent FxA devices. A bit of a leaky abstraction as it only
56    /// makes sense for tabs.
57    /// The arg is a json serialized `ClientData` struct.
58    fn prepare_for_sync(&self, _client_data: &str) -> Result<()> {
59        Ok(())
60    }
61
62    /// Indicates that the engine is about to start syncing. This is called
63    /// once per sync, and always before `store_incoming`.
64    fn sync_started(&self) -> Result<()>;
65
66    /// Stages a batch of incoming Sync records. This is called multiple
67    /// times per sync, once for each batch. Implementations can use the
68    /// signal to check if the operation was aborted, and cancel any
69    /// pending work.
70    fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>;
71
72    /// Applies all staged records, reconciling changes on both sides and
73    /// resolving conflicts. Returns a list of records to upload.
74    fn apply(&self) -> Result<ApplyResults>;
75
76    /// Indicates that the given record IDs were uploaded successfully to the
77    /// server. This is called multiple times per sync, once for each batch
78    /// upload.
79    fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>;
80
81    /// Indicates that all records have been uploaded. At this point, any record
82    /// IDs marked for upload that haven't been passed to `set_uploaded`, can be
83    /// assumed to have failed: for example, because the server rejected a record
84    /// with an invalid TTL or sort index.
85    fn sync_finished(&self) -> Result<()>;
86
87    /// Resets all local Sync state, including any change flags, mirrors, and
88    /// the last sync time, such that the next sync is treated as a first sync
89    /// with all new local data. Does not erase any local user data.
90    fn reset(&self) -> Result<()>;
91
92    /// Erases all local user data for this collection, and any Sync metadata.
93    /// This method is destructive, and unused for most collections.
94    fn wipe(&self) -> Result<()>;
95}
96
97// This is an adaptor trait - the idea is that engines can implement this
98// trait along with SyncEngine and get a BridgedEngine for free. It's temporary
99// so we can land this trait without needing to update desktop.
100// Longer term, we should remove both this trait and BridgedEngine entirely, sucking up
101// the breaking change for desktop. The main blocker to this is moving desktop away
102// from the explicit timestamp handling and moving closer to the `get_collection_request`
103// model.
104pub trait BridgedEngineAdaptor: Send + Sync {
105    // These are the main mismatches between the 2 engines
106    fn last_sync(&self) -> Result<i64>;
107    fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
108    fn sync_started(&self) -> Result<()> {
109        Ok(())
110    }
111
112    fn engine(&self) -> &dyn SyncEngine;
113}
114
115impl<A: BridgedEngineAdaptor> BridgedEngine for A {
116    fn last_sync(&self) -> Result<i64> {
117        self.last_sync()
118    }
119
120    fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
121        self.set_last_sync(last_sync_millis)
122    }
123
124    fn sync_id(&self) -> Result<Option<String>> {
125        Ok(match self.engine().get_sync_assoc()? {
126            EngineSyncAssociation::Disconnected => None,
127            EngineSyncAssociation::Connected(c) => Some(c.coll.into()),
128        })
129    }
130
131    fn reset_sync_id(&self) -> Result<String> {
132        // Note that bridged engines never maintain the "global" guid - that's all managed
133        // by desktop. bridged_engines only care about the per-collection one.
134        let global = Guid::empty();
135        let coll = Guid::random();
136        self.engine()
137            .reset(&EngineSyncAssociation::Connected(CollSyncIds {
138                global,
139                coll: coll.clone(),
140            }))?;
141        Ok(coll.to_string())
142    }
143
144    fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
145        let engine = self.engine();
146        let assoc = engine.get_sync_assoc()?;
147        if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) {
148            debug!("ensure_current_sync_id is current");
149        } else {
150            let new_coll_ids = CollSyncIds {
151                global: Guid::empty(),
152                coll: sync_id.into(),
153            };
154            engine.reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
155        }
156        Ok(sync_id.to_string())
157    }
158
159    fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
160        // unwrap here is unfortunate, but can hopefully go away if we can
161        // start using the ClientData type instead of the string.
162        self.engine()
163            .prepare_for_sync(&|| serde_json::from_str::<crate::ClientData>(client_data).unwrap())
164    }
165
166    fn sync_started(&self) -> Result<()> {
167        A::sync_started(self)
168    }
169
170    fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()> {
171        let engine = self.engine();
172        let mut telem = telemetry::Engine::new(engine.collection_name());
173        engine.stage_incoming(incoming_records, &mut telem)
174    }
175
176    fn apply(&self) -> Result<ApplyResults> {
177        let engine = self.engine();
178        let mut telem = telemetry::Engine::new(engine.collection_name());
179        // Desktop tells a bridged engine to apply the records without telling it
180        // the server timestamp, and once applied, explicitly calls `set_last_sync()`
181        // with that timestamp. So this adaptor needs to call apply with an invalid
182        // timestamp, and hope that later call with the correct timestamp does come.
183        // This isn't ideal as it means the timestamp is updated in a different transaction,
184        // but nothing too bad should happen if it doesn't - we'll just end up applying
185        // the same records again next sync.
186        let records = engine.apply(ServerTimestamp::from_millis(0), &mut telem)?;
187        Ok(ApplyResults {
188            records,
189            num_reconciled: telem
190                .get_incoming()
191                .as_ref()
192                .map(|i| i.get_reconciled() as usize),
193        })
194    }
195
196    fn set_uploaded(&self, millis: i64, ids: &[Guid]) -> Result<()> {
197        self.engine()
198            .set_uploaded(ServerTimestamp::from_millis(millis), ids.to_vec())
199    }
200
201    fn sync_finished(&self) -> Result<()> {
202        self.engine().sync_finished()
203    }
204
205    fn reset(&self) -> Result<()> {
206        self.engine().reset(&EngineSyncAssociation::Disconnected)
207    }
208
209    fn wipe(&self) -> Result<()> {
210        self.engine().wipe()
211    }
212}
213
214// TODO: We should see if we can remove this to reduce the number of types engines need to deal
215// with. num_reconciled is only used for telemetry on desktop.
216#[derive(Debug, Default)]
217pub struct ApplyResults {
218    /// List of records
219    pub records: Vec<OutgoingBso>,
220    /// The number of incoming records whose contents were merged because they
221    /// changed on both sides. None indicates we aren't reporting this
222    /// information.
223    pub num_reconciled: Option<usize>,
224}
225
226impl ApplyResults {
227    pub fn new(records: Vec<OutgoingBso>, num_reconciled: impl Into<Option<usize>>) -> Self {
228        Self {
229            records,
230            num_reconciled: num_reconciled.into(),
231        }
232    }
233}
234
235// Shorthand for engines that don't care.
236impl From<Vec<OutgoingBso>> for ApplyResults {
237    fn from(records: Vec<OutgoingBso>) -> Self {
238        Self {
239            records,
240            num_reconciled: None,
241        }
242    }
243}