sync15/client/
sync_multiple.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5// This helps you perform a sync of multiple engines and helps you manage
6// global and local state between syncs.
7
8use super::state::{EngineChangesNeeded, GlobalState, PersistedGlobalState, SetupStateMachine};
9use super::status::{ServiceStatus, SyncResult};
10use super::storage_client::{BackoffListener, Sync15StorageClient, Sync15StorageClientInit};
11use crate::clients_engine::{self, CommandProcessor, CLIENTS_TTL_REFRESH};
12use crate::engine::{EngineSyncAssociation, SyncEngine};
13use crate::error::{debug, info, trace, warn, Error};
14use crate::telemetry;
15use crate::KeyBundle;
16use interrupt_support::Interruptee;
17use std::collections::HashMap;
18use std::result;
19use std::time::{Duration, SystemTime};
20
21/// Info about the client to use. We reuse the client unless
22/// we discover the client_init has changed, in which case we re-create one.
23#[derive(Debug)]
24struct ClientInfo {
25    // the client_init used to create `client`.
26    client_init: Sync15StorageClientInit,
27    // the client (our tokenserver state machine state, and our http library's state)
28    client: Sync15StorageClient,
29}
30
31impl ClientInfo {
32    fn new(ci: &Sync15StorageClientInit) -> Result<Self, Error> {
33        Ok(Self {
34            client_init: ci.clone(),
35            client: Sync15StorageClient::new(ci.clone())?,
36        })
37    }
38}
39
40/// Info we want callers to engine *in memory* for us so that subsequent
41/// syncs are faster. This should never be persisted to storage as it holds
42/// sensitive information, such as the sync decryption keys.
43#[derive(Debug, Default)]
44pub struct MemoryCachedState {
45    last_client_info: Option<ClientInfo>,
46    last_global_state: Option<GlobalState>,
47    // These are just engined in memory, as persisting an invalid value far in the
48    // future has the potential to break sync for good.
49    next_sync_after: Option<SystemTime>,
50    next_client_refresh_after: Option<SystemTime>,
51}
52
53impl MemoryCachedState {
54    // Called we notice the cached state is stale.
55    pub fn clear_sensitive_info(&mut self) {
56        self.last_client_info = None;
57        self.last_global_state = None;
58        // Leave the backoff time, as there's no reason to think it's not still
59        // true.
60    }
61    pub fn get_next_sync_after(&self) -> Option<SystemTime> {
62        self.next_sync_after
63    }
64    pub fn should_refresh_client(&self) -> bool {
65        match self.next_client_refresh_after {
66            Some(t) => SystemTime::now() > t,
67            None => true,
68        }
69    }
70    pub fn note_client_refresh(&mut self) {
71        self.next_client_refresh_after =
72            Some(SystemTime::now() + Duration::from_secs(CLIENTS_TTL_REFRESH));
73    }
74}
75
76/// Sync multiple engines
77/// * `engines` - The engines to sync
78/// * `persisted_global_state` - The global state to use, or None if never
79///   before provided. At the end of the sync, and even when the sync fails,
80///   the value in this cell should be persisted to permanent storage and
81///   provided next time the sync is called.
82/// * `last_client_info` - The client state to use, or None if never before
83///   provided. At the end of the sync, the value should be persisted
84///   *in memory only* - it should not be persisted to disk.
85/// * `storage_init` - Information about how the sync http client should be
86///   configured.
87/// * `root_sync_key` - The KeyBundle used for encryption.
88///
89/// Returns a map, keyed by name and holding an error value - if any engine
90/// fails, the sync will continue on to other engines, but the error will be
91/// places in this map. The absence of a name in the map implies the engine
92/// succeeded.
93pub fn sync_multiple(
94    engines: &[&dyn SyncEngine],
95    persisted_global_state: &mut Option<String>,
96    mem_cached_state: &mut MemoryCachedState,
97    storage_init: &Sync15StorageClientInit,
98    root_sync_key: &KeyBundle,
99    interruptee: &dyn Interruptee,
100    req_info: Option<SyncRequestInfo<'_>>,
101) -> SyncResult {
102    sync_multiple_with_command_processor(
103        None,
104        engines,
105        persisted_global_state,
106        mem_cached_state,
107        storage_init,
108        root_sync_key,
109        interruptee,
110        req_info,
111    )
112}
113
114/// Like `sync_multiple`, but specifies an optional command processor to handle
115/// commands from the clients collection. This function is called by the sync
116/// manager, which provides its own processor.
117#[allow(clippy::too_many_arguments)]
118pub fn sync_multiple_with_command_processor(
119    command_processor: Option<&dyn CommandProcessor>,
120    engines: &[&dyn SyncEngine],
121    persisted_global_state: &mut Option<String>,
122    mem_cached_state: &mut MemoryCachedState,
123    storage_init: &Sync15StorageClientInit,
124    root_sync_key: &KeyBundle,
125    interruptee: &dyn Interruptee,
126    req_info: Option<SyncRequestInfo<'_>>,
127) -> SyncResult {
128    info!("Syncing {} engines", engines.len());
129    let mut sync_result = SyncResult {
130        service_status: ServiceStatus::OtherError,
131        result: Ok(()),
132        declined: None,
133        next_sync_after: None,
134        engine_results: HashMap::with_capacity(engines.len()),
135        telemetry: telemetry::SyncTelemetryPing::new(),
136    };
137    let backoff = super::storage_client::new_backoff_listener();
138    let req_info = req_info.unwrap_or_default();
139    let driver = SyncMultipleDriver {
140        command_processor,
141        engines,
142        storage_init,
143        interruptee,
144        engines_to_state_change: req_info.engines_to_state_change,
145        backoff: backoff.clone(),
146        root_sync_key,
147        result: &mut sync_result,
148        persisted_global_state,
149        mem_cached_state,
150        saw_auth_error: false,
151        ignore_soft_backoff: req_info.is_user_action,
152    };
153    match driver.sync() {
154        Ok(()) => {
155            debug!(
156                "sync was successful, final status={:?}",
157                sync_result.service_status
158            );
159        }
160        Err(e) => {
161            warn!(
162                "sync failed: {}, final status={:?}",
163                e, sync_result.service_status,
164            );
165            sync_result.result = Err(e);
166        }
167    }
168    // Respect `backoff` value when computing the next sync time even if we were
169    // ignoring it during the sync
170    sync_result.set_sync_after(backoff.get_required_wait(false).unwrap_or_default());
171    mem_cached_state.next_sync_after = sync_result.next_sync_after;
172    trace!("Sync result: {:?}", sync_result);
173    sync_result
174}
175
176/// This is essentially a bag of information that the sync manager knows, but
177/// otherwise we won't. It should probably be rethought if it gains many more
178/// fields.
179#[derive(Debug, Default)]
180pub struct SyncRequestInfo<'a> {
181    pub engines_to_state_change: Option<&'a HashMap<String, bool>>,
182    pub is_user_action: bool,
183}
184
185// The sync multiple driver
186struct SyncMultipleDriver<'info, 'res, 'pgs, 'mcs> {
187    command_processor: Option<&'info dyn CommandProcessor>,
188    engines: &'info [&'info dyn SyncEngine],
189    storage_init: &'info Sync15StorageClientInit,
190    root_sync_key: &'info KeyBundle,
191    interruptee: &'info dyn Interruptee,
192    backoff: BackoffListener,
193    engines_to_state_change: Option<&'info HashMap<String, bool>>,
194    result: &'res mut SyncResult,
195    persisted_global_state: &'pgs mut Option<String>,
196    mem_cached_state: &'mcs mut MemoryCachedState,
197    ignore_soft_backoff: bool,
198    saw_auth_error: bool,
199}
200
201impl SyncMultipleDriver<'_, '_, '_, '_> {
202    /// The actual worker for sync_multiple.
203    fn sync(mut self) -> result::Result<(), Error> {
204        info!("Loading/initializing persisted state");
205        let mut pgs = self.prepare_persisted_state();
206
207        info!("Preparing client info");
208        let client_info = self.prepare_client_info()?;
209
210        if self.was_interrupted() {
211            return Ok(());
212        }
213
214        info!("Entering sync state machine");
215        // Advance the state machine to the point where it can perform a full
216        // sync. This may involve uploading meta/global, crypto/keys etc.
217        let mut global_state = self.run_state_machine(&client_info, &mut pgs)?;
218
219        if self.was_interrupted() {
220            return Ok(());
221        }
222
223        // Set the service status to OK here - we may adjust it based on an individual
224        // engine failing.
225        self.result.service_status = ServiceStatus::Ok;
226
227        let clients_engine = if let Some(command_processor) = self.command_processor {
228            info!("Synchronizing clients engine");
229            let should_refresh = self.mem_cached_state.should_refresh_client();
230            let mut engine = clients_engine::Engine::new(command_processor, self.interruptee);
231            if let Err(e) = engine.sync(
232                &client_info.client,
233                &global_state,
234                self.root_sync_key,
235                should_refresh,
236            ) {
237                // Record telemetry with the error just in case...
238                let mut telem_sync = telemetry::SyncTelemetry::new();
239                let mut telem_engine = telemetry::Engine::new("clients");
240                telem_engine.failure(&e);
241                telem_sync.engine(telem_engine);
242                self.result.service_status = ServiceStatus::from_err(&e);
243
244                // ...And bail, because a clients engine sync failure is fatal.
245                return Err(e);
246            }
247            // We don't record telemetry for successful clients engine
248            // syncs, since we only keep client records in memory, we
249            // expect the counts to be the same most times, and a
250            // failure aborts the entire sync.
251            if self.was_interrupted() {
252                return Ok(());
253            }
254            self.mem_cached_state.note_client_refresh();
255            Some(engine)
256        } else {
257            None
258        };
259
260        info!("Synchronizing engines");
261
262        let telem_sync =
263            self.sync_engines(&client_info, &mut global_state, clients_engine.as_ref());
264        self.result.telemetry.sync(telem_sync);
265
266        info!("Finished syncing engines.");
267
268        if !self.saw_auth_error {
269            trace!("Updating persisted global state");
270            self.mem_cached_state.last_client_info = Some(client_info);
271            self.mem_cached_state.last_global_state = Some(global_state);
272        }
273
274        Ok(())
275    }
276
277    fn was_interrupted(&mut self) -> bool {
278        if self.interruptee.was_interrupted() {
279            info!("Interrupted, bailing out");
280            self.result.service_status = ServiceStatus::Interrupted;
281            true
282        } else {
283            false
284        }
285    }
286
287    fn sync_engines(
288        &mut self,
289        client_info: &ClientInfo,
290        global_state: &mut GlobalState,
291        clients: Option<&clients_engine::Engine<'_>>,
292    ) -> telemetry::SyncTelemetry {
293        let mut telem_sync = telemetry::SyncTelemetry::new();
294        for engine in self.engines {
295            let name = engine.collection_name();
296            if self
297                .backoff
298                .get_required_wait(self.ignore_soft_backoff)
299                .is_some()
300            {
301                warn!("Got backoff, bailing out of sync early");
302                break;
303            }
304            if global_state.global.declined.iter().any(|e| e == &*name) {
305                info!("The {} engine is declined. Skipping", name);
306                continue;
307            }
308            info!("Syncing {} engine!", name);
309
310            let mut telem_engine = telemetry::Engine::new(&*name);
311            let result = super::sync::synchronize_with_clients_engine(
312                &client_info.client,
313                global_state,
314                self.root_sync_key,
315                clients,
316                *engine,
317                true,
318                &mut telem_engine,
319                self.interruptee,
320            );
321
322            match result {
323                Ok(()) => info!("Sync of {} was successful!", name),
324                Err(ref e) => {
325                    warn!("Sync of {} failed! {:?}", name, e);
326                    let this_status = ServiceStatus::from_err(e);
327                    // The only error which forces us to discard our state is an
328                    // auth error.
329                    self.saw_auth_error =
330                        self.saw_auth_error || this_status == ServiceStatus::AuthenticationError;
331                    telem_engine.failure(e);
332                    // If the failure from the engine looks like anything other than
333                    // a "engine error" we don't bother trying the others.
334                    if this_status != ServiceStatus::OtherError {
335                        telem_sync.engine(telem_engine);
336                        self.result.engine_results.insert(name.into(), result);
337                        self.result.service_status = this_status;
338                        break;
339                    }
340                }
341            }
342            telem_sync.engine(telem_engine);
343            self.result.engine_results.insert(name.into(), result);
344            if self.was_interrupted() {
345                break;
346            }
347        }
348        telem_sync
349    }
350
351    fn run_state_machine(
352        &mut self,
353        client_info: &ClientInfo,
354        pgs: &mut PersistedGlobalState,
355    ) -> result::Result<GlobalState, Error> {
356        let last_state = self.mem_cached_state.last_global_state.take();
357
358        let mut state_machine = SetupStateMachine::for_full_sync(
359            &client_info.client,
360            self.root_sync_key,
361            pgs,
362            self.engines_to_state_change,
363            self.interruptee,
364        );
365
366        info!("Advancing state machine to ready (full)");
367        let res = state_machine.run_to_ready(last_state);
368        // Grab this now even though we don't need it until later to avoid a
369        // lifetime issue
370        let changes = state_machine.changes_needed.take();
371        // The state machine might have updated our persisted_global_state, so
372        // update the caller's repr of it.
373        *self.persisted_global_state = Some(serde_json::to_string(&pgs)?);
374
375        // Now that we've gone through the state machine, engine the declined list in
376        // the sync_result
377        self.result.declined = Some(pgs.get_declined().to_vec());
378        debug!(
379            "Declined engines list after state machine set to: {:?}",
380            self.result.declined,
381        );
382
383        if let Some(c) = changes {
384            self.wipe_or_reset_engines(c, &client_info.client)?;
385        }
386        let state = match res {
387            Err(e) => {
388                self.result.service_status = ServiceStatus::from_err(&e);
389                return Err(e);
390            }
391            Ok(state) => state,
392        };
393        self.result.telemetry.uid(client_info.client.hashed_uid()?);
394        // As for client_info, put None back now so we start from scratch on error.
395        self.mem_cached_state.last_global_state = None;
396        Ok(state)
397    }
398
399    fn wipe_or_reset_engines(
400        &mut self,
401        changes: EngineChangesNeeded,
402        client: &Sync15StorageClient,
403    ) -> result::Result<(), Error> {
404        if changes.local_resets.is_empty() && changes.remote_wipes.is_empty() {
405            return Ok(());
406        }
407        for e in &changes.remote_wipes {
408            info!("Engine {:?} just got disabled locally, wiping server", e);
409            client.wipe_remote_engine(e)?;
410        }
411
412        for s in self.engines {
413            let name = s.collection_name();
414            if changes.local_resets.contains(&*name) {
415                info!("Resetting engine {}, as it was declined remotely", name);
416                s.reset(&EngineSyncAssociation::Disconnected)?;
417            }
418        }
419
420        Ok(())
421    }
422
423    fn prepare_client_info(&mut self) -> result::Result<ClientInfo, Error> {
424        let mut client_info = match self.mem_cached_state.last_client_info.take() {
425            Some(client_info) => {
426                // if our storage_init has changed it probably means the user has
427                // changed, courtesy of the 'kid' in the structure. Thus, we can't
428                // reuse the client or the memory cached state. We do keep the disk
429                // state as currently that's only the declined list.
430                if client_info.client_init != *self.storage_init {
431                    info!("Discarding all state as the account might have changed");
432                    *self.mem_cached_state = MemoryCachedState::default();
433                    ClientInfo::new(self.storage_init)?
434                } else {
435                    debug!("Reusing memory-cached client_info");
436                    // we can reuse it (which should be the common path)
437                    client_info
438                }
439            }
440            None => {
441                debug!("mem_cached_state was stale or missing, need setup");
442                // We almost certainly have no other state here, but to be safe, we
443                // throw away any memory state we do have.
444                self.mem_cached_state.clear_sensitive_info();
445                ClientInfo::new(self.storage_init)?
446            }
447        };
448        // Ensure we use the correct listener here rather than on all the branches
449        // above, since it seems less error prone.
450        client_info.client.backoff = self.backoff.clone();
451        Ok(client_info)
452    }
453
454    fn prepare_persisted_state(&mut self) -> PersistedGlobalState {
455        // Note that any failure to use a persisted state means we also decline
456        // to use our memory cached state, so that we fully rebuild that
457        // persisted state for next time.
458        match self.persisted_global_state {
459            Some(persisted_string) if !persisted_string.is_empty() => {
460                match serde_json::from_str::<PersistedGlobalState>(persisted_string) {
461                    Ok(state) => {
462                        trace!("Read persisted state: {:?}", state);
463                        // Note that we don't set `result.declined` from the
464                        // data in state - it remains None, which explicitly
465                        // indicates "we don't have updated info".
466                        state
467                    }
468                    _ => {
469                        // Don't log the error since it might contain sensitive
470                        // info (although currently it only contains the declined engines list)
471                        error_support::report_error!(
472                            "sync15-prepare-persisted-state",
473                            "Failed to parse PersistedGlobalState from JSON! Falling back to default"
474                        );
475                        *self.mem_cached_state = MemoryCachedState::default();
476                        PersistedGlobalState::default()
477                    }
478                }
479            }
480            _ => {
481                info!(
482                    "The application didn't give us persisted state - \
483                     this is only expected on the very first run for a given user."
484                );
485                *self.mem_cached_state = MemoryCachedState::default();
486                PersistedGlobalState::default()
487            }
488        }
489    }
490}