tabs/sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::TabsRecord;
6use crate::schema;
7use crate::storage::{ClientRemoteTabs, TABS_CLIENT_TTL};
8use crate::store::TabsStore;
9use anyhow::Result;
10use error_support::{debug, info, trace, warn};
11
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
15use sync15::engine::{
16    CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine, SyncEngineId,
17};
18use sync15::{telemetry, ClientData, CollectionName, RemoteClient, ServerTimestamp};
19use sync_guid::Guid;
20
21// Our "sync manager" will use whatever is stashed here.
22lazy_static::lazy_static! {
23    // Mutex: just taken long enough to update the inner stuff
24    static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new());
25}
26
27/// Called by the sync manager to get a sync engine via the store previously
28/// registered with the sync manager.
29pub fn get_registered_sync_engine(
30    engine_id: &SyncEngineId,
31) -> Option<Box<dyn sync15::engine::SyncEngine>> {
32    let weak = STORE_FOR_MANAGER.lock().unwrap();
33    match weak.upgrade() {
34        None => None,
35        Some(store) => match engine_id {
36            SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))),
37            // panicking here seems reasonable - it's a static error if this
38            // it hit, not something that runtime conditions can influence.
39            _ => unreachable!("can't provide unknown engine: {}", engine_id),
40        },
41    }
42}
43
44impl ClientRemoteTabs {
45    pub(crate) fn from_record(
46        client_id: String,
47        last_modified: ServerTimestamp,
48        remote_client: &RemoteClient,
49        record: TabsRecord,
50    ) -> Self {
51        Self {
52            client_id,
53            client_name: remote_client.device_name.clone(),
54            device_type: remote_client.device_type,
55            last_modified: last_modified.as_millis(),
56            remote_tabs: record.tabs.into_iter().map(Into::into).collect(),
57            tab_groups: record
58                .tab_groups
59                .into_iter()
60                .map(|(n, v)| (n, v.into()))
61                .collect(),
62            windows: record
63                .windows
64                .into_iter()
65                .map(|(n, v)| (n, v.into()))
66                .collect(),
67        }
68    }
69}
70
71// This is the implementation of syncing, which is used by the 2 different "sync engines"
72// (We hope to get these 2 engines even closer in the future, but for now, we suck this up)
73pub struct TabsEngine {
74    pub(super) store: Arc<TabsStore>,
75    // local_id is made public for use in examples/tabs-sync
76    pub local_id: RwLock<String>,
77}
78
79impl TabsEngine {
80    pub fn new(store: Arc<TabsStore>) -> Self {
81        Self {
82            store,
83            local_id: Default::default(),
84        }
85    }
86
87    pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> {
88        let mut storage = self.store.storage.lock().unwrap();
89        debug!("Updating last sync to {}", last_sync);
90        let last_sync_millis = last_sync.as_millis();
91        Ok(storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)?)
92    }
93
94    pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> {
95        let mut storage = self.store.storage.lock().unwrap();
96        let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
97        Ok(millis.map(ServerTimestamp))
98    }
99}
100
101impl SyncEngine for TabsEngine {
102    fn collection_name(&self) -> CollectionName {
103        "tabs".into()
104    }
105
106    fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
107        let mut storage = self.store.storage.lock().unwrap();
108        // We only know the client list at sync time, but need to return tabs potentially
109        // at any time -- so we store the clients in the meta table to be able to properly
110        // return a ClientRemoteTab struct
111        let client_data = get_client_data();
112        storage.put_meta(
113            schema::REMOTE_CLIENTS_KEY,
114            &serde_json::to_string(&client_data.recent_clients)?,
115        )?;
116        *self.local_id.write().unwrap() = client_data.local_client_id;
117        Ok(())
118    }
119
120    fn stage_incoming(
121        &self,
122        inbound: Vec<IncomingBso>,
123        telem: &mut telemetry::Engine,
124    ) -> Result<()> {
125        // We don't really "stage" records, we just apply them.
126        let local_id = &*self.local_id.read().unwrap();
127        let mut remote_tabs = Vec::with_capacity(inbound.len());
128
129        let mut incoming_telemetry = telemetry::EngineIncoming::new();
130        for incoming in inbound {
131            if incoming.envelope.id == *local_id {
132                // That's our own record, ignore it.
133                continue;
134            }
135            let modified = incoming.envelope.modified;
136            let record = match incoming.into_content::<TabsRecord>().content() {
137                Some(record) => record,
138                None => {
139                    // Invalid record or a "tombstone" which tabs don't have.
140                    warn!("Ignoring incoming invalid tab");
141                    incoming_telemetry.failed(1);
142                    continue;
143                }
144            };
145            incoming_telemetry.applied(1);
146            remote_tabs.push((record, modified));
147        }
148        telem.incoming(incoming_telemetry);
149        let mut storage = self.store.storage.lock().unwrap();
150        // In desktop we might end up here with zero records when doing a quick-write, in
151        // which case we don't want to wipe the DB.
152        if !remote_tabs.is_empty() {
153            storage.replace_remote_tabs(&remote_tabs)?;
154        }
155        storage.remove_stale_clients()?;
156        storage.remove_old_pending_closures(&remote_tabs)?;
157        Ok(())
158    }
159
160    fn apply(
161        &self,
162        timestamp: ServerTimestamp,
163        _telem: &mut telemetry::Engine,
164    ) -> Result<Vec<OutgoingBso>> {
165        // We've already applied them - we just need to fetch outgoing.
166        let local_id = &*self.local_id.read().unwrap();
167        // Timestamp will be zero when used as a "bridged" engine.
168        if timestamp.0 != 0 {
169            self.set_last_sync(timestamp)?;
170        }
171
172        let mut storage = self.store.storage.lock().unwrap();
173        let remote_clients: HashMap<String, RemoteClient> = {
174            match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? {
175                None => HashMap::default(),
176                Some(json) => serde_json::from_str(&json).unwrap(),
177            }
178        };
179
180        let Some(ref tabs_info) = *storage.local_tabs.borrow() else {
181            // It's a less than ideal outcome if at startup (or any time) we are asked to
182            // sync tabs before the app has told us what the tabs are, so make noise, but
183            // don't actually write that we have no tabs.
184            warn!("syncing without local tabs");
185            return Ok(vec![]);
186        };
187
188        let client_name = remote_clients
189            .get(local_id)
190            .map(|client| client.device_name.clone())
191            .unwrap_or_default();
192
193        let mut record = TabsRecord {
194            id: local_id.clone(),
195            client_name,
196            tabs: tabs_info
197                .tabs
198                .iter()
199                .map(Clone::clone)
200                .map(Into::into)
201                .collect(),
202            windows: tabs_info
203                .windows
204                .iter()
205                .map(|(n, v)| (n.clone(), v.clone().into()))
206                .collect(),
207            tab_groups: tabs_info
208                .tab_groups
209                .iter()
210                .map(|(n, v)| (n.clone(), v.clone().into()))
211                .collect(),
212        };
213        super::prepare_for_upload(&mut record);
214
215        trace!("outgoing {:?}", record);
216        let envelope = OutgoingEnvelope {
217            id: local_id.as_str().into(),
218            ttl: Some(TABS_CLIENT_TTL),
219            ..Default::default()
220        };
221        // XXX - outgoing telem?
222        Ok(vec![OutgoingBso::from_content(envelope, record)?])
223    }
224
225    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> Result<()> {
226        info!("sync uploaded {} records", ids.len());
227        self.set_last_sync(new_timestamp)?;
228        Ok(())
229    }
230
231    fn get_collection_request(
232        &self,
233        server_timestamp: ServerTimestamp,
234    ) -> Result<Option<CollectionRequest>> {
235        let since = self.get_last_sync()?.unwrap_or_default();
236        Ok(if since == server_timestamp {
237            None
238        } else {
239            Some(
240                CollectionRequest::new("tabs".into())
241                    .full()
242                    .newer_than(since),
243            )
244        })
245    }
246
247    fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
248        self.set_last_sync(ServerTimestamp(0))?;
249        let mut storage = self.store.storage.lock().unwrap();
250        storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?;
251        storage.wipe_remote_tabs()?;
252        match assoc {
253            EngineSyncAssociation::Disconnected => {
254                storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
255                storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
256            }
257            EngineSyncAssociation::Connected(ids) => {
258                storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?;
259                storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?;
260            }
261        };
262        Ok(())
263    }
264
265    fn wipe(&self) -> Result<()> {
266        self.reset(&EngineSyncAssociation::Disconnected)?;
267        // not clear why we need to wipe the local tabs - the app is just going
268        // to re-add them?
269        self.store.storage.lock().unwrap().wipe_local_tabs();
270        Ok(())
271    }
272
273    fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> {
274        let mut storage = self.store.storage.lock().unwrap();
275        let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?;
276        let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?;
277        Ok(if let (Some(global), Some(coll)) = (global, coll) {
278            EngineSyncAssociation::Connected(CollSyncIds {
279                global: Guid::from_string(global),
280                coll: Guid::from_string(coll),
281            })
282        } else {
283            EngineSyncAssociation::Disconnected
284        })
285    }
286}
287
288impl crate::TabsStore {
289    // This allows the embedding app to say "make this instance available to
290    // the sync manager". The implementation is more like "offer to sync mgr"
291    // (thereby avoiding us needing to link with the sync manager) but
292    // `register_with_sync_manager()` is logically what's happening so that's
293    // the name it gets.
294    pub fn register_with_sync_manager(self: Arc<Self>) {
295        let mut state = STORE_FOR_MANAGER.lock().unwrap();
296        *state = Arc::downgrade(&self);
297    }
298}
299
300#[cfg(test)]
301pub mod test {
302    use super::*;
303    use crate::DeviceType;
304    use serde_json::json;
305    use sync15::bso::IncomingBso;
306
307    #[test]
308    fn test_incoming_tabs() {
309        error_support::init_for_tests();
310
311        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming")));
312
313        let client_data = ClientData {
314            local_client_id: "my-device".to_string(),
315            recent_clients: HashMap::from([
316                (
317                    "my-device".to_string(),
318                    RemoteClient {
319                        fxa_device_id: None,
320                        device_name: "my device".to_string(),
321                        device_type: sync15::DeviceType::Unknown,
322                    },
323                ),
324                (
325                    "device-no-tabs".to_string(),
326                    RemoteClient {
327                        fxa_device_id: None,
328                        device_name: "device with no tabs".to_string(),
329                        device_type: DeviceType::Unknown,
330                    },
331                ),
332                (
333                    "device-with-a-tab".to_string(),
334                    RemoteClient {
335                        fxa_device_id: None,
336                        device_name: "device with an updated tab".to_string(),
337                        device_type: DeviceType::Unknown,
338                    },
339                ),
340            ]),
341        };
342        engine
343            .prepare_for_sync(&|| client_data.clone())
344            .expect("should work");
345
346        let records = vec![
347            json!({
348                "id": "device-no-tabs",
349                "clientName": "device with no tabs",
350                "tabs": [],
351            }),
352            json!({
353                "id": "device-with-a-tab",
354                "clientName": "device with a tab",
355                "tabs": [{
356                    "title": "the title",
357                    "urlHistory": [
358                        "https://mozilla.org/"
359                    ],
360                    "icon": "https://mozilla.org/icon",
361                    "lastUsed": 1643764207
362                }]
363            }),
364            json!({
365                "id": "device-with-a-tab",
366                "clientName": "device with an updated tab",
367                "tabs": [{
368                    "title": "the new title",
369                    "urlHistory": [
370                        "https://mozilla.org/"
371                    ],
372                    "icon": "https://mozilla.org/icon",
373                    "lastUsed": 1643764208
374                }]
375            }),
376            // This has the main payload as OK but the tabs part invalid.
377            json!({
378                "id": "device-with-invalid-tab",
379                "clientName": "device with a tab",
380                "tabs": [{
381                    "foo": "bar",
382                }]
383            }),
384            // We want this to be a valid payload but an invalid tab - so it needs an ID.
385            json!({
386                "id": "invalid-tab",
387                "foo": "bar"
388            }),
389        ];
390
391        let mut telem = telemetry::Engine::new("tabs");
392        let incoming = records
393            .into_iter()
394            .map(IncomingBso::from_test_content)
395            .collect();
396        engine
397            .stage_incoming(incoming, &mut telem)
398            .expect("Should apply incoming and stage outgoing records");
399        let outgoing = engine
400            .apply(ServerTimestamp(0), &mut telem)
401            .expect("should apply");
402
403        assert!(outgoing.is_empty());
404
405        // now check the store has what we think it has.
406        let mut storage = engine.store.storage.lock().unwrap();
407        let mut crts = storage.get_remote_tabs().expect("should work");
408        crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap());
409        assert_eq!(crts.len(), 2, "we currently include devices with no tabs");
410        let crt = &crts[0];
411        assert_eq!(crt.client_name, "device with an updated tab");
412        assert_eq!(crt.device_type, DeviceType::Unknown);
413        assert_eq!(crt.remote_tabs.len(), 1);
414        assert_eq!(crt.remote_tabs[0].title, "the new title");
415
416        let crt = &crts[1];
417        assert_eq!(crt.client_name, "device with no tabs");
418        assert_eq!(crt.device_type, DeviceType::Unknown);
419        assert_eq!(crt.remote_tabs.len(), 0);
420    }
421
422    #[test]
423    fn test_no_incoming_doesnt_write() {
424        error_support::init_for_tests();
425
426        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
427            "test_no_incoming_doesnt_write",
428        )));
429
430        let client_data = ClientData {
431            local_client_id: "my-device".to_string(),
432            recent_clients: HashMap::from([(
433                "device-with-a-tab".to_string(),
434                RemoteClient {
435                    fxa_device_id: None,
436                    device_name: "device-with-a-tab".to_string(),
437                    device_type: DeviceType::Unknown,
438                },
439            )]),
440        };
441        engine
442            .prepare_for_sync(&|| client_data.clone())
443            .expect("should work");
444
445        let records = vec![json!({
446            "id": "device-with-a-tab",
447            "clientName": "device with a tab",
448            "tabs": [{
449                "title": "the title",
450                "urlHistory": [
451                    "https://mozilla.org/"
452                ],
453                "icon": "https://mozilla.org/icon",
454                "lastUsed": 1643764207
455            }]
456        })];
457
458        let mut telem = telemetry::Engine::new("tabs");
459        let incoming = records
460            .into_iter()
461            .map(IncomingBso::from_test_content)
462            .collect();
463        engine
464            .stage_incoming(incoming, &mut telem)
465            .expect("Should apply incoming and stage outgoing records");
466        engine
467            .apply(ServerTimestamp(0), &mut telem)
468            .expect("should apply");
469
470        // now check the store has what we think it has.
471        {
472            let mut storage = engine.store.storage.lock().unwrap();
473            assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
474        }
475
476        // Now another sync with zero incoming records, should still be able to get back
477        // our one client.
478        engine
479            .stage_incoming(vec![], &mut telemetry::Engine::new("tabs"))
480            .expect("Should succeed applying zero records");
481
482        {
483            let mut storage = engine.store.storage.lock().unwrap();
484            assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
485        }
486    }
487
488    #[test]
489    fn test_sync_manager_registration() {
490        let store = Arc::new(TabsStore::new_with_mem_path("test-registration"));
491        assert_eq!(Arc::strong_count(&store), 1);
492        assert_eq!(Arc::weak_count(&store), 0);
493        Arc::clone(&store).register_with_sync_manager();
494        assert_eq!(Arc::strong_count(&store), 1);
495        assert_eq!(Arc::weak_count(&store), 1);
496        let registered = STORE_FOR_MANAGER
497            .lock()
498            .unwrap()
499            .upgrade()
500            .expect("should upgrade");
501        assert!(Arc::ptr_eq(&store, &registered));
502        drop(registered);
503        // should be no new references
504        assert_eq!(Arc::strong_count(&store), 1);
505        assert_eq!(Arc::weak_count(&store), 1);
506        // dropping the registered object should drop the registration.
507        drop(store);
508        assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none());
509    }
510
511    #[test]
512    fn test_apply_timestamp() {
513        error_support::init_for_tests();
514
515        let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
516            "test-apply-timestamp",
517        )));
518
519        let records = vec![json!({
520            "id": "device-no-tabs",
521            "clientName": "device with no tabs",
522            "tabs": [],
523        })];
524
525        let mut telem = telemetry::Engine::new("tabs");
526        engine
527            .set_last_sync(ServerTimestamp::from_millis(123))
528            .unwrap();
529        let incoming = records
530            .into_iter()
531            .map(IncomingBso::from_test_content)
532            .collect();
533        engine
534            .stage_incoming(incoming, &mut telem)
535            .expect("Should apply incoming and stage outgoing records");
536        engine
537            .apply(ServerTimestamp(0), &mut telem)
538            .expect("should apply");
539
540        assert_eq!(
541            engine
542                .get_last_sync()
543                .expect("should work")
544                .expect("should have a value"),
545            ServerTimestamp::from_millis(123),
546            "didn't set a zero timestamp"
547        )
548    }
549}