1use std::collections::{HashMap, HashSet};
6
7use crate::bso::{IncomingBso, IncomingKind, OutgoingBso, OutgoingEnvelope};
8use crate::client::{
9 CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration,
10 Sync15StorageClient,
11};
12use crate::client_types::{ClientData, RemoteClient};
13use crate::engine::CollectionRequest;
14use crate::error::{debug, info, warn, Result};
15use crate::{Guid, KeyBundle};
16use interrupt_support::Interruptee;
17
18use super::{
19 record::{ClientRecord, CommandRecord},
20 ser::shrink_to_fit,
21 Command, CommandProcessor, CommandStatus, CLIENTS_TTL,
22};
23
24const COLLECTION_NAME: &str = "clients";
25
26struct Driver<'a> {
29 command_processor: &'a dyn CommandProcessor,
30 interruptee: &'a dyn Interruptee,
31 config: &'a InfoConfiguration,
32 recent_clients: HashMap<String, RemoteClient>,
33}
34
35impl<'a> Driver<'a> {
36 fn new(
37 command_processor: &'a dyn CommandProcessor,
38 interruptee: &'a dyn Interruptee,
39 config: &'a InfoConfiguration,
40 ) -> Driver<'a> {
41 Driver {
42 command_processor,
43 interruptee,
44 config,
45 recent_clients: HashMap::new(),
46 }
47 }
48
49 fn note_recent_client(&mut self, client: &ClientRecord) {
50 self.recent_clients.insert(client.id.clone(), client.into());
51 }
52
53 fn sync(
54 &mut self,
55 inbound: Vec<IncomingBso>,
56 should_refresh_client: bool,
57 ) -> Result<Vec<OutgoingBso>> {
58 self.interruptee.err_if_interrupted()?;
59 let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
60
61 let mut has_own_client_record = false;
62 let mut changes = Vec::new();
63
64 for bso in inbound {
65 self.interruptee.err_if_interrupted()?;
66
67 let content = bso.into_content();
68
69 let client: ClientRecord = match content.kind {
70 IncomingKind::Malformed => {
71 debug!("Error unpacking record");
72 continue;
73 }
74 IncomingKind::Tombstone => {
75 debug!("Record has been deleted; skipping...");
76 continue;
77 }
78 IncomingKind::Content(client) => client,
79 };
80
81 if client.id == self.command_processor.settings().fxa_device_id {
82 debug!("Found my record on the server");
83 has_own_client_record = true;
89 let mut current_client_record = self.current_client_record();
90 for c in &client.commands {
91 let status = match c.as_command() {
92 Some(command) => self.command_processor.apply_incoming_command(command)?,
93 None => CommandStatus::Unsupported,
94 };
95 match status {
96 CommandStatus::Applied => {}
97 CommandStatus::Ignored => {
98 debug!("Ignored command {:?}", c);
99 }
100 CommandStatus::Unsupported => {
101 warn!("Don't know how to apply command {:?}", c);
102 current_client_record.commands.push(c.clone());
103 }
104 }
105 }
106
107 shrink_to_fit(
112 &mut current_client_record.commands,
113 self.memcache_max_record_payload_size(),
114 )?;
115
116 self.note_recent_client(¤t_client_record);
120
121 if should_refresh_client || client != current_client_record {
124 debug!("Will update our client record on the server");
125 let envelope = OutgoingEnvelope {
126 id: content.envelope.id,
127 ttl: Some(CLIENTS_TTL),
128 ..Default::default()
129 };
130 changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
131 }
132 } else {
133 self.note_recent_client(&client);
135
136 if outgoing_commands.is_empty() {
139 continue;
140 }
141
142 let current_commands: HashSet<Command> = client
145 .commands
146 .iter()
147 .filter_map(|c| c.as_command())
148 .collect();
149 let mut new_outgoing_commands = outgoing_commands
150 .difference(¤t_commands)
151 .cloned()
152 .collect::<Vec<_>>();
153 new_outgoing_commands.sort();
155 let mut new_client = client.clone();
156 new_client
157 .commands
158 .extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
159 if new_client.commands.len() == client.commands.len() {
160 continue;
161 }
162
163 shrink_to_fit(
167 &mut new_client.commands,
168 self.memcache_max_record_payload_size(),
169 )?;
170
171 let envelope = OutgoingEnvelope {
172 id: content.envelope.id,
173 ttl: Some(CLIENTS_TTL),
174 ..Default::default()
175 };
176 changes.push(OutgoingBso::from_content(envelope, new_client)?);
177 }
178 }
179
180 if !has_own_client_record {
182 let current_client_record = self.current_client_record();
183 self.note_recent_client(¤t_client_record);
184 let envelope = OutgoingEnvelope {
185 id: Guid::new(¤t_client_record.id),
186 ttl: Some(CLIENTS_TTL),
187 ..Default::default()
188 };
189 changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
190 }
191
192 Ok(changes)
193 }
194
195 fn current_client_record(&self) -> ClientRecord {
197 let settings = self.command_processor.settings();
198 ClientRecord {
199 id: settings.fxa_device_id.clone(),
200 name: settings.device_name.clone(),
201 typ: settings.device_type,
202 commands: Vec::new(),
203 fxa_device_id: Some(settings.fxa_device_id.clone()),
204 version: None,
205 protocols: vec!["1.5".into()],
206 form_factor: None,
207 os: None,
208 app_package: None,
209 application: None,
210 device: None,
211 }
212 }
213
214 fn max_record_payload_size(&self) -> usize {
215 let payload_max = self.config.max_record_payload_bytes;
216 if payload_max <= self.config.max_post_bytes {
217 self.config.max_post_bytes.saturating_sub(4096)
218 } else {
219 payload_max
220 }
221 }
222
223 fn memcache_max_record_payload_size(&self) -> usize {
233 self.max_record_payload_size().min(512 * 1024)
234 }
235}
236
237pub struct Engine<'a> {
238 pub command_processor: &'a dyn CommandProcessor,
239 pub interruptee: &'a dyn Interruptee,
240 pub recent_clients: HashMap<String, RemoteClient>,
241}
242
243impl Engine<'_> {
244 pub fn new<'b>(
247 command_processor: &'b dyn CommandProcessor,
248 interruptee: &'b dyn Interruptee,
249 ) -> Engine<'b> {
250 Engine {
251 command_processor,
252 interruptee,
253 recent_clients: HashMap::new(),
254 }
255 }
256
257 pub fn sync(
274 &mut self,
275 storage_client: &Sync15StorageClient,
276 global_state: &GlobalState,
277 root_sync_key: &KeyBundle,
278 should_refresh_client: bool,
279 ) -> Result<()> {
280 info!("Syncing collection clients");
281
282 let coll_keys = CollectionKeys::from_encrypted_payload(
283 global_state.keys.clone(),
284 global_state.keys_timestamp,
285 root_sync_key,
286 )?;
287 let coll_state = CollState {
288 config: global_state.config.clone(),
289 last_modified: global_state
290 .collections
291 .get(COLLECTION_NAME)
292 .cloned()
293 .unwrap_or_default(),
294 key: coll_keys.key_for_collection(COLLECTION_NAME).clone(),
295 };
296
297 let inbound = self.fetch_incoming(storage_client, &coll_state)?;
298
299 let mut driver = Driver::new(
300 self.command_processor,
301 self.interruptee,
302 &global_state.config,
303 );
304
305 let outgoing = driver.sync(inbound, should_refresh_client)?;
306 self.recent_clients = driver.recent_clients;
307
308 self.interruptee.err_if_interrupted()?;
309 let upload_info = CollectionUpdate::new_from_changeset(
310 storage_client,
311 &coll_state,
312 COLLECTION_NAME.into(),
313 outgoing,
314 true,
315 )?
316 .upload()?;
317
318 info!(
319 "Upload success ({} records success, {} records failed)",
320 upload_info.successful_ids.len(),
321 upload_info.failed_ids.len()
322 );
323
324 info!("Finished syncing clients");
325 Ok(())
326 }
327
328 fn fetch_incoming(
329 &self,
330 storage_client: &Sync15StorageClient,
331 coll_state: &CollState,
332 ) -> Result<Vec<IncomingBso>> {
333 let coll_request = CollectionRequest::new(COLLECTION_NAME.into()).full();
337
338 self.interruptee.err_if_interrupted()?;
339 let inbound = crate::client::fetch_incoming(storage_client, coll_state, coll_request)?;
340
341 Ok(inbound)
342 }
343
344 pub fn local_client_id(&self) -> String {
345 self.command_processor.settings().fxa_device_id.clone()
348 }
349
350 pub fn get_client_data(&self) -> ClientData {
351 ClientData {
352 local_client_id: self.local_client_id(),
353 recent_clients: self.recent_clients.clone(),
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::super::{CommandStatus, DeviceType, Settings};
361 use super::*;
362 use crate::bso::IncomingBso;
363 use anyhow::Result;
364 use interrupt_support::NeverInterrupts;
365 use serde_json::{json, Value};
366 use std::iter::zip;
367
368 struct TestProcessor {
369 settings: Settings,
370 outgoing_commands: HashSet<Command>,
371 }
372
373 impl CommandProcessor for TestProcessor {
374 fn settings(&self) -> &Settings {
375 &self.settings
376 }
377
378 fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> {
379 Ok(if let Command::Reset(name) = command {
380 if name == "forms" {
381 CommandStatus::Unsupported
382 } else {
383 CommandStatus::Applied
384 }
385 } else {
386 CommandStatus::Ignored
387 })
388 }
389
390 fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> {
391 Ok(self.outgoing_commands.clone())
392 }
393 }
394
395 fn inbound_from_clients(clients: Value) -> Vec<IncomingBso> {
396 if let Value::Array(clients) = clients {
397 clients
398 .into_iter()
399 .map(IncomingBso::from_test_content)
400 .collect()
401 } else {
402 unreachable!("`clients` must be an array of client records")
403 }
404 }
405
406 #[test]
407 fn test_clients_sync() {
408 let processor = TestProcessor {
409 settings: Settings {
410 fxa_device_id: "deviceAAAAAA".into(),
411 device_name: "Laptop".into(),
412 device_type: DeviceType::Desktop,
413 },
414 outgoing_commands: [
415 Command::Wipe("bookmarks".into()),
416 Command::Reset("history".into()),
417 ]
418 .iter()
419 .cloned()
420 .collect(),
421 };
422
423 let config = InfoConfiguration::default();
424
425 let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
426
427 let inbound = inbound_from_clients(json!([{
428 "id": "deviceBBBBBB",
429 "name": "iPhone",
430 "type": "mobile",
431 "commands": [{
432 "command": "resetEngine",
433 "args": ["history"],
434 }],
435 "fxaDeviceId": "iPhooooooone",
436 "protocols": ["1.5"],
437 "device": "iPhone",
438 }, {
439 "id": "deviceCCCCCC",
440 "name": "Fenix",
441 "type": "mobile",
442 "commands": [],
443 "fxaDeviceId": "deviceCCCCCC",
444 }, {
445 "id": "deviceAAAAAA",
446 "name": "Laptop with a different name",
447 "type": "desktop",
448 "commands": [{
449 "command": "wipeEngine",
450 "args": ["logins"]
451 }, {
452 "command": "displayURI",
453 "args": ["http://example.com", "Fennec", "Example page"],
454 "flowID": "flooooooooow",
455 }, {
456 "command": "resetEngine",
457 "args": ["forms"],
458 }, {
459 "command": "logout",
460 "args": [],
461 }],
462 "fxaDeviceId": "deviceAAAAAA",
463 }]));
464
465 let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
468 outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
469
470 let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"];
472 let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
473 actual_ids.sort();
474 assert_eq!(actual_ids, expected_ids);
475
476 let expected_remote_clients = &[
477 RemoteClient {
478 fxa_device_id: Some("deviceAAAAAA".to_string()),
479 device_name: "Laptop".into(),
480 device_type: DeviceType::Desktop,
481 },
482 RemoteClient {
483 fxa_device_id: Some("iPhooooooone".to_string()),
484 device_name: "iPhone".into(),
485 device_type: DeviceType::Mobile,
486 },
487 RemoteClient {
488 fxa_device_id: Some("deviceCCCCCC".to_string()),
489 device_name: "Fenix".into(),
490 device_type: DeviceType::Mobile,
491 },
492 ];
493 let actual_remote_clients = expected_ids
494 .iter()
495 .filter_map(|&id| driver.recent_clients.get(id))
496 .cloned()
497 .collect::<Vec<RemoteClient>>();
498 assert_eq!(actual_remote_clients, expected_remote_clients);
499
500 let expected = json!([{
501 "id": "deviceAAAAAA",
502 "name": "Laptop",
503 "type": "desktop",
504 "commands": [{
505 "command": "displayURI",
506 "args": ["http://example.com", "Fennec", "Example page"],
507 "flowID": "flooooooooow",
508 }, {
509 "command": "resetEngine",
510 "args": ["forms"],
511 }, {
512 "command": "logout",
513 "args": [],
514 }],
515 "fxaDeviceId": "deviceAAAAAA",
516 "protocols": ["1.5"],
517 }, {
518 "id": "deviceBBBBBB",
519 "name": "iPhone",
520 "type": "mobile",
521 "commands": [{
522 "command": "resetEngine",
523 "args": ["history"],
524 }, {
525 "command": "wipeEngine",
526 "args": ["bookmarks"],
527 }],
528 "fxaDeviceId": "iPhooooooone",
529 "protocols": ["1.5"],
530 "device": "iPhone",
531 }, {
532 "id": "deviceCCCCCC",
533 "name": "Fenix",
534 "type": "mobile",
535 "commands": [{
536 "command": "wipeEngine",
537 "args": ["bookmarks"],
538 }, {
539 "command": "resetEngine",
540 "args": ["history"],
541 }],
542 "fxaDeviceId": "deviceCCCCCC",
543 }]);
544 let incoming = outgoing
546 .into_iter()
547 .map(|c| OutgoingBso::to_test_incoming(&c))
548 .collect::<Vec<IncomingBso>>();
549 if let Value::Array(expected) = expected {
550 for (incoming_cleartext, exp_client) in zip(incoming, expected) {
551 let incoming_client: ClientRecord =
552 incoming_cleartext.into_content().content().unwrap();
553 assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap());
554 }
555 } else {
556 unreachable!("`expected_clients` must be an array of client records")
557 }
558 }
559
560 #[test]
561 fn test_clients_sync_bad_incoming_record_skipped() {
562 let processor = TestProcessor {
563 settings: Settings {
564 fxa_device_id: "deviceAAAAAA".into(),
565 device_name: "Laptop".into(),
566 device_type: DeviceType::Desktop,
567 },
568 outgoing_commands: [].iter().cloned().collect(),
569 };
570
571 let config = InfoConfiguration::default();
572
573 let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
574
575 let inbound = inbound_from_clients(json!([{
576 "id": "deviceBBBBBB",
577 "name": "iPhone",
578 "type": "mobile",
579 "commands": [{
580 "command": "resetEngine",
581 "args": ["history"],
582 }],
583 "fxaDeviceId": "iPhooooooone",
584 "protocols": ["1.5"],
585 "device": "iPhone",
586 }, {
587 "id": "garbage",
588 "garbage": "value",
589 }, {
590 "id": "deviceCCCCCC",
591 "deleted": true,
592 "name": "Fenix",
593 "type": "mobile",
594 "commands": [],
595 "fxaDeviceId": "deviceCCCCCC",
596 }]));
597
598 driver.sync(inbound, false).expect("Should sync clients");
599
600 let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
602 let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
603 actual_ids.sort();
604 assert_eq!(actual_ids, expected_ids);
605
606 let expected_remote_clients = &[
607 RemoteClient {
608 fxa_device_id: Some("deviceAAAAAA".to_string()),
609 device_name: "Laptop".into(),
610 device_type: DeviceType::Desktop,
611 },
612 RemoteClient {
613 fxa_device_id: Some("iPhooooooone".to_string()),
614 device_name: "iPhone".into(),
615 device_type: DeviceType::Mobile,
616 },
617 ];
618 let actual_remote_clients = expected_ids
619 .iter()
620 .filter_map(|&id| driver.recent_clients.get(id))
621 .cloned()
622 .collect::<Vec<RemoteClient>>();
623 assert_eq!(actual_remote_clients, expected_remote_clients);
624 }
625
626 #[test]
627 fn test_clients_sync_explicit_refresh() {
628 let processor = TestProcessor {
629 settings: Settings {
630 fxa_device_id: "deviceAAAAAA".into(),
631 device_name: "Laptop".into(),
632 device_type: DeviceType::Desktop,
633 },
634 outgoing_commands: [].iter().cloned().collect(),
635 };
636
637 let config = InfoConfiguration::default();
638
639 let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
640
641 let test_clients = json!([{
642 "id": "deviceBBBBBB",
643 "name": "iPhone",
644 "type": "mobile",
645 "commands": [{
646 "command": "resetEngine",
647 "args": ["history"],
648 }],
649 "fxaDeviceId": "iPhooooooone",
650 "protocols": ["1.5"],
651 "device": "iPhone",
652 }, {
653 "id": "deviceAAAAAA",
654 "name": "Laptop",
655 "type": "desktop",
656 "commands": [],
657 "fxaDeviceId": "deviceAAAAAA",
658 "protocols": ["1.5"],
659 }]);
660
661 let outgoing = driver
662 .sync(inbound_from_clients(test_clients.clone()), false)
663 .expect("Should sync clients");
664 assert_eq!(outgoing.len(), 0);
666
667 let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
670 let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
671 actual_ids.sort();
672 assert_eq!(actual_ids, expected_ids);
673
674 let outgoing = driver
676 .sync(inbound_from_clients(test_clients), true)
677 .expect("Should sync clients");
678 assert_eq!(outgoing.len(), 1);
679
680 let inbound = inbound_from_clients(json!([{
683 "id": "deviceAAAAAA",
684 "name": "Laptop with New Name",
685 "type": "desktop",
686 "commands": [],
687 "fxaDeviceId": "deviceAAAAAA",
688 "protocols": ["1.5"],
689 }]));
690 let outgoing = driver.sync(inbound, false).expect("Should sync clients");
691 assert_eq!(outgoing.len(), 1);
693 }
694
695 #[test]
696 fn test_fresh_client_record() {
697 let processor = TestProcessor {
698 settings: Settings {
699 fxa_device_id: "deviceAAAAAA".into(),
700 device_name: "Laptop".into(),
701 device_type: DeviceType::Desktop,
702 },
703 outgoing_commands: HashSet::new(),
704 };
705
706 let config = InfoConfiguration::default();
707
708 let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
709
710 let clients = json!([{
711 "id": "deviceBBBBBB",
712 "name": "iPhone",
713 "type": "mobile",
714 "commands": [{
715 "command": "resetEngine",
716 "args": ["history"],
717 }],
718 "fxaDeviceId": "iPhooooooone",
719 "protocols": ["1.5"],
720 "device": "iPhone",
721 }]);
722
723 let inbound = if let Value::Array(clients) = clients {
724 clients
725 .into_iter()
726 .map(IncomingBso::from_test_content)
727 .collect()
728 } else {
729 unreachable!("`clients` must be an array of client records")
730 };
731
732 let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
735 outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
736
737 let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
739 let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
740 actual_ids.sort();
741 assert_eq!(actual_ids, expected_ids);
742
743 let expected_remote_clients = &[
744 RemoteClient {
745 fxa_device_id: Some("deviceAAAAAA".to_string()),
746 device_name: "Laptop".into(),
747 device_type: DeviceType::Desktop,
748 },
749 RemoteClient {
750 fxa_device_id: Some("iPhooooooone".to_string()),
751 device_name: "iPhone".into(),
752 device_type: DeviceType::Mobile,
753 },
754 ];
755 let actual_remote_clients = expected_ids
756 .iter()
757 .filter_map(|&id| driver.recent_clients.get(id))
758 .cloned()
759 .collect::<Vec<RemoteClient>>();
760 assert_eq!(actual_remote_clients, expected_remote_clients);
761
762 let expected = json!([{
763 "id": "deviceAAAAAA",
764 "name": "Laptop",
765 "type": "desktop",
766 "fxaDeviceId": "deviceAAAAAA",
767 "protocols": ["1.5"],
768 "ttl": CLIENTS_TTL,
769 }]);
770 if let Value::Array(expected) = expected {
771 let incoming = outgoing
773 .into_iter()
774 .map(|c| OutgoingBso::to_test_incoming(&c))
775 .collect::<Vec<IncomingBso>>();
776 for (incoming_cleartext, record) in zip(incoming, expected) {
777 let incoming_client: ClientRecord =
778 incoming_cleartext.into_content().content().unwrap();
779 assert_eq!(incoming_client, serde_json::from_value(record).unwrap());
780 }
781 } else {
782 unreachable!("`expected_clients` must be an array of client records")
783 }
784 }
785}