use std::collections::{HashMap, HashSet};
use crate::bso::{IncomingBso, IncomingKind, OutgoingBso, OutgoingEnvelope};
use crate::client::{
CollState, CollectionKeys, CollectionUpdate, GlobalState, InfoConfiguration,
Sync15StorageClient,
};
use crate::client_types::{ClientData, RemoteClient};
use crate::engine::CollectionRequest;
use crate::{error::Result, Guid, KeyBundle};
use interrupt_support::Interruptee;
use super::{
record::{ClientRecord, CommandRecord},
ser::shrink_to_fit,
Command, CommandProcessor, CommandStatus, CLIENTS_TTL,
};
const COLLECTION_NAME: &str = "clients";
struct Driver<'a> {
command_processor: &'a dyn CommandProcessor,
interruptee: &'a dyn Interruptee,
config: &'a InfoConfiguration,
recent_clients: HashMap<String, RemoteClient>,
}
impl<'a> Driver<'a> {
fn new(
command_processor: &'a dyn CommandProcessor,
interruptee: &'a dyn Interruptee,
config: &'a InfoConfiguration,
) -> Driver<'a> {
Driver {
command_processor,
interruptee,
config,
recent_clients: HashMap::new(),
}
}
fn note_recent_client(&mut self, client: &ClientRecord) {
self.recent_clients.insert(client.id.clone(), client.into());
}
fn sync(
&mut self,
inbound: Vec<IncomingBso>,
should_refresh_client: bool,
) -> Result<Vec<OutgoingBso>> {
self.interruptee.err_if_interrupted()?;
let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
let mut has_own_client_record = false;
let mut changes = Vec::new();
for bso in inbound {
self.interruptee.err_if_interrupted()?;
let content = bso.into_content();
let client: ClientRecord = match content.kind {
IncomingKind::Malformed => {
log::debug!("Error unpacking record");
continue;
}
IncomingKind::Tombstone => {
log::debug!("Record has been deleted; skipping...");
continue;
}
IncomingKind::Content(client) => client,
};
if client.id == self.command_processor.settings().fxa_device_id {
log::debug!("Found my record on the server");
has_own_client_record = true;
let mut current_client_record = self.current_client_record();
for c in &client.commands {
let status = match c.as_command() {
Some(command) => self.command_processor.apply_incoming_command(command)?,
None => CommandStatus::Unsupported,
};
match status {
CommandStatus::Applied => {}
CommandStatus::Ignored => {
log::debug!("Ignored command {:?}", c);
}
CommandStatus::Unsupported => {
log::warn!("Don't know how to apply command {:?}", c);
current_client_record.commands.push(c.clone());
}
}
}
shrink_to_fit(
&mut current_client_record.commands,
self.memcache_max_record_payload_size(),
)?;
self.note_recent_client(¤t_client_record);
if should_refresh_client || client != current_client_record {
log::debug!("Will update our client record on the server");
let envelope = OutgoingEnvelope {
id: content.envelope.id,
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
}
} else {
self.note_recent_client(&client);
if outgoing_commands.is_empty() {
continue;
}
let current_commands: HashSet<Command> = client
.commands
.iter()
.filter_map(|c| c.as_command())
.collect();
let mut new_outgoing_commands = outgoing_commands
.difference(¤t_commands)
.cloned()
.collect::<Vec<_>>();
new_outgoing_commands.sort();
let mut new_client = client.clone();
new_client
.commands
.extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
if new_client.commands.len() == client.commands.len() {
continue;
}
shrink_to_fit(
&mut new_client.commands,
self.memcache_max_record_payload_size(),
)?;
let envelope = OutgoingEnvelope {
id: content.envelope.id,
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, new_client)?);
}
}
if !has_own_client_record {
let current_client_record = self.current_client_record();
self.note_recent_client(¤t_client_record);
let envelope = OutgoingEnvelope {
id: Guid::new(¤t_client_record.id),
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
}
Ok(changes)
}
fn current_client_record(&self) -> ClientRecord {
let settings = self.command_processor.settings();
ClientRecord {
id: settings.fxa_device_id.clone(),
name: settings.device_name.clone(),
typ: settings.device_type,
commands: Vec::new(),
fxa_device_id: Some(settings.fxa_device_id.clone()),
version: None,
protocols: vec!["1.5".into()],
form_factor: None,
os: None,
app_package: None,
application: None,
device: None,
}
}
fn max_record_payload_size(&self) -> usize {
let payload_max = self.config.max_record_payload_bytes;
if payload_max <= self.config.max_post_bytes {
self.config.max_post_bytes.saturating_sub(4096)
} else {
payload_max
}
}
fn memcache_max_record_payload_size(&self) -> usize {
self.max_record_payload_size().min(512 * 1024)
}
}
pub struct Engine<'a> {
pub command_processor: &'a dyn CommandProcessor,
pub interruptee: &'a dyn Interruptee,
pub recent_clients: HashMap<String, RemoteClient>,
}
impl Engine<'_> {
pub fn new<'b>(
command_processor: &'b dyn CommandProcessor,
interruptee: &'b dyn Interruptee,
) -> Engine<'b> {
Engine {
command_processor,
interruptee,
recent_clients: HashMap::new(),
}
}
pub fn sync(
&mut self,
storage_client: &Sync15StorageClient,
global_state: &GlobalState,
root_sync_key: &KeyBundle,
should_refresh_client: bool,
) -> Result<()> {
log::info!("Syncing collection clients");
let coll_keys = CollectionKeys::from_encrypted_payload(
global_state.keys.clone(),
global_state.keys_timestamp,
root_sync_key,
)?;
let coll_state = CollState {
config: global_state.config.clone(),
last_modified: global_state
.collections
.get(COLLECTION_NAME)
.cloned()
.unwrap_or_default(),
key: coll_keys.key_for_collection(COLLECTION_NAME).clone(),
};
let inbound = self.fetch_incoming(storage_client, &coll_state)?;
let mut driver = Driver::new(
self.command_processor,
self.interruptee,
&global_state.config,
);
let outgoing = driver.sync(inbound, should_refresh_client)?;
self.recent_clients = driver.recent_clients;
self.interruptee.err_if_interrupted()?;
let upload_info = CollectionUpdate::new_from_changeset(
storage_client,
&coll_state,
COLLECTION_NAME.into(),
outgoing,
true,
)?
.upload()?;
log::info!(
"Upload success ({} records success, {} records failed)",
upload_info.successful_ids.len(),
upload_info.failed_ids.len()
);
log::info!("Finished syncing clients");
Ok(())
}
fn fetch_incoming(
&self,
storage_client: &Sync15StorageClient,
coll_state: &CollState,
) -> Result<Vec<IncomingBso>> {
let coll_request = CollectionRequest::new(COLLECTION_NAME.into()).full();
self.interruptee.err_if_interrupted()?;
let inbound = crate::client::fetch_incoming(storage_client, coll_state, coll_request)?;
Ok(inbound)
}
pub fn local_client_id(&self) -> String {
self.command_processor.settings().fxa_device_id.clone()
}
pub fn get_client_data(&self) -> ClientData {
ClientData {
local_client_id: self.local_client_id(),
recent_clients: self.recent_clients.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::super::{CommandStatus, DeviceType, Settings};
use super::*;
use crate::bso::IncomingBso;
use anyhow::Result;
use interrupt_support::NeverInterrupts;
use serde_json::{json, Value};
use std::iter::zip;
struct TestProcessor {
settings: Settings,
outgoing_commands: HashSet<Command>,
}
impl CommandProcessor for TestProcessor {
fn settings(&self) -> &Settings {
&self.settings
}
fn apply_incoming_command(&self, command: Command) -> Result<CommandStatus> {
Ok(if let Command::Reset(name) = command {
if name == "forms" {
CommandStatus::Unsupported
} else {
CommandStatus::Applied
}
} else {
CommandStatus::Ignored
})
}
fn fetch_outgoing_commands(&self) -> Result<HashSet<Command>> {
Ok(self.outgoing_commands.clone())
}
}
fn inbound_from_clients(clients: Value) -> Vec<IncomingBso> {
if let Value::Array(clients) = clients {
clients
.into_iter()
.map(IncomingBso::from_test_content)
.collect()
} else {
unreachable!("`clients` must be an array of client records")
}
}
#[test]
fn test_clients_sync() {
let processor = TestProcessor {
settings: Settings {
fxa_device_id: "deviceAAAAAA".into(),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
outgoing_commands: [
Command::Wipe("bookmarks".into()),
Command::Reset("history".into()),
]
.iter()
.cloned()
.collect(),
};
let config = InfoConfiguration::default();
let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
let inbound = inbound_from_clients(json!([{
"id": "deviceBBBBBB",
"name": "iPhone",
"type": "mobile",
"commands": [{
"command": "resetEngine",
"args": ["history"],
}],
"fxaDeviceId": "iPhooooooone",
"protocols": ["1.5"],
"device": "iPhone",
}, {
"id": "deviceCCCCCC",
"name": "Fenix",
"type": "mobile",
"commands": [],
"fxaDeviceId": "deviceCCCCCC",
}, {
"id": "deviceAAAAAA",
"name": "Laptop with a different name",
"type": "desktop",
"commands": [{
"command": "wipeEngine",
"args": ["logins"]
}, {
"command": "displayURI",
"args": ["http://example.com", "Fennec", "Example page"],
"flowID": "flooooooooow",
}, {
"command": "resetEngine",
"args": ["forms"],
}, {
"command": "logout",
"args": [],
}],
"fxaDeviceId": "deviceAAAAAA",
}]));
let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
let expected_ids = &["deviceAAAAAA", "deviceBBBBBB", "deviceCCCCCC"];
let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
actual_ids.sort();
assert_eq!(actual_ids, expected_ids);
let expected_remote_clients = &[
RemoteClient {
fxa_device_id: Some("deviceAAAAAA".to_string()),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
RemoteClient {
fxa_device_id: Some("iPhooooooone".to_string()),
device_name: "iPhone".into(),
device_type: DeviceType::Mobile,
},
RemoteClient {
fxa_device_id: Some("deviceCCCCCC".to_string()),
device_name: "Fenix".into(),
device_type: DeviceType::Mobile,
},
];
let actual_remote_clients = expected_ids
.iter()
.filter_map(|&id| driver.recent_clients.get(id))
.cloned()
.collect::<Vec<RemoteClient>>();
assert_eq!(actual_remote_clients, expected_remote_clients);
let expected = json!([{
"id": "deviceAAAAAA",
"name": "Laptop",
"type": "desktop",
"commands": [{
"command": "displayURI",
"args": ["http://example.com", "Fennec", "Example page"],
"flowID": "flooooooooow",
}, {
"command": "resetEngine",
"args": ["forms"],
}, {
"command": "logout",
"args": [],
}],
"fxaDeviceId": "deviceAAAAAA",
"protocols": ["1.5"],
}, {
"id": "deviceBBBBBB",
"name": "iPhone",
"type": "mobile",
"commands": [{
"command": "resetEngine",
"args": ["history"],
}, {
"command": "wipeEngine",
"args": ["bookmarks"],
}],
"fxaDeviceId": "iPhooooooone",
"protocols": ["1.5"],
"device": "iPhone",
}, {
"id": "deviceCCCCCC",
"name": "Fenix",
"type": "mobile",
"commands": [{
"command": "wipeEngine",
"args": ["bookmarks"],
}, {
"command": "resetEngine",
"args": ["history"],
}],
"fxaDeviceId": "deviceCCCCCC",
}]);
let incoming = outgoing
.into_iter()
.map(|c| OutgoingBso::to_test_incoming(&c))
.collect::<Vec<IncomingBso>>();
if let Value::Array(expected) = expected {
for (incoming_cleartext, exp_client) in zip(incoming, expected) {
let incoming_client: ClientRecord =
incoming_cleartext.into_content().content().unwrap();
assert_eq!(incoming_client, serde_json::from_value(exp_client).unwrap());
}
} else {
unreachable!("`expected_clients` must be an array of client records")
}
}
#[test]
fn test_clients_sync_bad_incoming_record_skipped() {
let processor = TestProcessor {
settings: Settings {
fxa_device_id: "deviceAAAAAA".into(),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
outgoing_commands: [].iter().cloned().collect(),
};
let config = InfoConfiguration::default();
let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
let inbound = inbound_from_clients(json!([{
"id": "deviceBBBBBB",
"name": "iPhone",
"type": "mobile",
"commands": [{
"command": "resetEngine",
"args": ["history"],
}],
"fxaDeviceId": "iPhooooooone",
"protocols": ["1.5"],
"device": "iPhone",
}, {
"id": "garbage",
"garbage": "value",
}, {
"id": "deviceCCCCCC",
"deleted": true,
"name": "Fenix",
"type": "mobile",
"commands": [],
"fxaDeviceId": "deviceCCCCCC",
}]));
driver.sync(inbound, false).expect("Should sync clients");
let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
actual_ids.sort();
assert_eq!(actual_ids, expected_ids);
let expected_remote_clients = &[
RemoteClient {
fxa_device_id: Some("deviceAAAAAA".to_string()),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
RemoteClient {
fxa_device_id: Some("iPhooooooone".to_string()),
device_name: "iPhone".into(),
device_type: DeviceType::Mobile,
},
];
let actual_remote_clients = expected_ids
.iter()
.filter_map(|&id| driver.recent_clients.get(id))
.cloned()
.collect::<Vec<RemoteClient>>();
assert_eq!(actual_remote_clients, expected_remote_clients);
}
#[test]
fn test_clients_sync_explicit_refresh() {
let processor = TestProcessor {
settings: Settings {
fxa_device_id: "deviceAAAAAA".into(),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
outgoing_commands: [].iter().cloned().collect(),
};
let config = InfoConfiguration::default();
let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
let test_clients = json!([{
"id": "deviceBBBBBB",
"name": "iPhone",
"type": "mobile",
"commands": [{
"command": "resetEngine",
"args": ["history"],
}],
"fxaDeviceId": "iPhooooooone",
"protocols": ["1.5"],
"device": "iPhone",
}, {
"id": "deviceAAAAAA",
"name": "Laptop",
"type": "desktop",
"commands": [],
"fxaDeviceId": "deviceAAAAAA",
"protocols": ["1.5"],
}]);
let outgoing = driver
.sync(inbound_from_clients(test_clients.clone()), false)
.expect("Should sync clients");
assert_eq!(outgoing.len(), 0);
let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
actual_ids.sort();
assert_eq!(actual_ids, expected_ids);
let outgoing = driver
.sync(inbound_from_clients(test_clients), true)
.expect("Should sync clients");
assert_eq!(outgoing.len(), 1);
let inbound = inbound_from_clients(json!([{
"id": "deviceAAAAAA",
"name": "Laptop with New Name",
"type": "desktop",
"commands": [],
"fxaDeviceId": "deviceAAAAAA",
"protocols": ["1.5"],
}]));
let outgoing = driver.sync(inbound, false).expect("Should sync clients");
assert_eq!(outgoing.len(), 1);
}
#[test]
fn test_fresh_client_record() {
let processor = TestProcessor {
settings: Settings {
fxa_device_id: "deviceAAAAAA".into(),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
outgoing_commands: HashSet::new(),
};
let config = InfoConfiguration::default();
let mut driver = Driver::new(&processor, &NeverInterrupts, &config);
let clients = json!([{
"id": "deviceBBBBBB",
"name": "iPhone",
"type": "mobile",
"commands": [{
"command": "resetEngine",
"args": ["history"],
}],
"fxaDeviceId": "iPhooooooone",
"protocols": ["1.5"],
"device": "iPhone",
}]);
let inbound = if let Value::Array(clients) = clients {
clients
.into_iter()
.map(IncomingBso::from_test_content)
.collect()
} else {
unreachable!("`clients` must be an array of client records")
};
let mut outgoing = driver.sync(inbound, false).expect("Should sync clients");
outgoing.sort_by(|a, b| a.envelope.id.cmp(&b.envelope.id));
let expected_ids = &["deviceAAAAAA", "deviceBBBBBB"];
let mut actual_ids = driver.recent_clients.keys().collect::<Vec<&String>>();
actual_ids.sort();
assert_eq!(actual_ids, expected_ids);
let expected_remote_clients = &[
RemoteClient {
fxa_device_id: Some("deviceAAAAAA".to_string()),
device_name: "Laptop".into(),
device_type: DeviceType::Desktop,
},
RemoteClient {
fxa_device_id: Some("iPhooooooone".to_string()),
device_name: "iPhone".into(),
device_type: DeviceType::Mobile,
},
];
let actual_remote_clients = expected_ids
.iter()
.filter_map(|&id| driver.recent_clients.get(id))
.cloned()
.collect::<Vec<RemoteClient>>();
assert_eq!(actual_remote_clients, expected_remote_clients);
let expected = json!([{
"id": "deviceAAAAAA",
"name": "Laptop",
"type": "desktop",
"fxaDeviceId": "deviceAAAAAA",
"protocols": ["1.5"],
"ttl": CLIENTS_TTL,
}]);
if let Value::Array(expected) = expected {
let incoming = outgoing
.into_iter()
.map(|c| OutgoingBso::to_test_incoming(&c))
.collect::<Vec<IncomingBso>>();
for (incoming_cleartext, record) in zip(incoming, expected) {
let incoming_client: ClientRecord =
incoming_cleartext.into_content().content().unwrap();
assert_eq!(incoming_client, serde_json::from_value(record).unwrap());
}
} else {
unreachable!("`expected_clients` must be an array of client records")
}
}
}