webext_storage/sync/
incoming.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5// The "incoming" part of syncing - handling the incoming rows, staging them,
6// working out a plan for them, updating the local data and mirror, etc.
7
8use interrupt_support::Interruptee;
9use rusqlite::{Connection, Row, Transaction};
10use sql_support::ConnExt;
11use sync15::bso::{IncomingContent, IncomingKind};
12use sync_guid::Guid as SyncGuid;
13
14use crate::api::{StorageChanges, StorageValueChange};
15use crate::error::*;
16
17use super::{merge, remove_matching_keys, JsonMap, WebextRecord};
18
19/// The state data can be in. Could be represented as Option<JsonMap>, but this
20/// is clearer and independent of how the data is stored.
21#[derive(Debug, PartialEq, Eq)]
22pub enum DataState {
23    /// The data was deleted.
24    Deleted,
25    /// Data exists, as stored in the map.
26    Exists(JsonMap),
27}
28
29// A little helper to create a StorageChanges object when we are creating
30// a new value with multiple keys that doesn't exist locally.
31fn changes_for_new_incoming(new: &JsonMap) -> StorageChanges {
32    let mut result = StorageChanges::with_capacity(new.len());
33    for (key, val) in new.iter() {
34        result.push(StorageValueChange {
35            key: key.clone(),
36            old_value: None,
37            new_value: Some(val.clone()),
38        });
39    }
40    result
41}
42
43// This module deals exclusively with the Map inside a JsonValue::Object().
44// This helper reads such a Map from a SQL row, ignoring anything which is
45// either invalid JSON or a different JSON type.
46fn json_map_from_row(row: &Row<'_>, col: &str) -> Result<DataState> {
47    let s = row.get::<_, Option<String>>(col)?;
48    Ok(match s {
49        None => DataState::Deleted,
50        Some(s) => match serde_json::from_str(&s) {
51            Ok(serde_json::Value::Object(m)) => DataState::Exists(m),
52            _ => {
53                // We don't want invalid json or wrong types to kill syncing.
54                // It should be impossible as we never write anything which
55                // could cause it, but we can't really log the bad data as there
56                // might be PII. Logging just a message without any additional
57                // clues is going to be unhelpfully noisy, so, silently None.
58                // XXX - Maybe record telemetry?
59                DataState::Deleted
60            }
61        },
62    })
63}
64
65/// The first thing we do with incoming items is to "stage" them in a temp table.
66/// The actual processing is done via this table.
67pub fn stage_incoming(
68    tx: &Transaction<'_>,
69    incoming_records: &[IncomingContent<WebextRecord>],
70    signal: &dyn Interruptee,
71) -> Result<()> {
72    sql_support::each_sized_chunk(
73        incoming_records,
74        // We bind 3 params per chunk.
75        sql_support::default_max_variable_number() / 3,
76        |chunk, _| -> Result<()> {
77            let mut params = Vec::with_capacity(chunk.len() * 3);
78            for record in chunk {
79                signal.err_if_interrupted()?;
80                match &record.kind {
81                    IncomingKind::Content(r) => {
82                        params.push(Some(record.envelope.id.to_string()));
83                        params.push(Some(r.ext_id.to_string()));
84                        params.push(Some(r.data.clone()));
85                    }
86                    IncomingKind::Tombstone => {
87                        params.push(Some(record.envelope.id.to_string()));
88                        params.push(None);
89                        params.push(None);
90                    }
91                    IncomingKind::Malformed => {
92                        error!("Ignoring incoming malformed record: {}", record.envelope.id);
93                    }
94                }
95            }
96            // we might have skipped records
97            let actual_len = params.len() / 3;
98            if actual_len != 0 {
99                let sql = format!(
100                    "INSERT OR REPLACE INTO temp.storage_sync_staging
101                    (guid, ext_id, data)
102                    VALUES {}",
103                    sql_support::repeat_multi_values(actual_len, 3)
104                );
105                tx.execute(&sql, rusqlite::params_from_iter(params))?;
106            }
107            Ok(())
108        },
109    )?;
110    Ok(())
111}
112
113/// The "state" we find ourselves in when considering an incoming/staging
114/// record. This "state" is the input to calculating the IncomingAction and
115/// carries all the data we need to make the required local changes.
116#[derive(Debug, PartialEq, Eq)]
117pub enum IncomingState {
118    /// There's an incoming item, but data for that extension doesn't exist
119    /// either in our local data store or in the local mirror. IOW, this is the
120    /// very first time we've seen this extension.
121    IncomingOnlyData { ext_id: String, data: JsonMap },
122
123    /// An incoming tombstone that doesn't exist locally. Because tombstones
124    /// don't carry the ext-id, it means it's not in our mirror. We are just
125    /// going to ignore it, but we track the state for consistency.
126    IncomingOnlyTombstone,
127
128    /// There's an incoming item and we have data for the same extension in
129    /// our local store - but not in our mirror. This should be relatively
130    /// uncommon as it means:
131    /// * Some other profile has recently installed an extension and synced.
132    /// * This profile has recently installed the same extension.
133    /// * This is the first sync for this profile since both those events
134    ///   happened.
135    HasLocal {
136        ext_id: String,
137        incoming: DataState,
138        local: DataState,
139    },
140    /// There's an incoming item and there's an item for the same extension in
141    /// the mirror. The addon probably doesn't exist locally, or if it does,
142    /// the last time we synced we synced the deletion of all data.
143    NotLocal {
144        ext_id: String,
145        incoming: DataState,
146        mirror: DataState,
147    },
148    /// This will be the most common "incoming" case - there's data incoming,
149    /// in the mirror and in the local store for an addon.
150    Everywhere {
151        ext_id: String,
152        incoming: DataState,
153        mirror: DataState,
154        local: DataState,
155    },
156}
157
158/// Get the items we need to process from the staging table. Return details about
159/// the item and the state of that item, ready for processing.
160pub fn get_incoming(conn: &Connection) -> Result<Vec<(SyncGuid, IncomingState)>> {
161    let sql = "
162        SELECT
163            s.guid as guid,
164            l.ext_id as l_ext_id,
165            m.ext_id as m_ext_id,
166            s.ext_id as s_ext_id,
167            s.data as s_data, m.data as m_data, l.data as l_data,
168            l.sync_change_counter
169        FROM temp.storage_sync_staging s
170        LEFT JOIN storage_sync_mirror m ON m.guid = s.guid
171        LEFT JOIN storage_sync_data l on l.ext_id IN (m.ext_id, s.ext_id);";
172
173    fn from_row(row: &Row<'_>) -> Result<(SyncGuid, IncomingState)> {
174        let guid = row.get("guid")?;
175        // This is complicated because the staging row doesn't hold the ext_id.
176        // However, both the local table and the mirror do.
177        let mirror_ext_id: Option<String> = row.get("m_ext_id")?;
178        let local_ext_id: Option<String> = row.get("l_ext_id")?;
179        let staged_ext_id: Option<String> = row.get("s_ext_id")?;
180        let incoming = json_map_from_row(row, "s_data")?;
181
182        // We find the state by examining which tables the ext-id exists in,
183        // using whether that column is null as a proxy for that.
184        let state = match (local_ext_id, mirror_ext_id) {
185            (None, None) => {
186                match staged_ext_id {
187                    Some(ext_id) => {
188                        let data = match incoming {
189                            // incoming record with missing data that's not a
190                            // tombstone shouldn't happen, but we can cope by
191                            // pretending it was an empty json map.
192                            DataState::Deleted => JsonMap::new(),
193                            DataState::Exists(data) => data,
194                        };
195                        IncomingState::IncomingOnlyData { ext_id, data }
196                    }
197                    None => IncomingState::IncomingOnlyTombstone,
198                }
199            }
200            (Some(ext_id), None) => IncomingState::HasLocal {
201                ext_id,
202                incoming,
203                local: json_map_from_row(row, "l_data")?,
204            },
205            (None, Some(ext_id)) => IncomingState::NotLocal {
206                ext_id,
207                incoming,
208                mirror: json_map_from_row(row, "m_data")?,
209            },
210            (Some(ext_id), Some(_)) => IncomingState::Everywhere {
211                ext_id,
212                incoming,
213                mirror: json_map_from_row(row, "m_data")?,
214                local: json_map_from_row(row, "l_data")?,
215            },
216        };
217        Ok((guid, state))
218    }
219
220    conn.conn().query_rows_and_then(sql, [], from_row)
221}
222
223/// This is the set of actions we know how to take *locally* for incoming
224/// records. Which one depends on the IncomingState.
225/// Every state which updates also records the set of changes we should notify
226#[derive(Debug, PartialEq, Eq)]
227pub enum IncomingAction {
228    /// We should locally delete the data for this record
229    DeleteLocally {
230        ext_id: String,
231        changes: StorageChanges,
232    },
233    /// We will take the remote.
234    TakeRemote {
235        ext_id: String,
236        data: JsonMap,
237        changes: StorageChanges,
238    },
239    /// We merged this data - this is what we came up with.
240    Merge {
241        ext_id: String,
242        data: JsonMap,
243        changes: StorageChanges,
244    },
245    /// Entry exists locally and it's the same as the incoming record.
246    Same { ext_id: String },
247    /// Incoming tombstone for an item we've never seen.
248    Nothing,
249}
250
251/// Takes the state of an item and returns the action we should take for it.
252pub fn plan_incoming(s: IncomingState) -> IncomingAction {
253    match s {
254        IncomingState::Everywhere {
255            ext_id,
256            incoming,
257            local,
258            mirror,
259        } => {
260            // All records exist - but do they all have data?
261            match (incoming, local, mirror) {
262                (
263                    DataState::Exists(incoming_data),
264                    DataState::Exists(local_data),
265                    DataState::Exists(mirror_data),
266                ) => {
267                    // all records have data - 3-way merge.
268                    merge(ext_id, incoming_data, local_data, Some(mirror_data))
269                }
270                (
271                    DataState::Exists(incoming_data),
272                    DataState::Exists(local_data),
273                    DataState::Deleted,
274                ) => {
275                    // No parent, so first time seeing this remotely - 2-way merge
276                    merge(ext_id, incoming_data, local_data, None)
277                }
278                (DataState::Exists(incoming_data), DataState::Deleted, _) => {
279                    // Incoming data, removed locally. Server wins.
280                    IncomingAction::TakeRemote {
281                        ext_id,
282                        changes: changes_for_new_incoming(&incoming_data),
283                        data: incoming_data,
284                    }
285                }
286                (DataState::Deleted, DataState::Exists(local_data), DataState::Exists(mirror)) => {
287                    // Deleted remotely.
288                    // Treat this as a delete of every key that we
289                    // know was present at the time.
290                    let (result, changes) = remove_matching_keys(local_data, &mirror);
291                    if result.is_empty() {
292                        // If there were no more keys left, we can
293                        // delete our version too.
294                        IncomingAction::DeleteLocally { ext_id, changes }
295                    } else {
296                        IncomingAction::Merge {
297                            ext_id,
298                            data: result,
299                            changes,
300                        }
301                    }
302                }
303                (DataState::Deleted, DataState::Exists(local_data), DataState::Deleted) => {
304                    // Perhaps another client created and then deleted
305                    // the whole object for this extension since the
306                    // last time we synced.
307                    // Treat this as a delete of every key that we
308                    // knew was present. Unfortunately, we don't know
309                    // any keys that were present, so we delete no keys.
310                    IncomingAction::Merge {
311                        ext_id,
312                        data: local_data,
313                        changes: StorageChanges::new(),
314                    }
315                }
316                (DataState::Deleted, DataState::Deleted, _) => {
317                    // We agree with the remote (regardless of what we
318                    // have mirrored).
319                    IncomingAction::Same { ext_id }
320                }
321            }
322        }
323        IncomingState::HasLocal {
324            ext_id,
325            incoming,
326            local,
327        } => {
328            // So we have a local record and an incoming/staging record, but *not* a
329            // mirror record. This means some other device has synced this for
330            // the first time and we are yet to do the same.
331            match (incoming, local) {
332                (DataState::Exists(incoming_data), DataState::Exists(local_data)) => {
333                    // This means the extension exists locally and remotely
334                    // but this is the first time we've synced it. That's no problem, it's
335                    // just a 2-way merge...
336                    merge(ext_id, incoming_data, local_data, None)
337                }
338                (DataState::Deleted, DataState::Exists(local_data)) => {
339                    // We've data locally, but there's an incoming deletion.
340                    // We would normally remove keys that we knew were
341                    // present on the server, but we don't know what
342                    // was on the server, so we don't remove anything.
343                    IncomingAction::Merge {
344                        ext_id,
345                        data: local_data,
346                        changes: StorageChanges::new(),
347                    }
348                }
349                (DataState::Exists(incoming_data), DataState::Deleted) => {
350                    // No data locally, but some is incoming - take it.
351                    IncomingAction::TakeRemote {
352                        ext_id,
353                        changes: changes_for_new_incoming(&incoming_data),
354                        data: incoming_data,
355                    }
356                }
357                (DataState::Deleted, DataState::Deleted) => {
358                    // Nothing anywhere - odd, but OK.
359                    IncomingAction::Same { ext_id }
360                }
361            }
362        }
363        IncomingState::NotLocal {
364            ext_id, incoming, ..
365        } => {
366            // No local data but there's mirror and an incoming record.
367            // This means a local deletion is being replaced by, or just re-doing
368            // the incoming record.
369            match incoming {
370                DataState::Exists(data) => IncomingAction::TakeRemote {
371                    ext_id,
372                    changes: changes_for_new_incoming(&data),
373                    data,
374                },
375                DataState::Deleted => IncomingAction::Same { ext_id },
376            }
377        }
378        IncomingState::IncomingOnlyData { ext_id, data } => {
379            // Only the staging record exists and it's not a tombstone.
380            // This means it's the first time we've ever seen it. No
381            // conflict possible, just take the remote.
382            IncomingAction::TakeRemote {
383                ext_id,
384                changes: changes_for_new_incoming(&data),
385                data,
386            }
387        }
388        IncomingState::IncomingOnlyTombstone => {
389            // Only the staging record exists and it is a tombstone - nothing to do.
390            IncomingAction::Nothing
391        }
392    }
393}
394
395fn insert_changes(tx: &Transaction<'_>, ext_id: &str, changes: &StorageChanges) -> Result<()> {
396    tx.execute_cached(
397        "INSERT INTO temp.storage_sync_applied (ext_id, changes)
398            VALUES (:ext_id, :changes)",
399        rusqlite::named_params! {
400            ":ext_id": ext_id,
401            ":changes": &serde_json::to_string(&changes)?,
402        },
403    )?;
404    Ok(())
405}
406
407// Apply the actions necessary to fully process the incoming items.
408pub fn apply_actions(
409    tx: &Transaction<'_>,
410    actions: Vec<(SyncGuid, IncomingAction)>,
411    signal: &dyn Interruptee,
412) -> Result<()> {
413    for (item, action) in actions {
414        signal.err_if_interrupted()?;
415
416        trace!("action for '{:?}': {:?}", item, action);
417        match action {
418            IncomingAction::DeleteLocally { ext_id, changes } => {
419                // Can just nuke it entirely.
420                tx.execute_cached(
421                    "DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
422                    &[(":ext_id", &ext_id)],
423                )?;
424                insert_changes(tx, &ext_id, &changes)?;
425            }
426            // We want to update the local record with 'data' and after this update the item no longer is considered dirty.
427            IncomingAction::TakeRemote {
428                ext_id,
429                data,
430                changes,
431            } => {
432                tx.execute_cached(
433                    "INSERT OR REPLACE INTO storage_sync_data(ext_id, data, sync_change_counter)
434                        VALUES (:ext_id, :data, 0)",
435                    rusqlite::named_params! {
436                        ":ext_id": ext_id,
437                        ":data": serde_json::Value::Object(data),
438                    },
439                )?;
440                insert_changes(tx, &ext_id, &changes)?;
441            }
442
443            // We merged this data, so need to update locally but still consider
444            // it dirty because the merged data must be uploaded.
445            IncomingAction::Merge {
446                ext_id,
447                data,
448                changes,
449            } => {
450                tx.execute_cached(
451                    "UPDATE storage_sync_data SET data = :data, sync_change_counter = sync_change_counter + 1 WHERE ext_id = :ext_id",
452                    rusqlite::named_params! {
453                        ":ext_id": ext_id,
454                        ":data": serde_json::Value::Object(data),
455                    },
456                )?;
457                insert_changes(tx, &ext_id, &changes)?;
458            }
459
460            // Both local and remote ended up the same - only need to nuke the
461            // change counter.
462            IncomingAction::Same { ext_id } => {
463                tx.execute_cached(
464                    "UPDATE storage_sync_data SET sync_change_counter = 0 WHERE ext_id = :ext_id",
465                    &[(":ext_id", &ext_id)],
466                )?;
467                // no changes to write
468            }
469            // Literally nothing to do!
470            IncomingAction::Nothing => {}
471        }
472    }
473    Ok(())
474}
475
476#[cfg(test)]
477mod tests {
478    use super::super::test::new_syncable_mem_db;
479    use super::*;
480    use crate::api;
481    use interrupt_support::NeverInterrupts;
482    use serde_json::{json, Value};
483    use sync15::bso::IncomingBso;
484
485    // select simple int
486    fn ssi(conn: &Connection, stmt: &str) -> u32 {
487        conn.try_query_one(stmt, [], true)
488            .expect("must work")
489            .unwrap_or_default()
490    }
491
492    fn array_to_incoming(mut array: Value) -> Vec<IncomingContent<WebextRecord>> {
493        let jv = array.as_array_mut().expect("you must pass a json array");
494        let mut result = Vec::with_capacity(jv.len());
495        for elt in jv {
496            result.push(IncomingBso::from_test_content(elt.take()).into_content());
497        }
498        result
499    }
500
501    // Can't find a way to import these from crate::sync::tests...
502    macro_rules! map {
503        ($($map:tt)+) => {
504            json!($($map)+).as_object().unwrap().clone()
505        };
506    }
507    macro_rules! change {
508        ($key:literal, None, None) => {
509            StorageValueChange {
510                key: $key.to_string(),
511                old_value: None,
512                new_value: None,
513            };
514        };
515        ($key:literal, $old:tt, None) => {
516            StorageValueChange {
517                key: $key.to_string(),
518                old_value: Some(json!($old)),
519                new_value: None,
520            }
521        };
522        ($key:literal, None, $new:tt) => {
523            StorageValueChange {
524                key: $key.to_string(),
525                old_value: None,
526                new_value: Some(json!($new)),
527            };
528        };
529        ($key:literal, $old:tt, $new:tt) => {
530            StorageValueChange {
531                key: $key.to_string(),
532                old_value: Some(json!($old)),
533                new_value: Some(json!($new)),
534            }
535        };
536    }
537    macro_rules! changes {
538        ( $( $change:expr ),* ) => {
539            {
540                let mut changes = StorageChanges::new();
541                $(
542                    changes.push($change);
543                )*
544                changes
545            }
546        };
547    }
548
549    #[test]
550    fn test_incoming_populates_staging() -> Result<()> {
551        let db = new_syncable_mem_db();
552        let conn = db.get_connection()?;
553        let tx = conn.unchecked_transaction()?;
554
555        let incoming = json! {[
556            {
557                "id": "guidAAAAAAAA",
558                "extId": "ext1@example.com",
559                "data": json!({"foo": "bar"}).to_string(),
560            }
561        ]};
562
563        stage_incoming(&tx, &array_to_incoming(incoming), &NeverInterrupts)?;
564        // check staging table
565        assert_eq!(
566            ssi(&tx, "SELECT count(*) FROM temp.storage_sync_staging"),
567            1
568        );
569        Ok(())
570    }
571
572    #[test]
573    fn test_fetch_incoming_state() -> Result<()> {
574        let db = new_syncable_mem_db();
575        let conn = db.get_connection()?;
576        let tx = conn.unchecked_transaction()?;
577
578        // Start with an item just in staging.
579        tx.execute(
580            r#"
581            INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
582            VALUES ('guid', 'ext_id', '{"foo":"bar"}')
583        "#,
584            [],
585        )?;
586
587        let incoming = get_incoming(&tx)?;
588        assert_eq!(incoming.len(), 1);
589        assert_eq!(incoming[0].0, SyncGuid::new("guid"),);
590        assert_eq!(
591            incoming[0].1,
592            IncomingState::IncomingOnlyData {
593                ext_id: "ext_id".to_string(),
594                data: map!({"foo": "bar"}),
595            }
596        );
597
598        // Add the same item to the mirror.
599        tx.execute(
600            r#"
601            INSERT INTO storage_sync_mirror (guid, ext_id, data)
602            VALUES ('guid', 'ext_id', '{"foo":"new"}')
603        "#,
604            [],
605        )?;
606        let incoming = get_incoming(&tx)?;
607        assert_eq!(incoming.len(), 1);
608        assert_eq!(
609            incoming[0].1,
610            IncomingState::NotLocal {
611                ext_id: "ext_id".to_string(),
612                incoming: DataState::Exists(map!({"foo": "bar"})),
613                mirror: DataState::Exists(map!({"foo": "new"})),
614            }
615        );
616
617        // and finally the data itself - might as use the API here!
618        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
619        let incoming = get_incoming(&tx)?;
620        assert_eq!(incoming.len(), 1);
621        assert_eq!(
622            incoming[0].1,
623            IncomingState::Everywhere {
624                ext_id: "ext_id".to_string(),
625                incoming: DataState::Exists(map!({"foo": "bar"})),
626                local: DataState::Exists(map!({"foo": "local"})),
627                mirror: DataState::Exists(map!({"foo": "new"})),
628            }
629        );
630        Ok(())
631    }
632
633    // Like test_fetch_incoming_state, but check NULLs are handled correctly.
634    #[test]
635    fn test_fetch_incoming_state_nulls() -> Result<()> {
636        let db = new_syncable_mem_db();
637        let conn = db.get_connection()?;
638        let tx = conn.unchecked_transaction()?;
639
640        // Start with a tombstone just in staging.
641        tx.execute(
642            r#"
643            INSERT INTO temp.storage_sync_staging (guid, ext_id, data)
644            VALUES ('guid', NULL, NULL)
645        "#,
646            [],
647        )?;
648
649        let incoming = get_incoming(&tx)?;
650        assert_eq!(incoming.len(), 1);
651        assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone,);
652
653        // Add the same item to the mirror (can't store an ext_id for a
654        // tombstone in the mirror as incoming tombstones never have it)
655        tx.execute(
656            r#"
657            INSERT INTO storage_sync_mirror (guid, ext_id, data)
658            VALUES ('guid', NULL, NULL)
659        "#,
660            [],
661        )?;
662        let incoming = get_incoming(&tx)?;
663        assert_eq!(incoming.len(), 1);
664        assert_eq!(incoming[0].1, IncomingState::IncomingOnlyTombstone);
665
666        tx.execute(
667            r#"
668            INSERT INTO storage_sync_data (ext_id, data)
669            VALUES ('ext_id', NULL)
670        "#,
671            [],
672        )?;
673        let incoming = get_incoming(&tx)?;
674        assert_eq!(incoming.len(), 1);
675        assert_eq!(
676            incoming[0].1,
677            // IncomingOnly* seems a little odd, but it is because we can't
678            // tie the tombstones together due to the lack of any ext-id/guid
679            // mapping in this case.
680            IncomingState::IncomingOnlyTombstone
681        );
682        Ok(())
683    }
684
685    // apply_action tests.
686    #[derive(Debug, PartialEq)]
687    struct LocalItem {
688        data: DataState,
689        sync_change_counter: i32,
690    }
691
692    fn get_local_item(conn: &Connection) -> Option<LocalItem> {
693        conn.try_query_row::<_, Error, _, _>(
694            "SELECT data, sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_id'",
695            [],
696            |row| {
697                let data = json_map_from_row(row, "data")?;
698                let sync_change_counter = row.get::<_, i32>(1)?;
699                Ok(LocalItem {
700                    data,
701                    sync_change_counter,
702                })
703            },
704            true,
705        )
706        .expect("query should work")
707    }
708
709    fn get_applied_item_changes(conn: &Connection) -> Option<StorageChanges> {
710        // no custom deserialize for storagechanges and we only need it for
711        // tests, so do it manually.
712        conn.try_query_row::<_, Error, _, _>(
713            "SELECT changes FROM temp.storage_sync_applied WHERE ext_id = 'ext_id'",
714            [],
715            |row| Ok(serde_json::from_str(&row.get::<_, String>("changes")?)?),
716            true,
717        )
718        .expect("query should work")
719        .map(|val: serde_json::Value| {
720            let ob = val.as_object().expect("should be an object of items");
721            let mut result = StorageChanges::with_capacity(ob.len());
722            for (key, val) in ob.into_iter() {
723                let details = val.as_object().expect("elts should be objects");
724                result.push(StorageValueChange {
725                    key: key.to_string(),
726                    old_value: details.get("oldValue").cloned(),
727                    new_value: details.get("newValue").cloned(),
728                });
729            }
730            result
731        })
732    }
733
734    fn do_apply_action(tx: &Transaction<'_>, action: IncomingAction) {
735        let guid = SyncGuid::new("guid");
736        apply_actions(tx, vec![(guid, action)], &NeverInterrupts).expect("should apply");
737    }
738
739    #[test]
740    fn test_apply_actions() -> Result<()> {
741        let db = new_syncable_mem_db();
742        let conn = db.get_connection().expect("connection should be retrieved");
743
744        // DeleteLocally - row should be entirely removed.
745        let tx = conn
746            .unchecked_transaction()
747            .expect("transaction should begin");
748        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
749        assert_eq!(
750            api::get(&tx, "ext_id", json!(null))?,
751            json!({"foo": "local"})
752        );
753        let changes = changes![change!("foo", "local", None)];
754        do_apply_action(
755            &tx,
756            IncomingAction::DeleteLocally {
757                ext_id: "ext_id".to_string(),
758                changes: changes.clone(),
759            },
760        );
761        assert_eq!(api::get(&tx, "ext_id", json!(null))?, json!({}));
762        // and there should not be a local record at all.
763        assert!(get_local_item(&tx).is_none());
764        assert_eq!(get_applied_item_changes(&tx), Some(changes));
765        tx.rollback()?;
766
767        // TakeRemote - replace local data with remote and marked as not dirty.
768        let tx = conn
769            .unchecked_transaction()
770            .expect("transaction should begin");
771        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
772        assert_eq!(
773            api::get(&tx, "ext_id", json!(null))?,
774            json!({"foo": "local"})
775        );
776        // data should exist locally with a change recorded.
777        assert_eq!(
778            get_local_item(&tx),
779            Some(LocalItem {
780                data: DataState::Exists(map!({"foo": "local"})),
781                sync_change_counter: 1
782            })
783        );
784        let changes = changes![change!("foo", "local", "remote")];
785        do_apply_action(
786            &tx,
787            IncomingAction::TakeRemote {
788                ext_id: "ext_id".to_string(),
789                data: map!({"foo": "remote"}),
790                changes: changes.clone(),
791            },
792        );
793        // data should exist locally with the remote data and not be dirty.
794        assert_eq!(
795            get_local_item(&tx),
796            Some(LocalItem {
797                data: DataState::Exists(map!({"foo": "remote"})),
798                sync_change_counter: 0
799            })
800        );
801        assert_eq!(get_applied_item_changes(&tx), Some(changes));
802        tx.rollback()?;
803
804        // Merge - like ::TakeRemote, but data remains dirty.
805        let tx = conn
806            .unchecked_transaction()
807            .expect("transaction should begin");
808        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
809        assert_eq!(
810            api::get(&tx, "ext_id", json!(null))?,
811            json!({"foo": "local"})
812        );
813        // data should exist locally with a change recorded.
814        assert_eq!(
815            get_local_item(&tx),
816            Some(LocalItem {
817                data: DataState::Exists(map!({"foo": "local"})),
818                sync_change_counter: 1
819            })
820        );
821        let changes = changes![change!("foo", "local", "remote")];
822        do_apply_action(
823            &tx,
824            IncomingAction::Merge {
825                ext_id: "ext_id".to_string(),
826                data: map!({"foo": "remote"}),
827                changes: changes.clone(),
828            },
829        );
830        assert_eq!(
831            get_local_item(&tx),
832            Some(LocalItem {
833                data: DataState::Exists(map!({"foo": "remote"})),
834                sync_change_counter: 2
835            })
836        );
837        assert_eq!(get_applied_item_changes(&tx), Some(changes));
838        tx.rollback()?;
839
840        // Same - data stays the same but is marked not dirty.
841        let tx = conn
842            .unchecked_transaction()
843            .expect("transaction should begin");
844        api::set(&tx, "ext_id", json!({"foo": "local"}))?;
845        assert_eq!(
846            api::get(&tx, "ext_id", json!(null))?,
847            json!({"foo": "local"})
848        );
849        // data should exist locally with a change recorded.
850        assert_eq!(
851            get_local_item(&tx),
852            Some(LocalItem {
853                data: DataState::Exists(map!({"foo": "local"})),
854                sync_change_counter: 1
855            })
856        );
857        do_apply_action(
858            &tx,
859            IncomingAction::Same {
860                ext_id: "ext_id".to_string(),
861            },
862        );
863        assert_eq!(
864            get_local_item(&tx),
865            Some(LocalItem {
866                data: DataState::Exists(map!({"foo": "local"})),
867                sync_change_counter: 0
868            })
869        );
870        assert_eq!(get_applied_item_changes(&tx), None);
871        tx.rollback()?;
872
873        Ok(())
874    }
875}