1use crate::schema;
6use crate::storage::{ClientRemoteTabs, RemoteTab, TABS_CLIENT_TTL};
7use crate::store::TabsStore;
8use crate::sync::record::{TabsRecord, TabsRecordTab};
9use anyhow::Result;
10use error_support::{debug, info, trace, warn};
11
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex, RwLock, Weak};
14use sync15::bso::{IncomingBso, OutgoingBso, OutgoingEnvelope};
15use sync15::engine::{
16 CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine, SyncEngineId,
17};
18use sync15::{telemetry, ClientData, CollectionName, DeviceType, RemoteClient, ServerTimestamp};
19use sync_guid::Guid;
20
21lazy_static::lazy_static! {
23 static ref STORE_FOR_MANAGER: Mutex<Weak<TabsStore>> = Mutex::new(Weak::new());
25}
26
27pub fn get_registered_sync_engine(
30 engine_id: &SyncEngineId,
31) -> Option<Box<dyn sync15::engine::SyncEngine>> {
32 let weak = STORE_FOR_MANAGER.lock().unwrap();
33 match weak.upgrade() {
34 None => None,
35 Some(store) => match engine_id {
36 SyncEngineId::Tabs => Some(Box::new(TabsEngine::new(Arc::clone(&store)))),
37 _ => unreachable!("can't provide unknown engine: {}", engine_id),
40 },
41 }
42}
43
44impl ClientRemoteTabs {
45 pub(crate) fn from_record_with_remote_client(
46 client_id: String,
47 last_modified: ServerTimestamp,
48 remote_client: &RemoteClient,
49 record: TabsRecord,
50 ) -> Self {
51 Self {
52 client_id,
53 client_name: remote_client.device_name.clone(),
54 device_type: remote_client.device_type,
55 last_modified: last_modified.as_millis(),
56 remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
57 }
58 }
59
60 pub(crate) fn from_record(
65 client_id: String,
66 last_modified: ServerTimestamp,
67 record: TabsRecord,
68 ) -> Self {
69 Self {
70 client_id,
71 client_name: record.client_name,
72 device_type: DeviceType::Unknown,
73 last_modified: last_modified.as_millis(),
74 remote_tabs: record.tabs.iter().map(RemoteTab::from_record_tab).collect(),
75 }
76 }
77 fn to_record(&self) -> TabsRecord {
78 TabsRecord {
79 id: self.client_id.clone(),
80 client_name: self.client_name.clone(),
81 tabs: self
82 .remote_tabs
83 .iter()
84 .map(RemoteTab::to_record_tab)
85 .collect(),
86 }
87 }
88}
89
90impl RemoteTab {
91 pub(crate) fn from_record_tab(tab: &TabsRecordTab) -> Self {
92 Self {
93 title: tab.title.clone(),
94 url_history: tab.url_history.clone(),
95 icon: tab.icon.clone(),
96 last_used: tab.last_used.checked_mul(1000).unwrap_or_default(),
97 inactive: tab.inactive,
98 }
99 }
100 pub(super) fn to_record_tab(&self) -> TabsRecordTab {
101 TabsRecordTab {
102 title: self.title.clone(),
103 url_history: self.url_history.clone(),
104 icon: self.icon.clone(),
105 last_used: self.last_used.checked_div(1000).unwrap_or_default(),
106 inactive: self.inactive,
107 }
108 }
109}
110
111pub struct TabsEngine {
114 pub(super) store: Arc<TabsStore>,
115 pub local_id: RwLock<String>,
117}
118
119impl TabsEngine {
120 pub fn new(store: Arc<TabsStore>) -> Self {
121 Self {
122 store,
123 local_id: Default::default(),
124 }
125 }
126
127 pub fn set_last_sync(&self, last_sync: ServerTimestamp) -> Result<()> {
128 let mut storage = self.store.storage.lock().unwrap();
129 debug!("Updating last sync to {}", last_sync);
130 let last_sync_millis = last_sync.as_millis();
131 Ok(storage.put_meta(schema::LAST_SYNC_META_KEY, &last_sync_millis)?)
132 }
133
134 pub fn get_last_sync(&self) -> Result<Option<ServerTimestamp>> {
135 let mut storage = self.store.storage.lock().unwrap();
136 let millis = storage.get_meta::<i64>(schema::LAST_SYNC_META_KEY)?;
137 Ok(millis.map(ServerTimestamp))
138 }
139}
140
141impl SyncEngine for TabsEngine {
142 fn collection_name(&self) -> CollectionName {
143 "tabs".into()
144 }
145
146 fn prepare_for_sync(&self, get_client_data: &dyn Fn() -> ClientData) -> Result<()> {
147 let mut storage = self.store.storage.lock().unwrap();
148 let client_data = get_client_data();
152 storage.put_meta(
153 schema::REMOTE_CLIENTS_KEY,
154 &serde_json::to_string(&client_data.recent_clients)?,
155 )?;
156 *self.local_id.write().unwrap() = client_data.local_client_id;
157 Ok(())
158 }
159
160 fn stage_incoming(
161 &self,
162 inbound: Vec<IncomingBso>,
163 telem: &mut telemetry::Engine,
164 ) -> Result<()> {
165 let local_id = &*self.local_id.read().unwrap();
167 let mut remote_tabs = Vec::with_capacity(inbound.len());
168
169 let mut incoming_telemetry = telemetry::EngineIncoming::new();
170 for incoming in inbound {
171 if incoming.envelope.id == *local_id {
172 continue;
174 }
175 let modified = incoming.envelope.modified;
176 let record = match incoming.into_content::<TabsRecord>().content() {
177 Some(record) => record,
178 None => {
179 warn!("Ignoring incoming invalid tab");
181 incoming_telemetry.failed(1);
182 continue;
183 }
184 };
185 incoming_telemetry.applied(1);
186 remote_tabs.push((record, modified));
187 }
188 telem.incoming(incoming_telemetry);
189 let mut storage = self.store.storage.lock().unwrap();
190 if !remote_tabs.is_empty() {
193 storage.replace_remote_tabs(&remote_tabs)?;
194 }
195 storage.remove_stale_clients()?;
196 storage.remove_old_pending_closures(&remote_tabs)?;
197 Ok(())
198 }
199
200 fn apply(
201 &self,
202 timestamp: ServerTimestamp,
203 _telem: &mut telemetry::Engine,
204 ) -> Result<Vec<OutgoingBso>> {
205 let (local_tabs, remote_clients) = {
207 let mut storage = self.store.storage.lock().unwrap();
208 let local_tabs = storage.prepare_local_tabs_for_upload();
209 let remote_clients: HashMap<String, RemoteClient> = {
210 match storage.get_meta::<String>(schema::REMOTE_CLIENTS_KEY)? {
211 None => HashMap::default(),
212 Some(json) => serde_json::from_str(&json).unwrap(),
213 }
214 };
215 (local_tabs, remote_clients)
216 };
217
218 let local_id = &*self.local_id.read().unwrap();
219 if timestamp.0 != 0 {
221 self.set_last_sync(timestamp)?;
222 }
223 let outgoing = if let Some(local_tabs) = local_tabs {
225 let (client_name, device_type) = remote_clients
226 .get(local_id)
227 .map(|client| (client.device_name.clone(), client.device_type))
228 .unwrap_or_else(|| (String::new(), DeviceType::Unknown));
229 let local_record = ClientRemoteTabs {
230 client_id: local_id.clone(),
231 client_name,
232 device_type,
233 last_modified: 0, remote_tabs: local_tabs.to_vec(),
235 };
236 trace!("outgoing {:?}", local_record);
237 let envelope = OutgoingEnvelope {
238 id: local_id.as_str().into(),
239 ttl: Some(TABS_CLIENT_TTL),
240 ..Default::default()
241 };
242 vec![OutgoingBso::from_content(
243 envelope,
244 local_record.to_record(),
245 )?]
246 } else {
247 vec![]
248 };
249 Ok(outgoing)
250 }
251
252 fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> Result<()> {
253 info!("sync uploaded {} records", ids.len());
254 self.set_last_sync(new_timestamp)?;
255 Ok(())
256 }
257
258 fn get_collection_request(
259 &self,
260 server_timestamp: ServerTimestamp,
261 ) -> Result<Option<CollectionRequest>> {
262 let since = self.get_last_sync()?.unwrap_or_default();
263 Ok(if since == server_timestamp {
264 None
265 } else {
266 Some(
267 CollectionRequest::new("tabs".into())
268 .full()
269 .newer_than(since),
270 )
271 })
272 }
273
274 fn reset(&self, assoc: &EngineSyncAssociation) -> Result<()> {
275 self.set_last_sync(ServerTimestamp(0))?;
276 let mut storage = self.store.storage.lock().unwrap();
277 storage.delete_meta(schema::REMOTE_CLIENTS_KEY)?;
278 storage.wipe_remote_tabs()?;
279 match assoc {
280 EngineSyncAssociation::Disconnected => {
281 storage.delete_meta(schema::GLOBAL_SYNCID_META_KEY)?;
282 storage.delete_meta(schema::COLLECTION_SYNCID_META_KEY)?;
283 }
284 EngineSyncAssociation::Connected(ids) => {
285 storage.put_meta(schema::GLOBAL_SYNCID_META_KEY, &ids.global.to_string())?;
286 storage.put_meta(schema::COLLECTION_SYNCID_META_KEY, &ids.coll.to_string())?;
287 }
288 };
289 Ok(())
290 }
291
292 fn wipe(&self) -> Result<()> {
293 self.reset(&EngineSyncAssociation::Disconnected)?;
294 self.store.storage.lock().unwrap().wipe_local_tabs();
297 Ok(())
298 }
299
300 fn get_sync_assoc(&self) -> Result<EngineSyncAssociation> {
301 let mut storage = self.store.storage.lock().unwrap();
302 let global = storage.get_meta::<String>(schema::GLOBAL_SYNCID_META_KEY)?;
303 let coll = storage.get_meta::<String>(schema::COLLECTION_SYNCID_META_KEY)?;
304 Ok(if let (Some(global), Some(coll)) = (global, coll) {
305 EngineSyncAssociation::Connected(CollSyncIds {
306 global: Guid::from_string(global),
307 coll: Guid::from_string(coll),
308 })
309 } else {
310 EngineSyncAssociation::Disconnected
311 })
312 }
313}
314
315impl crate::TabsStore {
316 pub fn register_with_sync_manager(self: Arc<Self>) {
322 let mut state = STORE_FOR_MANAGER.lock().unwrap();
323 *state = Arc::downgrade(&self);
324 }
325}
326
327#[cfg(test)]
328pub mod test {
329 use super::*;
330 use serde_json::json;
331 use sync15::bso::IncomingBso;
332
333 #[test]
334 fn test_incoming_tabs() {
335 error_support::init_for_tests();
336
337 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path("test-incoming")));
338
339 let records = vec![
340 json!({
341 "id": "device-no-tabs",
342 "clientName": "device with no tabs",
343 "tabs": [],
344 }),
345 json!({
346 "id": "device-with-a-tab",
347 "clientName": "device with a tab",
348 "tabs": [{
349 "title": "the title",
350 "urlHistory": [
351 "https://mozilla.org/"
352 ],
353 "icon": "https://mozilla.org/icon",
354 "lastUsed": 1643764207
355 }]
356 }),
357 json!({
359 "id": "device-with-a-tab",
360 "clientName": "device with an updated tab",
361 "tabs": [{
362 "title": "the new title",
363 "urlHistory": [
364 "https://mozilla.org/"
365 ],
366 "icon": "https://mozilla.org/icon",
367 "lastUsed": 1643764208
368 }]
369 }),
370 json!({
372 "id": "device-with-invalid-tab",
373 "clientName": "device with a tab",
374 "tabs": [{
375 "foo": "bar",
376 }]
377 }),
378 json!({
380 "id": "invalid-tab",
381 "foo": "bar"
382 }),
383 ];
384
385 let mut telem = telemetry::Engine::new("tabs");
386 let incoming = records
387 .into_iter()
388 .map(IncomingBso::from_test_content)
389 .collect();
390 engine
391 .stage_incoming(incoming, &mut telem)
392 .expect("Should apply incoming and stage outgoing records");
393 let outgoing = engine
394 .apply(ServerTimestamp(0), &mut telem)
395 .expect("should apply");
396
397 assert!(outgoing.is_empty());
398
399 let mut storage = engine.store.storage.lock().unwrap();
401 let mut crts = storage.get_remote_tabs().expect("should work");
402 crts.sort_by(|a, b| a.client_name.partial_cmp(&b.client_name).unwrap());
403 assert_eq!(crts.len(), 2, "we currently include devices with no tabs");
404 let crt = &crts[0];
405 assert_eq!(crt.client_name, "device with an updated tab");
406 assert_eq!(crt.device_type, DeviceType::Unknown);
407 assert_eq!(crt.remote_tabs.len(), 1);
408 assert_eq!(crt.remote_tabs[0].title, "the new title");
409
410 let crt = &crts[1];
411 assert_eq!(crt.client_name, "device with no tabs");
412 assert_eq!(crt.device_type, DeviceType::Unknown);
413 assert_eq!(crt.remote_tabs.len(), 0);
414 }
415
416 #[test]
417 fn test_no_incoming_doesnt_write() {
418 error_support::init_for_tests();
419
420 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
421 "test_no_incoming_doesnt_write",
422 )));
423
424 let records = vec![json!({
425 "id": "device-with-a-tab",
426 "clientName": "device with a tab",
427 "tabs": [{
428 "title": "the title",
429 "urlHistory": [
430 "https://mozilla.org/"
431 ],
432 "icon": "https://mozilla.org/icon",
433 "lastUsed": 1643764207
434 }]
435 })];
436
437 let mut telem = telemetry::Engine::new("tabs");
438 let incoming = records
439 .into_iter()
440 .map(IncomingBso::from_test_content)
441 .collect();
442 engine
443 .stage_incoming(incoming, &mut telem)
444 .expect("Should apply incoming and stage outgoing records");
445 engine
446 .apply(ServerTimestamp(0), &mut telem)
447 .expect("should apply");
448
449 {
451 let mut storage = engine.store.storage.lock().unwrap();
452 assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
453 }
454
455 engine
458 .stage_incoming(vec![], &mut telemetry::Engine::new("tabs"))
459 .expect("Should succeed applying zero records");
460
461 {
462 let mut storage = engine.store.storage.lock().unwrap();
463 assert_eq!(storage.get_remote_tabs().expect("should work").len(), 1);
464 }
465 }
466
467 #[test]
468 fn test_sync_manager_registration() {
469 let store = Arc::new(TabsStore::new_with_mem_path("test-registration"));
470 assert_eq!(Arc::strong_count(&store), 1);
471 assert_eq!(Arc::weak_count(&store), 0);
472 Arc::clone(&store).register_with_sync_manager();
473 assert_eq!(Arc::strong_count(&store), 1);
474 assert_eq!(Arc::weak_count(&store), 1);
475 let registered = STORE_FOR_MANAGER
476 .lock()
477 .unwrap()
478 .upgrade()
479 .expect("should upgrade");
480 assert!(Arc::ptr_eq(&store, ®istered));
481 drop(registered);
482 assert_eq!(Arc::strong_count(&store), 1);
484 assert_eq!(Arc::weak_count(&store), 1);
485 drop(store);
487 assert!(STORE_FOR_MANAGER.lock().unwrap().upgrade().is_none());
488 }
489
490 #[test]
491 fn test_apply_timestamp() {
492 error_support::init_for_tests();
493
494 let engine = TabsEngine::new(Arc::new(TabsStore::new_with_mem_path(
495 "test-apply-timestamp",
496 )));
497
498 let records = vec![json!({
499 "id": "device-no-tabs",
500 "clientName": "device with no tabs",
501 "tabs": [],
502 })];
503
504 let mut telem = telemetry::Engine::new("tabs");
505 engine
506 .set_last_sync(ServerTimestamp::from_millis(123))
507 .unwrap();
508 let incoming = records
509 .into_iter()
510 .map(IncomingBso::from_test_content)
511 .collect();
512 engine
513 .stage_incoming(incoming, &mut telem)
514 .expect("Should apply incoming and stage outgoing records");
515 engine
516 .apply(ServerTimestamp(0), &mut telem)
517 .expect("should apply");
518
519 assert_eq!(
520 engine
521 .get_last_sync()
522 .expect("should work")
523 .expect("should have a value"),
524 ServerTimestamp::from_millis(123),
525 "didn't set a zero timestamp"
526 )
527 }
528}