1use crate::error::*;
6use rusqlite::{Connection, Transaction};
7use serde::{ser::SerializeMap, Serialize, Serializer};
8
9use serde_json::{Map, Value as JsonValue};
10use sql_support::{self, ConnExt};
11
12pub const SYNC_QUOTA_BYTES: usize = 102_400;
19pub const SYNC_QUOTA_BYTES_PER_ITEM: usize = 8_192;
20pub const SYNC_MAX_ITEMS: usize = 512;
21type JsonMap = Map<String, JsonValue>;
25
26enum StorageChangeOp {
27 Clear,
28 Set(JsonValue),
29 SetWithoutQuota(JsonValue),
30}
31
32fn get_from_db(conn: &Connection, ext_id: &str) -> Result<Option<JsonMap>> {
33 Ok(
34 match conn.try_query_one::<String, _>(
35 "SELECT data FROM storage_sync_data
36 WHERE ext_id = :ext_id",
37 &[(":ext_id", &ext_id)],
38 true,
39 )? {
40 Some(s) => match serde_json::from_str(&s)? {
41 JsonValue::Object(m) => Some(m),
42 _ => None,
45 },
46 None => None,
47 },
48 )
49}
50
51fn save_to_db(tx: &Transaction<'_>, ext_id: &str, val: &StorageChangeOp) -> Result<()> {
52 let is_delete = match val {
56 StorageChangeOp::Clear => true,
57 StorageChangeOp::Set(JsonValue::Object(v)) => v.is_empty(),
58 StorageChangeOp::SetWithoutQuota(JsonValue::Object(v)) => v.is_empty(),
59 _ => false,
60 };
61 if is_delete {
62 let in_mirror = tx
63 .try_query_one(
64 "SELECT EXISTS(SELECT 1 FROM storage_sync_mirror WHERE ext_id = :ext_id);",
65 rusqlite::named_params! {
66 ":ext_id": ext_id,
67 },
68 true,
69 )?
70 .unwrap_or_default();
71 if in_mirror {
72 trace!("saving data for '{}': leaving a tombstone", ext_id);
73 tx.execute_cached(
74 "
75 INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
76 VALUES (:ext_id, NULL, 1)
77 ON CONFLICT (ext_id) DO UPDATE
78 SET data = NULL, sync_change_counter = sync_change_counter + 1",
79 rusqlite::named_params! {
80 ":ext_id": ext_id,
81 },
82 )?;
83 } else {
84 trace!("saving data for '{}': removing the row", ext_id);
85 tx.execute_cached(
86 "
87 DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
88 rusqlite::named_params! {
89 ":ext_id": ext_id,
90 },
91 )?;
92 }
93 } else {
94 let sval = match val {
96 StorageChangeOp::Set(v) => {
97 let sv = v.to_string();
98 if sv.len() > SYNC_QUOTA_BYTES {
99 return Err(Error::QuotaError(QuotaReason::TotalBytes));
100 }
101 sv
102 }
103 StorageChangeOp::SetWithoutQuota(v) => v.to_string(),
104 StorageChangeOp::Clear => unreachable!(),
105 };
106
107 trace!("saving data for '{}': writing", ext_id);
108 tx.execute_cached(
109 "INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
110 VALUES (:ext_id, :data, 1)
111 ON CONFLICT (ext_id) DO UPDATE
112 set data=:data, sync_change_counter = sync_change_counter + 1",
113 rusqlite::named_params! {
114 ":ext_id": ext_id,
115 ":data": &sval,
116 },
117 )?;
118 }
119 Ok(())
120}
121
122fn remove_from_db(tx: &Transaction<'_>, ext_id: &str) -> Result<()> {
123 save_to_db(tx, ext_id, &StorageChangeOp::Clear)
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
130#[serde(rename_all = "camelCase")]
131pub struct StorageValueChange {
132 #[serde(skip_serializing)]
133 pub key: String,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub old_value: Option<JsonValue>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub new_value: Option<JsonValue>,
138}
139
140#[derive(Debug, Default, Clone, PartialEq, Eq)]
144pub struct StorageChanges {
145 pub changes: Vec<StorageValueChange>,
146}
147
148impl StorageChanges {
149 pub fn new() -> Self {
150 Self::default()
151 }
152
153 pub fn with_capacity(n: usize) -> Self {
154 Self {
155 changes: Vec::with_capacity(n),
156 }
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.changes.is_empty()
161 }
162
163 pub fn push(&mut self, change: StorageValueChange) {
164 self.changes.push(change)
165 }
166}
167
168impl Serialize for StorageChanges {
170 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
171 where
172 S: Serializer,
173 {
174 let mut map = serializer.serialize_map(Some(self.changes.len()))?;
175 for change in &self.changes {
176 map.serialize_entry(&change.key, change)?;
177 }
178 map.end()
179 }
180}
181
182pub fn get_quota_size_of(key: &str, v: &JsonValue) -> usize {
185 key.len() + v.to_string().len()
188}
189
190pub fn set(tx: &Transaction<'_>, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
194 let val_map = match val {
195 JsonValue::Object(m) => m,
196 _ => Map::new(),
198 };
199
200 let mut current = get_from_db(tx, ext_id)?.unwrap_or_default();
201
202 let mut changes = StorageChanges::with_capacity(val_map.len());
203
204 for (k, v) in val_map.into_iter() {
206 let old_value = current.remove(&k);
207 if current.len() >= SYNC_MAX_ITEMS {
208 return Err(Error::QuotaError(QuotaReason::MaxItems));
209 }
210 if get_quota_size_of(&k, &v) > SYNC_QUOTA_BYTES_PER_ITEM {
213 return Err(Error::QuotaError(QuotaReason::ItemBytes));
214 }
215 let change = StorageValueChange {
216 key: k.clone(),
217 old_value,
218 new_value: Some(v.clone()),
219 };
220 changes.push(change);
221 current.insert(k, v);
222 }
223
224 save_to_db(
225 tx,
226 ext_id,
227 &StorageChangeOp::Set(JsonValue::Object(current)),
228 )?;
229 Ok(changes)
230}
231
232fn get_keys_helper(keys: JsonValue) -> Vec<(String, Option<JsonValue>)> {
236 match keys {
237 JsonValue::String(s) => vec![(s, None)],
238 JsonValue::Array(keys) => {
239 keys.iter()
242 .filter_map(|v| v.as_str().map(|s| (s.to_string(), None)))
243 .collect()
244 }
245 JsonValue::Object(m) => m.into_iter().map(|(k, d)| (k, Some(d))).collect(),
246 _ => vec![],
247 }
248}
249
250pub fn get(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
253 let maybe_existing = get_from_db(conn, ext_id)?;
255 let mut existing = match (maybe_existing, keys.is_object()) {
256 (None, true) => return Ok(keys),
257 (None, false) => return Ok(JsonValue::Object(Map::new())),
258 (Some(v), _) => v,
259 };
260 if keys.is_null() {
262 return Ok(JsonValue::Object(existing));
263 }
264 let keys_and_defaults = get_keys_helper(keys);
266 let mut result = Map::with_capacity(keys_and_defaults.len());
267 for (key, maybe_default) in keys_and_defaults {
268 if let Some(v) = existing.remove(&key) {
269 result.insert(key, v);
270 } else if let Some(def) = maybe_default {
271 result.insert(key, def);
272 }
273 }
276 Ok(JsonValue::Object(result))
277}
278
279pub fn get_keys(conn: &Connection, ext_id: &str) -> Result<JsonValue> {
282 let maybe_existing = get_from_db(conn, ext_id)?;
283 let existing = match maybe_existing {
284 None => return Ok(JsonValue::Array(vec![])),
285 Some(v) => v,
286 };
287 Ok(JsonValue::Array(
288 existing.keys().map(|k| k.to_string().into()).collect(),
289 ))
290}
291
292pub fn remove(tx: &Transaction<'_>, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
296 let mut existing = match get_from_db(tx, ext_id)? {
297 None => return Ok(StorageChanges::new()),
298 Some(v) => v,
299 };
300
301 let keys_and_defs = get_keys_helper(keys);
304
305 let mut result = StorageChanges::with_capacity(keys_and_defs.len());
306 for (key, _) in keys_and_defs {
307 if let Some(v) = existing.remove(&key) {
308 result.push(StorageValueChange {
309 key,
310 old_value: Some(v),
311 new_value: None,
312 });
313 }
314 }
315 if !result.is_empty() {
316 save_to_db(
317 tx,
318 ext_id,
319 &StorageChangeOp::SetWithoutQuota(JsonValue::Object(existing)),
320 )?;
321 }
322 Ok(result)
323}
324
325pub fn clear(tx: &Transaction<'_>, ext_id: &str) -> Result<StorageChanges> {
329 let existing = match get_from_db(tx, ext_id)? {
330 None => return Ok(StorageChanges::new()),
331 Some(v) => v,
332 };
333 let mut result = StorageChanges::with_capacity(existing.len());
334 for (key, val) in existing.into_iter() {
335 result.push(StorageValueChange {
336 key: key.to_string(),
337 new_value: None,
338 old_value: Some(val),
339 });
340 }
341 remove_from_db(tx, ext_id)?;
342 Ok(result)
343}
344
345pub fn get_bytes_in_use(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<usize> {
347 let maybe_existing = get_from_db(conn, ext_id)?;
348 let existing = match maybe_existing {
349 None => return Ok(0),
350 Some(v) => v,
351 };
352 let keys: Vec<&str> = match &keys {
354 JsonValue::Null => existing.keys().map(|v| v.as_str()).collect(),
355 JsonValue::String(name) => vec![name.as_str()],
356 JsonValue::Array(names) => names.iter().filter_map(|v| v.as_str()).collect(),
357 _ => return Ok(0),
359 };
360 let mut size = 0;
362 for key in keys.into_iter() {
363 if let Some(v) = existing.get(key) {
364 size += get_quota_size_of(key, v);
365 }
366 }
367 Ok(size)
368}
369
370#[derive(Debug, Clone, PartialEq, Eq)]
372pub struct UsageInfo {
373 pub ext_id: String,
375 pub num_keys: usize,
377 pub num_bytes: usize,
383}
384
385pub fn usage(db: &Connection) -> Result<Vec<UsageInfo>> {
388 type JsonObject = Map<String, JsonValue>;
389 let sql = "
390 SELECT ext_id, data
391 FROM storage_sync_data
392 WHERE data IS NOT NULL
393 -- for tests and determinism
394 ORDER BY ext_id
395 ";
396 db.query_rows_into(sql, [], |row| {
397 let ext_id: String = row.get("ext_id")?;
398 let data: String = row.get("data")?;
399 let num_bytes = data.len();
400 let num_keys = serde_json::from_str::<JsonObject>(&data)?.len();
401 Ok(UsageInfo {
402 ext_id,
403 num_keys,
404 num_bytes,
405 })
406 })
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use crate::db::test::new_mem_db;
413 use serde_json::json;
414
415 #[test]
416 fn test_serialize_storage_changes() -> Result<()> {
417 let c = StorageChanges {
418 changes: vec![StorageValueChange {
419 key: "key".to_string(),
420 old_value: Some(json!("old")),
421 new_value: None,
422 }],
423 };
424 assert_eq!(serde_json::to_string(&c)?, r#"{"key":{"oldValue":"old"}}"#);
425 let c = StorageChanges {
426 changes: vec![StorageValueChange {
427 key: "key".to_string(),
428 old_value: None,
429 new_value: Some(json!({"foo": "bar"})),
430 }],
431 };
432 assert_eq!(
433 serde_json::to_string(&c)?,
434 r#"{"key":{"newValue":{"foo":"bar"}}}"#
435 );
436 Ok(())
437 }
438
439 fn make_changes(changes: &[(&str, Option<JsonValue>, Option<JsonValue>)]) -> StorageChanges {
440 let mut r = StorageChanges::with_capacity(changes.len());
441 for (name, old_value, new_value) in changes {
442 r.push(StorageValueChange {
443 key: (*name).to_string(),
444 old_value: old_value.clone(),
445 new_value: new_value.clone(),
446 });
447 }
448 r
449 }
450
451 #[test]
452 fn test_simple() -> Result<()> {
453 let ext_id = "x";
454 let db = new_mem_db();
455 let conn = db.get_connection().expect("should retrieve connection");
456 let tx = conn.unchecked_transaction()?;
457
458 for q in vec![JsonValue::Null, json!("foo"), json!(["foo"])].into_iter() {
460 assert_eq!(get(&tx, ext_id, q)?, json!({}));
461 }
462
463 for q in vec![json!({ "foo": null }), json!({"foo": "default"})].into_iter() {
465 assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
466 }
467
468 set(&tx, ext_id, json!({"foo": "bar" }))?;
470 for q in vec![
471 JsonValue::Null,
472 json!("foo"),
473 json!(["foo"]),
474 json!({ "foo": null }),
475 json!({"foo": "default"}),
476 ]
477 .into_iter()
478 {
479 assert_eq!(get(&tx, ext_id, q)?, json!({"foo": "bar" }));
480 }
481
482 for q in vec![
484 json!({ "non_existing_key": null }),
485 json!({"non_existing_key": 0}),
486 json!({"non_existing_key": false}),
487 json!({"non_existing_key": "default"}),
488 json!({"non_existing_key": ["array"]}),
489 json!({"non_existing_key": {"objectkey": "value"}}),
490 ]
491 .into_iter()
492 {
493 assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
494 }
495
496 assert_eq!(
498 set(&tx, ext_id, json!({"foo": "new", "other": "also new" }))?,
499 make_changes(&[
500 ("foo", Some(json!("bar")), Some(json!("new"))),
501 ("other", None, Some(json!("also new")))
502 ])
503 );
504 assert_eq!(
505 get(&tx, ext_id, JsonValue::Null)?,
506 json!({"foo": "new", "other": "also new"})
507 );
508 assert_eq!(get(&tx, ext_id, json!("foo"))?, json!({"foo": "new"}));
509 assert_eq!(
510 get(&tx, ext_id, json!(["foo", "other"]))?,
511 json!({"foo": "new", "other": "also new"})
512 );
513 assert_eq!(
514 get(&tx, ext_id, json!({"foo": null, "default": "yo"}))?,
515 json!({"foo": "new", "default": "yo"})
516 );
517
518 assert_eq!(
519 remove(&tx, ext_id, json!("foo"))?,
520 make_changes(&[("foo", Some(json!("new")), None)]),
521 );
522
523 assert_eq!(
524 set(&tx, ext_id, json!({"foo": {"sub-object": "sub-value"}}))?,
525 make_changes(&[("foo", None, Some(json!({"sub-object": "sub-value"}))),])
526 );
527
528 assert_eq!(
531 clear(&tx, ext_id)?,
532 make_changes(&[
533 ("other", Some(json!("also new")), None),
534 ("foo", Some(json!({"sub-object": "sub-value"})), None),
535 ]),
536 );
537 assert_eq!(get(&tx, ext_id, JsonValue::Null)?, json!({}));
538
539 Ok(())
540 }
541
542 #[test]
543 fn test_check_get_impl() -> Result<()> {
544 let ext_id = "x";
546 let db = new_mem_db();
547 let conn = db.get_connection().expect("should retrieve connection");
548 let tx = conn.unchecked_transaction()?;
549
550 let prop = "test-prop";
551 let value = "test-value";
552
553 set(&tx, ext_id, json!({ prop: value }))?;
554
555 let mut data = get(&tx, ext_id, json!(null))?;
557 assert_eq!(value, json!(data[prop]), "null getter worked for {}", prop);
558
559 data = get(&tx, ext_id, json!(prop))?;
560 assert_eq!(
561 value,
562 json!(data[prop]),
563 "string getter worked for {}",
564 prop
565 );
566 assert_eq!(
567 data.as_object().unwrap().len(),
568 1,
569 "string getter should return an object with a single property"
570 );
571
572 data = get(&tx, ext_id, json!([prop]))?;
573 assert_eq!(value, json!(data[prop]), "array getter worked for {}", prop);
574 assert_eq!(
575 data.as_object().unwrap().len(),
576 1,
577 "array getter with a single key should return an object with a single property"
578 );
579
580 data = get(&tx, ext_id, json!({ prop: null }))?;
583 assert_eq!(
584 value,
585 json!(data[prop]),
586 "object getter worked for {}",
587 prop
588 );
589 assert_eq!(
590 data.as_object().unwrap().len(),
591 1,
592 "object getter with a single key should return an object with a single property"
593 );
594
595 Ok(())
596 }
597
598 #[test]
599 fn test_check_get_keys_impl() -> Result<()> {
600 let ext_id = "getKeysExtId";
601 let db = new_mem_db();
602 let conn = db.get_connection().expect("should retrieve connection");
603 let tx = conn.unchecked_transaction()?;
604 assert_eq!(
605 get_keys(&tx, ext_id)?,
606 json!([]),
607 "get_keys should return an empty array when storage is uninitialized"
608 );
609 set(&tx, ext_id, json!({"foo": "bar", "baz": "qux" }))?;
610 let data = get_keys(&tx, ext_id)?;
611 assert_eq!(
612 data,
613 json!(vec!["foo", "baz"]),
614 "get_keys should return all keys"
615 );
616 Ok(())
617 }
618
619 #[test]
620 fn test_bug_1621162() -> Result<()> {
621 let db = new_mem_db();
624 let conn = db.get_connection().expect("should retrieve connection");
625 let tx = conn.unchecked_transaction()?;
626 let ext_id = "xyz";
627
628 set(&tx, ext_id, json!({"foo": "bar" }))?;
629
630 assert_eq!(
631 set(&tx, ext_id, json!({"foo": "bar" }))?,
632 make_changes(&[("foo", Some(json!("bar")), Some(json!("bar")))]),
633 );
634 Ok(())
635 }
636
637 #[test]
638 fn test_quota_maxitems() -> Result<()> {
639 let db = new_mem_db();
640 let conn = db.get_connection().expect("should retrieve connection");
641 let tx = conn.unchecked_transaction()?;
642 let ext_id = "xyz";
643 for i in 1..SYNC_MAX_ITEMS + 1 {
644 set(
645 &tx,
646 ext_id,
647 json!({ format!("key-{}", i): format!("value-{}", i) }),
648 )?;
649 }
650 let e = set(&tx, ext_id, json!({"another": "another"})).unwrap_err();
651 match e {
652 Error::QuotaError(QuotaReason::MaxItems) => {}
653 _ => panic!("unexpected error type"),
654 };
655 Ok(())
656 }
657
658 #[test]
659 fn test_quota_bytesperitem() -> Result<()> {
660 let db = new_mem_db();
661 let conn = db.get_connection().expect("should retrieve connection");
662 let tx = conn.unchecked_transaction()?;
663 let ext_id = "xyz";
664 let val = "x".repeat(SYNC_QUOTA_BYTES_PER_ITEM - 5);
668
669 set(&tx, ext_id, json!({ "x": val }))?;
671 assert_eq!(
672 get_bytes_in_use(&tx, ext_id, json!("x"))?,
673 SYNC_QUOTA_BYTES_PER_ITEM - 2
674 );
675
676 let e = set(&tx, ext_id, json!({ "xxxx": val })).unwrap_err();
678 match e {
679 Error::QuotaError(QuotaReason::ItemBytes) => {}
680 _ => panic!("unexpected error type"),
681 };
682 Ok(())
683 }
684
685 #[test]
686 fn test_quota_bytes() -> Result<()> {
687 let db = new_mem_db();
688 let conn = db.get_connection().expect("should retrieve connection");
689 let tx = conn.unchecked_transaction()?;
690 let ext_id = "xyz";
691 let val = "x".repeat(SYNC_QUOTA_BYTES + 1);
692
693 save_to_db(
695 &tx,
696 ext_id,
697 &StorageChangeOp::SetWithoutQuota(json!({ "x": val })),
698 )?;
699
700 let e = set(&tx, ext_id, json!({ "y": "newvalue" })).unwrap_err();
702 match e {
703 Error::QuotaError(QuotaReason::TotalBytes) => {}
704 _ => panic!("unexpected error type"),
705 };
706
707 remove(&tx, ext_id, json!["x"])?;
709
710 save_to_db(
712 &tx,
713 ext_id,
714 &StorageChangeOp::SetWithoutQuota(json!({ "y": val })),
715 )?;
716
717 set(&tx, ext_id, json!({ "y": "lessdata" }))?;
719
720 Ok(())
721 }
722
723 #[test]
724 fn test_get_bytes_in_use() -> Result<()> {
725 let db = new_mem_db();
726 let conn = db.get_connection().expect("should retrieve connection");
727 let tx = conn.unchecked_transaction()?;
728 let ext_id = "xyz";
729
730 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 0);
731
732 set(&tx, ext_id, json!({ "a": "a" }))?; set(&tx, ext_id, json!({ "b": "bb" }))?; set(&tx, ext_id, json!({ "c": "ccc" }))?; set(&tx, ext_id, json!({ "n": 999_999 }))?; assert_eq!(get_bytes_in_use(&tx, ext_id, json!("x"))?, 0);
738 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("a"))?, 4);
739 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("b"))?, 5);
740 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("c"))?, 6);
741 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("n"))?, 7);
742
743 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a"]))?, 4);
744 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "x"]))?, 4);
745 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "b"]))?, 9);
746 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "c"]))?, 10);
747
748 assert_eq!(
749 get_bytes_in_use(&tx, ext_id, json!(["a", "b", "c", "n"]))?,
750 22
751 );
752 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 22);
753 Ok(())
754 }
755
756 #[test]
757 fn test_usage() {
758 let db = new_mem_db();
759 let conn = db.get_connection().expect("should retrieve connection");
760 let tx = conn.unchecked_transaction().unwrap();
761 set(&tx, "xyz", json!({ "a": "a" })).unwrap();
763 set(&tx, "xyz", json!({ "b": "bb" })).unwrap();
764 set(&tx, "xyz", json!({ "c": "ccc" })).unwrap();
765 set(&tx, "xyz", json!({ "n": 999_999 })).unwrap();
766
767 set(&tx, "abc", json!({ "a": "a" })).unwrap();
769
770 tx.commit().unwrap();
771
772 let usage = usage(conn).unwrap();
773 let expect = [
774 UsageInfo {
775 ext_id: "abc".to_string(),
776 num_keys: 1,
777 num_bytes: 9,
778 },
779 UsageInfo {
780 ext_id: "xyz".to_string(),
781 num_keys: 4,
782 num_bytes: 39,
783 },
784 ];
785 assert_eq!(&usage, &expect);
786 }
787}