1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
45// The "outgoing" part of syncing - building the payloads to upload and
6// managing the sync state of the local DB.
78use interrupt_support::Interruptee;
9use rusqlite::{Connection, Row, Transaction};
10use sql_support::ConnExt;
11use sync15::bso::OutgoingBso;
12use sync_guid::Guid as SyncGuid;
1314use crate::error::*;
1516use super::WebextRecord;
1718fn outgoing_from_row(row: &Row<'_>) -> Result<OutgoingBso> {
19let guid: SyncGuid = row.get("guid")?;
20let ext_id: String = row.get("ext_id")?;
21let raw_data: Option<String> = row.get("data")?;
22Ok(match raw_data {
23Some(raw_data) => {
24let record = WebextRecord {
25 guid,
26 ext_id,
27 data: raw_data,
28 };
29 OutgoingBso::from_content_with_id(record)?
30}
31None => OutgoingBso::new_tombstone(guid.into()),
32 })
33}
3435/// Stages info about what should be uploaded in a temp table. This should be
36/// called in the same transaction as `apply_actions`. record_uploaded() can be
37/// called after the upload is complete and the data in the temp table will be
38/// used to update the local store.
39pub fn stage_outgoing(tx: &Transaction<'_>) -> Result<()> {
40let sql = "
41 -- Stage outgoing items. The item may not yet have a GUID (ie, it might
42 -- not already be in either the mirror nor the incoming staging table),
43 -- so we generate one if it doesn't exist.
44 INSERT INTO storage_sync_outgoing_staging
45 (guid, ext_id, data, sync_change_counter)
46 SELECT coalesce(m.guid, s.guid, generate_guid()),
47 l.ext_id, l.data, l.sync_change_counter
48 FROM storage_sync_data l
49 -- left joins as one or both may not exist.
50 LEFT JOIN storage_sync_mirror m ON m.ext_id = l.ext_id
51 LEFT JOIN storage_sync_staging s ON s.ext_id = l.ext_id
52 WHERE sync_change_counter > 0;
5354 -- At this point, we've merged in all new records, so copy incoming
55 -- staging into the mirror so that it matches what's on the server.
56 INSERT OR REPLACE INTO storage_sync_mirror (guid, ext_id, data)
57 SELECT guid, ext_id, data FROM temp.storage_sync_staging;
5859 -- And copy any incoming records that we aren't reuploading into the
60 -- local table. We'll copy the outgoing ones into the mirror and local
61 -- after we upload them.
62 INSERT OR REPLACE INTO storage_sync_data (ext_id, data, sync_change_counter)
63 SELECT ext_id, data, 0
64 FROM storage_sync_staging s
65 WHERE ext_id IS NOT NULL
66 AND NOT EXISTS(SELECT 1 FROM storage_sync_outgoing_staging o
67 WHERE o.guid = s.guid);";
68 tx.execute_batch(sql)?;
69Ok(())
70}
7172/// Returns a vec of the outgoing records which should be uploaded.
73pub fn get_outgoing(conn: &Connection, signal: &dyn Interruptee) -> Result<Vec<OutgoingBso>> {
74let sql = "SELECT guid, ext_id, data FROM storage_sync_outgoing_staging";
75let elts = conn
76 .conn()
77 .query_rows_and_then(sql, [], |row| -> Result<_> {
78 signal.err_if_interrupted()?;
79 outgoing_from_row(row)
80 })?;
8182debug!("get_outgoing found {} items", elts.len());
83Ok(elts.into_iter().collect())
84}
8586/// Record the fact that items were uploaded. This updates the state of the
87/// local DB to reflect the state of the server we just updated.
88/// Note that this call is almost certainly going to be made in a *different*
89/// transaction than the transaction used in `stage_outgoing()`, and it will
90/// be called once per batch upload.
91pub fn record_uploaded(
92 tx: &Transaction<'_>,
93 items: &[SyncGuid],
94 signal: &dyn Interruptee,
95) -> Result<()> {
96debug!(
97"record_uploaded recording that {} items were uploaded",
98 items.len()
99 );
100101// Updating the `was_uploaded` column fires the `record_uploaded` trigger,
102 // which updates the local change counter and writes the uploaded record
103 // data back to the mirror.
104sql_support::each_chunk(items, |chunk, _| -> Result<()> {
105 signal.err_if_interrupted()?;
106let sql = format!(
107"UPDATE storage_sync_outgoing_staging SET
108 was_uploaded = 1
109 WHERE guid IN ({})",
110 sql_support::repeat_sql_vars(chunk.len()),
111 );
112 tx.execute(&sql, rusqlite::params_from_iter(chunk))?;
113Ok(())
114 })?;
115116Ok(())
117}
118119#[cfg(test)]
120mod tests {
121use super::super::test::new_syncable_mem_db;
122use super::*;
123use interrupt_support::NeverInterrupts;
124125#[test]
126fn test_simple() -> Result<()> {
127let db = new_syncable_mem_db();
128let conn = db.get_connection()?;
129let tx = conn.unchecked_transaction()?;
130131 tx.execute_batch(
132r#"
133 INSERT INTO storage_sync_data (ext_id, data, sync_change_counter)
134 VALUES
135 ('ext_no_changes', '{"foo":"bar"}', 0),
136 ('ext_with_changes', '{"foo":"bar"}', 1);
137 "#,
138 )?;
139140 stage_outgoing(&tx)?;
141let changes = get_outgoing(&tx, &NeverInterrupts)?;
142assert_eq!(changes.len(), 1);
143let record: serde_json::Value = serde_json::from_str(&changes[0].payload).unwrap();
144let ext_id = record.get("extId").unwrap().as_str().unwrap();
145146assert_eq!(ext_id, "ext_with_changes");
147148 record_uploaded(
149&tx,
150 changes
151 .into_iter()
152 .map(|p| p.envelope.id)
153 .collect::<Vec<SyncGuid>>()
154 .as_slice(),
155&NeverInterrupts,
156 )?;
157158let counter: i32 = tx.conn().query_one(
159"SELECT sync_change_counter FROM storage_sync_data WHERE ext_id = 'ext_with_changes'",
160 )?;
161assert_eq!(counter, 0);
162Ok(())
163 }
164165#[test]
166fn test_payload_serialization() {
167let record = WebextRecord {
168 guid: SyncGuid::new("guid"),
169 ext_id: "ext-id".to_string(),
170 data: "{}".to_string(),
171 };
172173let outgoing = OutgoingBso::from_content_with_id(record).unwrap();
174175// The envelope should have our ID.
176assert_eq!(outgoing.envelope.id, "guid");
177178let outgoing_payload =
179 serde_json::from_str::<serde_json::Value>(&outgoing.payload).unwrap();
180let outgoing_map = outgoing_payload.as_object().unwrap();
181182assert!(outgoing_map.contains_key("id"));
183assert!(outgoing_map.contains_key("data"));
184assert!(outgoing_map.contains_key("extId"));
185assert_eq!(outgoing_map.len(), 3);
186 }
187}