webext_storage/sync/
outgoing.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5// The "outgoing" part of syncing - building the payloads to upload and
6// managing the sync state of the local DB.
7
8use interrupt_support::Interruptee;
9use rusqlite::{Connection, Row, Transaction};
10use sql_support::ConnExt;
11use sync15::bso::OutgoingBso;
12use sync_guid::Guid as SyncGuid;
13
14use crate::error::*;
15
16use super::WebextRecord;
17
18fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> {
19    let guid: SyncGuid = row.get("guid")?;
20    let ext_id: String = row.get("ext_id")?;
21    let raw_data: Option<String> = row.get("data")?;
22    Ok(match raw_data {
23        Some(raw_data) => {
24            let record = WebextRecord {
25                guid,
26                ext_id,
27                data: raw_data,
28            };
29            OutgoingBso::from_content_with_id(record)?
30        }
31        None => OutgoingBso::new_tombstone(guid.into()),
32    })
33}
34
35/// Stages info about what should be uploaded in a temp table. This should be
36/// called in the same transaction as `apply_actions`. record_uploaded() can be
37/// called after the upload is complete and the data in the temp table will be
38/// used to update the local store.
39pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> {
40    let sql = "
41        -- Stage outgoing items. The item may not yet have a GUID (ie, it might
42        -- not already be in either the mirror nor the incoming staging table),
43        -- so we generate one if it doesn't exist.
44        INSERT INTO storage_sync_outgoing_staging
45        (guid, ext_id, data, sync_change_counter)
46        SELECT coalesce(m.guid, s.guid, generate_guid()),
47        l.ext_id, l.data, l.sync_change_counter
48        FROM storage_sync_data l
49        -- left joins as one or both may not exist.
50        LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id
51        LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id
52        WHERE sync_change_counter > 0;
53
54        -- At this point, we've merged in all new records, so copy incoming
55        -- staging into the mirror so that it matches what's on the server.
56        INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data)
57        SELECT guid, ext_id, data FROM temp.storage_sync_staging;
58
59        -- And copy any incoming records that we aren't reuploading into the
60        -- local table. We'll copy the outgoing ones into the mirror and local
61        -- after we upload them.
62        INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter)
63        SELECT ext_id, data, 0
64        FROM storage_sync_staging s
65        WHERE ext_id IS NOT NULL
66        AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o
67                       WHERE o.guid = s.guid);";
68    tx.execute_batch(sql)?;
69    Ok(())
70}
71
72/// Returns a vec of the outgoing records which should be uploaded.
73pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> {
74    let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging";
75    let elts = conn
76        .conn()
77        .query_rows_and_then(sql, [], |row| -> Result<_> {
78            signal.err_if_interrupted()?;
79            outgoing_from_row(row)
80        })?;
81
82    debug!("get_outgoing found {} items", elts.len());
83    Ok(elts.into_iter().collect())
84}
85
86/// Record the fact that items were uploaded. This updates the state of the
87/// local DB to reflect the state of the server we just updated.
88/// Note that this call is almost certainly going to be made in a *different*
89/// transaction than the transaction used in `stage_outgoing()`, and it will
90/// be called once per batch upload.
91pub fn record_uploaded(
92    tx: &Transaction<'_>,
93    items: &[SyncGuid],
94    signal: &dyn Interruptee,
95) -> Result<()> {
96    debug!(
97        "record_uploaded recording that {} items were uploaded",
98        items.len()
99    );
100
101    // Updating the `was_uploaded` column fires the `record_uploaded` trigger,
102    // which updates the local change counter and writes the uploaded record
103    // data back to the mirror.
104    sql_support::each_chunk(items, |chunk, _| -> Result<()> {
105        signal.err_if_interrupted()?;
106        let sql = format!(
107            "UPDATE storage_sync_outgoing_staging SET
108                 was_uploaded = 1
109             WHERE guid IN ({})",
110            sql_support::repeat_sql_vars(chunk.len()),
111        );
112        tx.execute(&sql, rusqlite::params_from_iter(chunk))?;
113        Ok(())
114    })?;
115
116    Ok(())
117}
118
119#[cfg(test)]
120mod tests {
121    use super::super::test::new_syncable_mem_db;
122    use super::*;
123    use interrupt_support::NeverInterrupts;
124
125    #[test]
126    fn test_simple() -> Result<()> {
127        let db = new_syncable_mem_db();
128        let conn = db.get_connection()?;
129        let tx = conn.unchecked_transaction()?;
130
131        tx.execute_batch(
132            r#"
133            INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
134            VALUES
135                ('ext_no_changes', '{"foo":"bar"}', 0),
136                ('ext_with_changes', '{"foo":"bar"}', 1);
137        "#,
138        )?;
139
140        stage_outgoing(&tx)?;
141        let changes = get_outgoing(&tx, &NeverInterrupts)?;
142        assert_eq!(changes.len(), 1);
143        let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap();
144        let ext_id = record.get("extId").unwrap().as_str().unwrap();
145
146        assert_eq!(ext_id, "ext_with_changes");
147
148        record_uploaded(
149            &tx,
150            changes
151                .into_iter()
152                .map(|p| p.envelope.id)
153                .collect::<Vec<SyncGuid>>()
154                .as_slice(),
155            &NeverInterrupts,
156        )?;
157
158        let counter: i32 = tx.conn().query_one(
159            "SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'",
160        )?;
161        assert_eq!(counter, 0);
162        Ok(())
163    }
164
165    #[test]
166    fn test_payload_serialization() {
167        let record = WebextRecord {
168            guid: SyncGuid::new("guid"),
169            ext_id: "ext-id".to_string(),
170            data: "{}".to_string(),
171        };
172
173        let outgoing = OutgoingBso::from_content_with_id(record).unwrap();
174
175        // The envelope should have our ID.
176        assert_eq!(outgoing.envelope.id, "guid");
177
178        let outgoing_payload =
179            serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap();
180        let outgoing_map = outgoing_payload.as_object().unwrap();
181
182        assert!(outgoing_map.contains_key("id"));
183        assert!(outgoing_map.contains_key("data"));
184        assert!(outgoing_map.contains_key("extId"));
185        assert_eq!(outgoing_map.len(), 3);
186    }
187}