1use super::TabsRecord;
6use crate::schema;
7use crate::storage::{ClientRemoteTabs, TABS_CLIENT_TTL};
8use crate::store::TabsStore;
9use anyhow::Result;
10use error_support::{debug, info, trace, warn};
11
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
15use sync15::engine::{
16 CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine, SyncEngineId,
17};
18use sync15::{telemetry, ClientData, CollectionName, RemoteClient, ServerTimestamp};
19use sync_guid::Guid;
20
21lazy_static::lazy_static! {
23 static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new());
25}
26
27pub fn get_registered_sync_engine(
30 engine_id: &SyncEngineId,
31) -> Option<Box<dyn sync15::engine::SyncEngine>> {
32 let weak = STORE_FOR_MANAGER.lock().unwrap();
33 match weak.upgrade() {
34 None => None,
35 Some(store) => match engine_id {
36 SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))),
37 _ => unreachable!("can't provide unknown engine: {}", engine_id),
40 },
41 }
42}
43
44impl ClientRemoteTabs {
45 pub(crate) fn from_record(
46 client_id: String,
47 last_modified: ServerTimestamp,
48 remote_client: &RemoteClient,
49 record: TabsRecord,
50 ) -> Self {
51 Self {
52 client_id,
53 client_name: remote_client.device_name.clone(),
54 device_type: remote_client.device_type,
55 last_modified: last_modified.as_millis(),
56 remote_tabs: record.tabs.into_iter().map(Into::into).collect(),
57 tab_groups: record
58 .tab_groups
59 .into_iter()
60 .map(|(n, v)| (n, v.into()))
61 .collect(),
62 windows: record
63 .windows
64 .into_iter()
65 .map(|(n, v)| (n, v.into()))
66 .collect(),
67 }
68 }
69}
70
71pub struct TabsEngine {
74 pub(super) store: Arc<TabsStore>,
75 pub local_id: RwLock<String>,
77}
78
79impl TabsEngine {
80 pub fn new(store: Arc<TabsStore>) -> Self {
81 Self {
82 store,
83 local_id: Default::default(),
84 }
85 }
86
87 pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> {
88 let mut storage = self.store.storage.lock().unwrap();
89 debug!("Updating last sync to {}", last_sync);
90 let last_sync_millis = last_sync.as_millis();
91 Ok(storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)?)
92 }
93
94 pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> {
95 let mut storage = self.store.storage.lock().unwrap();
96 let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
97 Ok(millis.map(ServerTimestamp))
98 }
99}
100
101impl SyncEngine for TabsEngine {
102 fn collection_name(&self) -> CollectionName {
103 "tabs".into()
104 }
105
106 fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
107 let mut storage = self.store.storage.lock().unwrap();
108 let client_data = get_client_data();
112 storage.put_meta(
113 schema::REMOTE_CLIENTS_KEY,
114 &serde_json::to_string(&client_data.recent_clients)?,
115 )?;
116 *self.local_id.write().unwrap() = client_data.local_client_id;
117 Ok(())
118 }
119
120 fn stage_incoming(
121 &self,
122 inbound: Vec<IncomingBso>,
123 telem: &mut telemetry::Engine,
124 ) -> Result<()> {
125 let local_id = &*self.local_id.read().unwrap();
127 let mut remote_tabs = Vec::with_capacity(inbound.len());
128
129 let mut incoming_telemetry = telemetry::EngineIncoming::new();
130 for incoming in inbound {
131 if incoming.envelope.id == *local_id {
132 continue;
134 }
135 let modified = incoming.envelope.modified;
136 let record = match incoming.into_content::<TabsRecord>().content() {
137 Some(record) => record,
138 None => {
139 warn!("Ignoring incoming invalid tab");
141 incoming_telemetry.failed(1);
142 continue;
143 }
144 };
145 incoming_telemetry.applied(1);
146 remote_tabs.push((record, modified));
147 }
148 telem.incoming(incoming_telemetry);
149 let mut storage = self.store.storage.lock().unwrap();
150 if !remote_tabs.is_empty() {
153 storage.replace_remote_tabs(&remote_tabs)?;
154 }
155 storage.remove_stale_clients()?;
156 storage.remove_old_pending_closures(&remote_tabs)?;
157 Ok(())
158 }
159
160 fn apply(
161 &self,
162 timestamp: ServerTimestamp,
163 _telem: &mut telemetry::Engine,
164 ) -> Result<Vec<OutgoingBso>> {
165 let local_id = &*self.local_id.read().unwrap();
167 if timestamp.0 != 0 {
169 self.set_last_sync(timestamp)?;
170 }
171
172 let mut storage = self.store.storage.lock().unwrap();
173 let remote_clients: HashMap<String, RemoteClient> = {
174 match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? {
175 None => HashMap::default(),
176 Some(json) => serde_json::from_str(&json).unwrap(),
177 }
178 };
179
180 let Some(ref tabs_info) = *storage.local_tabs.borrow() else {
181 warn!("syncing without local tabs");
185 return Ok(vec![]);
186 };
187
188 let client_name = remote_clients
189 .get(local_id)
190 .map(|client| client.device_name.clone())
191 .unwrap_or_default();
192
193 let mut record = TabsRecord {
194 id: local_id.clone(),
195 client_name,
196 tabs: tabs_info
197 .tabs
198 .iter()
199 .map(Clone::clone)
200 .map(Into::into)
201 .collect(),
202 windows: tabs_info
203 .windows
204 .iter()
205 .map(|(n, v)| (n.clone(), v.clone().into()))
206 .collect(),
207 tab_groups: tabs_info
208 .tab_groups
209 .iter()
210 .map(|(n, v)| (n.clone(), v.clone().into()))
211 .collect(),
212 };
213 super::prepare_for_upload(&mut record);
214
215 trace!("outgoing {:?}", record);
216 let envelope = OutgoingEnvelope {
217 id: local_id.as_str().into(),
218 ttl: Some(TABS_CLIENT_TTL),
219 ..Default::default()
220 };
221 Ok(vec![OutgoingBso::from_content(envelope, record)?])
223 }
224
225 fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> Result<()> {
226 info!("sync uploaded {} records", ids.len());
227 self.set_last_sync(new_timestamp)?;
228 Ok(())
229 }
230
231 fn get_collection_request(
232 &self,
233 server_timestamp: ServerTimestamp,
234 ) -> Result<Option<CollectionRequest>> {
235 let since = self.get_last_sync()?.unwrap_or_default();
236 Ok(if since == server_timestamp {
237 None
238 } else {
239 Some(
240 CollectionRequest::new("tabs".into())
241 .full()
242 .newer_than(since),
243 )
244 })
245 }
246
247 fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
248 self.set_last_sync(ServerTimestamp(0))?;
249 let mut storage = self.store.storage.lock().unwrap();
250 storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?;
251 storage.wipe_remote_tabs()?;
252 match assoc {
253 EngineSyncAssociation::Disconnected => {
254 storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
255 storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
256 }
257 EngineSyncAssociation::Connected(ids) => {
258 storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?;
259 storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?;
260 }
261 };
262 Ok(())
263 }
264
265 fn wipe(&self) -> Result<()> {
266 self.reset(&EngineSyncAssociation::Disconnected)?;
267 self.store.storage.lock().unwrap().wipe_local_tabs();
270 Ok(())
271 }
272
273 fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> {
274 let mut storage = self.store.storage.lock().unwrap();
275 let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?;
276 let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?;
277 Ok(if let (Some(global), Some(coll)) = (global, coll) {
278 EngineSyncAssociation::Connected(CollSyncIds {
279 global: Guid::from_string(global),
280 coll: Guid::from_string(coll),
281 })
282 } else {
283 EngineSyncAssociation::Disconnected
284 })
285 }
286}
287
288impl crate::TabsStore {
289 pub fn register_with_sync_manager(self: Arc<Self>) {
295 let mut state = STORE_FOR_MANAGER.lock().unwrap();
296 *state = Arc::downgrade(&self);
297 }
298}
299
300#[cfg(test)]
301pub mod test {
302 use super::*;
303 use crate::DeviceType;
304 use serde_json::json;
305 use sync15::bso::IncomingBso;
306
307 #[test]
308 fn test_incoming_tabs() {
309 error_support::init_for_tests();
310
311 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming")));
312
313 let client_data = ClientData {
314 local_client_id: "my-device".to_string(),
315 recent_clients: HashMap::from([
316 (
317 "my-device".to_string(),
318 RemoteClient {
319 fxa_device_id: None,
320 device_name: "my device".to_string(),
321 device_type: sync15::DeviceType::Unknown,
322 },
323 ),
324 (
325 "device-no-tabs".to_string(),
326 RemoteClient {
327 fxa_device_id: None,
328 device_name: "device with no tabs".to_string(),
329 device_type: DeviceType::Unknown,
330 },
331 ),
332 (
333 "device-with-a-tab".to_string(),
334 RemoteClient {
335 fxa_device_id: None,
336 device_name: "device with an updated tab".to_string(),
337 device_type: DeviceType::Unknown,
338 },
339 ),
340 ]),
341 };
342 engine
343 .prepare_for_sync(&|| client_data.clone())
344 .expect("should work");
345
346 let records = vec![
347 json!({
348 "id": "device-no-tabs",
349 "clientName": "device with no tabs",
350 "tabs": [],
351 }),
352 json!({
353 "id": "device-with-a-tab",
354 "clientName": "device with a tab",
355 "tabs": [{
356 "title": "the title",
357 "urlHistory": [
358 "https://mozilla.org/"
359 ],
360 "icon": "https://mozilla.org/icon",
361 "lastUsed": 1643764207
362 }]
363 }),
364 json!({
365 "id": "device-with-a-tab",
366 "clientName": "device with an updated tab",
367 "tabs": [{
368 "title": "the new title",
369 "urlHistory": [
370 "https://mozilla.org/"
371 ],
372 "icon": "https://mozilla.org/icon",
373 "lastUsed": 1643764208
374 }]
375 }),
376 json!({
378 "id": "device-with-invalid-tab",
379 "clientName": "device with a tab",
380 "tabs": [{
381 "foo": "bar",
382 }]
383 }),
384 json!({
386 "id": "invalid-tab",
387 "foo": "bar"
388 }),
389 ];
390
391 let mut telem = telemetry::Engine::new("tabs");
392 let incoming = records
393 .into_iter()
394 .map(IncomingBso::from_test_content)
395 .collect();
396 engine
397 .stage_incoming(incoming, &mut telem)
398 .expect("Should apply incoming and stage outgoing records");
399 let outgoing = engine
400 .apply(ServerTimestamp(0), &mut telem)
401 .expect("should apply");
402
403 assert!(outgoing.is_empty());
404
405 let mut storage = engine.store.storage.lock().unwrap();
407 let mut crts = storage.get_remote_tabs().expect("should work");
408 crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap());
409 assert_eq!(crts.len(), 2, "we currently include devices with no tabs");
410 let crt = &crts[0];
411 assert_eq!(crt.client_name, "device with an updated tab");
412 assert_eq!(crt.device_type, DeviceType::Unknown);
413 assert_eq!(crt.remote_tabs.len(), 1);
414 assert_eq!(crt.remote_tabs[0].title, "the new title");
415
416 let crt = &crts[1];
417 assert_eq!(crt.client_name, "device with no tabs");
418 assert_eq!(crt.device_type, DeviceType::Unknown);
419 assert_eq!(crt.remote_tabs.len(), 0);
420 }
421
422 #[test]
423 fn test_no_incoming_doesnt_write() {
424 error_support::init_for_tests();
425
426 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
427 "test_no_incoming_doesnt_write",
428 )));
429
430 let client_data = ClientData {
431 local_client_id: "my-device".to_string(),
432 recent_clients: HashMap::from([(
433 "device-with-a-tab".to_string(),
434 RemoteClient {
435 fxa_device_id: None,
436 device_name: "device-with-a-tab".to_string(),
437 device_type: DeviceType::Unknown,
438 },
439 )]),
440 };
441 engine
442 .prepare_for_sync(&|| client_data.clone())
443 .expect("should work");
444
445 let records = vec![json!({
446 "id": "device-with-a-tab",
447 "clientName": "device with a tab",
448 "tabs": [{
449 "title": "the title",
450 "urlHistory": [
451 "https://mozilla.org/"
452 ],
453 "icon": "https://mozilla.org/icon",
454 "lastUsed": 1643764207
455 }]
456 })];
457
458 let mut telem = telemetry::Engine::new("tabs");
459 let incoming = records
460 .into_iter()
461 .map(IncomingBso::from_test_content)
462 .collect();
463 engine
464 .stage_incoming(incoming, &mut telem)
465 .expect("Should apply incoming and stage outgoing records");
466 engine
467 .apply(ServerTimestamp(0), &mut telem)
468 .expect("should apply");
469
470 {
472 let mut storage = engine.store.storage.lock().unwrap();
473 assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
474 }
475
476 engine
479 .stage_incoming(vec![], &mut telemetry::Engine::new("tabs"))
480 .expect("Should succeed applying zero records");
481
482 {
483 let mut storage = engine.store.storage.lock().unwrap();
484 assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
485 }
486 }
487
488 #[test]
489 fn test_sync_manager_registration() {
490 let store = Arc::new(TabsStore::new_with_mem_path("test-registration"));
491 assert_eq!(Arc::strong_count(&store), 1);
492 assert_eq!(Arc::weak_count(&store), 0);
493 Arc::clone(&store).register_with_sync_manager();
494 assert_eq!(Arc::strong_count(&store), 1);
495 assert_eq!(Arc::weak_count(&store), 1);
496 let registered = STORE_FOR_MANAGER
497 .lock()
498 .unwrap()
499 .upgrade()
500 .expect("should upgrade");
501 assert!(Arc::ptr_eq(&store, ®istered));
502 drop(registered);
503 assert_eq!(Arc::strong_count(&store), 1);
505 assert_eq!(Arc::weak_count(&store), 1);
506 drop(store);
508 assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none());
509 }
510
511 #[test]
512 fn test_apply_timestamp() {
513 error_support::init_for_tests();
514
515 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
516 "test-apply-timestamp",
517 )));
518
519 let records = vec![json!({
520 "id": "device-no-tabs",
521 "clientName": "device with no tabs",
522 "tabs": [],
523 })];
524
525 let mut telem = telemetry::Engine::new("tabs");
526 engine
527 .set_last_sync(ServerTimestamp::from_millis(123))
528 .unwrap();
529 let incoming = records
530 .into_iter()
531 .map(IncomingBso::from_test_content)
532 .collect();
533 engine
534 .stage_incoming(incoming, &mut telem)
535 .expect("Should apply incoming and stage outgoing records");
536 engine
537 .apply(ServerTimestamp(0), &mut telem)
538 .expect("should apply");
539
540 assert_eq!(
541 engine
542 .get_last_sync()
543 .expect("should work")
544 .expect("should have a value"),
545 ServerTimestamp::from_millis(123),
546 "didn't set a zero timestamp"
547 )
548 }
549}