nimbus/stateful/
persistence.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Our storage abstraction, currently backed by Rkv.
6
7use crate::error::{debug, info, warn, NimbusError, Result};
8// This uses the lmdb backend for rkv, which is unstable.
9// We use it for now since glean didn't seem to have trouble with it (although
10// it must be noted that the rkv documentation explicitly says "To use rkv in
11// production/release environments at Mozilla, you may do so with the "SafeMode"
12// backend", so we really should get more guidance here.)
13use crate::enrollment::ExperimentEnrollment;
14use crate::Experiment;
15use core::iter::Iterator;
16use rkv::{StoreError, StoreOptions};
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20
21// We use an incrementing integer to manage database migrations.
22// If you need to make a backwards-incompatible change to the data schema,
23// increment `DB_VERSION` and implement some migration logic in `maybe_upgrade`.
24//
25// ⚠️ Warning : Altering the type of `DB_VERSION` would itself require a DB migration. ⚠️
26pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
27pub(crate) const DB_VERSION: u16 = 3;
28const RKV_MAX_DBS: u32 = 6;
29
30pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
31pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
32
33// Legacy key for migration purposes
34pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
35
36pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
37pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
38
39// Inspired by Glean - use a feature to choose between the backends.
40// Select the LMDB-powered storage backend when the feature is not activated.
41#[cfg(not(feature = "rkv-safe-mode"))]
42mod backend {
43    use rkv::backend::{
44        Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
45    };
46    use std::path::Path;
47
48    use super::RKV_MAX_DBS;
49
50    pub type Rkv = rkv::Rkv<LmdbEnvironment>;
51    pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
52    pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
53    pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
54    pub trait Readable<'r>:
55        rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
56    {
57    }
58    impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
59        Readable<'r> for T
60    {
61    }
62
63    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
64        Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
65    }
66}
67
68// Select the "safe mode" storage backend when the feature is activated.
69#[cfg(feature = "rkv-safe-mode")]
70mod backend {
71    use rkv::backend::{
72        SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
73        SafeModeRwTransaction,
74    };
75    use std::path::Path;
76
77    use super::RKV_MAX_DBS;
78
79    pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
80    pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
81    pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
82    pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
83    pub trait Readable<'r>:
84        rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
85    {
86    }
87    impl<
88            'r,
89            T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>,
90        > Readable<'r> for T
91    {
92    }
93
94    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
95        Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
96    }
97}
98
99use backend::*;
100pub use backend::{Readable, Writer};
101
102/// Enumeration of the different stores within our database.
103///
104/// Our rkv database contains a number of different "stores", and the items
105/// in each store correspond to a particular type of object at the Rust level.
106pub enum StoreId {
107    /// Store containing the set of known experiments, as read from the server.
108    ///
109    /// Keys in the `Experiments` store are experiment identifier slugs, and their
110    /// corresponding values are  serialized instances of the [`Experiment`] struct
111    /// representing the last known state of that experiment.
112    Experiments,
113    /// Store containing the set of known experiment enrollments.
114    ///
115    /// Keys in the `Enrollments` store are experiment identifier slugs, and their
116    /// corresponding values are serialized instances of the [`ExperimentEnrollment`]
117    /// struct representing the current state of this client's enrollment (or not)
118    /// in that experiment.
119    Enrollments,
120    /// Store containing miscellaneous metadata about this client instance.
121    ///
122    /// Keys in the `Meta` store are string constants, and their corresponding values
123    /// are serialized items whose type depends on the constant. Known constraints
124    /// include:
125    ///   * "db_version":   u16, the version number of the most revent migration
126    ///     applied to this database.
127    ///   * "nimbus-id":    String, the randomly-generated identifier for the
128    ///     current client instance.
129    ///   * "user-opt-in-experiments":  bool, whether the user has explicitly opted in or out
130    ///     of participating in experiments.
131    ///   * "user-opt-in-rollouts":  bool, whether the user has explicitly opted in or out
132    ///     of participating in rollouts.
133    ///   * "installation-date": a UTC DateTime string, defining the date the consuming app was
134    ///     installed
135    ///   * "update-date": a UTC DateTime string, defining the date the consuming app was
136    ///     last updated
137    ///   * "app-version": String, the version of the app last persisted
138    Meta,
139    /// Store containing pending updates to experiment data.
140    ///
141    /// The `Updates` store contains a single key "pending-experiment-updates", whose
142    /// corresponding value is a serialized `Vec<Experiment>` of new experiment data
143    /// that has been received from the server but not yet processed by the application.
144    Updates,
145    /// Store containing collected counts of behavior events for targeting purposes.
146    ///
147    /// Keys in the `EventCounts` store are strings representing the identifier for
148    /// the event and their corresponding values represent a serialized instance of a
149    /// [`MultiIntervalCounter`] struct that contains a set of configurations and data
150    /// for the different time periods that the data will be aggregated on.
151    EventCounts,
152}
153
154/// A wrapper for an Rkv store. Implemented to allow any value which supports
155/// serde to be used.
156pub struct SingleStore {
157    store: RkvSingleStore,
158}
159
160impl SingleStore {
161    pub fn new(store: RkvSingleStore) -> Self {
162        SingleStore { store }
163    }
164
165    pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
166        &self,
167        writer: &mut Writer,
168        key: &str,
169        persisted_data: &T,
170    ) -> Result<()> {
171        let persisted_json = match serde_json::to_string(persisted_data) {
172            Ok(v) => v,
173            Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
174        };
175        self.store
176            .put(writer, key, &rkv::Value::Json(&persisted_json))?;
177        Ok(())
178    }
179
180    #[allow(dead_code)]
181    pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
182        self.store.delete(writer, key)?;
183        Ok(())
184    }
185
186    pub fn clear(&self, writer: &mut Writer) -> Result<()> {
187        self.store.clear(writer)?;
188        Ok(())
189    }
190
191    // Some "get" functions that cooperate with transactions (ie, so we can
192    // get what we've written to the transaction before it's committed).
193    // It's unfortunate that these are duplicated with the DB itself, but the
194    // traits used by rkv make this tricky.
195    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
196    where
197        R: Readable<'r>,
198        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
199    {
200        let persisted_data = self.store.get(reader, key)?;
201        match persisted_data {
202            Some(data) => {
203                if let rkv::Value::Json(data) = data {
204                    Ok(Some(match serde_json::from_str::<T>(data) {
205                        Ok(v) => v,
206                        Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
207                    }))
208                } else {
209                    Err(NimbusError::InvalidPersistedData)
210                }
211            }
212            None => Ok(None),
213        }
214    }
215
216    /// Fork of collect_all that simply drops records that fail to read
217    /// rather than simply returning an error up the stack.  This likely
218    /// wants to be just a parameter to collect_all, but for now....
219    pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
220    where
221        R: Readable<'r>,
222        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
223    {
224        let mut result = Vec::new();
225        let mut iter = self.store.iter_start(reader)?;
226        while let Some(Ok((_, data))) = iter.next() {
227            if let rkv::Value::Json(data) = data {
228                let unserialized = serde_json::from_str::<T>(data);
229                match unserialized {
230                    Ok(value) => result.push(value),
231                    Err(e) => {
232                        // If there is an error, we won't push this onto the
233                        // result Vec, but we won't blow up the entire
234                        // deserialization either.
235                        warn!(
236                            "try_collect_all: discarded a record while deserializing with: {:?}",
237                            e
238                        );
239                        warn!(
240                            "try_collect_all:   data that failed to deserialize: {:?}",
241                            data
242                        );
243                    }
244                };
245            }
246        }
247        Ok(result)
248    }
249
250    pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
251    where
252        R: Readable<'r>,
253        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
254    {
255        let mut result = Vec::new();
256        let mut iter = self.store.iter_start(reader)?;
257        while let Some(Ok((_, data))) = iter.next() {
258            if let rkv::Value::Json(data) = data {
259                result.push(match serde_json::from_str::<T>(data) {
260                    Ok(v) => v,
261                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
262                });
263            }
264        }
265        Ok(result)
266    }
267}
268
269pub struct SingleStoreDatabase {
270    rkv: Rkv,
271    pub(crate) store: SingleStore,
272}
273
274impl SingleStoreDatabase {
275    /// Function used to obtain a "reader" which is used for read-only transactions.
276    pub fn read(&self) -> Result<Reader> {
277        Ok(self.rkv.read()?)
278    }
279
280    /// Function used to obtain a "writer" which is used for transactions.
281    /// The `writer.commit();` must be called to commit data added via the
282    /// writer.
283    pub fn write(&self) -> Result<Writer> {
284        Ok(self.rkv.write()?)
285    }
286
287    /// Function used to obtain values from the internal store.
288    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
289    where
290        R: Readable<'r>,
291        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
292    {
293        self.store.get(reader, key)
294    }
295}
296
297/// Database used to access persisted data
298/// This an abstraction around an Rkv database
299/// An instance on this database is created each time the component is loaded
300/// if there is persisted data, the `get` functions should retrieve it
301pub struct Database {
302    rkv: Rkv,
303    meta_store: SingleStore,
304    experiment_store: SingleStore,
305    enrollment_store: SingleStore,
306    updates_store: SingleStore,
307    event_count_store: SingleStore,
308}
309
310impl Database {
311    /// Main constructor for a database
312    /// Initiates the Rkv database to be used to retrieve persisted data
313    /// # Arguments
314    /// - `path`: A path to the persisted data, this is provided by the consuming application
315    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
316        let rkv = Self::open_rkv(path)?;
317        let meta_store = rkv.open_single("meta", StoreOptions::create())?;
318        let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
319        let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
320        let updates_store = rkv.open_single("updates", StoreOptions::create())?;
321        let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
322        let db = Self {
323            rkv,
324            meta_store: SingleStore::new(meta_store),
325            experiment_store: SingleStore::new(experiment_store),
326            enrollment_store: SingleStore::new(enrollment_store),
327            updates_store: SingleStore::new(updates_store),
328            event_count_store: SingleStore::new(event_count_store),
329        };
330        db.maybe_upgrade()?;
331        Ok(db)
332    }
333
334    pub fn open_single<P: AsRef<Path>>(path: P, store_id: StoreId) -> Result<SingleStoreDatabase> {
335        let rkv = Self::open_rkv(path)?;
336        let store = SingleStore::new(match store_id {
337            StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
338            StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
339            StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
340            StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
341            StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
342        });
343        Ok(SingleStoreDatabase { rkv, store })
344    }
345
346    fn maybe_upgrade(&self) -> Result<()> {
347        debug!("entered maybe upgrade");
348        let mut writer = self.rkv.write()?;
349        let current_version = self
350            .meta_store
351            .get::<u16, _>(&writer, DB_KEY_DB_VERSION)?
352            .unwrap_or(0);
353
354        if current_version == DB_VERSION {
355            info!("Already at version {}, no upgrade needed", DB_VERSION);
356            return Ok(());
357        }
358
359        if current_version == 0 || current_version > DB_VERSION {
360            info!(
361                "maybe_upgrade: current_version: {}, DB_VERSION: {}; wiping most stores",
362                current_version, DB_VERSION
363            );
364            self.clear_experiments_and_enrollments(&mut writer)?;
365            self.updates_store.clear(&mut writer)?;
366            self.meta_store
367                .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
368            writer.commit()?;
369            return Ok(());
370        }
371
372        if current_version == 1 {
373            info!("Migrating database from v1 to v2");
374            if let Err(e) = self.migrate_v1_to_v2(&mut writer) {
375                error_support::report_error!(
376                    "nimbus-database-migration",
377                    "Error migrating database v1 to v2: {:?}. Wiping experiments and enrollments",
378                    e
379                );
380                self.clear_experiments_and_enrollments(&mut writer)?;
381            }
382        }
383
384        if current_version == 2 {
385            info!("Migrating database from v2 to v3");
386            if let Err(e) = self.migrate_v2_to_v3(&mut writer) {
387                error_support::report_error!(
388                    "nimbus-database-migration",
389                    "Error migrating database v2 to v3: {:?}. Wiping experiments and enrollments",
390                    e
391                );
392                self.clear_experiments_and_enrollments(&mut writer)?;
393            }
394        }
395
396        // It is safe to clear the update store (i.e. the pending experiments) on all schema upgrades
397        // as it will be re-filled from the server on the next `fetch_experiments()`.
398        // The current contents of the update store may cause experiments to not load, or worse,
399        // accidentally unenroll.
400        self.updates_store.clear(&mut writer)?;
401        self.meta_store
402            .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
403        writer.commit()?;
404        debug!("maybe_upgrade: transaction committed");
405        Ok(())
406    }
407
408    pub(crate) fn clear_experiments_and_enrollments(
409        &self,
410        writer: &mut Writer,
411    ) -> Result<(), NimbusError> {
412        self.experiment_store.clear(writer)?;
413        self.enrollment_store.clear(writer)?;
414        Ok(())
415    }
416
417    pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
418        self.event_count_store.clear(writer)?;
419        Ok(())
420    }
421
422    /// Migrates a v1 database to v2
423    ///
424    /// Note that any Err returns from this function (including stuff
425    /// propagated up via the ? operator) will cause maybe_update (our caller)
426    /// to assume that this is unrecoverable and wipe the database, removing
427    /// people from any existing enrollments and blowing away their experiment
428    /// history, so that they don't get left in an inconsistent state.
429    fn migrate_v1_to_v2(&self, writer: &mut Writer) -> Result<()> {
430        info!("Upgrading from version 1 to version 2");
431
432        // use try_collect_all to read everything except records that serde
433        // returns deserialization errors on.  Some logging of those errors
434        // happens, but it's not ideal.
435        let reader = self.read()?;
436
437        // XXX write a test to verify that we don't need to gc any
438        // enrollments that don't have experiments because the experiments
439        // were discarded either during try_collect_all (these wouldn't have been
440        // detected during the filtering phase) or during the filtering phase
441        // itself.  The test needs to run evolve_experiments, as that should
442        // correctly drop any orphans, even if the migrators aren't perfect.
443
444        let enrollments: Vec<ExperimentEnrollment> =
445            self.enrollment_store.try_collect_all(&reader)?;
446        let experiments: Vec<Experiment> = self.experiment_store.try_collect_all(&reader)?;
447
448        // figure out which experiments have records that need to be dropped
449        // and log that we're going to drop them and why
450        let empty_string = "".to_string();
451        let slugs_with_experiment_issues: HashSet<String> = experiments
452            .iter()
453            .filter_map(
454                    |e| {
455                let branch_with_empty_feature_ids =
456                    e.branches.iter().find(|b| b.feature.is_none() || b.feature.as_ref().unwrap().feature_id.is_empty());
457                if branch_with_empty_feature_ids.is_some() {
458                    warn!("{:?} experiment has branch missing a feature prop; experiment & enrollment will be discarded", &e.slug);
459                    Some(e.slug.to_owned())
460                } else if e.feature_ids.is_empty() || e.feature_ids.contains(&empty_string) {
461                    warn!("{:?} experiment has invalid feature_ids array; experiment & enrollment will be discarded", &e.slug);
462                    Some(e.slug.to_owned())
463                } else {
464                    None
465                }
466            })
467            .collect();
468        let slugs_to_discard: HashSet<_> = slugs_with_experiment_issues;
469
470        // filter out experiments to be dropped
471        let updated_experiments: Vec<Experiment> = experiments
472            .into_iter()
473            .filter(|e| !slugs_to_discard.contains(&e.slug))
474            .collect();
475        debug!("updated experiments = {:?}", updated_experiments);
476
477        // filter out enrollments to be dropped
478        let updated_enrollments: Vec<ExperimentEnrollment> = enrollments
479            .into_iter()
480            .filter(|e| !slugs_to_discard.contains(&e.slug))
481            .collect();
482        debug!("updated enrollments = {:?}", updated_enrollments);
483
484        // rewrite both stores
485        self.experiment_store.clear(writer)?;
486        for experiment in updated_experiments {
487            self.experiment_store
488                .put(writer, &experiment.slug, &experiment)?;
489        }
490
491        self.enrollment_store.clear(writer)?;
492        for enrollment in updated_enrollments {
493            self.enrollment_store
494                .put(writer, &enrollment.slug, &enrollment)?;
495        }
496        debug!("exiting migrate_v1_to_v2");
497
498        Ok(())
499    }
500
501    /// Migrates a v2 database to v3
502    ///
503    /// Separates global user participation into experiments and rollouts participation.
504    /// For privacy: if user opted out globally, they remain opted out of experiments.
505    fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
506        info!("Upgrading from version 2 to version 3");
507
508        let meta_store = &self.meta_store;
509
510        // Get the old global participation flag
511        let old_global_participation = meta_store
512            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
513            .unwrap_or(true); // Default was true
514
515        // Set new separate flags based on privacy requirements:
516        // - If user opted out globally, they stay opted out of experiments
517        // - If user opted out globally, they stay opted out of rollouts (per requirement #3)
518        meta_store.put(
519            writer,
520            DB_KEY_EXPERIMENT_PARTICIPATION,
521            &old_global_participation,
522        )?;
523        meta_store.put(
524            writer,
525            DB_KEY_ROLLOUT_PARTICIPATION,
526            &old_global_participation,
527        )?;
528
529        // Remove the old global participation key if it exists
530        if meta_store
531            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
532            .is_some()
533        {
534            meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
535        }
536
537        info!(
538            "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
539            old_global_participation, old_global_participation
540        );
541
542        Ok(())
543    }
544
545    /// Gets a Store object, which used with the writer returned by
546    /// `self.write()` to update the database in a transaction.
547    pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
548        match store_id {
549            StoreId::Meta => &self.meta_store,
550            StoreId::Experiments => &self.experiment_store,
551            StoreId::Enrollments => &self.enrollment_store,
552            StoreId::Updates => &self.updates_store,
553            StoreId::EventCounts => &self.event_count_store,
554        }
555    }
556
557    pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<Rkv> {
558        let path = std::path::Path::new(path.as_ref()).join("db");
559        debug!("open_rkv: path =  {:?}", path.display());
560        fs::create_dir_all(&path)?;
561        let rkv = match rkv_new(&path) {
562            Ok(rkv) => Ok(rkv),
563            Err(rkv_error) => {
564                match rkv_error {
565                    // For some errors we just delete the DB and start again.
566                    StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
567                        // On one hand this seems a little dangerous, but on
568                        // the other hand avoids us knowing about the
569                        // underlying implementation (ie, how do we know what
570                        // files might exist in all cases?)
571                        warn!(
572                            "Database at '{}' appears corrupt - removing and recreating",
573                            path.display()
574                        );
575                        fs::remove_dir_all(&path)?;
576                        fs::create_dir_all(&path)?;
577                        // TODO: Once we have glean integration we want to
578                        // record telemetry here.
579                        rkv_new(&path)
580                    }
581                    // All other errors are fatal.
582                    _ => Err(rkv_error),
583                }
584            }
585        }?;
586        debug!("Database initialized");
587        Ok(rkv)
588    }
589
590    /// Function used to obtain a "reader" which is used for read-only transactions.
591    pub fn read(&self) -> Result<Reader> {
592        Ok(self.rkv.read()?)
593    }
594
595    /// Function used to obtain a "writer" which is used for transactions.
596    /// The `writer.commit();` must be called to commit data added via the
597    /// writer.
598    pub fn write(&self) -> Result<Writer> {
599        Ok(self.rkv.write()?)
600    }
601
602    /// Function used to retrieve persisted data outside of a transaction.
603    /// It allows retrieval of any serializable and deserializable data
604    /// Currently only supports JSON data
605    // Only available for tests; product code should always be using transactions.
606    ///
607    /// # Arguments
608    /// - `key`: A key for the data stored in the underlying database
609    #[cfg(test)]
610    pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
611        &self,
612        store_id: StoreId,
613        key: &str,
614    ) -> Result<Option<T>> {
615        let reader = self.rkv.read()?;
616        let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
617        match persisted_data {
618            Some(data) => {
619                if let rkv::Value::Json(data) = data {
620                    Ok(Some(match serde_json::from_str::<T>(data) {
621                        Ok(v) => v,
622                        Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
623                    }))
624                } else {
625                    Err(NimbusError::InvalidPersistedData)
626                }
627            }
628            None => Ok(None),
629        }
630    }
631
632    // Function for collecting all items in a store outside of a transaction.
633    // Only available for tests; product code should always be using transactions.
634    // Iters are a bit tricky - would be nice to make them generic, but this will
635    // do for our use-case.
636    #[cfg(test)]
637    pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
638        &self,
639        store_id: StoreId,
640    ) -> Result<Vec<T>> {
641        let mut result = Vec::new();
642        let reader = self.rkv.read()?;
643        let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
644        while let Some(Ok((_, data))) = iter.next() {
645            if let rkv::Value::Json(data) = data {
646                result.push(match serde_json::from_str::<T>(data) {
647                    Ok(v) => v,
648                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
649                });
650            }
651        }
652        Ok(result)
653    }
654}