1use super::AddressPayload;
7use crate::db::addresses::{add_internal_address, update_internal_address};
8use crate::db::models::address::InternalAddress;
9use crate::db::schema::ADDRESS_COMMON_COLS;
10use crate::error::*;
11use crate::sync::address::name_utils::{join_name_parts, split_name, NameParts};
12use crate::sync::common::*;
13use crate::sync::{
14 IncomingBso, IncomingContent, IncomingEnvelope, IncomingKind, IncomingState, LocalRecordInfo,
15 ProcessIncomingRecordImpl, ServerTimestamp, SyncRecord,
16};
17use interrupt_support::Interruptee;
18use rusqlite::{named_params, Transaction};
19use sql_support::ConnExt;
20use sync_guid::Guid as SyncGuid;
21
22fn update_name(payload_content: &mut IncomingContent<AddressPayload>, local_name: String) {
39 let internal_address =
41 if let IncomingKind::Content(internal_address) = &mut payload_content.kind {
42 internal_address
43 } else {
44 return;
45 };
46
47 let entry = &mut internal_address.entry;
48
49 if !entry.name.is_empty()
51 || (entry.given_name.is_empty()
52 && entry.additional_name.is_empty()
53 && entry.family_name.is_empty())
54 {
55 return;
56 }
57
58 let NameParts {
60 given,
61 middle,
62 family,
63 } = split_name(&local_name);
64
65 let is_local_name_matching =
67 entry.given_name == given && entry.additional_name == middle && entry.family_name == family;
68
69 entry.name = if is_local_name_matching {
71 local_name
72 } else {
73 join_name_parts(&NameParts {
74 given: entry.given_name.clone(),
75 middle: entry.additional_name.clone(),
76 family: entry.family_name.clone(),
77 })
78 };
79}
80
81fn create_incoming_bso(id: SyncGuid, raw: String) -> IncomingContent<AddressPayload> {
82 let bso = IncomingBso {
83 envelope: IncomingEnvelope {
84 id,
85 modified: ServerTimestamp::default(),
86 sortindex: None,
87 ttl: None,
88 },
89 payload: raw,
90 };
91 bso.into_content::<AddressPayload>()
92}
93
94fn bso_to_incoming(
95 payload_content: IncomingContent<AddressPayload>,
96) -> Result<IncomingContent<InternalAddress>> {
97 Ok(match payload_content.kind {
98 IncomingKind::Content(content) => IncomingContent {
99 envelope: payload_content.envelope,
100 kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
101 },
102 IncomingKind::Tombstone => IncomingContent {
103 envelope: payload_content.envelope,
104 kind: IncomingKind::Tombstone,
105 },
106 IncomingKind::Malformed => IncomingContent {
107 envelope: payload_content.envelope,
108 kind: IncomingKind::Malformed,
109 },
110 })
111}
112
113fn raw_payload_to_incoming(id: SyncGuid, raw: String) -> Result<IncomingContent<InternalAddress>> {
116 let payload_content = create_incoming_bso(id, raw);
117
118 Ok(match payload_content.kind {
119 IncomingKind::Content(content) => IncomingContent {
120 envelope: payload_content.envelope,
121 kind: IncomingKind::Content(InternalAddress::from_payload(content)?),
122 },
123 IncomingKind::Tombstone => IncomingContent {
124 envelope: payload_content.envelope,
125 kind: IncomingKind::Tombstone,
126 },
127 IncomingKind::Malformed => IncomingContent {
128 envelope: payload_content.envelope,
129 kind: IncomingKind::Malformed,
130 },
131 })
132}
133
134pub(super) struct IncomingAddressesImpl {}
135
136impl ProcessIncomingRecordImpl for IncomingAddressesImpl {
137 type Record = InternalAddress;
138
139 fn stage_incoming(
141 &self,
142 tx: &Transaction<'_>,
143 incoming: Vec<IncomingBso>,
144 signal: &dyn Interruptee,
145 ) -> Result<()> {
146 let to_stage = incoming
147 .into_iter()
148 .map(|bso| (bso.envelope.id, bso.payload, bso.envelope.modified))
150 .collect();
151 common_stage_incoming_records(tx, "addresses_sync_staging", to_stage, signal)
152 }
153
154 fn finish_incoming(&self, tx: &Transaction<'_>) -> Result<()> {
155 common_mirror_staged_records(tx, "addresses_sync_staging", "addresses_mirror")
156 }
157
158 fn fetch_incoming_states(
162 &self,
163 tx: &Transaction<'_>,
164 ) -> Result<Vec<IncomingState<Self::Record>>> {
165 let sql = "
166 SELECT
167 s.guid as guid,
168 l.guid as l_guid,
169 t.guid as t_guid,
170 s.payload as s_payload,
171 m.payload as m_payload,
172 l.name,
173 l.organization,
174 l.street_address,
175 l.address_level3,
176 l.address_level2,
177 l.address_level1,
178 l.postal_code,
179 l.country,
180 l.tel,
181 l.email,
182 l.time_created,
183 l.time_last_used,
184 l.time_last_modified,
185 l.times_used,
186 l.sync_change_counter
187 FROM temp.addresses_sync_staging s
188 LEFT JOIN addresses_mirror m ON s.guid = m.guid
189 LEFT JOIN addresses_data l ON s.guid = l.guid
190 LEFT JOIN addresses_tombstones t ON s.guid = t.guid";
191
192 tx.query_rows_and_then(sql, [], |row| -> Result<IncomingState<Self::Record>> {
193 let guid: SyncGuid = row.get("guid")?;
195
196 let mut payload_content = create_incoming_bso(guid.clone(), row.get("s_payload")?);
202 update_name(
203 &mut payload_content,
204 row.get("name").unwrap_or("".to_string()),
205 );
206 let incoming = bso_to_incoming(payload_content)?;
207
208 Ok(IncomingState {
209 incoming,
210 local: match row.get_unwrap::<_, Option<String>>("l_guid") {
211 Some(l_guid) => {
212 assert_eq!(l_guid, guid);
213 let record = InternalAddress::from_row(row)?;
215 let has_changes = record.metadata().sync_change_counter != 0;
216 if has_changes {
217 LocalRecordInfo::Modified { record }
218 } else {
219 LocalRecordInfo::Unmodified { record }
220 }
221 }
222 None => {
223 match row.get::<_, Option<String>>("t_guid")? {
225 Some(t_guid) => {
226 assert_eq!(guid, t_guid);
227 LocalRecordInfo::Tombstone { guid: guid.clone() }
228 }
229 None => LocalRecordInfo::Missing,
230 }
231 }
232 },
233 mirror: {
234 match row.get::<_, Option<String>>("m_payload")? {
235 Some(m_payload) => {
236 raw_payload_to_incoming(guid, m_payload)?.content()
238 }
239 None => None,
240 }
241 },
242 })
243 })
244 }
245
246 fn get_local_dupe(
250 &self,
251 tx: &Transaction<'_>,
252 incoming: &Self::Record,
253 ) -> Result<Option<Self::Record>> {
254 let sql = format!("
255 SELECT
256 {common_cols},
257 sync_change_counter
258 FROM addresses_data
259 WHERE
260 -- `guid <> :guid` is a pre-condition for this being called, but...
261 guid <> :guid
262 -- only non-synced records are candidates, which means can't already be in the mirror.
263 AND guid NOT IN (
264 SELECT guid
265 FROM addresses_mirror
266 )
267 -- and sql can check the field values.
268 AND name == :name
269 AND organization == :organization
270 AND street_address == :street_address
271 AND address_level3 == :address_level3
272 AND address_level2 == :address_level2
273 AND address_level1 == :address_level1
274 AND postal_code == :postal_code
275 AND country == :country
276 AND tel == :tel
277 AND email == :email", common_cols = ADDRESS_COMMON_COLS);
278
279 let params = named_params! {
280 ":guid": incoming.guid,
281 ":name": incoming.name,
282 ":organization": incoming.organization,
283 ":street_address": incoming.street_address,
284 ":address_level3": incoming.address_level3,
285 ":address_level2": incoming.address_level2,
286 ":address_level1": incoming.address_level1,
287 ":postal_code": incoming.postal_code,
288 ":country": incoming.country,
289 ":tel": incoming.tel,
290 ":email": incoming.email,
291 };
292
293 let result = tx.query_row(&sql, params, |row| {
294 Ok(Self::Record::from_row(row).expect("wtf? '?' doesn't work :("))
295 });
296
297 match result {
298 Ok(r) => Ok(Some(r)),
299 Err(e) => match e {
300 rusqlite::Error::QueryReturnedNoRows => Ok(None),
301 _ => Err(Error::SqlError(e)),
302 },
303 }
304 }
305
306 fn update_local_record(
307 &self,
308 tx: &Transaction<'_>,
309 new_record: Self::Record,
310 flag_as_changed: bool,
311 ) -> Result<()> {
312 update_internal_address(tx, &new_record, flag_as_changed)?;
313 Ok(())
314 }
315
316 fn insert_local_record(&self, tx: &Transaction<'_>, new_record: Self::Record) -> Result<()> {
317 add_internal_address(tx, &new_record)?;
318 Ok(())
319 }
320
321 fn change_record_guid(
325 &self,
326 tx: &Transaction<'_>,
327 old_guid: &SyncGuid,
328 new_guid: &SyncGuid,
329 ) -> Result<()> {
330 common_change_guid(tx, "addresses_data", "addresses_mirror", old_guid, new_guid)
331 }
332
333 fn remove_record(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
334 common_remove_record(tx, "addresses_data", guid)
335 }
336
337 fn remove_tombstone(&self, tx: &Transaction<'_>, guid: &SyncGuid) -> Result<()> {
338 common_remove_record(tx, "addresses_tombstones", guid)
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::super::super::test::new_syncable_mem_db;
345 use super::*;
346 use crate::db::addresses::get_address;
347 use crate::sync::common::tests::*;
348
349 use error_support::info;
350 use interrupt_support::NeverInterrupts;
351 use serde_json::{json, Map, Value};
352 use sql_support::ConnExt;
353
354 impl InternalAddress {
355 fn into_test_incoming_bso(self) -> IncomingBso {
356 IncomingBso::from_test_content(self.into_payload().expect("is json"))
357 }
358 }
359
360 lazy_static::lazy_static! {
361 static ref TEST_JSON_RECORDS: Map<String, Value> = {
362 let val = json! {{
367 "A" : {
368 "id": expand_test_guid('A'),
369 "entry": {
370 "name": "john doe",
371 "given-name": "john",
372 "family-name": "doe",
373 "street-address": "1300 Broadway",
374 "address-level2": "New York, NY",
375 "country": "United States",
376 "version": 1,
377 }
378 },
379 "C" : {
380 "id": expand_test_guid('C'),
381 "entry": {
382 "name": "jane doe",
383 "given-name": "jane",
384 "family-name": "doe",
385 "street-address": "3050 South La Brea Ave",
386 "address-level2": "Los Angeles, CA",
387 "country": "United States",
388 "timeCreated": 0,
389 "timeLastUsed": 0,
390 "timeLastModified": 0,
391 "timesUsed": 0,
392 "version": 1,
393 }
394 },
395 "D" : {
396 "id": expand_test_guid('D'),
397 "entry": {
398 "name": "test1 test2",
399 "given-name": "test1",
400 "family-name": "test2",
401 "street-address": "85 Pike St",
402 "address-level2": "Seattle, WA",
403 "country": "United States",
404 "foo": "bar",
405 "baz": "qux",
406 "version": 1,
407 }
408 }
409 }};
410 val.as_object().expect("literal is an object").clone()
411 };
412 }
413
414 fn test_json_record(guid_prefix: char) -> Value {
415 TEST_JSON_RECORDS
416 .get(&guid_prefix.to_string())
417 .expect("should exist")
418 .clone()
419 }
420
421 fn test_record(guid_prefix: char) -> InternalAddress {
422 let json = test_json_record(guid_prefix);
423 let address_payload = serde_json::from_value(json).unwrap();
424 InternalAddress::from_payload(address_payload).expect("should be valid")
425 }
426
427 #[test]
428 fn test_stage_incoming() -> Result<()> {
429 error_support::init_for_tests();
430 let mut db = new_syncable_mem_db();
431 struct TestCase {
432 incoming_records: Vec<Value>,
433 mirror_records: Vec<Value>,
434 expected_record_count: usize,
435 expected_tombstone_count: usize,
436 }
437
438 let test_cases = vec![
439 TestCase {
440 incoming_records: vec![test_json_record('A')],
441 mirror_records: vec![],
442 expected_record_count: 1,
443 expected_tombstone_count: 0,
444 },
445 TestCase {
446 incoming_records: vec![test_json_tombstone('A')],
447 mirror_records: vec![],
448 expected_record_count: 0,
449 expected_tombstone_count: 1,
450 },
451 TestCase {
452 incoming_records: vec![
453 test_json_record('A'),
454 test_json_record('C'),
455 test_json_tombstone('B'),
456 ],
457 mirror_records: vec![],
458 expected_record_count: 2,
459 expected_tombstone_count: 1,
460 },
461 TestCase {
463 incoming_records: vec![test_json_tombstone('B')],
464 mirror_records: vec![test_json_tombstone('B')],
465 expected_record_count: 0,
466 expected_tombstone_count: 1,
467 },
468 ];
469
470 for tc in test_cases {
471 info!("starting new testcase");
472 let tx = db.transaction()?;
473
474 let mirror_sql = "INSERT OR REPLACE INTO addresses_mirror (guid, payload)
476 VALUES (:guid, :payload)";
477 for payload in tc.mirror_records {
478 tx.execute(
479 mirror_sql,
480 rusqlite::named_params! {
481 ":guid": payload["id"].as_str().unwrap(),
482 ":payload": payload.to_string(),
483 },
484 )
485 .expect("should insert mirror record");
486 }
487
488 let ri = IncomingAddressesImpl {};
489 ri.stage_incoming(
490 &tx,
491 array_to_incoming(tc.incoming_records),
492 &NeverInterrupts,
493 )?;
494
495 let records = tx.conn().query_rows_and_then(
496 "SELECT * FROM temp.addresses_sync_staging;",
497 [],
498 |row| -> Result<IncomingContent<InternalAddress>> {
499 let guid: SyncGuid = row.get_unwrap("guid");
500 let payload: String = row.get_unwrap("payload");
501 raw_payload_to_incoming(guid, payload)
502 },
503 )?;
504
505 let record_count = records
506 .iter()
507 .filter(|p| !matches!(p.kind, IncomingKind::Tombstone))
508 .count();
509 let tombstone_count = records.len() - record_count;
510
511 assert_eq!(record_count, tc.expected_record_count);
512 assert_eq!(tombstone_count, tc.expected_tombstone_count);
513
514 ri.fetch_incoming_states(&tx)?;
515
516 tx.execute("DELETE FROM temp.addresses_sync_staging;", [])?;
517 }
518 Ok(())
519 }
520
521 #[test]
522 fn test_change_record_guid() -> Result<()> {
523 let mut db = new_syncable_mem_db();
524 let tx = db.transaction()?;
525 let ri = IncomingAddressesImpl {};
526
527 ri.insert_local_record(&tx, test_record('C'))?;
528
529 ri.change_record_guid(
530 &tx,
531 &SyncGuid::new(&expand_test_guid('C')),
532 &SyncGuid::new(&expand_test_guid('B')),
533 )?;
534 tx.commit()?;
535 assert!(get_address(&db.writer, &expand_test_guid('C').into()).is_err());
536 assert!(get_address(&db.writer, &expand_test_guid('B').into()).is_ok());
537 Ok(())
538 }
539
540 #[test]
541 fn test_get_incoming() {
542 let mut db = new_syncable_mem_db();
543 let tx = db.transaction().expect("should get tx");
544 let ai = IncomingAddressesImpl {};
545 let record = test_record('C');
546 let bso = record.clone().into_test_incoming_bso();
547 do_test_incoming_same(&ai, &tx, record, bso);
548 }
549
550 #[test]
551 fn test_get_incoming_unknown_fields() {
552 let json = test_json_record('D');
553 let address_payload = serde_json::from_value::<AddressPayload>(json).unwrap();
554 assert_eq!(address_payload.entry.unknown_fields.len(), 2);
556 assert_eq!(
557 address_payload
558 .entry
559 .unknown_fields
560 .get("foo")
561 .unwrap()
562 .as_str()
563 .unwrap(),
564 "bar"
565 );
566 }
567
568 #[test]
569 fn test_incoming_tombstone() {
570 let mut db = new_syncable_mem_db();
571 let tx = db.transaction().expect("should get tx");
572 let ai = IncomingAddressesImpl {};
573 do_test_incoming_tombstone(&ai, &tx, test_record('C'));
574 }
575
576 #[test]
577 fn test_staged_to_mirror() {
578 let mut db = new_syncable_mem_db();
579 let tx = db.transaction().expect("should get tx");
580 let ai = IncomingAddressesImpl {};
581 let record = test_record('C');
582 let bso = record.clone().into_test_incoming_bso();
583 do_test_staged_to_mirror(&ai, &tx, record, bso, "addresses_mirror");
584 }
585}