use super::AddressPayload;
use crate::db::addresses::{add_internal_address, update_internal_address};
use crate::db::models::address::InternalAddress;
use crate::db::schema::ADDRESS_COMMON_COLS;
use crate::error::*;
use crate::sync::address::name_utils::{join_name_parts, split_name, NameParts};
use crate::sync::common::*;
use crate::sync::{
IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, IncomingState, LocalRecordInfo,
ProcessIncomingRecordImpl, ServerTimestamp, SyncRecord,
};
use interrupt_support::Interruptee;
use rusqlite::{named_params, Transaction};
use sql_support::ConnExt;
use sync_guid::Guid as SyncGuid;
fn update_name(payload_content: &mut IncomingContent<AddressPayload>, local_name: String) {
let internal_address =
if let IncomingKind::Content(internal_address) = &mut payload_content.kind {
internal_address
} else {
return;
};
let entry = &mut internal_address.entry;
if !entry.name.is_empty()
|| (entry.given_name.is_empty()
&& entry.additional_name.is_empty()
&& entry.family_name.is_empty())
{
return;
}
let NameParts {
given,
middle,
family,
} = split_name(&local_name);
let is_local_name_matching =
entry.given_name == given && entry.additional_name == middle && entry.family_name == family;
entry.name = if is_local_name_matching {
local_name
} else {
join_name_parts(&NameParts {
given: entry.given_name.clone(),
middle: entry.additional_name.clone(),
family: entry.family_name.clone(),
})
};
}
fn create_incoming_bso(id: SyncGuid, raw: String) -> IncomingContent<AddressPayload> {
let bso = IncomingBso {
envelope: IncomingEnvelope {
id,
modified: ServerTimestamp::default(),
sortindex: None,
ttl: None,
},
payload: raw,
};
bso.into_content::<AddressPayload>()
}
fn bso_to_incoming(
payload_content: IncomingContent<AddressPayload>,
) -> Result<IncomingContent<InternalAddress>> {
Ok(match payload_content.kind {
IncomingKind::Content(content) => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
},
IncomingKind::Tombstone => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Tombstone,
},
IncomingKind::Malformed => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Malformed,
},
})
}
fn raw_payload_to_incoming(id: SyncGuid, raw: String) -> Result<IncomingContent<InternalAddress>> {
let payload_content = create_incoming_bso(id, raw);
Ok(match payload_content.kind {
IncomingKind::Content(content) => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
},
IncomingKind::Tombstone => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Tombstone,
},
IncomingKind::Malformed => IncomingContent {
envelope: payload_content.envelope,
kind: IncomingKind::Malformed,
},
})
}
pub(super) struct IncomingAddressesImpl {}
impl ProcessIncomingRecordImpl for IncomingAddressesImpl {
type Record = InternalAddress;
fn stage_incoming(
&self,
tx: &Transaction<'_>,
incoming: Vec<IncomingBso>,
signal: &dyn Interruptee,
) -> Result<()> {
let to_stage = incoming
.into_iter()
.map(|bso| (bso.envelope.id, bso.payload, bso.envelope.modified))
.collect();
common_stage_incoming_records(tx, "addresses_sync_staging", to_stage, signal)
}
fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()> {
common_mirror_staged_records(tx, "addresses_sync_staging", "addresses_mirror")
}
fn fetch_incoming_states(
&self,
tx: &Transaction<'_>,
) -> Result<Vec<IncomingState<Self::Record>>> {
let sql = "
SELECT
s.guid as guid,
l.guid as l_guid,
t.guid as t_guid,
s.payload as s_payload,
m.payload as m_payload,
l.name,
l.organization,
l.street_address,
l.address_level3,
l.address_level2,
l.address_level1,
l.postal_code,
l.country,
l.tel,
l.email,
l.time_created,
l.time_last_used,
l.time_last_modified,
l.times_used,
l.sync_change_counter
FROM temp.addresses_sync_staging s
LEFT JOIN addresses_mirror m ON s.guid = m.guid
LEFT JOIN addresses_data l ON s.guid = l.guid
LEFT JOIN addresses_tombstones t ON s.guid = t.guid";
tx.query_rows_and_then(sql, [], |row| -> Result<IncomingState<Self::Record>> {
let guid: SyncGuid = row.get("guid")?;
let mut payload_content = create_incoming_bso(guid.clone(), row.get("s_payload")?);
update_name(
&mut payload_content,
row.get("name").unwrap_or("".to_string()),
);
let incoming = bso_to_incoming(payload_content)?;
Ok(IncomingState {
incoming,
local: match row.get_unwrap::<_, Option<String>>("l_guid") {
Some(l_guid) => {
assert_eq!(l_guid, guid);
let record = InternalAddress::from_row(row)?;
let has_changes = record.metadata().sync_change_counter != 0;
if has_changes {
LocalRecordInfo::Modified { record }
} else {
LocalRecordInfo::Unmodified { record }
}
}
None => {
match row.get::<_, Option<String>>("t_guid")? {
Some(t_guid) => {
assert_eq!(guid, t_guid);
LocalRecordInfo::Tombstone { guid: guid.clone() }
}
None => LocalRecordInfo::Missing,
}
}
},
mirror: {
match row.get::<_, Option<String>>("m_payload")? {
Some(m_payload) => {
raw_payload_to_incoming(guid, m_payload)?.content()
}
None => None,
}
},
})
})
}
fn get_local_dupe(
&self,
tx: &Transaction<'_>,
incoming: &Self::Record,
) -> Result<Option<Self::Record>> {
let sql = format!("
SELECT
{common_cols},
sync_change_counter
FROM addresses_data
WHERE
-- `guid <> :guid` is a pre-condition for this being called, but...
guid <> :guid
-- only non-synced records are candidates, which means can't already be in the mirror.
AND guid NOT IN (
SELECT guid
FROM addresses_mirror
)
-- and sql can check the field values.
AND name == :name
AND organization == :organization
AND street_address == :street_address
AND address_level3 == :address_level3
AND address_level2 == :address_level2
AND address_level1 == :address_level1
AND postal_code == :postal_code
AND country == :country
AND tel == :tel
AND email == :email", common_cols = ADDRESS_COMMON_COLS);
let params = named_params! {
":guid": incoming.guid,
":name": incoming.name,
":organization": incoming.organization,
":street_address": incoming.street_address,
":address_level3": incoming.address_level3,
":address_level2": incoming.address_level2,
":address_level1": incoming.address_level1,
":postal_code": incoming.postal_code,
":country": incoming.country,
":tel": incoming.tel,
":email": incoming.email,
};
let result = tx.query_row(&sql, params, |row| {
Ok(Self::Record::from_row(row).expect("wtf? '?' doesn't work :("))
});
match result {
Ok(r) => Ok(Some(r)),
Err(e) => match e {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
_ => Err(Error::SqlError(e)),
},
}
}
fn update_local_record(
&self,
tx: &Transaction<'_>,
new_record: Self::Record,
flag_as_changed: bool,
) -> Result<()> {
update_internal_address(tx, &new_record, flag_as_changed)?;
Ok(())
}
fn insert_local_record(&self, tx: &Transaction<'_>, new_record: Self::Record) -> Result<()> {
add_internal_address(tx, &new_record)?;
Ok(())
}
fn change_record_guid(
&self,
tx: &Transaction<'_>,
old_guid: &SyncGuid,
new_guid: &SyncGuid,
) -> Result<()> {
common_change_guid(tx, "addresses_data", "addresses_mirror", old_guid, new_guid)
}
fn remove_record(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
common_remove_record(tx, "addresses_data", guid)
}
fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
common_remove_record(tx, "addresses_tombstones", guid)
}
}
#[cfg(test)]
mod tests {
use super::super::super::test::new_syncable_mem_db;
use super::*;
use crate::db::addresses::get_address;
use crate::sync::common::tests::*;
use interrupt_support::NeverInterrupts;
use serde_json::{json, Map, Value};
use sql_support::ConnExt;
impl InternalAddress {
fn into_test_incoming_bso(self) -> IncomingBso {
IncomingBso::from_test_content(self.into_payload().expect("is json"))
}
}
lazy_static::lazy_static! {
static ref TEST_JSON_RECORDS: Map<String, Value> = {
let val = json! {{
"A" : {
"id": expand_test_guid('A'),
"entry": {
"name": "john doe",
"given-name": "john",
"family-name": "doe",
"street-address": "1300 Broadway",
"address-level2": "New York, NY",
"country": "United States",
"version": 1,
}
},
"C" : {
"id": expand_test_guid('C'),
"entry": {
"name": "jane doe",
"given-name": "jane",
"family-name": "doe",
"street-address": "3050 South La Brea Ave",
"address-level2": "Los Angeles, CA",
"country": "United States",
"timeCreated": 0,
"timeLastUsed": 0,
"timeLastModified": 0,
"timesUsed": 0,
"version": 1,
}
},
"D" : {
"id": expand_test_guid('D'),
"entry": {
"name": "test1 test2",
"given-name": "test1",
"family-name": "test2",
"street-address": "85 Pike St",
"address-level2": "Seattle, WA",
"country": "United States",
"foo": "bar",
"baz": "qux",
"version": 1,
}
}
}};
val.as_object().expect("literal is an object").clone()
};
}
fn test_json_record(guid_prefix: char) -> Value {
TEST_JSON_RECORDS
.get(&guid_prefix.to_string())
.expect("should exist")
.clone()
}
fn test_record(guid_prefix: char) -> InternalAddress {
let json = test_json_record(guid_prefix);
let address_payload = serde_json::from_value(json).unwrap();
InternalAddress::from_payload(address_payload).expect("should be valid")
}
#[test]
fn test_stage_incoming() -> Result<()> {
let _ = env_logger::try_init();
let mut db = new_syncable_mem_db();
struct TestCase {
incoming_records: Vec<Value>,
mirror_records: Vec<Value>,
expected_record_count: usize,
expected_tombstone_count: usize,
}
let test_cases = vec![
TestCase {
incoming_records: vec![test_json_record('A')],
mirror_records: vec![],
expected_record_count: 1,
expected_tombstone_count: 0,
},
TestCase {
incoming_records: vec![test_json_tombstone('A')],
mirror_records: vec![],
expected_record_count: 0,
expected_tombstone_count: 1,
},
TestCase {
incoming_records: vec![
test_json_record('A'),
test_json_record('C'),
test_json_tombstone('B'),
],
mirror_records: vec![],
expected_record_count: 2,
expected_tombstone_count: 1,
},
TestCase {
incoming_records: vec![test_json_tombstone('B')],
mirror_records: vec![test_json_tombstone('B')],
expected_record_count: 0,
expected_tombstone_count: 1,
},
];
for tc in test_cases {
log::info!("starting new testcase");
let tx = db.transaction()?;
let mirror_sql = "INSERT OR REPLACE INTO addresses_mirror (guid, payload)
VALUES (:guid, :payload)";
for payload in tc.mirror_records {
tx.execute(
mirror_sql,
rusqlite::named_params! {
":guid": payload["id"].as_str().unwrap(),
":payload": payload.to_string(),
},
)
.expect("should insert mirror record");
}
let ri = IncomingAddressesImpl {};
ri.stage_incoming(
&tx,
array_to_incoming(tc.incoming_records),
&NeverInterrupts,
)?;
let records = tx.conn().query_rows_and_then(
"SELECT * FROM temp.addresses_sync_staging;",
[],
|row| -> Result<IncomingContent<InternalAddress>> {
let guid: SyncGuid = row.get_unwrap("guid");
let payload: String = row.get_unwrap("payload");
raw_payload_to_incoming(guid, payload)
},
)?;
let record_count = records
.iter()
.filter(|p| !matches!(p.kind, IncomingKind::Tombstone))
.count();
let tombstone_count = records.len() - record_count;
assert_eq!(record_count, tc.expected_record_count);
assert_eq!(tombstone_count, tc.expected_tombstone_count);
ri.fetch_incoming_states(&tx)?;
tx.execute("DELETE FROM temp.addresses_sync_staging;", [])?;
}
Ok(())
}
#[test]
fn test_change_record_guid() -> Result<()> {
let mut db = new_syncable_mem_db();
let tx = db.transaction()?;
let ri = IncomingAddressesImpl {};
ri.insert_local_record(&tx, test_record('C'))?;
ri.change_record_guid(
&tx,
&SyncGuid::new(&expand_test_guid('C')),
&SyncGuid::new(&expand_test_guid('B')),
)?;
tx.commit()?;
assert!(get_address(&db.writer, &expand_test_guid('C').into()).is_err());
assert!(get_address(&db.writer, &expand_test_guid('B').into()).is_ok());
Ok(())
}
#[test]
fn test_get_incoming() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = IncomingAddressesImpl {};
let record = test_record('C');
let bso = record.clone().into_test_incoming_bso();
do_test_incoming_same(&ai, &tx, record, bso);
}
#[test]
fn test_get_incoming_unknown_fields() {
let json = test_json_record('D');
let address_payload = serde_json::from_value::<AddressPayload>(json).unwrap();
assert_eq!(address_payload.entry.unknown_fields.len(), 2);
assert_eq!(
address_payload
.entry
.unknown_fields
.get("foo")
.unwrap()
.as_str()
.unwrap(),
"bar"
);
}
#[test]
fn test_incoming_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = IncomingAddressesImpl {};
do_test_incoming_tombstone(&ai, &tx, test_record('C'));
}
#[test]
fn test_staged_to_mirror() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ai = IncomingAddressesImpl {};
let record = test_record('C');
let bso = record.clone().into_test_incoming_bso();
do_test_staged_to_mirror(&ai, &tx, record, bso, "addresses_mirror");
}
}