1use crate::error::*;
6use rusqlite::{Connection, Transaction};
7use serde::{ser::SerializeMap, Serialize, Serializer};
8
9use serde_json::{Map, Value as JsonValue};
10use sql_support::{self, ConnExt};
11
12pub const SYNC_QUOTA_BYTES: usize = 102_400;
19pub const SYNC_QUOTA_BYTES_PER_ITEM: usize = 8_192;
20pub const SYNC_MAX_ITEMS: usize = 512;
21type JsonMap = Map<String, JsonValue>;
25
26enum StorageChangeOp {
27 Clear,
28 Set(JsonValue),
29 SetWithoutQuota(JsonValue),
30}
31
32fn get_from_db(conn: &Connection, ext_id: &str) -> Result<Option<JsonMap>> {
33 Ok(
34 match conn.try_query_one::<String, _>(
35 "SELECT data FROM storage_sync_data
36 WHERE ext_id = :ext_id",
37 &[(":ext_id", &ext_id)],
38 true,
39 )? {
40 Some(s) => match serde_json::from_str(&s)? {
41 JsonValue::Object(m) => Some(m),
42 _ => None,
45 },
46 None => None,
47 },
48 )
49}
50
51fn save_to_db(tx: &Transaction<'_>, ext_id: &str, val: &StorageChangeOp) -> Result<()> {
52 let is_delete = match val {
56 StorageChangeOp::Clear => true,
57 StorageChangeOp::Set(JsonValue::Object(v)) => v.is_empty(),
58 StorageChangeOp::SetWithoutQuota(JsonValue::Object(v)) => v.is_empty(),
59 _ => false,
60 };
61 if is_delete {
62 let in_mirror = tx
63 .try_query_one(
64 "SELECT EXISTS(SELECT 1 FROM storage_sync_mirror WHERE ext_id = :ext_id);",
65 rusqlite::named_params! {
66 ":ext_id": ext_id,
67 },
68 true,
69 )?
70 .unwrap_or_default();
71 if in_mirror {
72 trace!("saving data for '{}': leaving a tombstone", ext_id);
73 tx.execute_cached(
74 "
75 INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
76 VALUES (:ext_id, NULL, 1)
77 ON CONFLICT (ext_id) DO UPDATE
78 SET data = NULL, sync_change_counter = sync_change_counter + 1",
79 rusqlite::named_params! {
80 ":ext_id": ext_id,
81 },
82 )?;
83 } else {
84 trace!("saving data for '{}': removing the row", ext_id);
85 tx.execute_cached(
86 "
87 DELETE FROM storage_sync_data WHERE ext_id = :ext_id",
88 rusqlite::named_params! {
89 ":ext_id": ext_id,
90 },
91 )?;
92 }
93 } else {
94 let sval = match val {
96 StorageChangeOp::Set(v) => {
97 let sv = v.to_string();
98 if sv.len() > SYNC_QUOTA_BYTES {
99 return Err(Error::QuotaError(QuotaReason::TotalBytes));
100 }
101 sv
102 }
103 StorageChangeOp::SetWithoutQuota(v) => v.to_string(),
104 StorageChangeOp::Clear => unreachable!(),
105 };
106
107 trace!("saving data for '{}': writing", ext_id);
108 tx.execute_cached(
109 "INSERT INTO storage_sync_data(ext_id, data, sync_change_counter)
110 VALUES (:ext_id, :data, 1)
111 ON CONFLICT (ext_id) DO UPDATE
112 set data=:data, sync_change_counter = sync_change_counter + 1",
113 rusqlite::named_params! {
114 ":ext_id": ext_id,
115 ":data": &sval,
116 },
117 )?;
118 }
119 Ok(())
120}
121
122fn remove_from_db(tx: &Transaction<'_>, ext_id: &str) -> Result<()> {
123 save_to_db(tx, ext_id, &StorageChangeOp::Clear)
124}
125
126#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
130#[serde(rename_all = "camelCase")]
131pub struct StorageValueChange {
132 #[serde(skip_serializing)]
133 pub key: String,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub old_value: Option<JsonValue>,
136 #[serde(skip_serializing_if = "Option::is_none")]
137 pub new_value: Option<JsonValue>,
138}
139
140#[derive(Debug, Default, Clone, PartialEq, Eq)]
144pub struct StorageChanges {
145 pub changes: Vec<StorageValueChange>,
146}
147
148impl StorageChanges {
149 pub fn new() -> Self {
150 Self::default()
151 }
152
153 pub fn with_capacity(n: usize) -> Self {
154 Self {
155 changes: Vec::with_capacity(n),
156 }
157 }
158
159 pub fn is_empty(&self) -> bool {
160 self.changes.is_empty()
161 }
162
163 pub fn push(&mut self, change: StorageValueChange) {
164 self.changes.push(change)
165 }
166}
167
168impl Serialize for StorageChanges {
170 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
171 where
172 S: Serializer,
173 {
174 let mut map = serializer.serialize_map(Some(self.changes.len()))?;
175 for change in &self.changes {
176 map.serialize_entry(&change.key, change)?;
177 }
178 map.end()
179 }
180}
181
182pub fn get_quota_size_of(key: &str, v: &JsonValue) -> usize {
185 key.len() + v.to_string().len()
188}
189
190pub fn set(tx: &Transaction<'_>, ext_id: &str, val: JsonValue) -> Result<StorageChanges> {
194 let val_map = match val {
195 JsonValue::Object(m) => m,
196 _ => Map::new(),
198 };
199
200 let mut current = get_from_db(tx, ext_id)?.unwrap_or_default();
201
202 let mut changes = StorageChanges::with_capacity(val_map.len());
203
204 for (k, v) in val_map.into_iter() {
206 let old_value = current.remove(&k);
207 if current.len() >= SYNC_MAX_ITEMS {
208 return Err(Error::QuotaError(QuotaReason::MaxItems));
209 }
210 if get_quota_size_of(&k, &v) > SYNC_QUOTA_BYTES_PER_ITEM {
213 return Err(Error::QuotaError(QuotaReason::ItemBytes));
214 }
215 let change = StorageValueChange {
216 key: k.clone(),
217 old_value,
218 new_value: Some(v.clone()),
219 };
220 changes.push(change);
221 current.insert(k, v);
222 }
223
224 save_to_db(
225 tx,
226 ext_id,
227 &StorageChangeOp::Set(JsonValue::Object(current)),
228 )?;
229 Ok(changes)
230}
231
232fn get_keys(keys: JsonValue) -> Vec<(String, Option<JsonValue>)> {
236 match keys {
237 JsonValue::String(s) => vec![(s, None)],
238 JsonValue::Array(keys) => {
239 keys.iter()
242 .filter_map(|v| v.as_str().map(|s| (s.to_string(), None)))
243 .collect()
244 }
245 JsonValue::Object(m) => m.into_iter().map(|(k, d)| (k, Some(d))).collect(),
246 _ => vec![],
247 }
248}
249
250pub fn get(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<JsonValue> {
253 let maybe_existing = get_from_db(conn, ext_id)?;
255 let mut existing = match (maybe_existing, keys.is_object()) {
256 (None, true) => return Ok(keys),
257 (None, false) => return Ok(JsonValue::Object(Map::new())),
258 (Some(v), _) => v,
259 };
260 if keys.is_null() {
262 return Ok(JsonValue::Object(existing));
263 }
264 let keys_and_defaults = get_keys(keys);
266 let mut result = Map::with_capacity(keys_and_defaults.len());
267 for (key, maybe_default) in keys_and_defaults {
268 if let Some(v) = existing.remove(&key) {
269 result.insert(key, v);
270 } else if let Some(def) = maybe_default {
271 result.insert(key, def);
272 }
273 }
276 Ok(JsonValue::Object(result))
277}
278
279pub fn remove(tx: &Transaction<'_>, ext_id: &str, keys: JsonValue) -> Result<StorageChanges> {
283 let mut existing = match get_from_db(tx, ext_id)? {
284 None => return Ok(StorageChanges::new()),
285 Some(v) => v,
286 };
287
288 let keys_and_defs = get_keys(keys);
291
292 let mut result = StorageChanges::with_capacity(keys_and_defs.len());
293 for (key, _) in keys_and_defs {
294 if let Some(v) = existing.remove(&key) {
295 result.push(StorageValueChange {
296 key,
297 old_value: Some(v),
298 new_value: None,
299 });
300 }
301 }
302 if !result.is_empty() {
303 save_to_db(
304 tx,
305 ext_id,
306 &StorageChangeOp::SetWithoutQuota(JsonValue::Object(existing)),
307 )?;
308 }
309 Ok(result)
310}
311
312pub fn clear(tx: &Transaction<'_>, ext_id: &str) -> Result<StorageChanges> {
316 let existing = match get_from_db(tx, ext_id)? {
317 None => return Ok(StorageChanges::new()),
318 Some(v) => v,
319 };
320 let mut result = StorageChanges::with_capacity(existing.len());
321 for (key, val) in existing.into_iter() {
322 result.push(StorageValueChange {
323 key: key.to_string(),
324 new_value: None,
325 old_value: Some(val),
326 });
327 }
328 remove_from_db(tx, ext_id)?;
329 Ok(result)
330}
331
332pub fn get_bytes_in_use(conn: &Connection, ext_id: &str, keys: JsonValue) -> Result<usize> {
334 let maybe_existing = get_from_db(conn, ext_id)?;
335 let existing = match maybe_existing {
336 None => return Ok(0),
337 Some(v) => v,
338 };
339 let keys: Vec<&str> = match &keys {
341 JsonValue::Null => existing.keys().map(|v| v.as_str()).collect(),
342 JsonValue::String(name) => vec![name.as_str()],
343 JsonValue::Array(names) => names.iter().filter_map(|v| v.as_str()).collect(),
344 _ => return Ok(0),
346 };
347 let mut size = 0;
349 for key in keys.into_iter() {
350 if let Some(v) = existing.get(key) {
351 size += get_quota_size_of(key, v);
352 }
353 }
354 Ok(size)
355}
356
357#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct UsageInfo {
360 pub ext_id: String,
362 pub num_keys: usize,
364 pub num_bytes: usize,
370}
371
372pub fn usage(db: &Connection) -> Result<Vec<UsageInfo>> {
375 type JsonObject = Map<String, JsonValue>;
376 let sql = "
377 SELECT ext_id, data
378 FROM storage_sync_data
379 WHERE data IS NOT NULL
380 -- for tests and determinism
381 ORDER BY ext_id
382 ";
383 db.query_rows_into(sql, [], |row| {
384 let ext_id: String = row.get("ext_id")?;
385 let data: String = row.get("data")?;
386 let num_bytes = data.len();
387 let num_keys = serde_json::from_str::<JsonObject>(&data)?.len();
388 Ok(UsageInfo {
389 ext_id,
390 num_keys,
391 num_bytes,
392 })
393 })
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::db::test::new_mem_db;
400 use serde_json::json;
401
402 #[test]
403 fn test_serialize_storage_changes() -> Result<()> {
404 let c = StorageChanges {
405 changes: vec![StorageValueChange {
406 key: "key".to_string(),
407 old_value: Some(json!("old")),
408 new_value: None,
409 }],
410 };
411 assert_eq!(serde_json::to_string(&c)?, r#"{"key":{"oldValue":"old"}}"#);
412 let c = StorageChanges {
413 changes: vec![StorageValueChange {
414 key: "key".to_string(),
415 old_value: None,
416 new_value: Some(json!({"foo": "bar"})),
417 }],
418 };
419 assert_eq!(
420 serde_json::to_string(&c)?,
421 r#"{"key":{"newValue":{"foo":"bar"}}}"#
422 );
423 Ok(())
424 }
425
426 fn make_changes(changes: &[(&str, Option<JsonValue>, Option<JsonValue>)]) -> StorageChanges {
427 let mut r = StorageChanges::with_capacity(changes.len());
428 for (name, old_value, new_value) in changes {
429 r.push(StorageValueChange {
430 key: (*name).to_string(),
431 old_value: old_value.clone(),
432 new_value: new_value.clone(),
433 });
434 }
435 r
436 }
437
438 #[test]
439 fn test_simple() -> Result<()> {
440 let ext_id = "x";
441 let db = new_mem_db();
442 let conn = db.get_connection().expect("should retrieve connection");
443 let tx = conn.unchecked_transaction()?;
444
445 for q in vec![JsonValue::Null, json!("foo"), json!(["foo"])].into_iter() {
447 assert_eq!(get(&tx, ext_id, q)?, json!({}));
448 }
449
450 for q in vec![json!({ "foo": null }), json!({"foo": "default"})].into_iter() {
452 assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
453 }
454
455 set(&tx, ext_id, json!({"foo": "bar" }))?;
457 for q in vec![
458 JsonValue::Null,
459 json!("foo"),
460 json!(["foo"]),
461 json!({ "foo": null }),
462 json!({"foo": "default"}),
463 ]
464 .into_iter()
465 {
466 assert_eq!(get(&tx, ext_id, q)?, json!({"foo": "bar" }));
467 }
468
469 for q in vec![
471 json!({ "non_existing_key": null }),
472 json!({"non_existing_key": 0}),
473 json!({"non_existing_key": false}),
474 json!({"non_existing_key": "default"}),
475 json!({"non_existing_key": ["array"]}),
476 json!({"non_existing_key": {"objectkey": "value"}}),
477 ]
478 .into_iter()
479 {
480 assert_eq!(get(&tx, ext_id, q.clone())?, q.clone());
481 }
482
483 assert_eq!(
485 set(&tx, ext_id, json!({"foo": "new", "other": "also new" }))?,
486 make_changes(&[
487 ("foo", Some(json!("bar")), Some(json!("new"))),
488 ("other", None, Some(json!("also new")))
489 ])
490 );
491 assert_eq!(
492 get(&tx, ext_id, JsonValue::Null)?,
493 json!({"foo": "new", "other": "also new"})
494 );
495 assert_eq!(get(&tx, ext_id, json!("foo"))?, json!({"foo": "new"}));
496 assert_eq!(
497 get(&tx, ext_id, json!(["foo", "other"]))?,
498 json!({"foo": "new", "other": "also new"})
499 );
500 assert_eq!(
501 get(&tx, ext_id, json!({"foo": null, "default": "yo"}))?,
502 json!({"foo": "new", "default": "yo"})
503 );
504
505 assert_eq!(
506 remove(&tx, ext_id, json!("foo"))?,
507 make_changes(&[("foo", Some(json!("new")), None)]),
508 );
509
510 assert_eq!(
511 set(&tx, ext_id, json!({"foo": {"sub-object": "sub-value"}}))?,
512 make_changes(&[("foo", None, Some(json!({"sub-object": "sub-value"}))),])
513 );
514
515 assert_eq!(
518 clear(&tx, ext_id)?,
519 make_changes(&[
520 ("other", Some(json!("also new")), None),
521 ("foo", Some(json!({"sub-object": "sub-value"})), None),
522 ]),
523 );
524 assert_eq!(get(&tx, ext_id, JsonValue::Null)?, json!({}));
525
526 Ok(())
527 }
528
529 #[test]
530 fn test_check_get_impl() -> Result<()> {
531 let ext_id = "x";
533 let db = new_mem_db();
534 let conn = db.get_connection().expect("should retrieve connection");
535 let tx = conn.unchecked_transaction()?;
536
537 let prop = "test-prop";
538 let value = "test-value";
539
540 set(&tx, ext_id, json!({ prop: value }))?;
541
542 let mut data = get(&tx, ext_id, json!(null))?;
544 assert_eq!(value, json!(data[prop]), "null getter worked for {}", prop);
545
546 data = get(&tx, ext_id, json!(prop))?;
547 assert_eq!(
548 value,
549 json!(data[prop]),
550 "string getter worked for {}",
551 prop
552 );
553 assert_eq!(
554 data.as_object().unwrap().len(),
555 1,
556 "string getter should return an object with a single property"
557 );
558
559 data = get(&tx, ext_id, json!([prop]))?;
560 assert_eq!(value, json!(data[prop]), "array getter worked for {}", prop);
561 assert_eq!(
562 data.as_object().unwrap().len(),
563 1,
564 "array getter with a single key should return an object with a single property"
565 );
566
567 data = get(&tx, ext_id, json!({ prop: null }))?;
570 assert_eq!(
571 value,
572 json!(data[prop]),
573 "object getter worked for {}",
574 prop
575 );
576 assert_eq!(
577 data.as_object().unwrap().len(),
578 1,
579 "object getter with a single key should return an object with a single property"
580 );
581
582 Ok(())
583 }
584
585 #[test]
586 fn test_bug_1621162() -> Result<()> {
587 let db = new_mem_db();
590 let conn = db.get_connection().expect("should retrieve connection");
591 let tx = conn.unchecked_transaction()?;
592 let ext_id = "xyz";
593
594 set(&tx, ext_id, json!({"foo": "bar" }))?;
595
596 assert_eq!(
597 set(&tx, ext_id, json!({"foo": "bar" }))?,
598 make_changes(&[("foo", Some(json!("bar")), Some(json!("bar")))]),
599 );
600 Ok(())
601 }
602
603 #[test]
604 fn test_quota_maxitems() -> Result<()> {
605 let db = new_mem_db();
606 let conn = db.get_connection().expect("should retrieve connection");
607 let tx = conn.unchecked_transaction()?;
608 let ext_id = "xyz";
609 for i in 1..SYNC_MAX_ITEMS + 1 {
610 set(
611 &tx,
612 ext_id,
613 json!({ format!("key-{}", i): format!("value-{}", i) }),
614 )?;
615 }
616 let e = set(&tx, ext_id, json!({"another": "another"})).unwrap_err();
617 match e {
618 Error::QuotaError(QuotaReason::MaxItems) => {}
619 _ => panic!("unexpected error type"),
620 };
621 Ok(())
622 }
623
624 #[test]
625 fn test_quota_bytesperitem() -> Result<()> {
626 let db = new_mem_db();
627 let conn = db.get_connection().expect("should retrieve connection");
628 let tx = conn.unchecked_transaction()?;
629 let ext_id = "xyz";
630 let val = "x".repeat(SYNC_QUOTA_BYTES_PER_ITEM - 5);
634
635 set(&tx, ext_id, json!({ "x": val }))?;
637 assert_eq!(
638 get_bytes_in_use(&tx, ext_id, json!("x"))?,
639 SYNC_QUOTA_BYTES_PER_ITEM - 2
640 );
641
642 let e = set(&tx, ext_id, json!({ "xxxx": val })).unwrap_err();
644 match e {
645 Error::QuotaError(QuotaReason::ItemBytes) => {}
646 _ => panic!("unexpected error type"),
647 };
648 Ok(())
649 }
650
651 #[test]
652 fn test_quota_bytes() -> Result<()> {
653 let db = new_mem_db();
654 let conn = db.get_connection().expect("should retrieve connection");
655 let tx = conn.unchecked_transaction()?;
656 let ext_id = "xyz";
657 let val = "x".repeat(SYNC_QUOTA_BYTES + 1);
658
659 save_to_db(
661 &tx,
662 ext_id,
663 &StorageChangeOp::SetWithoutQuota(json!({ "x": val })),
664 )?;
665
666 let e = set(&tx, ext_id, json!({ "y": "newvalue" })).unwrap_err();
668 match e {
669 Error::QuotaError(QuotaReason::TotalBytes) => {}
670 _ => panic!("unexpected error type"),
671 };
672
673 remove(&tx, ext_id, json!["x"])?;
675
676 save_to_db(
678 &tx,
679 ext_id,
680 &StorageChangeOp::SetWithoutQuota(json!({ "y": val })),
681 )?;
682
683 set(&tx, ext_id, json!({ "y": "lessdata" }))?;
685
686 Ok(())
687 }
688
689 #[test]
690 fn test_get_bytes_in_use() -> Result<()> {
691 let db = new_mem_db();
692 let conn = db.get_connection().expect("should retrieve connection");
693 let tx = conn.unchecked_transaction()?;
694 let ext_id = "xyz";
695
696 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 0);
697
698 set(&tx, ext_id, json!({ "a": "a" }))?; set(&tx, ext_id, json!({ "b": "bb" }))?; set(&tx, ext_id, json!({ "c": "ccc" }))?; set(&tx, ext_id, json!({ "n": 999_999 }))?; assert_eq!(get_bytes_in_use(&tx, ext_id, json!("x"))?, 0);
704 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("a"))?, 4);
705 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("b"))?, 5);
706 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("c"))?, 6);
707 assert_eq!(get_bytes_in_use(&tx, ext_id, json!("n"))?, 7);
708
709 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a"]))?, 4);
710 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "x"]))?, 4);
711 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "b"]))?, 9);
712 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(["a", "c"]))?, 10);
713
714 assert_eq!(
715 get_bytes_in_use(&tx, ext_id, json!(["a", "b", "c", "n"]))?,
716 22
717 );
718 assert_eq!(get_bytes_in_use(&tx, ext_id, json!(null))?, 22);
719 Ok(())
720 }
721
722 #[test]
723 fn test_usage() {
724 let db = new_mem_db();
725 let conn = db.get_connection().expect("should retrieve connection");
726 let tx = conn.unchecked_transaction().unwrap();
727 set(&tx, "xyz", json!({ "a": "a" })).unwrap();
729 set(&tx, "xyz", json!({ "b": "bb" })).unwrap();
730 set(&tx, "xyz", json!({ "c": "ccc" })).unwrap();
731 set(&tx, "xyz", json!({ "n": 999_999 })).unwrap();
732
733 set(&tx, "abc", json!({ "a": "a" })).unwrap();
735
736 tx.commit().unwrap();
737
738 let usage = usage(conn).unwrap();
739 let expect = [
740 UsageInfo {
741 ext_id: "abc".to_string(),
742 num_keys: 1,
743 num_bytes: 9,
744 },
745 UsageInfo {
746 ext_id: "xyz".to_string(),
747 num_keys: 4,
748 num_bytes: 39,
749 },
750 ];
751 assert_eq!(&usage, &expect);
752 }
753}