1use interrupt_support::Interruptee;
9use rusqlite::{Connection, Row, Transaction};
10use sql_support::ConnExt;
11use sync15::bso::{IncomingContent, IncomingKind};
12use sync_guid::Guid as SyncGuid;
13
14use crate::api::{StorageChanges, StorageValueChange};
15use crate::error::*;
16
17use super::{merge, remove_matching_keys, JsonMap, WebextRecord};
18
19#[derive(Debug, PartialEq, Eq)]
22pub enum DataState {
23 Deleted,
25 Exists(JsonMap),
27}
28
29fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
32 let mut result = StorageChanges::with_capacity(new.len());
33 for (key, val) in new.iter() {
34 result.push(StorageValueChange {
35 key: key.clone(),
36 old_value: None,
37 new_value: Some(val.clone()),
38 });
39 }
40 result
41}
42
43fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
47 let s = row.get::<_, Option<String>>(col)?;
48 Ok(match s {
49 None => DataState::Deleted,
50 Some(s) => match serde_json::from_str(&s) {
51 Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
52 _ => {
53 DataState::Deleted
60 }
61 },
62 })
63}
64
65pub fn stage_incoming(
68 tx: &Transaction<'_>,
69 incoming_records: &[IncomingContent<WebextRecord>],
70 signal: &dyn Interruptee,
71) -> Result<()> {
72 sql_support::each_sized_chunk(
73 incoming_records,
74 sql_support::default_max_variable_number() / 3,
76 |chunk, _| -> Result<()> {
77 let mut params = Vec::with_capacity(chunk.len() * 3);
78 for record in chunk {
79 signal.err_if_interrupted()?;
80 match &record.kind {
81 IncomingKind::Content(r) => {
82 params.push(Some(record.envelope.id.to_string()));
83 params.push(Some(r.ext_id.to_string()));
84 params.push(Some(r.data.clone()));
85 }
86 IncomingKind::Tombstone => {
87 params.push(Some(record.envelope.id.to_string()));
88 params.push(None);
89 params.push(None);
90 }
91 IncomingKind::Malformed => {
92 error!("Ignoring incoming malformed record: {}", record.envelope.id);
93 }
94 }
95 }
96 let actual_len = params.len() / 3;
98 if actual_len != 0 {
99 let sql = format!(
100 "INSERT OR REPLACE INTO temp.storage_sync_staging
101 (guid, ext_id, data)
102 VALUES {}",
103 sql_support::repeat_multi_values(actual_len, 3)
104 );
105 tx.execute(&sql, rusqlite::params_from_iter(params))?;
106 }
107 Ok(())
108 },
109 )?;
110 Ok(())
111}
112
113#[derive(Debug, PartialEq, Eq)]
117pub enum IncomingState {
118 IncomingOnlyData { ext_id: String, data: JsonMap },
122
123 IncomingOnlyTombstone,
127
128 HasLocal {
136 ext_id: String,
137 incoming: DataState,
138 local: DataState,
139 },
140 NotLocal {
144 ext_id: String,
145 incoming: DataState,
146 mirror: DataState,
147 },
148 Everywhere {
151 ext_id: String,
152 incoming: DataState,
153 mirror: DataState,
154 local: DataState,
155 },
156}
157
158pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
161 let sql = "
162 SELECT
163 s.guid as guid,
164 l.ext_id as l_ext_id,
165 m.ext_id as m_ext_id,
166 s.ext_id as s_ext_id,
167 s.data as s_data, m.data as m_data, l.data as l_data,
168 l.sync_change_counter
169 FROM temp.storage_sync_staging s
170 LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
171 LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
172
173 fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
174 let guid = row.get("guid")?;
175 let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
178 let local_ext_id: Option<String> = row.get("l_ext_id")?;
179 let staged_ext_id: Option<String> = row.get("s_ext_id")?;
180 let incoming = json_map_from_row(row, "s_data")?;
181
182 let state = match (local_ext_id, mirror_ext_id) {
185 (None, None) => {
186 match staged_ext_id {
187 Some(ext_id) => {
188 let data = match incoming {
189 DataState::Deleted => JsonMap::new(),
193 DataState::Exists(data) => data,
194 };
195 IncomingState::IncomingOnlyData { ext_id, data }
196 }
197 None => IncomingState::IncomingOnlyTombstone,
198 }
199 }
200 (Some(ext_id), None) => IncomingState::HasLocal {
201 ext_id,
202 incoming,
203 local: json_map_from_row(row, "l_data")?,
204 },
205 (None, Some(ext_id)) => IncomingState::NotLocal {
206 ext_id,
207 incoming,
208 mirror: json_map_from_row(row, "m_data")?,
209 },
210 (Some(ext_id), Some(_)) => IncomingState::Everywhere {
211 ext_id,
212 incoming,
213 mirror: json_map_from_row(row, "m_data")?,
214 local: json_map_from_row(row, "l_data")?,
215 },
216 };
217 Ok((guid, state))
218 }
219
220 conn.conn().query_rows_and_then(sql, [], from_row)
221}
222
223#[derive(Debug, PartialEq, Eq)]
227pub enum IncomingAction {
228 DeleteLocally {
230 ext_id: String,
231 changes: StorageChanges,
232 },
233 TakeRemote {
235 ext_id: String,
236 data: JsonMap,
237 changes: StorageChanges,
238 },
239 Merge {
241 ext_id: String,
242 data: JsonMap,
243 changes: StorageChanges,
244 },
245 Same { ext_id: String },
247 Nothing,
249}
250
251pub fn plan_incoming(s: IncomingState) -> IncomingAction {
253 match s {
254 IncomingState::Everywhere {
255 ext_id,
256 incoming,
257 local,
258 mirror,
259 } => {
260 match (incoming, local, mirror) {
262 (
263 DataState::Exists(incoming_data),
264 DataState::Exists(local_data),
265 DataState::Exists(mirror_data),
266 ) => {
267 merge(ext_id, incoming_data, local_data, Some(mirror_data))
269 }
270 (
271 DataState::Exists(incoming_data),
272 DataState::Exists(local_data),
273 DataState::Deleted,
274 ) => {
275 merge(ext_id, incoming_data, local_data, None)
277 }
278 (DataState::Exists(incoming_data), DataState::Deleted, _) => {
279 IncomingAction::TakeRemote {
281 ext_id,
282 changes: changes_for_new_incoming(&incoming_data),
283 data: incoming_data,
284 }
285 }
286 (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
287 let (result, changes) = remove_matching_keys(local_data, &mirror);
291 if result.is_empty() {
292 IncomingAction::DeleteLocally { ext_id, changes }
295 } else {
296 IncomingAction::Merge {
297 ext_id,
298 data: result,
299 changes,
300 }
301 }
302 }
303 (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
304 IncomingAction::Merge {
311 ext_id,
312 data: local_data,
313 changes: StorageChanges::new(),
314 }
315 }
316 (DataState::Deleted, DataState::Deleted, _) => {
317 IncomingAction::Same { ext_id }
320 }
321 }
322 }
323 IncomingState::HasLocal {
324 ext_id,
325 incoming,
326 local,
327 } => {
328 match (incoming, local) {
332 (DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
333 merge(ext_id, incoming_data, local_data, None)
337 }
338 (DataState::Deleted, DataState::Exists(local_data)) => {
339 IncomingAction::Merge {
344 ext_id,
345 data: local_data,
346 changes: StorageChanges::new(),
347 }
348 }
349 (DataState::Exists(incoming_data), DataState::Deleted) => {
350 IncomingAction::TakeRemote {
352 ext_id,
353 changes: changes_for_new_incoming(&incoming_data),
354 data: incoming_data,
355 }
356 }
357 (DataState::Deleted, DataState::Deleted) => {
358 IncomingAction::Same { ext_id }
360 }
361 }
362 }
363 IncomingState::NotLocal {
364 ext_id, incoming, ..
365 } => {
366 match incoming {
370 DataState::Exists(data) => IncomingAction::TakeRemote {
371 ext_id,
372 changes: changes_for_new_incoming(&data),
373 data,
374 },
375 DataState::Deleted => IncomingAction::Same { ext_id },
376 }
377 }
378 IncomingState::IncomingOnlyData { ext_id, data } => {
379 IncomingAction::TakeRemote {
383 ext_id,
384 changes: changes_for_new_incoming(&data),
385 data,
386 }
387 }
388 IncomingState::IncomingOnlyTombstone => {
389 IncomingAction::Nothing
391 }
392 }
393}
394
395fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
396 tx.execute_cached(
397 "INSERT INTO temp.storage_sync_applied (ext_id, changes)
398 VALUES (:ext_id, :changes)",
399 rusqlite::named_params! {
400 ":ext_id": ext_id,
401 ":changes": &serde_json::to_string(&changes)?,
402 },
403 )?;
404 Ok(())
405}
406
407pub fn apply_actions(
409 tx: &Transaction<'_>,
410 actions: Vec<(SyncGuid, IncomingAction)>,
411 signal: &dyn Interruptee,
412) -> Result<()> {
413 for (item, action) in actions {
414 signal.err_if_interrupted()?;
415
416 trace!("action for '{:?}': {:?}", item, action);
417 match action {
418 IncomingAction::DeleteLocally { ext_id, changes } => {
419 tx.execute_cached(
421 "DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
422 &[(":ext_id", &ext_id)],
423 )?;
424 insert_changes(tx, &ext_id, &changes)?;
425 }
426 IncomingAction::TakeRemote {
428 ext_id,
429 data,
430 changes,
431 } => {
432 tx.execute_cached(
433 "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
434 VALUES (:ext_id, :data, 0)",
435 rusqlite::named_params! {
436 ":ext_id": ext_id,
437 ":data": serde_json::Value::Object(data),
438 },
439 )?;
440 insert_changes(tx, &ext_id, &changes)?;
441 }
442
443 IncomingAction::Merge {
446 ext_id,
447 data,
448 changes,
449 } => {
450 tx.execute_cached(
451 "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
452 rusqlite::named_params! {
453 ":ext_id": ext_id,
454 ":data": serde_json::Value::Object(data),
455 },
456 )?;
457 insert_changes(tx, &ext_id, &changes)?;
458 }
459
460 IncomingAction::Same { ext_id } => {
463 tx.execute_cached(
464 "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
465 &[(":ext_id", &ext_id)],
466 )?;
467 }
469 IncomingAction::Nothing => {}
471 }
472 }
473 Ok(())
474}
475
476#[cfg(test)]
477mod tests {
478 use super::super::test::new_syncable_mem_db;
479 use super::*;
480 use crate::api;
481 use interrupt_support::NeverInterrupts;
482 use serde_json::{json, Value};
483 use sync15::bso::IncomingBso;
484
485 fn ssi(conn: &Connection, stmt: &str) -> u32 {
487 conn.try_query_one(stmt, [], true)
488 .expect("must work")
489 .unwrap_or_default()
490 }
491
492 fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> {
493 let jv = array.as_array_mut().expect("you must pass a json array");
494 let mut result = Vec::with_capacity(jv.len());
495 for elt in jv {
496 result.push(IncomingBso::from_test_content(elt.take()).into_content());
497 }
498 result
499 }
500
501 macro_rules! map {
503 ($($map:tt)+) => {
504 json!($($map)+).as_object().unwrap().clone()
505 };
506 }
507 macro_rules! change {
508 ($key:literal, None, None) => {
509 StorageValueChange {
510 key: $key.to_string(),
511 old_value: None,
512 new_value: None,
513 };
514 };
515 ($key:literal, $old:tt, None) => {
516 StorageValueChange {
517 key: $key.to_string(),
518 old_value: Some(json!($old)),
519 new_value: None,
520 }
521 };
522 ($key:literal, None, $new:tt) => {
523 StorageValueChange {
524 key: $key.to_string(),
525 old_value: None,
526 new_value: Some(json!($new)),
527 };
528 };
529 ($key:literal, $old:tt, $new:tt) => {
530 StorageValueChange {
531 key: $key.to_string(),
532 old_value: Some(json!($old)),
533 new_value: Some(json!($new)),
534 }
535 };
536 }
537 macro_rules! changes {
538 ( $( $change:expr ),* ) => {
539 {
540 let mut changes = StorageChanges::new();
541 $(
542 changes.push($change);
543 )*
544 changes
545 }
546 };
547 }
548
549 #[test]
550 fn test_incoming_populates_staging() -> Result<()> {
551 let db = new_syncable_mem_db();
552 let conn = db.get_connection()?;
553 let tx = conn.unchecked_transaction()?;
554
555 let incoming = json! {[
556 {
557 "id": "guidAAAAAAAA",
558 "extId": "ext1@example.com",
559 "data": json!({"foo": "bar"}).to_string(),
560 }
561 ]};
562
563 stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?;
564 assert_eq!(
566 ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
567 1
568 );
569 Ok(())
570 }
571
572 #[test]
573 fn test_fetch_incoming_state() -> Result<()> {
574 let db = new_syncable_mem_db();
575 let conn = db.get_connection()?;
576 let tx = conn.unchecked_transaction()?;
577
578 tx.execute(
580 r#"
581 INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
582 VALUES ('guid', 'ext_id', '{"foo":"bar"}')
583 "#,
584 [],
585 )?;
586
587 let incoming = get_incoming(&tx)?;
588 assert_eq!(incoming.len(), 1);
589 assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
590 assert_eq!(
591 incoming[0].1,
592 IncomingState::IncomingOnlyData {
593 ext_id: "ext_id".to_string(),
594 data: map!({"foo": "bar"}),
595 }
596 );
597
598 tx.execute(
600 r#"
601 INSERT INTO storage_sync_mirror (guid, ext_id, data)
602 VALUES ('guid', 'ext_id', '{"foo":"new"}')
603 "#,
604 [],
605 )?;
606 let incoming = get_incoming(&tx)?;
607 assert_eq!(incoming.len(), 1);
608 assert_eq!(
609 incoming[0].1,
610 IncomingState::NotLocal {
611 ext_id: "ext_id".to_string(),
612 incoming: DataState::Exists(map!({"foo": "bar"})),
613 mirror: DataState::Exists(map!({"foo": "new"})),
614 }
615 );
616
617 api::set(&tx, "ext_id", json!({"foo": "local"}))?;
619 let incoming = get_incoming(&tx)?;
620 assert_eq!(incoming.len(), 1);
621 assert_eq!(
622 incoming[0].1,
623 IncomingState::Everywhere {
624 ext_id: "ext_id".to_string(),
625 incoming: DataState::Exists(map!({"foo": "bar"})),
626 local: DataState::Exists(map!({"foo": "local"})),
627 mirror: DataState::Exists(map!({"foo": "new"})),
628 }
629 );
630 Ok(())
631 }
632
633 #[test]
635 fn test_fetch_incoming_state_nulls() -> Result<()> {
636 let db = new_syncable_mem_db();
637 let conn = db.get_connection()?;
638 let tx = conn.unchecked_transaction()?;
639
640 tx.execute(
642 r#"
643 INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
644 VALUES ('guid', NULL, NULL)
645 "#,
646 [],
647 )?;
648
649 let incoming = get_incoming(&tx)?;
650 assert_eq!(incoming.len(), 1);
651 assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
652
653 tx.execute(
656 r#"
657 INSERT INTO storage_sync_mirror (guid, ext_id, data)
658 VALUES ('guid', NULL, NULL)
659 "#,
660 [],
661 )?;
662 let incoming = get_incoming(&tx)?;
663 assert_eq!(incoming.len(), 1);
664 assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
665
666 tx.execute(
667 r#"
668 INSERT INTO storage_sync_data (ext_id, data)
669 VALUES ('ext_id', NULL)
670 "#,
671 [],
672 )?;
673 let incoming = get_incoming(&tx)?;
674 assert_eq!(incoming.len(), 1);
675 assert_eq!(
676 incoming[0].1,
677 IncomingState::IncomingOnlyTombstone
681 );
682 Ok(())
683 }
684
685 #[derive(Debug, PartialEq)]
687 struct LocalItem {
688 data: DataState,
689 sync_change_counter: i32,
690 }
691
692 fn get_local_item(conn: &Connection) -> Option<LocalItem> {
693 conn.try_query_row::<_, Error, _, _>(
694 "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
695 [],
696 |row| {
697 let data = json_map_from_row(row, "data")?;
698 let sync_change_counter = row.get::<_, i32>(1)?;
699 Ok(LocalItem {
700 data,
701 sync_change_counter,
702 })
703 },
704 true,
705 )
706 .expect("query should work")
707 }
708
709 fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
710 conn.try_query_row::<_, Error, _, _>(
713 "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
714 [],
715 |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
716 true,
717 )
718 .expect("query should work")
719 .map(|val: serde_json::Value| {
720 let ob = val.as_object().expect("should be an object of items");
721 let mut result = StorageChanges::with_capacity(ob.len());
722 for (key, val) in ob.into_iter() {
723 let details = val.as_object().expect("elts should be objects");
724 result.push(StorageValueChange {
725 key: key.to_string(),
726 old_value: details.get("oldValue").cloned(),
727 new_value: details.get("newValue").cloned(),
728 });
729 }
730 result
731 })
732 }
733
734 fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
735 let guid = SyncGuid::new("guid");
736 apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
737 }
738
739 #[test]
740 fn test_apply_actions() -> Result<()> {
741 let db = new_syncable_mem_db();
742 let conn = db.get_connection().expect("connection should be retrieved");
743
744 let tx = conn
746 .unchecked_transaction()
747 .expect("transaction should begin");
748 api::set(&tx, "ext_id", json!({"foo": "local"}))?;
749 assert_eq!(
750 api::get(&tx, "ext_id", json!(null))?,
751 json!({"foo": "local"})
752 );
753 let changes = changes![change!("foo", "local", None)];
754 do_apply_action(
755 &tx,
756 IncomingAction::DeleteLocally {
757 ext_id: "ext_id".to_string(),
758 changes: changes.clone(),
759 },
760 );
761 assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
762 assert!(get_local_item(&tx).is_none());
764 assert_eq!(get_applied_item_changes(&tx), Some(changes));
765 tx.rollback()?;
766
767 let tx = conn
769 .unchecked_transaction()
770 .expect("transaction should begin");
771 api::set(&tx, "ext_id", json!({"foo": "local"}))?;
772 assert_eq!(
773 api::get(&tx, "ext_id", json!(null))?,
774 json!({"foo": "local"})
775 );
776 assert_eq!(
778 get_local_item(&tx),
779 Some(LocalItem {
780 data: DataState::Exists(map!({"foo": "local"})),
781 sync_change_counter: 1
782 })
783 );
784 let changes = changes![change!("foo", "local", "remote")];
785 do_apply_action(
786 &tx,
787 IncomingAction::TakeRemote {
788 ext_id: "ext_id".to_string(),
789 data: map!({"foo": "remote"}),
790 changes: changes.clone(),
791 },
792 );
793 assert_eq!(
795 get_local_item(&tx),
796 Some(LocalItem {
797 data: DataState::Exists(map!({"foo": "remote"})),
798 sync_change_counter: 0
799 })
800 );
801 assert_eq!(get_applied_item_changes(&tx), Some(changes));
802 tx.rollback()?;
803
804 let tx = conn
806 .unchecked_transaction()
807 .expect("transaction should begin");
808 api::set(&tx, "ext_id", json!({"foo": "local"}))?;
809 assert_eq!(
810 api::get(&tx, "ext_id", json!(null))?,
811 json!({"foo": "local"})
812 );
813 assert_eq!(
815 get_local_item(&tx),
816 Some(LocalItem {
817 data: DataState::Exists(map!({"foo": "local"})),
818 sync_change_counter: 1
819 })
820 );
821 let changes = changes![change!("foo", "local", "remote")];
822 do_apply_action(
823 &tx,
824 IncomingAction::Merge {
825 ext_id: "ext_id".to_string(),
826 data: map!({"foo": "remote"}),
827 changes: changes.clone(),
828 },
829 );
830 assert_eq!(
831 get_local_item(&tx),
832 Some(LocalItem {
833 data: DataState::Exists(map!({"foo": "remote"})),
834 sync_change_counter: 2
835 })
836 );
837 assert_eq!(get_applied_item_changes(&tx), Some(changes));
838 tx.rollback()?;
839
840 let tx = conn
842 .unchecked_transaction()
843 .expect("transaction should begin");
844 api::set(&tx, "ext_id", json!({"foo": "local"}))?;
845 assert_eq!(
846 api::get(&tx, "ext_id", json!(null))?,
847 json!({"foo": "local"})
848 );
849 assert_eq!(
851 get_local_item(&tx),
852 Some(LocalItem {
853 data: DataState::Exists(map!({"foo": "local"})),
854 sync_change_counter: 1
855 })
856 );
857 do_apply_action(
858 &tx,
859 IncomingAction::Same {
860 ext_id: "ext_id".to_string(),
861 },
862 );
863 assert_eq!(
864 get_local_item(&tx),
865 Some(LocalItem {
866 data: DataState::Exists(map!({"foo": "local"})),
867 sync_change_counter: 0
868 })
869 );
870 assert_eq!(get_applied_item_changes(&tx), None);
871 tx.rollback()?;
872
873 Ok(())
874 }
875}