1use crate::error::*;
14use error_support::{error, info, trace};
15use interrupt_support::Interruptee;
16use rusqlite::{types::ToSql, Connection, Row};
17use sync15::bso::OutgoingBso;
18use sync15::ServerTimestamp;
19use sync_guid::Guid;
20
21pub(super) fn common_stage_incoming_records(
26 conn: &Connection,
27 table_name: &str,
28 incoming: Vec<(Guid, String, ServerTimestamp)>,
29 signal: &dyn Interruptee,
30) -> Result<()> {
31 info!(
32 "staging {} incoming records into {}",
33 incoming.len(),
34 table_name
35 );
36 let chunk_size = 2;
37 let vals: Vec<(Guid, String)> = incoming
38 .into_iter()
39 .map(|(guid, data, _)| (guid, data))
40 .collect();
41 sql_support::each_sized_chunk(
42 &vals,
43 sql_support::default_max_variable_number() / chunk_size,
44 |chunk, _| -> Result<()> {
45 signal.err_if_interrupted()?;
46 let sql = format!(
47 "INSERT OR REPLACE INTO temp.{table_name} (guid, payload)
48 VALUES {vals}",
49 table_name = table_name,
50 vals = sql_support::repeat_multi_values(chunk.len(), 2)
51 );
52 let mut params = Vec::with_capacity(chunk.len() * chunk_size);
53 for (guid, json) in chunk {
54 params.push(guid as &dyn ToSql);
55 params.push(json);
56 }
57 conn.execute(&sql, rusqlite::params_from_iter(params))?;
58 Ok(())
59 },
60 )?;
61 trace!("staged");
62 Ok(())
63}
64
65pub(super) fn common_remove_record(conn: &Connection, table_name: &str, guid: &Guid) -> Result<()> {
66 conn.execute(
67 &format!(
68 "DELETE FROM {}
69 WHERE guid = :guid",
70 table_name
71 ),
72 rusqlite::named_params! {
73 ":guid": guid,
74 },
75 )?;
76 Ok(())
77}
78
79pub(super) fn common_change_guid(
82 conn: &Connection,
83 table_name: &str,
84 mirror_table_name: &str,
85 old_guid: &Guid,
86 new_guid: &Guid,
87) -> Result<()> {
88 assert_ne!(old_guid, new_guid);
89 let nrows = conn.execute(
90 &format!(
91 "UPDATE {}
92 SET guid = :new_guid,
93 sync_change_counter = sync_change_counter + 1
94 WHERE guid = :old_guid",
95 table_name
96 ),
97 rusqlite::named_params! {
98 ":old_guid": old_guid,
99 ":new_guid": new_guid,
100 },
101 )?;
102 assert_eq!(nrows, 1);
104
105 conn.execute(
107 &format!(
108 "UPDATE {}
109 SET guid = :new_guid
110 WHERE guid = :old_guid",
111 mirror_table_name
112 ),
113 rusqlite::named_params! {
114 ":old_guid": old_guid,
115 ":new_guid": new_guid,
116 },
117 )?;
118
119 Ok(())
120}
121
122pub(super) fn common_mirror_staged_records(
124 conn: &Connection,
125 staging_table_name: &str,
126 mirror_table_name: &str,
127) -> Result<()> {
128 conn.execute(
129 &format!(
130 "INSERT OR REPLACE INTO {} (guid, payload)
131 SELECT guid, payload FROM temp.{}",
132 mirror_table_name, staging_table_name,
133 ),
134 [],
135 )?;
136 Ok(())
137}
138
139#[macro_export]
145macro_rules! sync_merge_field_check {
146 ($field_name:ident,
147 $incoming:ident,
148 $local:ident,
149 $mirror:ident,
150 $merged_record:ident
151 ) => {
152 let incoming_field = &$incoming.$field_name;
153 let local_field = &$local.$field_name;
154 let is_local_same;
155 let is_incoming_same;
156
157 match &$mirror {
158 Some(m) => {
159 let mirror_field = &m.$field_name;
160 is_local_same = mirror_field == local_field;
161 is_incoming_same = mirror_field == incoming_field;
162 }
163 None => {
164 is_local_same = true;
165 is_incoming_same = local_field == incoming_field;
166 }
167 };
168
169 let should_use_local = is_incoming_same || local_field == incoming_field;
170
171 if is_local_same && !is_incoming_same {
172 $merged_record.$field_name = incoming_field.clone();
173 } else if should_use_local {
174 $merged_record.$field_name = local_field.clone();
175 } else {
176 return MergeResult::Forked {
180 forked: get_forked_record($local.clone()),
181 };
182 }
183 };
184}
185
186pub(super) fn common_get_outgoing_staging_records(
187 conn: &Connection,
188 data_sql: &str,
189 tombstones_sql: &str,
190 payload_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
191) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
192 let outgoing_records =
193 common_get_outgoing_records(conn, data_sql, tombstones_sql, payload_from_data_row)?;
194 Ok(outgoing_records.into_iter().collect::<Vec<_>>())
195}
196
197fn get_outgoing_records(
198 conn: &Connection,
199 sql: &str,
200 record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
201) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
202 conn.prepare(sql)?
203 .query_map([], |row| Ok(record_from_data_row(row)))?
204 .map(|r| {
205 r.unwrap().map_err(|e| {
206 error!(
207 "Failed to retrieve a record from a row with the following error: {}",
208 e
209 );
210 e.into()
211 })
212 })
213 .collect::<std::result::Result<Vec<_>, _>>()
214}
215
216pub(super) fn common_get_outgoing_records(
217 conn: &Connection,
218 data_sql: &str,
219 tombstone_sql: &str,
220 record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)>,
221) -> anyhow::Result<Vec<(OutgoingBso, i64)>> {
222 let mut payload = get_outgoing_records(conn, data_sql, record_from_data_row)?;
223
224 payload.append(&mut get_outgoing_records(conn, tombstone_sql, &|row| {
225 Ok((
226 OutgoingBso::new_tombstone(Guid::from_string(row.get("guid")?).into()),
227 0,
228 ))
229 })?);
230
231 Ok(payload)
232}
233
234pub(super) fn common_save_outgoing_records(
235 conn: &Connection,
236 table_name: &str,
237 staging_records: Vec<(Guid, String, i64)>,
238) -> anyhow::Result<()> {
239 let chunk_size = 3;
240 sql_support::each_sized_chunk(
241 &staging_records,
242 sql_support::default_max_variable_number() / chunk_size,
243 |chunk, _| -> anyhow::Result<()> {
244 let sql = format!(
245 "INSERT OR REPLACE INTO temp.{table_name} (guid, payload, sync_change_counter)
246 VALUES {staging_records}",
247 table_name = table_name,
248 staging_records = sql_support::repeat_multi_values(chunk.len(), chunk_size)
249 );
250 let mut params = Vec::with_capacity(chunk.len() * chunk_size);
251 for (guid, json, sync_change_counter) in chunk {
252 params.push(guid as &dyn ToSql);
253 params.push(json);
254 params.push(sync_change_counter);
255 }
256 conn.execute(&sql, rusqlite::params_from_iter(params))?;
257 Ok(())
258 },
259 )?;
260 Ok(())
261}
262
263pub(super) fn common_finish_synced_items(
264 conn: &Connection,
265 data_table_name: &str,
266 mirror_table_name: &str,
267 outgoing_staging_table_name: &str,
268 records_synced: Vec<Guid>,
269) -> anyhow::Result<()> {
270 reset_sync_change_counter(
272 conn,
273 data_table_name,
274 outgoing_staging_table_name,
275 records_synced,
276 )?;
277 let sql = format!(
279 "INSERT OR REPLACE INTO {mirror_table_name}
280 SELECT guid, payload FROM temp.{outgoing_staging_table_name}",
281 mirror_table_name = mirror_table_name,
282 outgoing_staging_table_name = outgoing_staging_table_name,
283 );
284 conn.execute(&sql, [])?;
285 Ok(())
286}
287
288fn reset_sync_change_counter(
294 conn: &Connection,
295 data_table_name: &str,
296 outgoing_table_name: &str,
297 records_synced: Vec<Guid>,
298) -> anyhow::Result<()> {
299 sql_support::each_chunk(&records_synced, |chunk, _| -> anyhow::Result<()> {
300 conn.execute(
301 &format!(
302 "UPDATE {data_table_name} AS data
307 SET sync_change_counter = sync_change_counter -
308 (
309 SELECT outgoing.sync_change_counter
310 FROM temp.{outgoing_table_name} AS outgoing
311 WHERE outgoing.guid = data.guid LIMIT 1
312 )
313 WHERE guid IN ({values})",
314 data_table_name = data_table_name,
315 outgoing_table_name = outgoing_table_name,
316 values = sql_support::repeat_sql_values(chunk.len())
317 ),
318 rusqlite::params_from_iter(chunk),
319 )?;
320 Ok(())
321 })?;
322
323 Ok(())
324}
325
326#[cfg(test)]
328pub(super) mod tests {
329 use super::super::*;
330 use interrupt_support::NeverInterrupts;
331 use serde_json::{json, Value};
332
333 pub(in crate::sync) fn array_to_incoming(vals: Vec<Value>) -> Vec<IncomingBso> {
334 vals.into_iter()
335 .map(IncomingBso::from_test_content)
336 .collect()
337 }
338
339 pub(in crate::sync) fn expand_test_guid(c: char) -> String {
340 c.to_string().repeat(12)
341 }
342
343 pub(in crate::sync) fn test_json_tombstone(guid_prefix: char) -> Value {
344 let t = json! {
345 {
346 "id": expand_test_guid(guid_prefix),
347 "deleted": true,
348 }
349 };
350 t
351 }
352
353 pub(in crate::sync) fn do_test_incoming_same<T: SyncRecord + std::fmt::Debug + Clone>(
355 ri: &dyn ProcessIncomingRecordImpl<Record = T>,
356 tx: &Transaction<'_>,
357 record: T,
358 bso: IncomingBso,
359 ) {
360 ri.insert_local_record(tx, record)
361 .expect("insert should work");
362 ri.stage_incoming(tx, vec![bso], &NeverInterrupts)
363 .expect("stage should work");
364 let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
365 assert_eq!(states.len(), 1, "1 records == 1 state!");
366 let action =
367 crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
368 assert!(matches!(action, crate::sync::IncomingAction::Update { .. }));
371 }
372
373 pub(in crate::sync) fn do_test_incoming_tombstone<T: SyncRecord + std::fmt::Debug + Clone>(
375 ri: &dyn ProcessIncomingRecordImpl<Record = T>,
376 tx: &Transaction<'_>,
377 record: T,
378 ) {
379 let guid = record.id().clone();
380 ri.insert_local_record(tx, record)
381 .expect("insert should work");
382 ri.stage_incoming(
383 tx,
384 vec![IncomingBso::new_test_tombstone(guid)],
385 &NeverInterrupts,
386 )
387 .expect("stage should work");
388 let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
389 assert_eq!(states.len(), 1, "1 records == 1 state!");
390 let action =
391 crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
392 assert!(matches!(
395 action,
396 crate::sync::IncomingAction::DeleteLocalRecord { .. }
397 ));
398 }
399
400 pub(in crate::sync) fn do_test_scrubbed_local_data<T: SyncRecord + std::fmt::Debug + Clone>(
402 ri: &dyn ProcessIncomingRecordImpl<Record = T>,
403 tx: &Transaction<'_>,
404 record: T,
405 bso: IncomingBso,
406 ) {
407 ri.insert_local_record(tx, record)
408 .expect("insert should work");
409 ri.stage_incoming(tx, vec![bso], &NeverInterrupts)
410 .expect("stage should work");
411 let mut states = ri.fetch_incoming_states(tx).expect("fetch should work");
412 assert_eq!(states.len(), 1, "1 records == 1 state!");
413 assert!(
414 matches!(states[0].local, LocalRecordInfo::Scrubbed { .. }),
415 "state should be LocalRecordInfo::Scubbed but it is: {:?}",
416 states[0].local
417 );
418
419 let action =
420 crate::sync::plan_incoming(ri, tx, states.pop().unwrap()).expect("plan should work");
421 assert!(matches!(action, crate::sync::IncomingAction::Update { .. }));
422 }
423
424 pub(in crate::sync) fn do_test_staged_to_mirror<T: SyncRecord + std::fmt::Debug + Clone>(
426 ri: &dyn ProcessIncomingRecordImpl<Record = T>,
427 tx: &Transaction<'_>,
428 record: T,
429 bso1: IncomingBso,
430 mirror_table_name: &str,
431 ) {
432 let guid1 = record.id().clone();
433 let guid2 = Guid::random();
434 let bso2 = IncomingBso::new_test_tombstone(guid2.clone());
435
436 ri.stage_incoming(tx, vec![bso1, bso2], &NeverInterrupts)
437 .expect("stage should work");
438
439 ri.finish_incoming(tx).expect("finish should work");
440
441 let sql = format!(
442 "SELECT COUNT(*) FROM {} where guid = '{}' OR guid = '{}'",
443 mirror_table_name, guid1, guid2
444 );
445 let num_rows = tx
446 .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
447 .unwrap();
448 assert_eq!(num_rows, 2);
449 }
450
451 fn exists_in_table(tx: &Transaction<'_>, table_name: &str, guid: &Guid) {
452 let sql = format!(
453 "SELECT COUNT(*) FROM {} where guid = '{}'",
454 table_name, guid
455 );
456 let num_rows = tx
457 .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
458 .unwrap();
459 assert_eq!(num_rows, 1);
460 }
461
462 pub(in crate::sync) fn exists_with_counter_value_in_table(
463 tx: &Transaction<'_>,
464 table_name: &str,
465 guid: &Guid,
466 expected_counter_value: i64,
467 ) {
468 let sql = format!(
469 "SELECT COUNT(*)
470 FROM {table_name}
471 WHERE sync_change_counter = {expected_counter_value}
472 AND guid = :guid",
473 table_name = table_name,
474 expected_counter_value = expected_counter_value,
475 );
476
477 let num_rows = tx
478 .query_row(&sql, [guid], |row| Ok(row.get::<_, u32>(0).unwrap()))
479 .unwrap();
480 assert_eq!(num_rows, 1);
481 }
482
483 pub(in crate::sync) fn do_test_outgoing_never_synced<
484 T: SyncRecord + std::fmt::Debug + Clone,
485 >(
486 tx: &Transaction<'_>,
487 ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
488 guid: &Guid,
489 data_table_name: &str,
490 mirror_table_name: &str,
491 staging_table_name: &str,
492 ) {
493 assert!(ro.fetch_outgoing_records(tx).is_ok());
495
496 exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
498
499 assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
501
502 exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
504
505 exists_in_table(tx, mirror_table_name, guid);
507 }
508
509 pub(in crate::sync) fn do_test_outgoing_tombstone<T: SyncRecord + std::fmt::Debug + Clone>(
510 tx: &Transaction<'_>,
511 ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
512 guid: &Guid,
513 data_table_name: &str,
514 mirror_table_name: &str,
515 staging_table_name: &str,
516 ) {
517 assert!(ro.fetch_outgoing_records(tx).is_ok());
519
520 exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
522
523 assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
525
526 let sql = format!(
528 "SELECT COUNT(*) FROM {} where guid = '{}'",
529 data_table_name, guid
530 );
531 let num_rows = tx
532 .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
533 .unwrap();
534 assert_eq!(num_rows, 0);
535
536 exists_in_table(tx, mirror_table_name, guid);
538 }
539
540 pub(in crate::sync) fn do_test_outgoing_synced_with_local_change<
541 T: SyncRecord + std::fmt::Debug + Clone,
542 >(
543 tx: &Transaction<'_>,
544 ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
545 guid: &Guid,
546 data_table_name: &str,
547 mirror_table_name: &str,
548 staging_table_name: &str,
549 ) {
550 assert!(ro.fetch_outgoing_records(tx).is_ok());
552
553 exists_in_table(tx, &format!("temp.{}", staging_table_name), guid);
555
556 assert!(ro.finish_synced_items(tx, vec![guid.clone()]).is_ok());
558
559 exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
561
562 exists_in_table(tx, mirror_table_name, guid);
564 }
565
566 pub(in crate::sync) fn do_test_outgoing_synced_with_no_change<
567 T: SyncRecord + std::fmt::Debug + Clone,
568 >(
569 tx: &Transaction<'_>,
570 ro: &dyn ProcessOutgoingRecordImpl<Record = T>,
571 guid: &Guid,
572 data_table_name: &str,
573 staging_table_name: &str,
574 ) {
575 assert!(ro.fetch_outgoing_records(tx).is_ok());
577
578 let sql = format!(
580 "SELECT COUNT(*) FROM {} where guid = '{}'",
581 &format!("temp.{}", staging_table_name),
582 guid
583 );
584 let num_rows = tx
585 .query_row(&sql, [], |row| Ok(row.get::<_, u32>(0).unwrap()))
586 .unwrap();
587 assert_eq!(num_rows, 0);
588
589 assert!(ro.finish_synced_items(tx, Vec::<Guid>::new()).is_ok());
591
592 exists_with_counter_value_in_table(tx, data_table_name, guid, 0);
594 }
595}