use crate::db::models::address::InternalAddress;
use crate::db::schema::ADDRESS_COMMON_COLS;
use crate::error::*;
use crate::sync::{address::AddressPayload, common::*};
use crate::sync::{OutgoingBso, ProcessOutgoingRecordImpl};
use rusqlite::{Row, Transaction};
use sync_guid::Guid as SyncGuid;
const DATA_TABLE_NAME: &str = "addresses_data";
const MIRROR_TABLE_NAME: &str = "addresses_mirror";
const STAGING_TABLE_NAME: &str = "addresses_sync_outgoing_staging";
pub(super) struct OutgoingAddressesImpl {}
impl ProcessOutgoingRecordImpl for OutgoingAddressesImpl {
type Record = InternalAddress;
fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>> {
let data_sql = format!(
"SELECT
l.{common_cols},
m.payload,
l.sync_change_counter
FROM addresses_data l
LEFT JOIN addresses_mirror m
ON l.guid = m.guid
WHERE sync_change_counter > 0
OR l.guid NOT IN (
SELECT m.guid
FROM addresses_mirror m
)",
common_cols = ADDRESS_COMMON_COLS,
);
let record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)> = &|row| {
let mut record = InternalAddress::from_row(row)?.into_payload()?;
if let Some(s) = row.get::<_, Option<String>>("payload")? {
let mirror_payload: AddressPayload = serde_json::from_str(&s)?;
record.entry.unknown_fields = mirror_payload.entry.unknown_fields;
};
Ok((
OutgoingBso::from_content_with_id(record)?,
row.get::<_, i64>("sync_change_counter")?,
))
};
let tombstones_sql = "SELECT guid FROM addresses_tombstones";
let staging_records = common_get_outgoing_staging_records(
tx,
&data_sql,
tombstones_sql,
record_from_data_row,
)?
.into_iter()
.map(|(bso, change_counter)| (bso.envelope.id, bso.payload, change_counter))
.collect::<Vec<_>>();
common_save_outgoing_records(tx, STAGING_TABLE_NAME, staging_records)?;
Ok(
common_get_outgoing_records(tx, &data_sql, tombstones_sql, record_from_data_row)?
.into_iter()
.map(|(bso, _change_counter)| bso)
.collect::<Vec<OutgoingBso>>(),
)
}
fn finish_synced_items(
&self,
tx: &Transaction<'_>,
records_synced: Vec<SyncGuid>,
) -> anyhow::Result<()> {
common_finish_synced_items(
tx,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
records_synced,
)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::{addresses::add_internal_address, models::address::InternalAddress};
use crate::sync::{common::tests::*, test::new_syncable_mem_db, UnknownFields};
use rusqlite::Connection;
use serde_json::{json, Map, Value};
use types::Timestamp;
fn test_insert_mirror_record(
conn: &Connection,
address: InternalAddress,
unknown_fields: UnknownFields,
) {
let guid = address.guid.clone();
let mut addr_payload = address.into_payload().unwrap();
addr_payload.entry.unknown_fields = unknown_fields;
let payload = serde_json::to_string(&addr_payload).expect("is json");
conn.execute(
"INSERT OR IGNORE INTO addresses_mirror (guid, payload)
VALUES (:guid, :payload)",
rusqlite::named_params! {
":guid": guid,
":payload": &payload,
},
)
.expect("should insert");
}
lazy_static::lazy_static! {
static ref TEST_JSON_RECORDS: Map<String, Value> = {
let val = json! {{
"C" : {
"id": expand_test_guid('C'),
"entry": {
"name": "jane doe",
"streetAddress": "3050 South La Brea Ave",
"addressLevel2": "Los Angeles, CA",
"country": "United States",
"timeCreated": 0,
"timeLastUsed": 0,
"timeLastModified": 0,
"timesUsed": 0,
"version": 1,
}
},
"D" : {
"id": expand_test_guid('D'),
"entry": {
"name": "john doe",
"street-address": "85 Pike St",
"address-level2": "Seattle, WA",
"country": "United States",
"timeCreated": 0,
"timeLastUsed": 0,
"timeLastModified": 0,
"timesUsed": 0,
"version": 1,
"foo": "bar",
"baz": "qux",
}
}
}};
val.as_object().expect("literal is an object").clone()
};
}
fn test_json_record(guid_prefix: char) -> Value {
TEST_JSON_RECORDS
.get(&guid_prefix.to_string())
.expect("should exist")
.clone()
}
fn test_record(guid_prefix: char) -> InternalAddress {
let json = test_json_record(guid_prefix);
let payload = serde_json::from_value(json).unwrap();
InternalAddress::from_payload(payload).expect("should be valid")
}
#[test]
fn test_outgoing_never_synced() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let test_record = test_record('C');
assert!(add_internal_address(&tx, &test_record).is_ok());
do_test_outgoing_never_synced(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
);
}
#[test]
fn test_outgoing_tombstone() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let test_record = test_record('C');
assert!(tx
.execute(
"INSERT INTO addresses_tombstones (
guid,
time_deleted
) VALUES (
:guid,
:time_deleted
)",
rusqlite::named_params! {
":guid": test_record.guid,
":time_deleted": Timestamp::now(),
},
)
.is_ok());
do_test_outgoing_tombstone(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
);
}
#[test]
fn test_outgoing_synced_with_local_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let mut test_record = test_record('C');
let initial_change_counter_val = 2;
test_record.metadata.sync_change_counter = initial_change_counter_val;
assert!(add_internal_address(&tx, &test_record).is_ok());
test_insert_mirror_record(&tx, test_record.clone(), Default::default());
exists_with_counter_value_in_table(
&tx,
DATA_TABLE_NAME,
&test_record.guid,
initial_change_counter_val,
);
do_test_outgoing_synced_with_local_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
);
}
#[test]
fn test_outgoing_synced_with_no_change() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let test_record = test_record('C');
assert!(add_internal_address(&tx, &test_record).is_ok());
test_insert_mirror_record(&tx, test_record.clone(), Default::default());
do_test_outgoing_synced_with_no_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
STAGING_TABLE_NAME,
);
}
#[test]
fn test_outgoing_roundtrip_unknown() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let mut test_record = test_record('D');
let initial_change_counter_val = 2;
test_record.metadata.sync_change_counter = initial_change_counter_val;
assert!(add_internal_address(&tx, &test_record).is_ok());
let unknown_fields: UnknownFields =
serde_json::from_value(json! {{ "foo": "bar", "baz": "qux"}}).unwrap();
test_insert_mirror_record(&tx, test_record.clone(), unknown_fields);
exists_with_counter_value_in_table(
&tx,
DATA_TABLE_NAME,
&test_record.guid,
initial_change_counter_val,
);
let outgoing = &ao.fetch_outgoing_records(&tx).unwrap();
let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
let entry = bso_payload.get("entry").unwrap();
assert_eq!(entry.get("foo").unwrap(), "bar");
assert_eq!(entry.get("baz").unwrap(), "qux");
do_test_outgoing_synced_with_local_change(
&tx,
&ao,
&test_record.guid,
DATA_TABLE_NAME,
MIRROR_TABLE_NAME,
STAGING_TABLE_NAME,
);
}
#[test]
fn test_outgoing_with_migrated_fields() {
let mut db = new_syncable_mem_db();
let tx = db.transaction().expect("should get tx");
let ao = OutgoingAddressesImpl {};
let mut test_record = test_record('C');
let initial_change_counter_val = 2;
test_record.metadata.sync_change_counter = initial_change_counter_val;
assert!(add_internal_address(&tx, &test_record).is_ok());
let outgoing = ao.fetch_outgoing_records(&tx).unwrap();
let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
let entry = bso_payload.get("entry").unwrap();
assert_eq!(entry.get("name").unwrap(), "jane doe");
assert_eq!(entry.get("given-name").unwrap(), "jane");
assert_eq!(entry.get("additional-name").unwrap(), "");
assert_eq!(entry.get("family-name").unwrap(), "doe");
}
}