sync15/clients_engine/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::collections::{HashMap, HashSet};
6
7use crate::bso::{IncomingBso, IncomingKind, OutgoingBso, OutgoingEnvelope};
8use crate::client::{
9    CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration,
10    Sync15StorageClient,
11};
12use crate::client_types::{ClientData, RemoteClient};
13use crate::engine::CollectionRequest;
14use crate::error::{debug, info, warn, Result};
15use crate::{Guid, KeyBundle};
16use interrupt_support::Interruptee;
17
18use super::{
19    record::{ClientRecord, CommandRecord},
20    ser::shrink_to_fit,
21    Command, CommandProcessor, CommandStatus, CLIENTS_TTL,
22};
23
24const COLLECTION_NAME: &str = "clients";
25
26/// The driver for the clients engine. Internal; split out from the `Engine`
27/// struct to make testing easier.
28struct Driver<'a> {
29    command_processor: &'a dyn CommandProcessor,
30    interruptee: &'a dyn Interruptee,
31    config: &'a InfoConfiguration,
32    recent_clients: HashMap<String, RemoteClient>,
33}
34
35impl<'a> Driver<'a> {
36    fn new(
37        command_processor: &'a dyn CommandProcessor,
38        interruptee: &'a dyn Interruptee,
39        config: &'a InfoConfiguration,
40    ) -> Driver<'a> {
41        Driver {
42            command_processor,
43            interruptee,
44            config,
45            recent_clients: HashMap::new(),
46        }
47    }
48
49    fn note_recent_client(&mut self, client: &ClientRecord) {
50        self.recent_clients.insert(client.id.clone(), client.into());
51    }
52
53    fn sync(
54        &mut self,
55        inbound: Vec<IncomingBso>,
56        should_refresh_client: bool,
57    ) -> Result<Vec<OutgoingBso>> {
58        self.interruptee.err_if_interrupted()?;
59        let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
60
61        let mut has_own_client_record = false;
62        let mut changes = Vec::new();
63
64        for bso in inbound {
65            self.interruptee.err_if_interrupted()?;
66
67            let content = bso.into_content();
68
69            let client: ClientRecord = match content.kind {
70                IncomingKind::Malformed => {
71                    debug!("Error unpacking record");
72                    continue;
73                }
74                IncomingKind::Tombstone => {
75                    debug!("Record has been deleted; skipping...");
76                    continue;
77                }
78                IncomingKind::Content(client) => client,
79            };
80
81            if client.id == self.command_processor.settings().fxa_device_id {
82                debug!("Found my record on the server");
83                // If we see our own client record, apply any incoming commands,
84                // remove them from the list, and reupload the record. Any
85                // commands that we don't understand also go back in the list.
86                // https://github.com/mozilla/application-services/issues/1800
87                // tracks if that's the right thing to do.
88                has_own_client_record = true;
89                let mut current_client_record = self.current_client_record();
90                for c in &client.commands {
91                    let status = match c.as_command() {
92                        Some(command) => self.command_processor.apply_incoming_command(command)?,
93                        None => CommandStatus::Unsupported,
94                    };
95                    match status {
96                        CommandStatus::Applied => {}
97                        CommandStatus::Ignored => {
98                            debug!("Ignored command {:?}", c);
99                        }
100                        CommandStatus::Unsupported => {
101                            warn!("Don't know how to apply command {:?}", c);
102                            current_client_record.commands.push(c.clone());
103                        }
104                    }
105                }
106
107                // The clients collection has a hard limit on the payload size,
108                // after which the server starts rejecting our records. Large
109                // command lists can cause us to exceed this, so we truncate
110                // the list.
111                shrink_to_fit(
112                    &mut current_client_record.commands,
113                    self.memcache_max_record_payload_size(),
114                )?;
115
116                // Add the new client record to our map of recently synced
117                // clients, so that downstream consumers like synced tabs can
118                // access them.
119                self.note_recent_client(&current_client_record);
120
121                // We periodically upload our own client record, even if it
122                // doesn't change, to keep it fresh.
123                if should_refresh_client || client != current_client_record {
124                    debug!("Will update our client record on the server");
125                    let envelope = OutgoingEnvelope {
126                        id: content.envelope.id,
127                        ttl: Some(CLIENTS_TTL),
128                        ..Default::default()
129                    };
130                    changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
131                }
132            } else {
133                // Add the other client to our map of recently synced clients.
134                self.note_recent_client(&client);
135
136                // Bail if we don't have any outgoing commands to write into
137                // the other client's record.
138                if outgoing_commands.is_empty() {
139                    continue;
140                }
141
142                // Determine if we have new commands, that aren't already in the
143                // client's command list.
144                let current_commands: HashSet<Command> = client
145                    .commands
146                    .iter()
147                    .filter_map(|c| c.as_command())
148                    .collect();
149                let mut new_outgoing_commands = outgoing_commands
150                    .difference(&current_commands)
151                    .cloned()
152                    .collect::<Vec<_>>();
153                // Sort, to ensure deterministic ordering for tests.
154                new_outgoing_commands.sort();
155                let mut new_client = client.clone();
156                new_client
157                    .commands
158                    .extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
159                if new_client.commands.len() == client.commands.len() {
160                    continue;
161                }
162
163                // Hooray, we added new commands! Make sure the record still
164                // fits in the maximum record size, or the server will reject
165                // our upload.
166                shrink_to_fit(
167                    &mut new_client.commands,
168                    self.memcache_max_record_payload_size(),
169                )?;
170
171                let envelope = OutgoingEnvelope {
172                    id: content.envelope.id,
173                    ttl: Some(CLIENTS_TTL),
174                    ..Default::default()
175                };
176                changes.push(OutgoingBso::from_content(envelope, new_client)?);
177            }
178        }
179
180        // Upload a record for our own client, if we didn't replace it already.
181        if !has_own_client_record {
182            let current_client_record = self.current_client_record();
183            self.note_recent_client(&current_client_record);
184            let envelope = OutgoingEnvelope {
185                id: Guid::new(&current_client_record.id),
186                ttl: Some(CLIENTS_TTL),
187                ..Default::default()
188            };
189            changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
190        }
191
192        Ok(changes)
193    }
194
195    /// Builds a fresh client record for this device.
196    fn current_client_record(&self) -> ClientRecord {
197        let settings = self.command_processor.settings();
198        ClientRecord {
199            id: settings.fxa_device_id.clone(),
200            name: settings.device_name.clone(),
201            typ: settings.device_type,
202            commands: Vec::new(),
203            fxa_device_id: Some(settings.fxa_device_id.clone()),
204            version: None,
205            protocols: vec!["1.5".into()],
206            form_factor: None,
207            os: None,
208            app_package: None,
209            application: None,
210            device: None,
211        }
212    }
213
214    fn max_record_payload_size(&self) -> usize {
215        let payload_max = self.config.max_record_payload_bytes;
216        if payload_max <= self.config.max_post_bytes {
217            self.config.max_post_bytes.saturating_sub(4096)
218        } else {
219            payload_max
220        }
221    }
222
223    /// Collections stored in memcached ("tabs", "clients" or "meta") have a
224    /// different max size than ones stored in the normal storage server db.
225    /// In practice, the real limit here is 1M (bug 1300451 comment 40), but
226    /// there's overhead involved that is hard to calculate on the client, so we
227    /// use 512k to be safe (at the recommendation of the server team). Note
228    /// that if the server reports a lower limit (via info/configuration), we
229    /// respect that limit instead. See also bug 1403052.
230    /// XXX - the above comment is stale and refers to the world before the
231    /// move to spanner and the rust sync server.
232    fn memcache_max_record_payload_size(&self) -> usize {
233        self.max_record_payload_size().min(512 * 1024)
234    }
235}
236
237pub struct Engine<'a> {
238    pub command_processor: &'a dyn CommandProcessor,
239    pub interruptee: &'a dyn Interruptee,
240    pub recent_clients: HashMap<String, RemoteClient>,
241}
242
243impl Engine<'_> {
244    /// Creates a new clients engine that delegates to the given command
245    /// processor to apply incoming commands.
246    pub fn new<'b>(
247        command_processor: &'b dyn CommandProcessor,
248        interruptee: &'b dyn Interruptee,
249    ) -> Engine<'b> {
250        Engine {
251            command_processor,
252            interruptee,
253            recent_clients: HashMap::new(),
254        }
255    }
256
257    /// Syncs the clients collection. This works a little differently than
258    /// other collections:
259    ///
260    ///   1. It can't be disabled or declined.
261    ///   2. The sync ID and last sync time aren't meaningful, since we always
262    ///      fetch all client records on every sync. As such, the
263    ///      `LocalCollStateMachine` that we use for other engines doesn't
264    ///      apply to it.
265    ///   3. It doesn't persist state directly, but relies on the sync manager
266    ///      to persist device settings, and process commands.
267    ///   4. Failing to sync the clients collection is fatal, and aborts the
268    ///      sync.
269    ///
270    /// For these reasons, we implement this engine directly in the `sync15`
271    /// crate, and provide a specialized `sync` method instead of implementing
272    /// `sync15::Store`.
273    pub fn sync(
274        &mut self,
275        storage_client: &Sync15StorageClient,
276        global_state: &GlobalState,
277        root_sync_key: &KeyBundle,
278        should_refresh_client: bool,
279    ) -> Result<()> {
280        info!("Syncing collection clients");
281
282        let coll_keys = CollectionKeys::from_encrypted_payload(
283            global_state.keys.clone(),
284            global_state.keys_timestamp,
285            root_sync_key,
286        )?;
287        let coll_state = CollState {
288            config: global_state.config.clone(),
289            last_modified: global_state
290                .collections
291                .get(COLLECTION_NAME)
292                .cloned()
293                .unwrap_or_default(),
294            key: coll_keys.key_for_collection(COLLECTION_NAME).clone(),
295        };
296
297        let inbound = self.fetch_incoming(storage_client, &coll_state)?;
298
299        let mut driver = Driver::new(
300            self.command_processor,
301            self.interruptee,
302            &global_state.config,
303        );
304
305        let outgoing = driver.sync(inbound, should_refresh_client)?;
306        self.recent_clients = driver.recent_clients;
307
308        self.interruptee.err_if_interrupted()?;
309        let upload_info = CollectionUpdate::new_from_changeset(
310            storage_client,
311            &coll_state,
312            COLLECTION_NAME.into(),
313            outgoing,
314            true,
315        )?
316        .upload()?;
317
318        info!(
319            "Upload success ({} records success, {} records failed)",
320            upload_info.successful_ids.len(),
321            upload_info.failed_ids.len()
322        );
323
324        info!("Finished syncing clients");
325        Ok(())
326    }
327
328    fn fetch_incoming(
329        &self,
330        storage_client: &Sync15StorageClient,
331        coll_state: &CollState,
332    ) -> Result<Vec<IncomingBso>> {
333        // Note that, unlike other stores, we always fetch the full collection
334        // on every sync, so `inbound` will return all clients, not just the
335        // ones that changed since the last sync.
336        let coll_request = CollectionRequest::new(COLLECTION_NAME.into()).full();
337
338        self.interruptee.err_if_interrupted()?;
339        let inbound = crate::client::fetch_incoming(storage_client, coll_state, coll_request)?;
340
341        Ok(inbound)
342    }
343
344    pub fn local_client_id(&self) -> String {
345        // Bit dirty but it's the easiest way to reach to our own
346        // device ID without refactoring the whole sync manager crate.
347        self.command_processor.settings().fxa_device_id.clone()
348    }
349
350    pub fn get_client_data(&self) -> ClientData {
351        ClientData {
352            local_client_id: self.local_client_id(),
353            recent_clients: self.recent_clients.clone(),
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::super::{CommandStatus, DeviceType, Settings};
361    use super::*;
362    use crate::bso::IncomingBso;
363    use anyhow::Result;
364    use interrupt_support::NeverInterrupts;
365    use serde_json::{json, Value};
366    use std::iter::zip;
367
368    struct TestProcessor {
369        settings: Settings,
370        outgoing_commands: HashSet<Command>,
371    }
372
373    impl CommandProcessor for TestProcessor {
374        fn settings(&self) -> &Settings {
375            &self.settings
376        }
377
378        fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> {
379            Ok(if let Command::Reset(name) = command {
380                if name == "forms" {
381                    CommandStatus::Unsupported
382                } else {
383                    CommandStatus::Applied
384                }
385            } else {
386                CommandStatus::Ignored
387            })
388        }
389
390        fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> {
391            Ok(self.outgoing_commands.clone())
392        }
393    }
394
395    fn inbound_from_clients(clients: Value) -> Vec<IncomingBso> {
396        if let Value::Array(clients) = clients {
397            clients
398                .into_iter()
399                .map(IncomingBso::from_test_content)
400                .collect()
401        } else {
402            unreachable!("`clients` must be an array of client records")
403        }
404    }
405
406    #[test]
407    fn test_clients_sync() {
408        let processor = TestProcessor {
409            settings: Settings {
410                fxa_device_id: "deviceAAAAAA".into(),
411                device_name: "Laptop".into(),
412                device_type: DeviceType::Desktop,
413            },
414            outgoing_commands: [
415                Command::Wipe("bookmarks".into()),
416                Command::Reset("history".into()),
417            ]
418            .iter()
419            .cloned()
420            .collect(),
421        };
422
423        let config = InfoConfiguration::default();
424
425        let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
426
427        let inbound = inbound_from_clients(json!([{
428            "id": "deviceBBBBBB",
429            "name": "iPhone",
430            "type": "mobile",
431            "commands": [{
432                "command": "resetEngine",
433                "args": ["history"],
434            }],
435            "fxaDeviceId": "iPhooooooone",
436            "protocols": ["1.5"],
437            "device": "iPhone",
438        }, {
439            "id": "deviceCCCCCC",
440            "name": "Fenix",
441            "type": "mobile",
442            "commands": [],
443            "fxaDeviceId": "deviceCCCCCC",
444        }, {
445            "id": "deviceAAAAAA",
446            "name": "Laptop with a different name",
447            "type": "desktop",
448            "commands": [{
449                "command": "wipeEngine",
450                "args": ["logins"]
451            }, {
452                "command": "displayURI",
453                "args": ["http://example.com", "Fennec", "Example page"],
454                "flowID": "flooooooooow",
455            }, {
456                "command": "resetEngine",
457                "args": ["forms"],
458            }, {
459                "command": "logout",
460                "args": [],
461            }],
462            "fxaDeviceId": "deviceAAAAAA",
463        }]));
464
465        // Passing false for `should_refresh_client` - it should be ignored
466        // because we've changed the commands.
467        let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
468        outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
469
470        // Make sure the list of recently synced remote clients is correct.
471        let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"];
472        let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
473        actual_ids.sort();
474        assert_eq!(actual_ids, expected_ids);
475
476        let expected_remote_clients = &[
477            RemoteClient {
478                fxa_device_id: Some("deviceAAAAAA".to_string()),
479                device_name: "Laptop".into(),
480                device_type: DeviceType::Desktop,
481            },
482            RemoteClient {
483                fxa_device_id: Some("iPhooooooone".to_string()),
484                device_name: "iPhone".into(),
485                device_type: DeviceType::Mobile,
486            },
487            RemoteClient {
488                fxa_device_id: Some("deviceCCCCCC".to_string()),
489                device_name: "Fenix".into(),
490                device_type: DeviceType::Mobile,
491            },
492        ];
493        let actual_remote_clients = expected_ids
494            .iter()
495            .filter_map(|&id| driver.recent_clients.get(id))
496            .cloned()
497            .collect::<Vec<RemoteClient>>();
498        assert_eq!(actual_remote_clients, expected_remote_clients);
499
500        let expected = json!([{
501            "id": "deviceAAAAAA",
502            "name": "Laptop",
503            "type": "desktop",
504            "commands": [{
505                "command": "displayURI",
506                "args": ["http://example.com", "Fennec", "Example page"],
507                "flowID": "flooooooooow",
508            }, {
509                "command": "resetEngine",
510                "args": ["forms"],
511            }, {
512                "command": "logout",
513                "args": [],
514            }],
515            "fxaDeviceId": "deviceAAAAAA",
516            "protocols": ["1.5"],
517        }, {
518            "id": "deviceBBBBBB",
519            "name": "iPhone",
520            "type": "mobile",
521            "commands": [{
522                "command": "resetEngine",
523                "args": ["history"],
524            }, {
525                "command": "wipeEngine",
526                "args": ["bookmarks"],
527            }],
528            "fxaDeviceId": "iPhooooooone",
529            "protocols": ["1.5"],
530            "device": "iPhone",
531        }, {
532            "id": "deviceCCCCCC",
533            "name": "Fenix",
534            "type": "mobile",
535            "commands": [{
536                "command": "wipeEngine",
537                "args": ["bookmarks"],
538            }, {
539                "command": "resetEngine",
540                "args": ["history"],
541            }],
542            "fxaDeviceId": "deviceCCCCCC",
543        }]);
544        // turn outgoing into an incoming payload.
545        let incoming = outgoing
546            .into_iter()
547            .map(|c| OutgoingBso::to_test_incoming(&c))
548            .collect::<Vec<IncomingBso>>();
549        if let Value::Array(expected) = expected {
550            for (incoming_cleartext, exp_client) in zip(incoming, expected) {
551                let incoming_client: ClientRecord =
552                    incoming_cleartext.into_content().content().unwrap();
553                assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap());
554            }
555        } else {
556            unreachable!("`expected_clients` must be an array of client records")
557        }
558    }
559
560    #[test]
561    fn test_clients_sync_bad_incoming_record_skipped() {
562        let processor = TestProcessor {
563            settings: Settings {
564                fxa_device_id: "deviceAAAAAA".into(),
565                device_name: "Laptop".into(),
566                device_type: DeviceType::Desktop,
567            },
568            outgoing_commands: [].iter().cloned().collect(),
569        };
570
571        let config = InfoConfiguration::default();
572
573        let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
574
575        let inbound = inbound_from_clients(json!([{
576            "id": "deviceBBBBBB",
577            "name": "iPhone",
578            "type": "mobile",
579            "commands": [{
580                "command": "resetEngine",
581                "args": ["history"],
582            }],
583            "fxaDeviceId": "iPhooooooone",
584            "protocols": ["1.5"],
585            "device": "iPhone",
586        }, {
587            "id": "garbage",
588            "garbage": "value",
589        }, {
590            "id": "deviceCCCCCC",
591            "deleted": true,
592            "name": "Fenix",
593            "type": "mobile",
594            "commands": [],
595            "fxaDeviceId": "deviceCCCCCC",
596        }]));
597
598        driver.sync(inbound, false).expect("Should sync clients");
599
600        // Make sure the list of recently synced remote clients is correct.
601        let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
602        let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
603        actual_ids.sort();
604        assert_eq!(actual_ids, expected_ids);
605
606        let expected_remote_clients = &[
607            RemoteClient {
608                fxa_device_id: Some("deviceAAAAAA".to_string()),
609                device_name: "Laptop".into(),
610                device_type: DeviceType::Desktop,
611            },
612            RemoteClient {
613                fxa_device_id: Some("iPhooooooone".to_string()),
614                device_name: "iPhone".into(),
615                device_type: DeviceType::Mobile,
616            },
617        ];
618        let actual_remote_clients = expected_ids
619            .iter()
620            .filter_map(|&id| driver.recent_clients.get(id))
621            .cloned()
622            .collect::<Vec<RemoteClient>>();
623        assert_eq!(actual_remote_clients, expected_remote_clients);
624    }
625
626    #[test]
627    fn test_clients_sync_explicit_refresh() {
628        let processor = TestProcessor {
629            settings: Settings {
630                fxa_device_id: "deviceAAAAAA".into(),
631                device_name: "Laptop".into(),
632                device_type: DeviceType::Desktop,
633            },
634            outgoing_commands: [].iter().cloned().collect(),
635        };
636
637        let config = InfoConfiguration::default();
638
639        let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
640
641        let test_clients = json!([{
642            "id": "deviceBBBBBB",
643            "name": "iPhone",
644            "type": "mobile",
645            "commands": [{
646                "command": "resetEngine",
647                "args": ["history"],
648            }],
649            "fxaDeviceId": "iPhooooooone",
650            "protocols": ["1.5"],
651            "device": "iPhone",
652        }, {
653            "id": "deviceAAAAAA",
654            "name": "Laptop",
655            "type": "desktop",
656            "commands": [],
657            "fxaDeviceId": "deviceAAAAAA",
658            "protocols": ["1.5"],
659        }]);
660
661        let outgoing = driver
662            .sync(inbound_from_clients(test_clients.clone()), false)
663            .expect("Should sync clients");
664        // should be no outgoing changes.
665        assert_eq!(outgoing.len(), 0);
666
667        // Make sure the list of recently synced remote clients is correct and
668        // still includes our record we didn't update.
669        let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
670        let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
671        actual_ids.sort();
672        assert_eq!(actual_ids, expected_ids);
673
674        // Do it again - still no changes, but force a refresh.
675        let outgoing = driver
676            .sync(inbound_from_clients(test_clients), true)
677            .expect("Should sync clients");
678        assert_eq!(outgoing.len(), 1);
679
680        // Do it again - but this time with our own client record needing
681        // some change.
682        let inbound = inbound_from_clients(json!([{
683            "id": "deviceAAAAAA",
684            "name": "Laptop with New Name",
685            "type": "desktop",
686            "commands": [],
687            "fxaDeviceId": "deviceAAAAAA",
688            "protocols": ["1.5"],
689        }]));
690        let outgoing = driver.sync(inbound, false).expect("Should sync clients");
691        // should still be outgoing because the name changed.
692        assert_eq!(outgoing.len(), 1);
693    }
694
695    #[test]
696    fn test_fresh_client_record() {
697        let processor = TestProcessor {
698            settings: Settings {
699                fxa_device_id: "deviceAAAAAA".into(),
700                device_name: "Laptop".into(),
701                device_type: DeviceType::Desktop,
702            },
703            outgoing_commands: HashSet::new(),
704        };
705
706        let config = InfoConfiguration::default();
707
708        let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
709
710        let clients = json!([{
711            "id": "deviceBBBBBB",
712            "name": "iPhone",
713            "type": "mobile",
714            "commands": [{
715                "command": "resetEngine",
716                "args": ["history"],
717            }],
718            "fxaDeviceId": "iPhooooooone",
719            "protocols": ["1.5"],
720            "device": "iPhone",
721        }]);
722
723        let inbound = if let Value::Array(clients) = clients {
724            clients
725                .into_iter()
726                .map(IncomingBso::from_test_content)
727                .collect()
728        } else {
729            unreachable!("`clients` must be an array of client records")
730        };
731
732        // Passing false here for should_refresh_client, but it should be
733        // ignored as we don't have an existing record yet.
734        let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
735        outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
736
737        // Make sure the list of recently synced remote clients is correct.
738        let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
739        let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
740        actual_ids.sort();
741        assert_eq!(actual_ids, expected_ids);
742
743        let expected_remote_clients = &[
744            RemoteClient {
745                fxa_device_id: Some("deviceAAAAAA".to_string()),
746                device_name: "Laptop".into(),
747                device_type: DeviceType::Desktop,
748            },
749            RemoteClient {
750                fxa_device_id: Some("iPhooooooone".to_string()),
751                device_name: "iPhone".into(),
752                device_type: DeviceType::Mobile,
753            },
754        ];
755        let actual_remote_clients = expected_ids
756            .iter()
757            .filter_map(|&id| driver.recent_clients.get(id))
758            .cloned()
759            .collect::<Vec<RemoteClient>>();
760        assert_eq!(actual_remote_clients, expected_remote_clients);
761
762        let expected = json!([{
763            "id": "deviceAAAAAA",
764            "name": "Laptop",
765            "type": "desktop",
766            "fxaDeviceId": "deviceAAAAAA",
767            "protocols": ["1.5"],
768            "ttl": CLIENTS_TTL,
769        }]);
770        if let Value::Array(expected) = expected {
771            // turn outgoing into an incoming payload.
772            let incoming = outgoing
773                .into_iter()
774                .map(|c| OutgoingBso::to_test_incoming(&c))
775                .collect::<Vec<IncomingBso>>();
776            for (incoming_cleartext, record) in zip(incoming, expected) {
777                let incoming_client: ClientRecord =
778                    incoming_cleartext.into_content().content().unwrap();
779                assert_eq!(incoming_client, serde_json::from_value(record).unwrap());
780            }
781        } else {
782            unreachable!("`expected_clients` must be an array of client records")
783        }
784    }
785}