1use super::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
9use super::status::{ServiceStatus, SyncResult};
10use super::storage_client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
11use crate::clients_engine::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
12use crate::engine::{EngineSyncAssociation, SyncEngine};
13use crate::error::{debug, info, trace, warn, Error};
14use crate::telemetry;
15use crate::KeyBundle;
16use interrupt_support::Interruptee;
17use std::collections::HashMap;
18use std::result;
19use std::time::{Duration, SystemTime};
20
21#[derive(Debug)]
24struct ClientInfo {
25 client_init: Sync15StorageClientInit,
27 client: Sync15StorageClient,
29}
30
31impl ClientInfo {
32 fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
33 Ok(Self {
34 client_init: ci.clone(),
35 client: Sync15StorageClient::new(ci.clone())?,
36 })
37 }
38}
39
40#[derive(Debug, Default)]
44pub struct MemoryCachedState {
45 last_client_info: Option<ClientInfo>,
46 last_global_state: Option<GlobalState>,
47 next_sync_after: Option<SystemTime>,
50 next_client_refresh_after: Option<SystemTime>,
51}
52
53impl MemoryCachedState {
54 pub fn clear_sensitive_info(&mut self) {
56 self.last_client_info = None;
57 self.last_global_state = None;
58 }
61 pub fn get_next_sync_after(&self) -> Option<SystemTime> {
62 self.next_sync_after
63 }
64 pub fn should_refresh_client(&self) -> bool {
65 match self.next_client_refresh_after {
66 Some(t) => SystemTime::now() > t,
67 None => true,
68 }
69 }
70 pub fn note_client_refresh(&mut self) {
71 self.next_client_refresh_after =
72 Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
73 }
74}
75
76pub fn sync_multiple(
94 engines: &[&dyn SyncEngine],
95 persisted_global_state: &mut Option<String>,
96 mem_cached_state: &mut MemoryCachedState,
97 storage_init: &Sync15StorageClientInit,
98 root_sync_key: &KeyBundle,
99 interruptee: &dyn Interruptee,
100 req_info: Option<SyncRequestInfo<'_>>,
101) -> SyncResult {
102 sync_multiple_with_command_processor(
103 None,
104 engines,
105 persisted_global_state,
106 mem_cached_state,
107 storage_init,
108 root_sync_key,
109 interruptee,
110 req_info,
111 )
112}
113
114#[allow(clippy::too_many_arguments)]
118pub fn sync_multiple_with_command_processor(
119 command_processor: Option<&dyn CommandProcessor>,
120 engines: &[&dyn SyncEngine],
121 persisted_global_state: &mut Option<String>,
122 mem_cached_state: &mut MemoryCachedState,
123 storage_init: &Sync15StorageClientInit,
124 root_sync_key: &KeyBundle,
125 interruptee: &dyn Interruptee,
126 req_info: Option<SyncRequestInfo<'_>>,
127) -> SyncResult {
128 info!("Syncing {} engines", engines.len());
129 let mut sync_result = SyncResult {
130 service_status: ServiceStatus::OtherError,
131 result: Ok(()),
132 declined: None,
133 next_sync_after: None,
134 engine_results: HashMap::with_capacity(engines.len()),
135 telemetry: telemetry::SyncTelemetryPing::new(),
136 };
137 let backoff = super::storage_client::new_backoff_listener();
138 let req_info = req_info.unwrap_or_default();
139 let driver = SyncMultipleDriver {
140 command_processor,
141 engines,
142 storage_init,
143 interruptee,
144 engines_to_state_change: req_info.engines_to_state_change,
145 backoff: backoff.clone(),
146 root_sync_key,
147 result: &mut sync_result,
148 persisted_global_state,
149 mem_cached_state,
150 saw_auth_error: false,
151 ignore_soft_backoff: req_info.is_user_action,
152 };
153 match driver.sync() {
154 Ok(()) => {
155 debug!(
156 "sync was successful, final status={:?}",
157 sync_result.service_status
158 );
159 }
160 Err(e) => {
161 warn!(
162 "sync failed: {}, final status={:?}",
163 e, sync_result.service_status,
164 );
165 sync_result.result = Err(e);
166 }
167 }
168 sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
171 mem_cached_state.next_sync_after = sync_result.next_sync_after;
172 trace!("Sync result: {:?}", sync_result);
173 sync_result
174}
175
176#[derive(Debug, Default)]
180pub struct SyncRequestInfo<'a> {
181 pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
182 pub is_user_action: bool,
183}
184
185struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
187 command_processor: Option<&'info dyn CommandProcessor>,
188 engines: &'info [&'info dyn SyncEngine],
189 storage_init: &'info Sync15StorageClientInit,
190 root_sync_key: &'info KeyBundle,
191 interruptee: &'info dyn Interruptee,
192 backoff: BackoffListener,
193 engines_to_state_change: Option<&'info HashMap<String, bool>>,
194 result: &'res mut SyncResult,
195 persisted_global_state: &'pgs mut Option<String>,
196 mem_cached_state: &'mcs mut MemoryCachedState,
197 ignore_soft_backoff: bool,
198 saw_auth_error: bool,
199}
200
201impl SyncMultipleDriver<'_, '_, '_, '_> {
202 fn sync(mut self) -> result::Result<(), Error> {
204 info!("Loading/initializing persisted state");
205 let mut pgs = self.prepare_persisted_state();
206
207 info!("Preparing client info");
208 let client_info = self.prepare_client_info()?;
209
210 if self.was_interrupted() {
211 return Ok(());
212 }
213
214 info!("Entering sync state machine");
215 let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;
218
219 if self.was_interrupted() {
220 return Ok(());
221 }
222
223 self.result.service_status = ServiceStatus::Ok;
226
227 let clients_engine = if let Some(command_processor) = self.command_processor {
228 info!("Synchronizing clients engine");
229 let should_refresh = self.mem_cached_state.should_refresh_client();
230 let mut engine = clients_engine::Engine::new(command_processor, self.interruptee);
231 if let Err(e) = engine.sync(
232 &client_info.client,
233 &global_state,
234 self.root_sync_key,
235 should_refresh,
236 ) {
237 let mut telem_sync = telemetry::SyncTelemetry::new();
239 let mut telem_engine = telemetry::Engine::new("clients");
240 telem_engine.failure(&e);
241 telem_sync.engine(telem_engine);
242 self.result.service_status = ServiceStatus::from_err(&e);
243
244 return Err(e);
246 }
247 if self.was_interrupted() {
252 return Ok(());
253 }
254 self.mem_cached_state.note_client_refresh();
255 Some(engine)
256 } else {
257 None
258 };
259
260 info!("Synchronizing engines");
261
262 let telem_sync =
263 self.sync_engines(&client_info, &mut global_state, clients_engine.as_ref());
264 self.result.telemetry.sync(telem_sync);
265
266 info!("Finished syncing engines.");
267
268 if !self.saw_auth_error {
269 trace!("Updating persisted global state");
270 self.mem_cached_state.last_client_info = Some(client_info);
271 self.mem_cached_state.last_global_state = Some(global_state);
272 }
273
274 Ok(())
275 }
276
277 fn was_interrupted(&mut self) -> bool {
278 if self.interruptee.was_interrupted() {
279 info!("Interrupted, bailing out");
280 self.result.service_status = ServiceStatus::Interrupted;
281 true
282 } else {
283 false
284 }
285 }
286
287 fn sync_engines(
288 &mut self,
289 client_info: &ClientInfo,
290 global_state: &mut GlobalState,
291 clients: Option<&clients_engine::Engine<'_>>,
292 ) -> telemetry::SyncTelemetry {
293 let mut telem_sync = telemetry::SyncTelemetry::new();
294 for engine in self.engines {
295 let name = engine.collection_name();
296 if self
297 .backoff
298 .get_required_wait(self.ignore_soft_backoff)
299 .is_some()
300 {
301 warn!("Got backoff, bailing out of sync early");
302 break;
303 }
304 if global_state.global.declined.iter().any(|e| e == &*name) {
305 info!("The {} engine is declined. Skipping", name);
306 continue;
307 }
308 info!("Syncing {} engine!", name);
309
310 let mut telem_engine = telemetry::Engine::new(&*name);
311 let result = super::sync::synchronize_with_clients_engine(
312 &client_info.client,
313 global_state,
314 self.root_sync_key,
315 clients,
316 *engine,
317 true,
318 &mut telem_engine,
319 self.interruptee,
320 );
321
322 match result {
323 Ok(()) => info!("Sync of {} was successful!", name),
324 Err(ref e) => {
325 warn!("Sync of {} failed! {:?}", name, e);
326 let this_status = ServiceStatus::from_err(e);
327 self.saw_auth_error =
330 self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
331 telem_engine.failure(e);
332 if this_status != ServiceStatus::OtherError {
335 telem_sync.engine(telem_engine);
336 self.result.engine_results.insert(name.into(), result);
337 self.result.service_status = this_status;
338 break;
339 }
340 }
341 }
342 telem_sync.engine(telem_engine);
343 self.result.engine_results.insert(name.into(), result);
344 if self.was_interrupted() {
345 break;
346 }
347 }
348 telem_sync
349 }
350
351 fn run_state_machine(
352 &mut self,
353 client_info: &ClientInfo,
354 pgs: &mut PersistedGlobalState,
355 ) -> result::Result<GlobalState, Error> {
356 let last_state = self.mem_cached_state.last_global_state.take();
357
358 let mut state_machine = SetupStateMachine::for_full_sync(
359 &client_info.client,
360 self.root_sync_key,
361 pgs,
362 self.engines_to_state_change,
363 self.interruptee,
364 );
365
366 info!("Advancing state machine to ready (full)");
367 let res = state_machine.run_to_ready(last_state);
368 let changes = state_machine.changes_needed.take();
371 *self.persisted_global_state = Some(serde_json::to_string(&pgs)?);
374
375 self.result.declined = Some(pgs.get_declined().to_vec());
378 debug!(
379 "Declined engines list after state machine set to: {:?}",
380 self.result.declined,
381 );
382
383 if let Some(c) = changes {
384 self.wipe_or_reset_engines(c, &client_info.client)?;
385 }
386 let state = match res {
387 Err(e) => {
388 self.result.service_status = ServiceStatus::from_err(&e);
389 return Err(e);
390 }
391 Ok(state) => state,
392 };
393 self.result.telemetry.uid(client_info.client.hashed_uid()?);
394 self.mem_cached_state.last_global_state = None;
396 Ok(state)
397 }
398
399 fn wipe_or_reset_engines(
400 &mut self,
401 changes: EngineChangesNeeded,
402 client: &Sync15StorageClient,
403 ) -> result::Result<(), Error> {
404 if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
405 return Ok(());
406 }
407 for e in &changes.remote_wipes {
408 info!("Engine {:?} just got disabled locally, wiping server", e);
409 client.wipe_remote_engine(e)?;
410 }
411
412 for s in self.engines {
413 let name = s.collection_name();
414 if changes.local_resets.contains(&*name) {
415 info!("Resetting engine {}, as it was declined remotely", name);
416 s.reset(&EngineSyncAssociation::Disconnected)?;
417 }
418 }
419
420 Ok(())
421 }
422
423 fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
424 let mut client_info = match self.mem_cached_state.last_client_info.take() {
425 Some(client_info) => {
426 if client_info.client_init != *self.storage_init {
431 info!("Discarding all state as the account might have changed");
432 *self.mem_cached_state = MemoryCachedState::default();
433 ClientInfo::new(self.storage_init)?
434 } else {
435 debug!("Reusing memory-cached client_info");
436 client_info
438 }
439 }
440 None => {
441 debug!("mem_cached_state was stale or missing, need setup");
442 self.mem_cached_state.clear_sensitive_info();
445 ClientInfo::new(self.storage_init)?
446 }
447 };
448 client_info.client.backoff = self.backoff.clone();
451 Ok(client_info)
452 }
453
454 fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
455 match self.persisted_global_state {
459 Some(persisted_string) if !persisted_string.is_empty() => {
460 match serde_json::from_str::<PersistedGlobalState>(persisted_string) {
461 Ok(state) => {
462 trace!("Read persisted state: {:?}", state);
463 state
467 }
468 _ => {
469 error_support::report_error!(
472 "sync15-prepare-persisted-state",
473 "Failed to parse PersistedGlobalState from JSON! Falling back to default"
474 );
475 *self.mem_cached_state = MemoryCachedState::default();
476 PersistedGlobalState::default()
477 }
478 }
479 }
480 _ => {
481 info!(
482 "The application didn't give us persisted state - \
483 this is only expected on the very first run for a given user."
484 );
485 *self.mem_cached_state = MemoryCachedState::default();
486 PersistedGlobalState::default()
487 }
488 }
489 }
490}