tabs/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::schema;
6use crate::storage::{ClientRemoteTabs, RemoteTab, TABS_CLIENT_TTL};
7use crate::store::TabsStore;
8use crate::sync::record::{TabsRecord, TabsRecordTab};
9use anyhow::Result;
10use error_support::{debug, info, trace, warn};
11
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
15use sync15::engine::{
16    CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine, SyncEngineId,
17};
18use sync15::{telemetry, ClientData, CollectionName, DeviceType, RemoteClient, ServerTimestamp};
19use sync_guid::Guid;
20
21// Our "sync manager" will use whatever is stashed here.
22lazy_static::lazy_static! {
23    // Mutex: just taken long enough to update the inner stuff
24    static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new());
25}
26
27/// Called by the sync manager to get a sync engine via the store previously
28/// registered with the sync manager.
29pub fn get_registered_sync_engine(
30    engine_id: &SyncEngineId,
31) -> Option<Box<dyn sync15::engine::SyncEngine>> {
32    let weak = STORE_FOR_MANAGER.lock().unwrap();
33    match weak.upgrade() {
34        None => None,
35        Some(store) => match engine_id {
36            SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))),
37            // panicking here seems reasonable - it's a static error if this
38            // it hit, not something that runtime conditions can influence.
39            _ => unreachable!("can't provide unknown engine: {}", engine_id),
40        },
41    }
42}
43
44impl ClientRemoteTabs {
45    pub(crate) fn from_record_with_remote_client(
46        client_id: String,
47        last_modified: ServerTimestamp,
48        remote_client: &RemoteClient,
49        record: TabsRecord,
50    ) -> Self {
51        Self {
52            client_id,
53            client_name: remote_client.device_name.clone(),
54            device_type: remote_client.device_type,
55            last_modified: last_modified.as_millis(),
56            remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
57        }
58    }
59
60    // Note that this should die as part of https://github.com/mozilla/application-services/issues/5199
61    // If we don't have a `RemoteClient` record, then we don't know whether the ID passed here is
62    // the fxa_device_id (which is must be) or the client_id (which it will be if this ends up being
63    // called for desktop records, where client_id != fxa_device_id)
64    pub(crate) fn from_record(
65        client_id: String,
66        last_modified: ServerTimestamp,
67        record: TabsRecord,
68    ) -> Self {
69        Self {
70            client_id,
71            client_name: record.client_name,
72            device_type: DeviceType::Unknown,
73            last_modified: last_modified.as_millis(),
74            remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
75        }
76    }
77    fn to_record(&self) -> TabsRecord {
78        TabsRecord {
79            id: self.client_id.clone(),
80            client_name: self.client_name.clone(),
81            tabs: self
82                .remote_tabs
83                .iter()
84                .map(RemoteTab::to_record_tab)
85                .collect(),
86        }
87    }
88}
89
90impl RemoteTab {
91    pub(crate) fn from_record_tab(tab: &TabsRecordTab) -> Self {
92        Self {
93            title: tab.title.clone(),
94            url_history: tab.url_history.clone(),
95            icon: tab.icon.clone(),
96            last_used: tab.last_used.checked_mul(1000).unwrap_or_default(),
97            inactive: tab.inactive,
98        }
99    }
100    pub(super) fn to_record_tab(&self) -> TabsRecordTab {
101        TabsRecordTab {
102            title: self.title.clone(),
103            url_history: self.url_history.clone(),
104            icon: self.icon.clone(),
105            last_used: self.last_used.checked_div(1000).unwrap_or_default(),
106            inactive: self.inactive,
107        }
108    }
109}
110
111// This is the implementation of syncing, which is used by the 2 different "sync engines"
112// (We hope to get these 2 engines even closer in the future, but for now, we suck this up)
113pub struct TabsEngine {
114    pub(super) store: Arc<TabsStore>,
115    // local_id is made public for use in examples/tabs-sync
116    pub local_id: RwLock<String>,
117}
118
119impl TabsEngine {
120    pub fn new(store: Arc<TabsStore>) -> Self {
121        Self {
122            store,
123            local_id: Default::default(),
124        }
125    }
126
127    pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> {
128        let mut storage = self.store.storage.lock().unwrap();
129        debug!("Updating last sync to {}", last_sync);
130        let last_sync_millis = last_sync.as_millis();
131        Ok(storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)?)
132    }
133
134    pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> {
135        let mut storage = self.store.storage.lock().unwrap();
136        let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
137        Ok(millis.map(ServerTimestamp))
138    }
139}
140
141impl SyncEngine for TabsEngine {
142    fn collection_name(&self) -> CollectionName {
143        "tabs".into()
144    }
145
146    fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
147        let mut storage = self.store.storage.lock().unwrap();
148        // We only know the client list at sync time, but need to return tabs potentially
149        // at any time -- so we store the clients in the meta table to be able to properly
150        // return a ClientRemoteTab struct
151        let client_data = get_client_data();
152        storage.put_meta(
153            schema::REMOTE_CLIENTS_KEY,
154            &serde_json::to_string(&client_data.recent_clients)?,
155        )?;
156        *self.local_id.write().unwrap() = client_data.local_client_id;
157        Ok(())
158    }
159
160    fn stage_incoming(
161        &self,
162        inbound: Vec<IncomingBso>,
163        telem: &mut telemetry::Engine,
164    ) -> Result<()> {
165        // We don't really "stage" records, we just apply them.
166        let local_id = &*self.local_id.read().unwrap();
167        let mut remote_tabs = Vec::with_capacity(inbound.len());
168
169        let mut incoming_telemetry = telemetry::EngineIncoming::new();
170        for incoming in inbound {
171            if incoming.envelope.id == *local_id {
172                // That's our own record, ignore it.
173                continue;
174            }
175            let modified = incoming.envelope.modified;
176            let record = match incoming.into_content::<TabsRecord>().content() {
177                Some(record) => record,
178                None => {
179                    // Invalid record or a "tombstone" which tabs don't have.
180                    warn!("Ignoring incoming invalid tab");
181                    incoming_telemetry.failed(1);
182                    continue;
183                }
184            };
185            incoming_telemetry.applied(1);
186            remote_tabs.push((record, modified));
187        }
188        telem.incoming(incoming_telemetry);
189        let mut storage = self.store.storage.lock().unwrap();
190        // In desktop we might end up here with zero records when doing a quick-write, in
191        // which case we don't want to wipe the DB.
192        if !remote_tabs.is_empty() {
193            storage.replace_remote_tabs(&remote_tabs)?;
194        }
195        storage.remove_stale_clients()?;
196        storage.remove_old_pending_closures(&remote_tabs)?;
197        Ok(())
198    }
199
200    fn apply(
201        &self,
202        timestamp: ServerTimestamp,
203        _telem: &mut telemetry::Engine,
204    ) -> Result<Vec<OutgoingBso>> {
205        // We've already applied them - really we just need to fetch outgoing.
206        let (local_tabs, remote_clients) = {
207            let mut storage = self.store.storage.lock().unwrap();
208            let local_tabs = storage.prepare_local_tabs_for_upload();
209            let remote_clients: HashMap<String, RemoteClient> = {
210                match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? {
211                    None => HashMap::default(),
212                    Some(json) => serde_json::from_str(&json).unwrap(),
213                }
214            };
215            (local_tabs, remote_clients)
216        };
217
218        let local_id = &*self.local_id.read().unwrap();
219        // Timestamp will be zero when used as a "bridged" engine.
220        if timestamp.0 != 0 {
221            self.set_last_sync(timestamp)?;
222        }
223        // XXX - outgoing telem?
224        let outgoing = if let Some(local_tabs) = local_tabs {
225            let (client_name, device_type) = remote_clients
226                .get(local_id)
227                .map(|client| (client.device_name.clone(), client.device_type))
228                .unwrap_or_else(|| (String::new(), DeviceType::Unknown));
229            let local_record = ClientRemoteTabs {
230                client_id: local_id.clone(),
231                client_name,
232                device_type,
233                last_modified: 0, // ignored for outgoing records.
234                remote_tabs: local_tabs.to_vec(),
235            };
236            trace!("outgoing {:?}", local_record);
237            let envelope = OutgoingEnvelope {
238                id: local_id.as_str().into(),
239                ttl: Some(TABS_CLIENT_TTL),
240                ..Default::default()
241            };
242            vec![OutgoingBso::from_content(
243                envelope,
244                local_record.to_record(),
245            )?]
246        } else {
247            vec![]
248        };
249        Ok(outgoing)
250    }
251
252    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> Result<()> {
253        info!("sync uploaded {} records", ids.len());
254        self.set_last_sync(new_timestamp)?;
255        Ok(())
256    }
257
258    fn get_collection_request(
259        &self,
260        server_timestamp: ServerTimestamp,
261    ) -> Result<Option<CollectionRequest>> {
262        let since = self.get_last_sync()?.unwrap_or_default();
263        Ok(if since == server_timestamp {
264            None
265        } else {
266            Some(
267                CollectionRequest::new("tabs".into())
268                    .full()
269                    .newer_than(since),
270            )
271        })
272    }
273
274    fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
275        self.set_last_sync(ServerTimestamp(0))?;
276        let mut storage = self.store.storage.lock().unwrap();
277        storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?;
278        storage.wipe_remote_tabs()?;
279        match assoc {
280            EngineSyncAssociation::Disconnected => {
281                storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
282                storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
283            }
284            EngineSyncAssociation::Connected(ids) => {
285                storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?;
286                storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?;
287            }
288        };
289        Ok(())
290    }
291
292    fn wipe(&self) -> Result<()> {
293        self.reset(&EngineSyncAssociation::Disconnected)?;
294        // not clear why we need to wipe the local tabs - the app is just going
295        // to re-add them?
296        self.store.storage.lock().unwrap().wipe_local_tabs();
297        Ok(())
298    }
299
300    fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> {
301        let mut storage = self.store.storage.lock().unwrap();
302        let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?;
303        let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?;
304        Ok(if let (Some(global), Some(coll)) = (global, coll) {
305            EngineSyncAssociation::Connected(CollSyncIds {
306                global: Guid::from_string(global),
307                coll: Guid::from_string(coll),
308            })
309        } else {
310            EngineSyncAssociation::Disconnected
311        })
312    }
313}
314
315impl crate::TabsStore {
316    // This allows the embedding app to say "make this instance available to
317    // the sync manager". The implementation is more like "offer to sync mgr"
318    // (thereby avoiding us needing to link with the sync manager) but
319    // `register_with_sync_manager()` is logically what's happening so that's
320    // the name it gets.
321    pub fn register_with_sync_manager(self: Arc<Self>) {
322        let mut state = STORE_FOR_MANAGER.lock().unwrap();
323        *state = Arc::downgrade(&self);
324    }
325}
326
327#[cfg(test)]
328pub mod test {
329    use super::*;
330    use serde_json::json;
331    use sync15::bso::IncomingBso;
332
333    #[test]
334    fn test_incoming_tabs() {
335        error_support::init_for_tests();
336
337        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming")));
338
339        let records = vec![
340            json!({
341                "id": "device-no-tabs",
342                "clientName": "device with no tabs",
343                "tabs": [],
344            }),
345            json!({
346                "id": "device-with-a-tab",
347                "clientName": "device with a tab",
348                "tabs": [{
349                    "title": "the title",
350                    "urlHistory": [
351                        "https://mozilla.org/"
352                    ],
353                    "icon": "https://mozilla.org/icon",
354                    "lastUsed": 1643764207
355                }]
356            }),
357            // test an updated payload will replace the previous record
358            json!({
359                "id": "device-with-a-tab",
360                "clientName": "device with an updated tab",
361                "tabs": [{
362                    "title": "the new title",
363                    "urlHistory": [
364                        "https://mozilla.org/"
365                    ],
366                    "icon": "https://mozilla.org/icon",
367                    "lastUsed": 1643764208
368                }]
369            }),
370            // This has the main payload as OK but the tabs part invalid.
371            json!({
372                "id": "device-with-invalid-tab",
373                "clientName": "device with a tab",
374                "tabs": [{
375                    "foo": "bar",
376                }]
377            }),
378            // We want this to be a valid payload but an invalid tab - so it needs an ID.
379            json!({
380                "id": "invalid-tab",
381                "foo": "bar"
382            }),
383        ];
384
385        let mut telem = telemetry::Engine::new("tabs");
386        let incoming = records
387            .into_iter()
388            .map(IncomingBso::from_test_content)
389            .collect();
390        engine
391            .stage_incoming(incoming, &mut telem)
392            .expect("Should apply incoming and stage outgoing records");
393        let outgoing = engine
394            .apply(ServerTimestamp(0), &mut telem)
395            .expect("should apply");
396
397        assert!(outgoing.is_empty());
398
399        // now check the store has what we think it has.
400        let mut storage = engine.store.storage.lock().unwrap();
401        let mut crts = storage.get_remote_tabs().expect("should work");
402        crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap());
403        assert_eq!(crts.len(), 2, "we currently include devices with no tabs");
404        let crt = &crts[0];
405        assert_eq!(crt.client_name, "device with an updated tab");
406        assert_eq!(crt.device_type, DeviceType::Unknown);
407        assert_eq!(crt.remote_tabs.len(), 1);
408        assert_eq!(crt.remote_tabs[0].title, "the new title");
409
410        let crt = &crts[1];
411        assert_eq!(crt.client_name, "device with no tabs");
412        assert_eq!(crt.device_type, DeviceType::Unknown);
413        assert_eq!(crt.remote_tabs.len(), 0);
414    }
415
416    #[test]
417    fn test_no_incoming_doesnt_write() {
418        error_support::init_for_tests();
419
420        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
421            "test_no_incoming_doesnt_write",
422        )));
423
424        let records = vec![json!({
425            "id": "device-with-a-tab",
426            "clientName": "device with a tab",
427            "tabs": [{
428                "title": "the title",
429                "urlHistory": [
430                    "https://mozilla.org/"
431                ],
432                "icon": "https://mozilla.org/icon",
433                "lastUsed": 1643764207
434            }]
435        })];
436
437        let mut telem = telemetry::Engine::new("tabs");
438        let incoming = records
439            .into_iter()
440            .map(IncomingBso::from_test_content)
441            .collect();
442        engine
443            .stage_incoming(incoming, &mut telem)
444            .expect("Should apply incoming and stage outgoing records");
445        engine
446            .apply(ServerTimestamp(0), &mut telem)
447            .expect("should apply");
448
449        // now check the store has what we think it has.
450        {
451            let mut storage = engine.store.storage.lock().unwrap();
452            assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
453        }
454
455        // Now another sync with zero incoming records, should still be able to get back
456        // our one client.
457        engine
458            .stage_incoming(vec![], &mut telemetry::Engine::new("tabs"))
459            .expect("Should succeed applying zero records");
460
461        {
462            let mut storage = engine.store.storage.lock().unwrap();
463            assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
464        }
465    }
466
467    #[test]
468    fn test_sync_manager_registration() {
469        let store = Arc::new(TabsStore::new_with_mem_path("test-registration"));
470        assert_eq!(Arc::strong_count(&store), 1);
471        assert_eq!(Arc::weak_count(&store), 0);
472        Arc::clone(&store).register_with_sync_manager();
473        assert_eq!(Arc::strong_count(&store), 1);
474        assert_eq!(Arc::weak_count(&store), 1);
475        let registered = STORE_FOR_MANAGER
476            .lock()
477            .unwrap()
478            .upgrade()
479            .expect("should upgrade");
480        assert!(Arc::ptr_eq(&store, &registered));
481        drop(registered);
482        // should be no new references
483        assert_eq!(Arc::strong_count(&store), 1);
484        assert_eq!(Arc::weak_count(&store), 1);
485        // dropping the registered object should drop the registration.
486        drop(store);
487        assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none());
488    }
489
490    #[test]
491    fn test_apply_timestamp() {
492        error_support::init_for_tests();
493
494        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
495            "test-apply-timestamp",
496        )));
497
498        let records = vec![json!({
499            "id": "device-no-tabs",
500            "clientName": "device with no tabs",
501            "tabs": [],
502        })];
503
504        let mut telem = telemetry::Engine::new("tabs");
505        engine
506            .set_last_sync(ServerTimestamp::from_millis(123))
507            .unwrap();
508        let incoming = records
509            .into_iter()
510            .map(IncomingBso::from_test_content)
511            .collect();
512        engine
513            .stage_incoming(incoming, &mut telem)
514            .expect("Should apply incoming and stage outgoing records");
515        engine
516            .apply(ServerTimestamp(0), &mut telem)
517            .expect("should apply");
518
519        assert_eq!(
520            engine
521                .get_last_sync()
522                .expect("should work")
523                .expect("should have a value"),
524            ServerTimestamp::from_millis(123),
525            "didn't set a zero timestamp"
526        )
527    }
528}