sync_manager/
manager.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::error::*;
6use crate::types::{ServiceStatus, SyncEngineSelection, SyncParams, SyncReason, SyncResult};
7use crate::{reset, reset_all, wipe};
8use error_support::{breadcrumb, debug, info, warn};
9use parking_lot::Mutex;
10use std::collections::{BTreeMap, HashMap, HashSet};
11use std::convert::TryFrom;
12use std::time::SystemTime;
13use sync15::client::{
14    sync_multiple_with_command_processor, MemoryCachedState, Sync15StorageClientInit,
15    SyncRequestInfo,
16};
17use sync15::clients_engine::{Command, CommandProcessor, CommandStatus, Settings};
18use sync15::engine::{EngineSyncAssociation, SyncEngine, SyncEngineId};
19
20#[derive(Default)]
21pub struct SyncManager {
22    mem_cached_state: Mutex<Option<MemoryCachedState>>,
23}
24
25impl SyncManager {
26    pub fn new() -> Self {
27        Self::default()
28    }
29
30    fn get_engine_id(engine_name: &str) -> Result<SyncEngineId> {
31        SyncEngineId::try_from(engine_name).map_err(SyncManagerError::UnknownEngine)
32    }
33
34    fn get_engine(engine_id: &SyncEngineId) -> Option<Box<dyn SyncEngine>> {
35        match engine_id {
36            SyncEngineId::History => places::get_registered_sync_engine(engine_id),
37            SyncEngineId::Bookmarks => places::get_registered_sync_engine(engine_id),
38            SyncEngineId::Addresses => autofill::get_registered_sync_engine(engine_id),
39            SyncEngineId::CreditCards => autofill::get_registered_sync_engine(engine_id),
40            SyncEngineId::Passwords => logins::get_registered_sync_engine(engine_id),
41            SyncEngineId::Tabs => tabs::get_registered_sync_engine(engine_id),
42        }
43    }
44
45    pub fn wipe(&self, engine_name: &str) -> Result<()> {
46        if let Some(engine) = Self::get_engine(&Self::get_engine_id(engine_name)?) {
47            engine.wipe()?;
48        }
49        Ok(())
50    }
51
52    pub fn reset(&self, engine_name: &str) -> Result<()> {
53        if let Some(engine) = Self::get_engine(&Self::get_engine_id(engine_name)?) {
54            engine.reset(&EngineSyncAssociation::Disconnected)?;
55        }
56        Ok(())
57    }
58
59    pub fn reset_all(&self) -> Result<()> {
60        for (_, engine) in self.iter_registered_engines() {
61            engine.reset(&EngineSyncAssociation::Disconnected)?;
62        }
63        Ok(())
64    }
65
66    /// Disconnect engines from sync, deleting/resetting the sync-related data
67    pub fn disconnect(&self) {
68        breadcrumb!("SyncManager disconnect()");
69        for engine_id in SyncEngineId::iter() {
70            if let Some(engine) = Self::get_engine(&engine_id) {
71                if let Err(e) = engine.reset(&EngineSyncAssociation::Disconnected) {
72                    error_support::report_error!(
73                        "sync-manager-reset",
74                        "Failed to reset {}: {}",
75                        engine_id,
76                        e
77                    );
78                }
79            } else {
80                warn!("Unable to reset {}, be sure to call register_with_sync_manager before disconnect if this is surprising", engine_id);
81            }
82        }
83    }
84
85    /// Perform a sync.  See [SyncParams] and [SyncResult] for details on how this works
86    pub fn sync(&self, params: SyncParams) -> Result<SyncResult> {
87        breadcrumb!("SyncManager::sync started");
88        let mut state = self.mem_cached_state.lock();
89        let engines = self.calc_engines_to_sync(&params.engines)?;
90        let next_sync_after = state.as_ref().and_then(|mcs| mcs.get_next_sync_after());
91        let result = if !backoff_in_effect(next_sync_after, &params) {
92            info!("No backoff in effect (or we decided to ignore it), starting sync");
93            self.do_sync(params, &mut state, engines)
94        } else {
95            breadcrumb!(
96                "Backoff still in effect (until {:?}), bailing out early",
97                next_sync_after
98            );
99            Ok(SyncResult {
100                status: ServiceStatus::BackedOff,
101                successful: Default::default(),
102                failures: Default::default(),
103                declined: None,
104                next_sync_allowed_at: next_sync_after,
105                persisted_state: params.persisted_state.unwrap_or_default(),
106                // It would be nice to record telemetry here.
107                telemetry_json: None,
108            })
109        };
110        breadcrumb!("SyncManager sync ended");
111        result
112    }
113
114    fn do_sync(
115        &self,
116        mut params: SyncParams,
117        state: &mut Option<MemoryCachedState>,
118        mut engines: Vec<Box<dyn SyncEngine>>,
119    ) -> Result<SyncResult> {
120        let key_bundle = sync15::KeyBundle::from_ksync_base64(&params.auth_info.sync_key)?;
121        let tokenserver_url = url::Url::parse(&params.auth_info.tokenserver_url)?;
122        let interruptee = interrupt_support::ShutdownInterruptee;
123        let mut mem_cached_state = state.take().unwrap_or_default();
124        let mut disk_cached_state = params.persisted_state.take();
125
126        // tell engines about the local encryption key.
127        for engine in engines.iter_mut() {
128            if let Some(key) = params.local_encryption_keys.get(&*engine.collection_name()) {
129                engine.set_local_encryption_key(key)?
130            }
131        }
132
133        let engine_refs: Vec<&dyn SyncEngine> = engines.iter().map(|s| &**s).collect();
134
135        let client_init = Sync15StorageClientInit {
136            key_id: params.auth_info.kid.clone(),
137            access_token: params.auth_info.fxa_access_token.clone(),
138            tokenserver_url,
139        };
140        let engines_to_change = if params.enabled_changes.is_empty() {
141            None
142        } else {
143            Some(&params.enabled_changes)
144        };
145
146        let settings = Settings {
147            fxa_device_id: params.device_settings.fxa_device_id,
148            device_name: params.device_settings.name,
149            device_type: params.device_settings.kind,
150        };
151        let c = SyncClient::new(settings);
152        let result = sync_multiple_with_command_processor(
153            Some(&c),
154            &engine_refs,
155            &mut disk_cached_state,
156            &mut mem_cached_state,
157            &client_init,
158            &key_bundle,
159            &interruptee,
160            Some(SyncRequestInfo {
161                engines_to_state_change: engines_to_change,
162                is_user_action: matches!(params.reason, SyncReason::User),
163            }),
164        );
165        *state = Some(mem_cached_state);
166
167        info!("Sync finished with status {:?}", result.service_status);
168        let status = ServiceStatus::from(result.service_status);
169        for (engine, result) in result.engine_results.iter() {
170            info!("engine {:?} status: {:?}", engine, result);
171        }
172        let mut successful: Vec<String> = Vec::new();
173        let mut failures: HashMap<String, String> = HashMap::new();
174        for (engine, result) in result.engine_results.into_iter() {
175            match result {
176                Ok(_) => {
177                    successful.push(engine);
178                }
179                Err(err) => {
180                    failures.insert(engine, err.to_string());
181                }
182            }
183        }
184        let telemetry_json = serde_json::to_string(&result.telemetry).unwrap();
185
186        Ok(SyncResult {
187            status,
188            successful,
189            failures,
190            declined: result.declined,
191            next_sync_allowed_at: result.next_sync_after,
192            persisted_state: disk_cached_state.unwrap_or_default(),
193            telemetry_json: Some(telemetry_json),
194        })
195    }
196
197    fn iter_registered_engines(&self) -> impl Iterator<Item = (SyncEngineId, Box<dyn SyncEngine>)> {
198        SyncEngineId::iter().filter_map(|id| Self::get_engine(&id).map(|engine| (id, engine)))
199    }
200
201    pub fn get_available_engines(&self) -> Vec<String> {
202        self.iter_registered_engines()
203            .map(|(name, _)| name.to_string())
204            .collect()
205    }
206
207    fn calc_engines_to_sync(
208        &self,
209        selection: &SyncEngineSelection,
210    ) -> Result<Vec<Box<dyn SyncEngine>>> {
211        // BTreeMap to ensure we sync the engines in priority order.
212        let mut engine_map: BTreeMap<_, _> = self.iter_registered_engines().collect();
213        breadcrumb!(
214            "Checking engines requested ({:?}) vs local engines ({:?})",
215            selection,
216            engine_map
217                .keys()
218                .map(|engine_id| engine_id.name())
219                .collect::<Vec<_>>(),
220        );
221        if let SyncEngineSelection::Some {
222            engines: engine_names,
223        } = selection
224        {
225            // Validate selection and convert to SyncEngineId
226            let mut selected_engine_ids: HashSet<SyncEngineId> = HashSet::new();
227            for name in engine_names {
228                let engine_id = Self::get_engine_id(name)?;
229                if !engine_map.contains_key(&engine_id) {
230                    return Err(SyncManagerError::UnsupportedFeature(name.to_string()));
231                }
232                selected_engine_ids.insert(engine_id);
233            }
234            // Filter engines based on the selection
235            engine_map.retain(|engine_id, _| selected_engine_ids.contains(engine_id))
236        }
237        Ok(engine_map.into_values().collect())
238    }
239}
240
241fn backoff_in_effect(next_sync_after: Option<SystemTime>, p: &SyncParams) -> bool {
242    let now = SystemTime::now();
243    if let Some(nsa) = next_sync_after {
244        if nsa > now {
245            return if matches!(p.reason, SyncReason::User | SyncReason::EnabledChange) {
246                info!(
247                    "Still under backoff, but syncing anyway because reason is {:?}",
248                    p.reason
249                );
250                false
251            } else if !p.enabled_changes.is_empty() {
252                info!("Still under backoff, but syncing because we have enabled state changes.");
253                false
254            } else {
255                info!("Still under backoff, and there's no compelling reason for us to ignore it");
256                true
257            };
258        }
259    }
260    debug!("Not under backoff");
261    false
262}
263
264impl From<sync15::client::ServiceStatus> for ServiceStatus {
265    fn from(s15s: sync15::client::ServiceStatus) -> Self {
266        use sync15::client::ServiceStatus::*;
267        match s15s {
268            Ok => ServiceStatus::Ok,
269            NetworkError => ServiceStatus::NetworkError,
270            ServiceError => ServiceStatus::ServiceError,
271            AuthenticationError => ServiceStatus::AuthError,
272            BackedOff => ServiceStatus::BackedOff,
273            Interrupted => ServiceStatus::OtherError, // Eh...
274            OtherError => ServiceStatus::OtherError,
275        }
276    }
277}
278
279struct SyncClient(Settings);
280
281impl SyncClient {
282    pub fn new(settings: Settings) -> SyncClient {
283        SyncClient(settings)
284    }
285}
286
287impl CommandProcessor for SyncClient {
288    fn settings(&self) -> &Settings {
289        &self.0
290    }
291
292    fn apply_incoming_command(&self, command: Command) -> anyhow::Result<CommandStatus> {
293        let result = match command {
294            Command::Wipe(engine) => wipe(&engine),
295            Command::Reset(engine) => reset(&engine),
296            Command::ResetAll => reset_all(),
297        };
298        match result {
299            Ok(()) => Ok(CommandStatus::Applied),
300            Err(err) => match err {
301                SyncManagerError::UnknownEngine(_) => Ok(CommandStatus::Unsupported),
302                _ => Err(err.into()),
303            },
304        }
305    }
306
307    fn fetch_outgoing_commands(&self) -> anyhow::Result<HashSet<Command>> {
308        Ok(HashSet::new())
309    }
310}
311
312#[cfg(test)]
313mod test {
314    use super::*;
315
316    #[test]
317    fn test_engine_id_sanity() {
318        for engine_id in SyncEngineId::iter() {
319            assert_eq!(engine_id, SyncEngineId::try_from(engine_id.name()).unwrap());
320        }
321    }
322}