nimbus/stateful/
persistence.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Our storage abstraction, currently backed by Rkv.
6
7use crate::error::{debug, info, warn, NimbusError, Result};
8// This uses the lmdb backend for rkv, which is unstable.
9// We use it for now since glean didn't seem to have trouble with it (although
10// it must be noted that the rkv documentation explicitly says "To use rkv in
11// production/release environments at Mozilla, you may do so with the "SafeMode"
12// backend", so we really should get more guidance here.)
13use crate::enrollment::ExperimentEnrollment;
14use crate::Experiment;
15use core::iter::Iterator;
16use rkv::{StoreError, StoreOptions};
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20
21// We use an incrementing integer to manage database migrations.
22// If you need to make a backwards-incompatible change to the data schema,
23// increment `DB_VERSION` and implement some migration logic in `maybe_upgrade`.
24//
25// ⚠️ Warning : Altering the type of `DB_VERSION` would itself require a DB migration. ⚠️
26pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
27pub(crate) const DB_VERSION: u16 = 2;
28const RKV_MAX_DBS: u32 = 6;
29
30// Inspired by Glean - use a feature to choose between the backends.
31// Select the LMDB-powered storage backend when the feature is not activated.
32#[cfg(not(feature = "rkv-safe-mode"))]
33mod backend {
34    use rkv::backend::{
35        Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
36    };
37    use std::path::Path;
38
39    use super::RKV_MAX_DBS;
40
41    pub type Rkv = rkv::Rkv<LmdbEnvironment>;
42    pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
43    pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
44    pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
45    pub trait Readable<'r>:
46        rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
47    {
48    }
49    impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
50        Readable<'r> for T
51    {
52    }
53
54    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
55        Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
56    }
57}
58
59// Select the "safe mode" storage backend when the feature is activated.
60#[cfg(feature = "rkv-safe-mode")]
61mod backend {
62    use rkv::backend::{
63        SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
64        SafeModeRwTransaction,
65    };
66    use std::path::Path;
67
68    use super::RKV_MAX_DBS;
69
70    pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
71    pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
72    pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
73    pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
74    pub trait Readable<'r>:
75        rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
76    {
77    }
78    impl<
79            'r,
80            T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>,
81        > Readable<'r> for T
82    {
83    }
84
85    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
86        Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
87    }
88}
89
90use backend::*;
91pub use backend::{Readable, Writer};
92
93/// Enumeration of the different stores within our database.
94///
95/// Our rkv database contains a number of different "stores", and the items
96/// in each store correspond to a particular type of object at the Rust level.
97pub enum StoreId {
98    /// Store containing the set of known experiments, as read from the server.
99    ///
100    /// Keys in the `Experiments` store are experiment identifier slugs, and their
101    /// corresponding values are  serialized instances of the [`Experiment`] struct
102    /// representing the last known state of that experiment.
103    Experiments,
104    /// Store containing the set of known experiment enrollments.
105    ///
106    /// Keys in the `Enrollments` store are experiment identifier slugs, and their
107    /// corresponding values are serialized instances of the [`ExperimentEnrollment`]
108    /// struct representing the current state of this client's enrollment (or not)
109    /// in that experiment.
110    Enrollments,
111    /// Store containing miscellaneous metadata about this client instance.
112    ///
113    /// Keys in the `Meta` store are string constants, and their corresponding values
114    /// are serialized items whose type depends on the constant. Known constraints
115    /// include:
116    ///   * "db_version":   u16, the version number of the most revent migration
117    ///     applied to this database.
118    ///   * "nimbus-id":    String, the randomly-generated identifier for the
119    ///     current client instance.
120    ///   * "user-opt-in":  bool, whether the user has explicitly opted in or out
121    ///     of participating in experiments.
122    ///   * "installation-date": a UTC DateTime string, defining the date the consuming app was
123    ///     installed
124    ///   * "update-date": a UTC DateTime string, defining the date the consuming app was
125    ///     last updated
126    ///   * "app-version": String, the version of the app last persisted
127    Meta,
128    /// Store containing pending updates to experiment data.
129    ///
130    /// The `Updates` store contains a single key "pending-experiment-updates", whose
131    /// corresponding value is a serialized `Vec<Experiment>` of new experiment data
132    /// that has been received from the server but not yet processed by the application.
133    Updates,
134    /// Store containing collected counts of behavior events for targeting purposes.
135    ///
136    /// Keys in the `EventCounts` store are strings representing the identifier for
137    /// the event and their corresponding values represent a serialized instance of a
138    /// [`MultiIntervalCounter`] struct that contains a set of configurations and data
139    /// for the different time periods that the data will be aggregated on.
140    EventCounts,
141}
142
143/// A wrapper for an Rkv store. Implemented to allow any value which supports
144/// serde to be used.
145pub struct SingleStore {
146    store: RkvSingleStore,
147}
148
149impl SingleStore {
150    pub fn new(store: RkvSingleStore) -> Self {
151        SingleStore { store }
152    }
153
154    pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
155        &self,
156        writer: &mut Writer,
157        key: &str,
158        persisted_data: &T,
159    ) -> Result<()> {
160        let persisted_json = match serde_json::to_string(persisted_data) {
161            Ok(v) => v,
162            Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
163        };
164        self.store
165            .put(writer, key, &rkv::Value::Json(&persisted_json))?;
166        Ok(())
167    }
168
169    #[allow(dead_code)]
170    pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
171        self.store.delete(writer, key)?;
172        Ok(())
173    }
174
175    pub fn clear(&self, writer: &mut Writer) -> Result<()> {
176        self.store.clear(writer)?;
177        Ok(())
178    }
179
180    // Some "get" functions that cooperate with transactions (ie, so we can
181    // get what we've written to the transaction before it's committed).
182    // It's unfortunate that these are duplicated with the DB itself, but the
183    // traits used by rkv make this tricky.
184    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
185    where
186        R: Readable<'r>,
187        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
188    {
189        let persisted_data = self.store.get(reader, key)?;
190        match persisted_data {
191            Some(data) => {
192                if let rkv::Value::Json(data) = data {
193                    Ok(Some(match serde_json::from_str::<T>(data) {
194                        Ok(v) => v,
195                        Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
196                    }))
197                } else {
198                    Err(NimbusError::InvalidPersistedData)
199                }
200            }
201            None => Ok(None),
202        }
203    }
204
205    /// Fork of collect_all that simply drops records that fail to read
206    /// rather than simply returning an error up the stack.  This likely
207    /// wants to be just a parameter to collect_all, but for now....
208    pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
209    where
210        R: Readable<'r>,
211        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
212    {
213        let mut result = Vec::new();
214        let mut iter = self.store.iter_start(reader)?;
215        while let Some(Ok((_, data))) = iter.next() {
216            if let rkv::Value::Json(data) = data {
217                let unserialized = serde_json::from_str::<T>(data);
218                match unserialized {
219                    Ok(value) => result.push(value),
220                    Err(e) => {
221                        // If there is an error, we won't push this onto the
222                        // result Vec, but we won't blow up the entire
223                        // deserialization either.
224                        warn!(
225                            "try_collect_all: discarded a record while deserializing with: {:?}",
226                            e
227                        );
228                        warn!(
229                            "try_collect_all:   data that failed to deserialize: {:?}",
230                            data
231                        );
232                    }
233                };
234            }
235        }
236        Ok(result)
237    }
238
239    pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
240    where
241        R: Readable<'r>,
242        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
243    {
244        let mut result = Vec::new();
245        let mut iter = self.store.iter_start(reader)?;
246        while let Some(Ok((_, data))) = iter.next() {
247            if let rkv::Value::Json(data) = data {
248                result.push(match serde_json::from_str::<T>(data) {
249                    Ok(v) => v,
250                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
251                });
252            }
253        }
254        Ok(result)
255    }
256}
257
258pub struct SingleStoreDatabase {
259    rkv: Rkv,
260    pub(crate) store: SingleStore,
261}
262
263impl SingleStoreDatabase {
264    /// Function used to obtain a "reader" which is used for read-only transactions.
265    pub fn read(&self) -> Result<Reader> {
266        Ok(self.rkv.read()?)
267    }
268
269    /// Function used to obtain a "writer" which is used for transactions.
270    /// The `writer.commit();` must be called to commit data added via the
271    /// writer.
272    pub fn write(&self) -> Result<Writer> {
273        Ok(self.rkv.write()?)
274    }
275
276    /// Function used to obtain values from the internal store.
277    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
278    where
279        R: Readable<'r>,
280        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
281    {
282        self.store.get(reader, key)
283    }
284}
285
286/// Database used to access persisted data
287/// This an abstraction around an Rkv database
288/// An instance on this database is created each time the component is loaded
289/// if there is persisted data, the `get` functions should retrieve it
290pub struct Database {
291    rkv: Rkv,
292    meta_store: SingleStore,
293    experiment_store: SingleStore,
294    enrollment_store: SingleStore,
295    updates_store: SingleStore,
296    event_count_store: SingleStore,
297}
298
299impl Database {
300    /// Main constructor for a database
301    /// Initiates the Rkv database to be used to retrieve persisted data
302    /// # Arguments
303    /// - `path`: A path to the persisted data, this is provided by the consuming application
304    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
305        let rkv = Self::open_rkv(path)?;
306        let meta_store = rkv.open_single("meta", StoreOptions::create())?;
307        let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
308        let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
309        let updates_store = rkv.open_single("updates", StoreOptions::create())?;
310        let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
311        let db = Self {
312            rkv,
313            meta_store: SingleStore::new(meta_store),
314            experiment_store: SingleStore::new(experiment_store),
315            enrollment_store: SingleStore::new(enrollment_store),
316            updates_store: SingleStore::new(updates_store),
317            event_count_store: SingleStore::new(event_count_store),
318        };
319        db.maybe_upgrade()?;
320        Ok(db)
321    }
322
323    pub fn open_single<P: AsRef<Path>>(path: P, store_id: StoreId) -> Result<SingleStoreDatabase> {
324        let rkv = Self::open_rkv(path)?;
325        let store = SingleStore::new(match store_id {
326            StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
327            StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
328            StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
329            StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
330            StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
331        });
332        Ok(SingleStoreDatabase { rkv, store })
333    }
334
335    fn maybe_upgrade(&self) -> Result<()> {
336        debug!("entered maybe upgrade");
337        let mut writer = self.rkv.write()?;
338        let db_version = self.meta_store.get::<u16, _>(&writer, DB_KEY_DB_VERSION)?;
339        match db_version {
340            Some(DB_VERSION) => {
341                // Already at the current version, no migration required.
342                info!("Already at version {}, no upgrade needed", DB_VERSION);
343                return Ok(());
344            }
345            Some(1) => {
346                info!("Migrating database from v1 to v2");
347                match self.migrate_v1_to_v2(&mut writer) {
348                    Ok(_) => (),
349                    Err(e) => {
350                        // The idea here is that it's better to leave an
351                        // individual install with a clean empty database
352                        // than in an unknown inconsistent state, because it
353                        // allows them to start participating in experiments
354                        // again, rather than potentially repeating the upgrade
355                        // over and over at each embedding client restart.
356                        error_support::report_error!(
357                            "nimbus-database-migration",
358                            "Error migrating database v1 to v2: {:?}.  Wiping experiments and enrollments",
359                            e
360                        );
361                        self.clear_experiments_and_enrollments(&mut writer)?;
362                    }
363                };
364            }
365            None => {
366                info!("maybe_upgrade: no version number; wiping most stores");
367                // The "first" version of the database (= no version number) had un-migratable data
368                // for experiments and enrollments, start anew.
369                // XXX: We can most likely remove this behaviour once enough time has passed,
370                // since nimbus wasn't really shipped to production at the time anyway.
371                self.clear_experiments_and_enrollments(&mut writer)?;
372            }
373            _ => {
374                error_support::report_error!(
375                    "nimbus-unknown-database-version",
376                    "Unknown database version. Wiping all stores."
377                );
378                self.clear_experiments_and_enrollments(&mut writer)?;
379                self.meta_store.clear(&mut writer)?;
380            }
381        }
382        // It is safe to clear the update store (i.e. the pending experiments) on all schema upgrades
383        // as it will be re-filled from the server on the next `fetch_experiments()`.
384        // The current contents of the update store may cause experiments to not load, or worse,
385        // accidentally unenroll.
386        self.updates_store.clear(&mut writer)?;
387        self.meta_store
388            .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
389        writer.commit()?;
390        debug!("maybe_upgrade: transaction committed");
391        Ok(())
392    }
393
394    pub(crate) fn clear_experiments_and_enrollments(
395        &self,
396        writer: &mut Writer,
397    ) -> Result<(), NimbusError> {
398        self.experiment_store.clear(writer)?;
399        self.enrollment_store.clear(writer)?;
400        Ok(())
401    }
402
403    pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
404        self.event_count_store.clear(writer)?;
405        Ok(())
406    }
407
408    /// Migrates a v1 database to v2
409    ///
410    /// Note that any Err returns from this function (including stuff
411    /// propagated up via the ? operator) will cause maybe_update (our caller)
412    /// to assume that this is unrecoverable and wipe the database, removing
413    /// people from any existing enrollments and blowing away their experiment
414    /// history, so that they don't get left in an inconsistent state.
415    fn migrate_v1_to_v2(&self, writer: &mut Writer) -> Result<()> {
416        info!("Upgrading from version 1 to version 2");
417
418        // use try_collect_all to read everything except records that serde
419        // returns deserialization errors on.  Some logging of those errors
420        // happens, but it's not ideal.
421        let reader = self.read()?;
422
423        // XXX write a test to verify that we don't need to gc any
424        // enrollments that don't have experiments because the experiments
425        // were discarded either during try_collect_all (these wouldn't have been
426        // detected during the filtering phase) or during the filtering phase
427        // itself.  The test needs to run evolve_experiments, as that should
428        // correctly drop any orphans, even if the migrators aren't perfect.
429
430        let enrollments: Vec<ExperimentEnrollment> =
431            self.enrollment_store.try_collect_all(&reader)?;
432        let experiments: Vec<Experiment> = self.experiment_store.try_collect_all(&reader)?;
433
434        // figure out which experiments have records that need to be dropped
435        // and log that we're going to drop them and why
436        let empty_string = "".to_string();
437        let slugs_with_experiment_issues: HashSet<String> = experiments
438            .iter()
439            .filter_map(
440                    |e| {
441                let branch_with_empty_feature_ids =
442                    e.branches.iter().find(|b| b.feature.is_none() || b.feature.as_ref().unwrap().feature_id.is_empty());
443                if branch_with_empty_feature_ids.is_some() {
444                    warn!("{:?} experiment has branch missing a feature prop; experiment & enrollment will be discarded", &e.slug);
445                    Some(e.slug.to_owned())
446                } else if e.feature_ids.is_empty() || e.feature_ids.contains(&empty_string) {
447                    warn!("{:?} experiment has invalid feature_ids array; experiment & enrollment will be discarded", &e.slug);
448                    Some(e.slug.to_owned())
449                } else {
450                    None
451                }
452            })
453            .collect();
454        let slugs_to_discard: HashSet<_> = slugs_with_experiment_issues;
455
456        // filter out experiments to be dropped
457        let updated_experiments: Vec<Experiment> = experiments
458            .into_iter()
459            .filter(|e| !slugs_to_discard.contains(&e.slug))
460            .collect();
461        debug!("updated experiments = {:?}", updated_experiments);
462
463        // filter out enrollments to be dropped
464        let updated_enrollments: Vec<ExperimentEnrollment> = enrollments
465            .into_iter()
466            .filter(|e| !slugs_to_discard.contains(&e.slug))
467            .collect();
468        debug!("updated enrollments = {:?}", updated_enrollments);
469
470        // rewrite both stores
471        self.experiment_store.clear(writer)?;
472        for experiment in updated_experiments {
473            self.experiment_store
474                .put(writer, &experiment.slug, &experiment)?;
475        }
476
477        self.enrollment_store.clear(writer)?;
478        for enrollment in updated_enrollments {
479            self.enrollment_store
480                .put(writer, &enrollment.slug, &enrollment)?;
481        }
482        debug!("exiting migrate_v1_to_v2");
483
484        Ok(())
485    }
486
487    /// Gets a Store object, which used with the writer returned by
488    /// `self.write()` to update the database in a transaction.
489    pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
490        match store_id {
491            StoreId::Meta => &self.meta_store,
492            StoreId::Experiments => &self.experiment_store,
493            StoreId::Enrollments => &self.enrollment_store,
494            StoreId::Updates => &self.updates_store,
495            StoreId::EventCounts => &self.event_count_store,
496        }
497    }
498
499    pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<Rkv> {
500        let path = std::path::Path::new(path.as_ref()).join("db");
501        debug!("open_rkv: path =  {:?}", path.display());
502        fs::create_dir_all(&path)?;
503        let rkv = match rkv_new(&path) {
504            Ok(rkv) => Ok(rkv),
505            Err(rkv_error) => {
506                match rkv_error {
507                    // For some errors we just delete the DB and start again.
508                    StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
509                        // On one hand this seems a little dangerous, but on
510                        // the other hand avoids us knowing about the
511                        // underlying implementation (ie, how do we know what
512                        // files might exist in all cases?)
513                        warn!(
514                            "Database at '{}' appears corrupt - removing and recreating",
515                            path.display()
516                        );
517                        fs::remove_dir_all(&path)?;
518                        fs::create_dir_all(&path)?;
519                        // TODO: Once we have glean integration we want to
520                        // record telemetry here.
521                        rkv_new(&path)
522                    }
523                    // All other errors are fatal.
524                    _ => Err(rkv_error),
525                }
526            }
527        }?;
528        debug!("Database initialized");
529        Ok(rkv)
530    }
531
532    /// Function used to obtain a "reader" which is used for read-only transactions.
533    pub fn read(&self) -> Result<Reader> {
534        Ok(self.rkv.read()?)
535    }
536
537    /// Function used to obtain a "writer" which is used for transactions.
538    /// The `writer.commit();` must be called to commit data added via the
539    /// writer.
540    pub fn write(&self) -> Result<Writer> {
541        Ok(self.rkv.write()?)
542    }
543
544    /// Function used to retrieve persisted data outside of a transaction.
545    /// It allows retrieval of any serializable and deserializable data
546    /// Currently only supports JSON data
547    // Only available for tests; product code should always be using transactions.
548    ///
549    /// # Arguments
550    /// - `key`: A key for the data stored in the underlying database
551    #[cfg(test)]
552    pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
553        &self,
554        store_id: StoreId,
555        key: &str,
556    ) -> Result<Option<T>> {
557        let reader = self.rkv.read()?;
558        let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
559        match persisted_data {
560            Some(data) => {
561                if let rkv::Value::Json(data) = data {
562                    Ok(Some(match serde_json::from_str::<T>(data) {
563                        Ok(v) => v,
564                        Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
565                    }))
566                } else {
567                    Err(NimbusError::InvalidPersistedData)
568                }
569            }
570            None => Ok(None),
571        }
572    }
573
574    // Function for collecting all items in a store outside of a transaction.
575    // Only available for tests; product code should always be using transactions.
576    // Iters are a bit tricky - would be nice to make them generic, but this will
577    // do for our use-case.
578    #[cfg(test)]
579    pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
580        &self,
581        store_id: StoreId,
582    ) -> Result<Vec<T>> {
583        let mut result = Vec::new();
584        let reader = self.rkv.read()?;
585        let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
586        while let Some(Ok((_, data))) = iter.next() {
587            if let rkv::Value::Json(data) = data {
588                result.push(match serde_json::from_str::<T>(data) {
589                    Ok(v) => v,
590                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
591                });
592            }
593        }
594        Ok(result)
595    }
596}