sync15/engine/bridged_engine.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::error::debug;
6use crate::{telemetry, ServerTimestamp};
7use anyhow::Result;
8
9use crate::bso::{IncomingBso, OutgoingBso};
10use crate::Guid;
11
12use super::{CollSyncIds, EngineSyncAssociation, SyncEngine};
13
14/// A BridgedEngine acts as a bridge between application-services, rust
15/// implemented sync engines and sync engines as defined by Desktop Firefox.
16///
17/// [Desktop Firefox has an abstract implementation of a Sync
18/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js)
19/// with a number of functions each engine is expected to override. Engines
20/// implemented in Rust use a different shape (specifically, the
21/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts
22/// between the 2.
23pub trait BridgedEngine: Send + Sync {
24 /// Returns the last sync time, in milliseconds, for this engine's
25 /// collection. This is called before each sync, to determine the lower
26 /// bound for new records to fetch from the server.
27 fn last_sync(&self) -> Result<i64>;
28
29 /// Sets the last sync time, in milliseconds. This is called throughout
30 /// the sync, to fast-forward the stored last sync time to match the
31 /// timestamp on the uploaded records.
32 fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
33
34 /// Returns the sync ID for this engine's collection. This is only used in
35 /// tests.
36 fn sync_id(&self) -> Result<Option<String>>;
37
38 /// Resets the sync ID for this engine's collection, returning the new ID.
39 /// As a side effect, implementations should reset all local Sync state,
40 /// as in `reset`.
41 /// (Note that bridged engines never maintain the "global" guid - that's all managed
42 /// by the bridged_engine consumer (ie, desktop). bridged_engines only care about
43 /// the per-collection one.)
44 fn reset_sync_id(&self) -> Result<String>;
45
46 /// Ensures that the locally stored sync ID for this engine's collection
47 /// matches the `new_sync_id` from the server. If the two don't match,
48 /// implementations should reset all local Sync state, as in `reset`.
49 /// This method returns the assigned sync ID, which can be either the
50 /// `new_sync_id`, or a different one if the engine wants to force other
51 /// devices to reset their Sync state for this collection the next time they
52 /// sync.
53 fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>;
54
55 /// Tells the tabs engine about recent FxA devices. A bit of a leaky abstraction as it only
56 /// makes sense for tabs.
57 /// The arg is a json serialized `ClientData` struct.
58 fn prepare_for_sync(&self, _client_data: &str) -> Result<()> {
59 Ok(())
60 }
61
62 /// Indicates that the engine is about to start syncing. This is called
63 /// once per sync, and always before `store_incoming`.
64 fn sync_started(&self) -> Result<()>;
65
66 /// Stages a batch of incoming Sync records. This is called multiple
67 /// times per sync, once for each batch. Implementations can use the
68 /// signal to check if the operation was aborted, and cancel any
69 /// pending work.
70 fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>;
71
72 /// Applies all staged records, reconciling changes on both sides and
73 /// resolving conflicts. Returns a list of records to upload.
74 fn apply(&self) -> Result<ApplyResults>;
75
76 /// Indicates that the given record IDs were uploaded successfully to the
77 /// server. This is called multiple times per sync, once for each batch
78 /// upload.
79 fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>;
80
81 /// Indicates that all records have been uploaded. At this point, any record
82 /// IDs marked for upload that haven't been passed to `set_uploaded`, can be
83 /// assumed to have failed: for example, because the server rejected a record
84 /// with an invalid TTL or sort index.
85 fn sync_finished(&self) -> Result<()>;
86
87 /// Resets all local Sync state, including any change flags, mirrors, and
88 /// the last sync time, such that the next sync is treated as a first sync
89 /// with all new local data. Does not erase any local user data.
90 fn reset(&self) -> Result<()>;
91
92 /// Erases all local user data for this collection, and any Sync metadata.
93 /// This method is destructive, and unused for most collections.
94 fn wipe(&self) -> Result<()>;
95}
96
97// This is an adaptor trait - the idea is that engines can implement this
98// trait along with SyncEngine and get a BridgedEngine for free. It's temporary
99// so we can land this trait without needing to update desktop.
100// Longer term, we should remove both this trait and BridgedEngine entirely, sucking up
101// the breaking change for desktop. The main blocker to this is moving desktop away
102// from the explicit timestamp handling and moving closer to the `get_collection_request`
103// model.
104pub trait BridgedEngineAdaptor: Send + Sync {
105 // These are the main mismatches between the 2 engines
106 fn last_sync(&self) -> Result<i64>;
107 fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
108 fn sync_started(&self) -> Result<()> {
109 Ok(())
110 }
111
112 fn engine(&self) -> &dyn SyncEngine;
113}
114
115impl<A: BridgedEngineAdaptor> BridgedEngine for A {
116 fn last_sync(&self) -> Result<i64> {
117 self.last_sync()
118 }
119
120 fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
121 self.set_last_sync(last_sync_millis)
122 }
123
124 fn sync_id(&self) -> Result<Option<String>> {
125 Ok(match self.engine().get_sync_assoc()? {
126 EngineSyncAssociation::Disconnected => None,
127 EngineSyncAssociation::Connected(c) => Some(c.coll.into()),
128 })
129 }
130
131 fn reset_sync_id(&self) -> Result<String> {
132 // Note that bridged engines never maintain the "global" guid - that's all managed
133 // by desktop. bridged_engines only care about the per-collection one.
134 let global = Guid::empty();
135 let coll = Guid::random();
136 self.engine()
137 .reset(&EngineSyncAssociation::Connected(CollSyncIds {
138 global,
139 coll: coll.clone(),
140 }))?;
141 Ok(coll.to_string())
142 }
143
144 fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
145 let engine = self.engine();
146 let assoc = engine.get_sync_assoc()?;
147 if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) {
148 debug!("ensure_current_sync_id is current");
149 } else {
150 let new_coll_ids = CollSyncIds {
151 global: Guid::empty(),
152 coll: sync_id.into(),
153 };
154 engine.reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
155 }
156 Ok(sync_id.to_string())
157 }
158
159 fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
160 // unwrap here is unfortunate, but can hopefully go away if we can
161 // start using the ClientData type instead of the string.
162 self.engine()
163 .prepare_for_sync(&|| serde_json::from_str::<crate::ClientData>(client_data).unwrap())
164 }
165
166 fn sync_started(&self) -> Result<()> {
167 A::sync_started(self)
168 }
169
170 fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()> {
171 let engine = self.engine();
172 let mut telem = telemetry::Engine::new(engine.collection_name());
173 engine.stage_incoming(incoming_records, &mut telem)
174 }
175
176 fn apply(&self) -> Result<ApplyResults> {
177 let engine = self.engine();
178 let mut telem = telemetry::Engine::new(engine.collection_name());
179 // Desktop tells a bridged engine to apply the records without telling it
180 // the server timestamp, and once applied, explicitly calls `set_last_sync()`
181 // with that timestamp. So this adaptor needs to call apply with an invalid
182 // timestamp, and hope that later call with the correct timestamp does come.
183 // This isn't ideal as it means the timestamp is updated in a different transaction,
184 // but nothing too bad should happen if it doesn't - we'll just end up applying
185 // the same records again next sync.
186 let records = engine.apply(ServerTimestamp::from_millis(0), &mut telem)?;
187 Ok(ApplyResults {
188 records,
189 num_reconciled: telem
190 .get_incoming()
191 .as_ref()
192 .map(|i| i.get_reconciled() as usize),
193 })
194 }
195
196 fn set_uploaded(&self, millis: i64, ids: &[Guid]) -> Result<()> {
197 self.engine()
198 .set_uploaded(ServerTimestamp::from_millis(millis), ids.to_vec())
199 }
200
201 fn sync_finished(&self) -> Result<()> {
202 self.engine().sync_finished()
203 }
204
205 fn reset(&self) -> Result<()> {
206 self.engine().reset(&EngineSyncAssociation::Disconnected)
207 }
208
209 fn wipe(&self) -> Result<()> {
210 self.engine().wipe()
211 }
212}
213
214// TODO: We should see if we can remove this to reduce the number of types engines need to deal
215// with. num_reconciled is only used for telemetry on desktop.
216#[derive(Debug, Default)]
217pub struct ApplyResults {
218 /// List of records
219 pub records: Vec<OutgoingBso>,
220 /// The number of incoming records whose contents were merged because they
221 /// changed on both sides. None indicates we aren't reporting this
222 /// information.
223 pub num_reconciled: Option<usize>,
224}
225
226impl ApplyResults {
227 pub fn new(records: Vec<OutgoingBso>, num_reconciled: impl Into<Option<usize>>) -> Self {
228 Self {
229 records,
230 num_reconciled: num_reconciled.into(),
231 }
232 }
233}
234
235// Shorthand for engines that don't care.
236impl From<Vec<OutgoingBso>> for ApplyResults {
237 fn from(records: Vec<OutgoingBso>) -> Self {
238 Self {
239 records,
240 num_reconciled: None,
241 }
242 }
243}