1use crate::db::models::address::InternalAddress;
7use crate::db::schema::ADDRESS_COMMON_COLS;
8use crate::error::*;
9use crate::sync::{address::AddressPayload, common::*};
10use crate::sync::{OutgoingBso, ProcessOutgoingRecordImpl};
11use rusqlite::{Row, Transaction};
12use sync_guid::Guid as SyncGuid;
13
14const DATA_TABLE_NAME: &str = "addresses_data";
15const MIRROR_TABLE_NAME: &str = "addresses_mirror";
16const STAGING_TABLE_NAME: &str = "addresses_sync_outgoing_staging";
17
18pub(super) struct OutgoingAddressesImpl {}
19
20impl ProcessOutgoingRecordImpl for OutgoingAddressesImpl {
21 type Record = InternalAddress;
22
23 fn fetch_outgoing_records(&self, tx: &Transaction<'_>) -> anyhow::Result<Vec<OutgoingBso>> {
26 let data_sql = format!(
29 "SELECT
30 l.{common_cols},
31 m.payload,
32 l.sync_change_counter
33 FROM addresses_data l
34 LEFT JOIN addresses_mirror m
35 ON l.guid = m.guid
36 WHERE sync_change_counter > 0
37 OR l.guid NOT IN (
38 SELECT m.guid
39 FROM addresses_mirror m
40 )",
41 common_cols = ADDRESS_COMMON_COLS,
42 );
43 let record_from_data_row: &dyn Fn(&Row<'_>) -> Result<(OutgoingBso, i64)> = &|row| {
44 let mut record = InternalAddress::from_row(row)?.into_payload()?;
45 if let Some(s) = row.get::<_, Option<String>>("payload")? {
48 let mirror_payload: AddressPayload = serde_json::from_str(&s)?;
49 record.entry.unknown_fields = mirror_payload.entry.unknown_fields;
50 };
51
52 Ok((
53 OutgoingBso::from_content_with_id(record)?,
54 row.get::<_, i64>("sync_change_counter")?,
55 ))
56 };
57
58 let tombstones_sql = "SELECT guid FROM addresses_tombstones";
59
60 let staging_records = common_get_outgoing_staging_records(
65 tx,
66 &data_sql,
67 tombstones_sql,
68 record_from_data_row,
69 )?
70 .into_iter()
71 .map(|(bso, change_counter)| (bso.envelope.id, bso.payload, change_counter))
72 .collect::<Vec<_>>();
73 common_save_outgoing_records(tx, STAGING_TABLE_NAME, staging_records)?;
74
75 Ok(
77 common_get_outgoing_records(tx, &data_sql, tombstones_sql, record_from_data_row)?
78 .into_iter()
79 .map(|(bso, _change_counter)| bso)
80 .collect::<Vec<OutgoingBso>>(),
81 )
82 }
83
84 fn finish_synced_items(
85 &self,
86 tx: &Transaction<'_>,
87 records_synced: Vec<SyncGuid>,
88 ) -> anyhow::Result<()> {
89 common_finish_synced_items(
90 tx,
91 DATA_TABLE_NAME,
92 MIRROR_TABLE_NAME,
93 STAGING_TABLE_NAME,
94 records_synced,
95 )?;
96 Ok(())
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103 use crate::db::{addresses::add_internal_address, models::address::InternalAddress};
104 use crate::sync::{common::tests::*, test::new_syncable_mem_db, UnknownFields};
105 use rusqlite::Connection;
106 use serde_json::{json, Map, Value};
107 use types::Timestamp;
108
109 fn test_insert_mirror_record(
110 conn: &Connection,
111 address: InternalAddress,
112 unknown_fields: UnknownFields,
113 ) {
114 let guid = address.guid.clone();
116 let mut addr_payload = address.into_payload().unwrap();
117 addr_payload.entry.unknown_fields = unknown_fields;
118 let payload = serde_json::to_string(&addr_payload).expect("is json");
119 conn.execute(
120 "INSERT OR IGNORE INTO addresses_mirror (guid, payload)
121 VALUES (:guid, :payload)",
122 rusqlite::named_params! {
123 ":guid": guid,
124 ":payload": &payload,
125 },
126 )
127 .expect("should insert");
128 }
129
130 lazy_static::lazy_static! {
131 static ref TEST_JSON_RECORDS: Map<String, Value> = {
132 let val = json! {{
135 "C" : {
136 "id": expand_test_guid('C'),
137 "entry": {
138 "name": "jane doe",
139 "streetAddress": "3050 South La Brea Ave",
140 "addressLevel2": "Los Angeles, CA",
141 "country": "United States",
142 "timeCreated": 0,
143 "timeLastUsed": 0,
144 "timeLastModified": 0,
145 "timesUsed": 0,
146 "version": 1,
147 }
148 },
149 "D" : {
150 "id": expand_test_guid('D'),
151 "entry": {
152 "name": "john doe",
153 "street-address": "85 Pike St",
154 "address-level2": "Seattle, WA",
155 "country": "United States",
156 "timeCreated": 0,
157 "timeLastUsed": 0,
158 "timeLastModified": 0,
159 "timesUsed": 0,
160 "version": 1,
161 "foo": "bar",
163 "baz": "qux",
164 }
165 }
166 }};
167 val.as_object().expect("literal is an object").clone()
168 };
169 }
170
171 fn test_json_record(guid_prefix: char) -> Value {
172 TEST_JSON_RECORDS
173 .get(&guid_prefix.to_string())
174 .expect("should exist")
175 .clone()
176 }
177
178 fn test_record(guid_prefix: char) -> InternalAddress {
179 let json = test_json_record(guid_prefix);
180 let payload = serde_json::from_value(json).unwrap();
181 InternalAddress::from_payload(payload).expect("should be valid")
182 }
183
184 #[test]
185 fn test_outgoing_never_synced() {
186 let mut db = new_syncable_mem_db();
187 let tx = db.transaction().expect("should get tx");
188 let ao = OutgoingAddressesImpl {};
189 let test_record = test_record('C');
190
191 assert!(add_internal_address(&tx, &test_record).is_ok());
193 do_test_outgoing_never_synced(
194 &tx,
195 &ao,
196 &test_record.guid,
197 DATA_TABLE_NAME,
198 MIRROR_TABLE_NAME,
199 STAGING_TABLE_NAME,
200 );
201 }
202
203 #[test]
204 fn test_outgoing_tombstone() {
205 let mut db = new_syncable_mem_db();
206 let tx = db.transaction().expect("should get tx");
207 let ao = OutgoingAddressesImpl {};
208 let test_record = test_record('C');
209
210 assert!(tx
212 .execute(
213 "INSERT INTO addresses_tombstones (
214 guid,
215 time_deleted
216 ) VALUES (
217 :guid,
218 :time_deleted
219 )",
220 rusqlite::named_params! {
221 ":guid": test_record.guid,
222 ":time_deleted": Timestamp::now(),
223 },
224 )
225 .is_ok());
226 do_test_outgoing_tombstone(
227 &tx,
228 &ao,
229 &test_record.guid,
230 DATA_TABLE_NAME,
231 MIRROR_TABLE_NAME,
232 STAGING_TABLE_NAME,
233 );
234 }
235
236 #[test]
237 fn test_outgoing_synced_with_local_change() {
238 let mut db = new_syncable_mem_db();
239 let tx = db.transaction().expect("should get tx");
240 let ao = OutgoingAddressesImpl {};
241
242 let mut test_record = test_record('C');
244 let initial_change_counter_val = 2;
245 test_record.metadata.sync_change_counter = initial_change_counter_val;
246 assert!(add_internal_address(&tx, &test_record).is_ok());
247 test_insert_mirror_record(&tx, test_record.clone(), Default::default());
248 exists_with_counter_value_in_table(
249 &tx,
250 DATA_TABLE_NAME,
251 &test_record.guid,
252 initial_change_counter_val,
253 );
254
255 do_test_outgoing_synced_with_local_change(
256 &tx,
257 &ao,
258 &test_record.guid,
259 DATA_TABLE_NAME,
260 MIRROR_TABLE_NAME,
261 STAGING_TABLE_NAME,
262 );
263 }
264
265 #[test]
266 fn test_outgoing_synced_with_no_change() {
267 let mut db = new_syncable_mem_db();
268 let tx = db.transaction().expect("should get tx");
269 let ao = OutgoingAddressesImpl {};
270
271 let test_record = test_record('C');
273 assert!(add_internal_address(&tx, &test_record).is_ok());
274 test_insert_mirror_record(&tx, test_record.clone(), Default::default());
275
276 do_test_outgoing_synced_with_no_change(
277 &tx,
278 &ao,
279 &test_record.guid,
280 DATA_TABLE_NAME,
281 STAGING_TABLE_NAME,
282 );
283 }
284
285 #[test]
286 fn test_outgoing_roundtrip_unknown() {
287 let mut db = new_syncable_mem_db();
288 let tx = db.transaction().expect("should get tx");
289 let ao = OutgoingAddressesImpl {};
290
291 let mut test_record = test_record('D');
293 let initial_change_counter_val = 2;
294 test_record.metadata.sync_change_counter = initial_change_counter_val;
295 assert!(add_internal_address(&tx, &test_record).is_ok());
296 let unknown_fields: UnknownFields =
298 serde_json::from_value(json! {{ "foo": "bar", "baz": "qux"}}).unwrap();
299 test_insert_mirror_record(&tx, test_record.clone(), unknown_fields);
300 exists_with_counter_value_in_table(
301 &tx,
302 DATA_TABLE_NAME,
303 &test_record.guid,
304 initial_change_counter_val,
305 );
306
307 let outgoing = &ao.fetch_outgoing_records(&tx).unwrap();
308 let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
310 let entry = bso_payload.get("entry").unwrap();
311 assert_eq!(entry.get("foo").unwrap(), "bar");
312 assert_eq!(entry.get("baz").unwrap(), "qux");
313 do_test_outgoing_synced_with_local_change(
314 &tx,
315 &ao,
316 &test_record.guid,
317 DATA_TABLE_NAME,
318 MIRROR_TABLE_NAME,
319 STAGING_TABLE_NAME,
320 );
321 }
322
323 #[test]
324 fn test_outgoing_with_migrated_fields() {
325 let mut db = new_syncable_mem_db();
326 let tx = db.transaction().expect("should get tx");
327 let ao = OutgoingAddressesImpl {};
328 let mut test_record = test_record('C');
329 let initial_change_counter_val = 2;
330 test_record.metadata.sync_change_counter = initial_change_counter_val;
331 assert!(add_internal_address(&tx, &test_record).is_ok());
332
333 let outgoing = ao.fetch_outgoing_records(&tx).unwrap();
334 let bso_payload: Map<String, Value> = serde_json::from_str(&outgoing[0].payload).unwrap();
336 let entry = bso_payload.get("entry").unwrap();
337 assert_eq!(entry.get("name").unwrap(), "jane doe");
338 assert_eq!(entry.get("given-name").unwrap(), "jane");
339 assert_eq!(entry.get("additional-name").unwrap(), "");
340 assert_eq!(entry.get("family-name").unwrap(), "doe");
341 }
342}