1use crate::error::*;
6use crate::types::{ServiceStatus, SyncEngineSelection, SyncParams, SyncReason, SyncResult};
7use crate::{reset, reset_all, wipe};
8use error_support::{breadcrumb, debug, info, warn};
9use parking_lot::Mutex;
10use std::collections::{BTreeMap, HashMap, HashSet};
11use std::convert::TryFrom;
12use std::time::SystemTime;
13use sync15::client::{
14 sync_multiple_with_command_processor, MemoryCachedState, Sync15StorageClientInit,
15 SyncRequestInfo,
16};
17use sync15::clients_engine::{Command, CommandProcessor, CommandStatus, Settings};
18use sync15::engine::{EngineSyncAssociation, SyncEngine, SyncEngineId};
19
20#[derive(Default)]
21pub struct SyncManager {
22 mem_cached_state: Mutex<Option<MemoryCachedState>>,
23}
24
25impl SyncManager {
26 pub fn new() -> Self {
27 Self::default()
28 }
29
30 fn get_engine_id(engine_name: &str) -> Result<SyncEngineId> {
31 SyncEngineId::try_from(engine_name).map_err(SyncManagerError::UnknownEngine)
32 }
33
34 fn get_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
35 match engine_id {
36 SyncEngineId::History => places::get_registered_sync_engine(engine_id),
37 SyncEngineId::Bookmarks => places::get_registered_sync_engine(engine_id),
38 SyncEngineId::Addresses => autofill::get_registered_sync_engine(engine_id),
39 SyncEngineId::CreditCards => autofill::get_registered_sync_engine(engine_id),
40 SyncEngineId::Passwords => logins::get_registered_sync_engine(engine_id),
41 SyncEngineId::Tabs => tabs::get_registered_sync_engine(engine_id),
42 }
43 }
44
45 pub fn wipe(&self, engine_name: &str) -> Result<()> {
46 if let Some(engine) = Self::get_engine(&Self::get_engine_id(engine_name)?) {
47 engine.wipe()?;
48 }
49 Ok(())
50 }
51
52 pub fn reset(&self, engine_name: &str) -> Result<()> {
53 if let Some(engine) = Self::get_engine(&Self::get_engine_id(engine_name)?) {
54 engine.reset(&EngineSyncAssociation::Disconnected)?;
55 }
56 Ok(())
57 }
58
59 pub fn reset_all(&self) -> Result<()> {
60 for (_, engine) in self.iter_registered_engines() {
61 engine.reset(&EngineSyncAssociation::Disconnected)?;
62 }
63 Ok(())
64 }
65
66 pub fn disconnect(&self) {
68 breadcrumb!("SyncManager disconnect()");
69 for engine_id in SyncEngineId::iter() {
70 if let Some(engine) = Self::get_engine(&engine_id) {
71 if let Err(e) = engine.reset(&EngineSyncAssociation::Disconnected) {
72 error_support::report_error!(
73 "sync-manager-reset",
74 "Failed to reset {}: {}",
75 engine_id,
76 e
77 );
78 }
79 } else {
80 warn!("Unable to reset {}, be sure to call register_with_sync_manager before disconnect if this is surprising", engine_id);
81 }
82 }
83 }
84
85 pub fn sync(&self, params: SyncParams) -> Result<SyncResult> {
87 breadcrumb!("SyncManager::sync started");
88 let mut state = self.mem_cached_state.lock();
89 let engines = self.calc_engines_to_sync(¶ms.engines)?;
90 let next_sync_after = state.as_ref().and_then(|mcs| mcs.get_next_sync_after());
91 let result = if !backoff_in_effect(next_sync_after, ¶ms) {
92 info!("No backoff in effect (or we decided to ignore it), starting sync");
93 self.do_sync(params, &mut state, engines)
94 } else {
95 breadcrumb!(
96 "Backoff still in effect (until {:?}), bailing out early",
97 next_sync_after
98 );
99 Ok(SyncResult {
100 status: ServiceStatus::BackedOff,
101 successful: Default::default(),
102 failures: Default::default(),
103 declined: None,
104 next_sync_allowed_at: next_sync_after,
105 persisted_state: params.persisted_state.unwrap_or_default(),
106 telemetry_json: None,
108 })
109 };
110 breadcrumb!("SyncManager sync ended");
111 result
112 }
113
114 fn do_sync(
115 &self,
116 mut params: SyncParams,
117 state: &mut Option<MemoryCachedState>,
118 mut engines: Vec<Box<dyn SyncEngine>>,
119 ) -> Result<SyncResult> {
120 let key_bundle = sync15::KeyBundle::from_ksync_base64(¶ms.auth_info.sync_key)?;
121 let tokenserver_url = url::Url::parse(¶ms.auth_info.tokenserver_url)?;
122 let interruptee = interrupt_support::ShutdownInterruptee;
123 let mut mem_cached_state = state.take().unwrap_or_default();
124 let mut disk_cached_state = params.persisted_state.take();
125
126 for engine in engines.iter_mut() {
128 if let Some(key) = params.local_encryption_keys.get(&*engine.collection_name()) {
129 engine.set_local_encryption_key(key)?
130 }
131 }
132
133 let engine_refs: Vec<&dyn SyncEngine> = engines.iter().map(|s| &**s).collect();
134
135 let client_init = Sync15StorageClientInit {
136 key_id: params.auth_info.kid.clone(),
137 access_token: params.auth_info.fxa_access_token.clone(),
138 tokenserver_url,
139 };
140 let engines_to_change = if params.enabled_changes.is_empty() {
141 None
142 } else {
143 Some(¶ms.enabled_changes)
144 };
145
146 let settings = Settings {
147 fxa_device_id: params.device_settings.fxa_device_id,
148 device_name: params.device_settings.name,
149 device_type: params.device_settings.kind,
150 };
151 let c = SyncClient::new(settings);
152 let result = sync_multiple_with_command_processor(
153 Some(&c),
154 &engine_refs,
155 &mut disk_cached_state,
156 &mut mem_cached_state,
157 &client_init,
158 &key_bundle,
159 &interruptee,
160 Some(SyncRequestInfo {
161 engines_to_state_change: engines_to_change,
162 is_user_action: matches!(params.reason, SyncReason::User),
163 }),
164 );
165 *state = Some(mem_cached_state);
166
167 info!("Sync finished with status {:?}", result.service_status);
168 let status = ServiceStatus::from(result.service_status);
169 for (engine, result) in result.engine_results.iter() {
170 info!("engine {:?} status: {:?}", engine, result);
171 }
172 let mut successful: Vec<String> = Vec::new();
173 let mut failures: HashMap<String, String> = HashMap::new();
174 for (engine, result) in result.engine_results.into_iter() {
175 match result {
176 Ok(_) => {
177 successful.push(engine);
178 }
179 Err(err) => {
180 failures.insert(engine, err.to_string());
181 }
182 }
183 }
184 let telemetry_json = serde_json::to_string(&result.telemetry).unwrap();
185
186 Ok(SyncResult {
187 status,
188 successful,
189 failures,
190 declined: result.declined,
191 next_sync_allowed_at: result.next_sync_after,
192 persisted_state: disk_cached_state.unwrap_or_default(),
193 telemetry_json: Some(telemetry_json),
194 })
195 }
196
197 fn iter_registered_engines(&self) -> impl Iterator<Item = (SyncEngineId, Box<dyn SyncEngine>)> {
198 SyncEngineId::iter().filter_map(|id| Self::get_engine(&id).map(|engine| (id, engine)))
199 }
200
201 pub fn get_available_engines(&self) -> Vec<String> {
202 self.iter_registered_engines()
203 .map(|(name, _)| name.to_string())
204 .collect()
205 }
206
207 fn calc_engines_to_sync(
208 &self,
209 selection: &SyncEngineSelection,
210 ) -> Result<Vec<Box<dyn SyncEngine>>> {
211 let mut engine_map: BTreeMap<_, _> = self.iter_registered_engines().collect();
213 breadcrumb!(
214 "Checking engines requested ({:?}) vs local engines ({:?})",
215 selection,
216 engine_map
217 .keys()
218 .map(|engine_id| engine_id.name())
219 .collect::<Vec<_>>(),
220 );
221 if let SyncEngineSelection::Some {
222 engines: engine_names,
223 } = selection
224 {
225 let mut selected_engine_ids: HashSet<SyncEngineId> = HashSet::new();
227 for name in engine_names {
228 let engine_id = Self::get_engine_id(name)?;
229 if !engine_map.contains_key(&engine_id) {
230 return Err(SyncManagerError::UnsupportedFeature(name.to_string()));
231 }
232 selected_engine_ids.insert(engine_id);
233 }
234 engine_map.retain(|engine_id, _| selected_engine_ids.contains(engine_id))
236 }
237 Ok(engine_map.into_values().collect())
238 }
239}
240
241fn backoff_in_effect(next_sync_after: Option<SystemTime>, p: &SyncParams) -> bool {
242 let now = SystemTime::now();
243 if let Some(nsa) = next_sync_after {
244 if nsa > now {
245 return if matches!(p.reason, SyncReason::User | SyncReason::EnabledChange) {
246 info!(
247 "Still under backoff, but syncing anyway because reason is {:?}",
248 p.reason
249 );
250 false
251 } else if !p.enabled_changes.is_empty() {
252 info!("Still under backoff, but syncing because we have enabled state changes.");
253 false
254 } else {
255 info!("Still under backoff, and there's no compelling reason for us to ignore it");
256 true
257 };
258 }
259 }
260 debug!("Not under backoff");
261 false
262}
263
264impl From<sync15::client::ServiceStatus> for ServiceStatus {
265 fn from(s15s: sync15::client::ServiceStatus) -> Self {
266 use sync15::client::ServiceStatus::*;
267 match s15s {
268 Ok => ServiceStatus::Ok,
269 NetworkError => ServiceStatus::NetworkError,
270 ServiceError => ServiceStatus::ServiceError,
271 AuthenticationError => ServiceStatus::AuthError,
272 BackedOff => ServiceStatus::BackedOff,
273 Interrupted => ServiceStatus::OtherError, OtherError => ServiceStatus::OtherError,
275 }
276 }
277}
278
279struct SyncClient(Settings);
280
281impl SyncClient {
282 pub fn new(settings: Settings) -> SyncClient {
283 SyncClient(settings)
284 }
285}
286
287impl CommandProcessor for SyncClient {
288 fn settings(&self) -> &Settings {
289 &self.0
290 }
291
292 fn apply_incoming_command(&self, command: Command) -> anyhow::Result<CommandStatus> {
293 let result = match command {
294 Command::Wipe(engine) => wipe(&engine),
295 Command::Reset(engine) => reset(&engine),
296 Command::ResetAll => reset_all(),
297 };
298 match result {
299 Ok(()) => Ok(CommandStatus::Applied),
300 Err(err) => match err {
301 SyncManagerError::UnknownEngine(_) => Ok(CommandStatus::Unsupported),
302 _ => Err(err.into()),
303 },
304 }
305 }
306
307 fn fetch_outgoing_commands(&self) -> anyhow::Result<HashSet<Command>> {
308 Ok(HashSet::new())
309 }
310}
311
312#[cfg(test)]
313mod test {
314 use super::*;
315
316 #[test]
317 fn test_engine_id_sanity() {
318 for engine_id in SyncEngineId::iter() {
319 assert_eq!(engine_id, SyncEngineId::try_from(engine_id.name()).unwrap());
320 }
321 }
322}