nimbus/stateful/
persistence.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Our storage abstraction, currently backed by Rkv.
6
7use rkv::{StoreError, StoreOptions};
8use std::fmt;
9use std::fs;
10use std::path::Path;
11use std::sync::Arc;
12
13use crate::error::{ErrorCode, NimbusError, Result, debug, info, warn};
14use crate::metrics::{DatabaseLoadExtraDef, DatabaseMigrationExtraDef, MetricsHandler};
15
16// This uses the lmdb backend for rkv, which is unstable.
17// We use it for now since glean didn't seem to have trouble with it (although
18// it must be noted that the rkv documentation explicitly says "To use rkv in
19// production/release environments at Mozilla, you may do so with the "SafeMode"
20// backend", so we really should get more guidance here.)
21
22// We use an incrementing integer to manage database migrations.
23// If you need to make a backwards-incompatible change to the data schema,
24// increment `DB_VERSION` and implement some migration logic in `maybe_upgrade`.
25//
26// ⚠️ Warning : Altering the type of `DB_VERSION` would itself require a DB migration. ⚠️
27pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
28
29/// The current database version.
30pub(crate) const DB_VERSION: u16 = 3;
31
32pub(crate) const DB_KEY_DB_WAS_CORRUPT: &str = "db-was-corrupt";
33
34/// The minimum database version that will be migrated.
35///
36/// If the version is below this threshold, the database will be reset.
37pub(crate) const DB_MIN_VERSION: u16 = 2;
38
39const RKV_MAX_DBS: u32 = 6;
40
41pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
42pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
43
44// Legacy key for migration purposes
45pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
46
47pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
48pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
49
50// Inspired by Glean - use a feature to choose between the backends.
51// Select the LMDB-powered storage backend when the feature is not activated.
52#[cfg(not(feature = "rkv-safe-mode"))]
53mod backend {
54    use rkv::backend::{
55        Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
56    };
57    use std::path::Path;
58
59    use super::RKV_MAX_DBS;
60
61    pub type Rkv = rkv::Rkv<LmdbEnvironment>;
62    pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
63    pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
64    pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
65    pub trait Readable<'r>:
66        rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
67    {
68    }
69    impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
70        Readable<'r> for T
71    {
72    }
73
74    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
75        Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
76    }
77}
78
79// Select the "safe mode" storage backend when the feature is activated.
80#[cfg(feature = "rkv-safe-mode")]
81mod backend {
82    use rkv::backend::{
83        SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
84        SafeModeRwTransaction,
85    };
86    use std::path::Path;
87
88    use super::RKV_MAX_DBS;
89
90    pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
91    pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
92    pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
93    pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
94    pub trait Readable<'r>:
95        rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
96    {
97    }
98    impl<'r, T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>>
99        Readable<'r> for T
100    {
101    }
102
103    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
104        Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
105    }
106}
107
108use backend::*;
109pub use backend::{Readable, Writer};
110
111/// Enumeration of the different stores within our database.
112///
113/// Our rkv database contains a number of different "stores", and the items
114/// in each store correspond to a particular type of object at the Rust level.
115pub enum StoreId {
116    /// Store containing the set of known experiments, as read from the server.
117    ///
118    /// Keys in the `Experiments` store are experiment identifier slugs, and their
119    /// corresponding values are  serialized instances of the [`Experiment`] struct
120    /// representing the last known state of that experiment.
121    Experiments,
122    /// Store containing the set of known experiment enrollments.
123    ///
124    /// Keys in the `Enrollments` store are experiment identifier slugs, and their
125    /// corresponding values are serialized instances of the [`ExperimentEnrollment`]
126    /// struct representing the current state of this client's enrollment (or not)
127    /// in that experiment.
128    Enrollments,
129    /// Store containing miscellaneous metadata about this client instance.
130    ///
131    /// Keys in the `Meta` store are string constants, and their corresponding values
132    /// are serialized items whose type depends on the constant. Known constraints
133    /// include:
134    ///   * "db_version":   u16, the version number of the most revent migration
135    ///     applied to this database.
136    ///   * "db_was_corrupt":   boolean, whether or not a corrupt database was
137    ///     replaced with a new one in Database::open_single
138    ///   * "nimbus-id":    String, the randomly-generated identifier for the
139    ///     current client instance.
140    ///   * "user-opt-in-experiments":  bool, whether the user has explicitly opted in or out
141    ///     of participating in experiments.
142    ///   * "user-opt-in-rollouts":  bool, whether the user has explicitly opted in or out
143    ///     of participating in rollouts.
144    ///   * "installation-date": a UTC DateTime string, defining the date the consuming app was
145    ///     installed
146    ///   * "update-date": a UTC DateTime string, defining the date the consuming app was
147    ///     last updated
148    ///   * "app-version": String, the version of the app last persisted
149    Meta,
150    /// Store containing pending updates to experiment data.
151    ///
152    /// The `Updates` store contains a single key "pending-experiment-updates", whose
153    /// corresponding value is a serialized `Vec<Experiment>` of new experiment data
154    /// that has been received from the server but not yet processed by the application.
155    Updates,
156    /// Store containing collected counts of behavior events for targeting purposes.
157    ///
158    /// Keys in the `EventCounts` store are strings representing the identifier for
159    /// the event and their corresponding values represent a serialized instance of a
160    /// [`MultiIntervalCounter`] struct that contains a set of configurations and data
161    /// for the different time periods that the data will be aggregated on.
162    EventCounts,
163}
164
165/// A wrapper for an Rkv store. Implemented to allow any value which supports
166/// serde to be used.
167pub struct SingleStore {
168    store: RkvSingleStore,
169}
170
171impl SingleStore {
172    pub fn new(store: RkvSingleStore) -> Self {
173        SingleStore { store }
174    }
175
176    pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
177        &self,
178        writer: &mut Writer,
179        key: &str,
180        persisted_data: &T,
181    ) -> Result<()> {
182        let persisted_json = match serde_json::to_string(persisted_data) {
183            Ok(v) => v,
184            Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
185        };
186        self.store
187            .put(writer, key, &rkv::Value::Json(&persisted_json))?;
188        Ok(())
189    }
190
191    #[allow(dead_code)]
192    pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
193        self.store.delete(writer, key)?;
194        Ok(())
195    }
196
197    pub fn clear(&self, writer: &mut Writer) -> Result<()> {
198        self.store.clear(writer)?;
199        Ok(())
200    }
201
202    // Some "get" functions that cooperate with transactions (ie, so we can
203    // get what we've written to the transaction before it's committed).
204    // It's unfortunate that these are duplicated with the DB itself, but the
205    // traits used by rkv make this tricky.
206    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
207    where
208        R: Readable<'r>,
209        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
210    {
211        let persisted_data = self.store.get(reader, key)?;
212        match persisted_data {
213            Some(data) => {
214                if let rkv::Value::Json(data) = data {
215                    Ok(Some(match serde_json::from_str::<T>(data) {
216                        Ok(v) => v,
217                        Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
218                    }))
219                } else {
220                    Err(NimbusError::InvalidPersistedData)
221                }
222            }
223            None => Ok(None),
224        }
225    }
226
227    /// Fork of collect_all that simply drops records that fail to read
228    /// rather than simply returning an error up the stack.  This likely
229    /// wants to be just a parameter to collect_all, but for now....
230    pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
231    where
232        R: Readable<'r>,
233        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
234    {
235        let mut result = Vec::new();
236        let mut iter = self.store.iter_start(reader)?;
237        while let Some(Ok((_, data))) = iter.next() {
238            if let rkv::Value::Json(data) = data {
239                let unserialized = serde_json::from_str::<T>(data);
240                match unserialized {
241                    Ok(value) => result.push(value),
242                    Err(e) => {
243                        // If there is an error, we won't push this onto the
244                        // result Vec, but we won't blow up the entire
245                        // deserialization either.
246                        warn!(
247                            "try_collect_all: discarded a record while deserializing with: {:?}",
248                            e
249                        );
250                        warn!(
251                            "try_collect_all:   data that failed to deserialize: {:?}",
252                            data
253                        );
254                    }
255                };
256            }
257        }
258        Ok(result)
259    }
260
261    pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
262    where
263        R: Readable<'r>,
264        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
265    {
266        let mut result = Vec::new();
267        let mut iter = self.store.iter_start(reader)?;
268        while let Some(Ok((_, data))) = iter.next() {
269            if let rkv::Value::Json(data) = data {
270                result.push(match serde_json::from_str::<T>(data) {
271                    Ok(v) => v,
272                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
273                });
274            }
275        }
276        Ok(result)
277    }
278}
279
280pub struct SingleStoreDatabase {
281    rkv: Rkv,
282    pub(crate) store: SingleStore,
283}
284
285impl SingleStoreDatabase {
286    /// Function used to obtain a "reader" which is used for read-only transactions.
287    pub fn read(&self) -> Result<Reader<'_>> {
288        Ok(self.rkv.read()?)
289    }
290
291    /// Function used to obtain a "writer" which is used for transactions.
292    /// The `writer.commit();` must be called to commit data added via the
293    /// writer.
294    pub fn write(&self) -> Result<Writer<'_>> {
295        Ok(self.rkv.write()?)
296    }
297
298    /// Function used to obtain values from the internal store.
299    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
300    where
301        R: Readable<'r>,
302        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
303    {
304        self.store.get(reader, key)
305    }
306}
307
308/// Metadata about opening an RKV database.
309#[derive(Default)]
310pub struct OpenRkvMetadata {
311    /// Was the database corrupt? If so, it has been replaced by a new, blank
312    /// database.
313    pub corrupt: bool,
314}
315
316/// Metadata about opening an RKV database and its stores.
317///
318/// This has more information than [`OpenRkvMetadata`] because the former does
319/// not attempt to open any stores.
320pub struct OpenMetadata {
321    /// Was the database corrupt? If so, it has been replaced by a new, blank
322    /// database.
323    pub corrupt: bool,
324
325    /// The database version recorded at load time.
326    ///
327    /// A value of `0` may indicate that no version was recorded, as there was
328    /// never a v0 database.
329    pub initial_version: u16,
330}
331
332#[derive(Default)]
333pub struct MigrationMetadata {
334    pub initial_version: Option<u16>,
335    pub migrated_version: Option<u16>,
336    pub mirgation_error: Option<String>,
337}
338
339#[derive(Clone, Copy)]
340pub enum DatabaseMigrationReason {
341    Upgrade,
342    InvalidVersion,
343}
344
345impl fmt::Display for DatabaseMigrationReason {
346    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
347        f.write_str(match self {
348            Self::Upgrade => "upgrade",
349            Self::InvalidVersion => "invalid_version",
350        })
351    }
352}
353
354/// Database used to access persisted data
355/// This an abstraction around an Rkv database
356/// An instance on this database is created each time the component is loaded
357/// if there is persisted data, the `get` functions should retrieve it
358pub struct Database {
359    rkv: Rkv,
360    meta_store: SingleStore,
361    experiment_store: SingleStore,
362    enrollment_store: SingleStore,
363    updates_store: SingleStore,
364    event_count_store: SingleStore,
365
366    metrics_handler: Arc<dyn MetricsHandler>,
367}
368
369impl Database {
370    /// Main constructor for a database
371    /// Initiates the Rkv database to be used to retrieve persisted data
372    /// # Arguments
373    /// - `path`: A path to the persisted data, this is provided by the consuming application
374    pub fn new<P: AsRef<Path>>(path: P, metrics_handler: Arc<dyn MetricsHandler>) -> Result<Self> {
375        let mut event = DatabaseLoadExtraDef::default();
376
377        let (db, open_metadata) = match Self::open(path, metrics_handler.clone()) {
378            Ok(db) => db,
379            Err(e) => {
380                event.error = Some(e.error_code().to_string());
381                metrics_handler.record_database_load(event);
382                return Err(e);
383            }
384        };
385
386        event.initial_version = Some(open_metadata.initial_version);
387        event.corrupt = Some(open_metadata.corrupt);
388
389        let migrate_result = db.maybe_upgrade(open_metadata.initial_version);
390        match migrate_result {
391            Ok(migrated_version) => event.migrated_version = migrated_version,
392            Err(ref e) => event.migration_error = Some(e.error_code().to_string()),
393        }
394
395        metrics_handler.record_database_load(event);
396
397        migrate_result?;
398
399        Ok(db)
400    }
401
402    /// Open a database, creating it if it does not exist.
403    fn open<P: AsRef<Path>>(
404        path: P,
405        metrics_handler: Arc<dyn MetricsHandler>,
406    ) -> Result<(Self, OpenMetadata)> {
407        let (rkv, open_metadata) = Self::open_rkv(path)?;
408
409        let meta_store = rkv.open_single("meta", StoreOptions::create())?;
410        let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
411        let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
412        let updates_store = rkv.open_single("updates", StoreOptions::create())?;
413        let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
414        let db = Self {
415            rkv,
416            meta_store: SingleStore::new(meta_store),
417            experiment_store: SingleStore::new(experiment_store),
418            enrollment_store: SingleStore::new(enrollment_store),
419            updates_store: SingleStore::new(updates_store),
420            event_count_store: SingleStore::new(event_count_store),
421            metrics_handler,
422        };
423
424        let mut writer = db.rkv.write()?;
425
426        let mut open_metadata = OpenMetadata {
427            corrupt: open_metadata.corrupt,
428            initial_version: db.meta_store.get(&writer, DB_KEY_DB_VERSION)?.unwrap_or(0),
429        };
430
431        if !open_metadata.corrupt {
432            open_metadata.corrupt = db
433                .meta_store
434                .get(&writer, DB_KEY_DB_WAS_CORRUPT)?
435                .unwrap_or(false);
436
437            if open_metadata.corrupt {
438                db.meta_store.delete(&mut writer, DB_KEY_DB_WAS_CORRUPT)?;
439                writer.commit()?;
440            }
441        }
442
443        Ok((db, open_metadata))
444    }
445
446    pub(crate) fn open_single<P: AsRef<Path>>(
447        path: P,
448        store_id: StoreId,
449    ) -> Result<SingleStoreDatabase> {
450        // `Database::open_single` is used by `get_calculated_attributes` to
451        // compute `days_since_update` before the Nimbus SDK has loaded the
452        // database. The following `open_rkv` call *will* wipe the database upon
453        // encountering corruption, which means when we call `Database::new()`
454        // from the client we will not be able to determine that corruption occured.
455        //
456        // Record the corrupted state into the meta store so that the client
457        // will be able to report whether or not the database was actually
458        // corrupted at startup, regardless if we already clobbered it.
459        let (rkv, open_metadata) = Self::open_rkv(path)?;
460
461        if open_metadata.corrupt {
462            let meta = rkv.open_single("meta", StoreOptions::create())?;
463
464            let mut writer = rkv.write()?;
465            meta.put(
466                &mut writer,
467                DB_KEY_DB_WAS_CORRUPT,
468                &rkv::Value::Json("true"),
469            )?;
470            writer.commit()?;
471        }
472
473        let store = SingleStore::new(match store_id {
474            StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
475            StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
476            StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
477            StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
478            StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
479        });
480        Ok(SingleStoreDatabase { rkv, store })
481    }
482
483    /// Attempt to upgrade the database.
484    ///
485    /// If the database is already up-to-date, no operations will be performed.
486    /// Otherwise migrations will be applied in order until the database is at
487    /// [`DB_VERSION`].
488    ///
489    /// If an error occurs during migration, the experiments, enrollments, and
490    /// meta stores will be cleared.
491    fn maybe_upgrade(&self, current_version: u16) -> Result<Option<u16>> {
492        debug!("entered maybe upgrade");
493
494        println!("maybe_upgrade from {current_version}");
495
496        if current_version == DB_VERSION {
497            return Ok(None);
498        }
499
500        let mut writer = self.write()?;
501
502        // An `Err` here means either:
503        //
504        // - an individual migration failed, in which case the machinery in
505        //   [`force_apply_migration`] will have wiped the database in an attempt
506        //   to recover; or
507        //
508        // - the database wipe resulting from a failed migration *also* failed,
509        //   in which case there is not really anything we can do.
510        let _ = self.apply_migrations(&mut writer, current_version);
511
512        // It is safe to clear the update store (i.e. the pending experiments)
513        // on all schema upgrades as it will be re-filled from the server on the
514        // next `fetch_experiments()`. The current contents of the update store
515        // may cause experiments to not load, or worse, accidentally unenroll.
516        self.updates_store.clear(&mut writer)?;
517        self.meta_store
518            .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
519        writer.commit()?;
520        debug!("maybe_upgrade: transaction committed");
521
522        Ok(Some(DB_VERSION))
523    }
524
525    /// Apply all pending migrations.
526    ///
527    /// If all migrations apply successfully, the database will have version
528    /// [`DB_VERSION`].
529    fn apply_migrations(&self, writer: &mut Writer, initial_version: u16) -> Result<()> {
530        let mut current_version = initial_version;
531
532        if !(DB_MIN_VERSION..=DB_VERSION).contains(&current_version) {
533            let reason = if current_version < DB_MIN_VERSION {
534                DatabaseMigrationReason::Upgrade
535            } else {
536                DatabaseMigrationReason::InvalidVersion
537            };
538
539            // We need to force-apply this migration because current_version may be > 2.
540            self.force_apply_migration(
541                writer,
542                |writer| self.migrate_reset_to_v2(writer),
543                &mut current_version,
544                2,
545                reason,
546            )?;
547        };
548
549        self.apply_migration(
550            writer,
551            |writer| self.migrate_v2_to_v3(writer),
552            &mut current_version,
553            3,
554            DatabaseMigrationReason::Upgrade,
555        )?;
556
557        Ok(())
558    }
559
560    /// Apply a single migration, if it is applicable.
561    ///
562    /// The result of the migration will be reported via telemetry.
563    fn apply_migration(
564        &self,
565        writer: &mut Writer,
566        migration: impl FnOnce(&mut Writer) -> Result<()>,
567        from_version: &mut u16,
568        to_version: u16,
569        reason: DatabaseMigrationReason,
570    ) -> Result<()> {
571        if *from_version >= to_version {
572            return Ok(());
573        }
574
575        self.force_apply_migration(writer, migration, from_version, to_version, reason)
576    }
577
578    /// Forcibly apply a migration, without taking version constraints into
579    /// account.
580    fn force_apply_migration(
581        &self,
582        writer: &mut Writer,
583        migration: impl FnOnce(&mut Writer) -> Result<()>,
584        from_version: &mut u16,
585        to_version: u16,
586        reason: DatabaseMigrationReason,
587    ) -> Result<()> {
588        let mut event = DatabaseMigrationExtraDef {
589            from_version: *from_version,
590            to_version,
591            reason: reason.to_string(),
592            error: None,
593        };
594
595        if let Err(e) = migration(writer) {
596            event.error = Some(e.error_code().to_string());
597            self.metrics_handler.record_database_migration(event);
598
599            error_support::report_error!(
600                "nimbus-database-migration",
601                "Error migrating database from v{} to v{}: {:?}. Wiping experiments and enrollments",
602                from_version,
603                to_version,
604                e
605            );
606
607            self.clear_experiments_and_enrollments(writer)?;
608            return Err(e);
609        }
610
611        self.metrics_handler.record_database_migration(event);
612        *from_version = to_version;
613        Ok(())
614    }
615
616    pub(crate) fn clear_experiments_and_enrollments(
617        &self,
618        writer: &mut Writer,
619    ) -> Result<(), NimbusError> {
620        self.experiment_store.clear(writer)?;
621        self.enrollment_store.clear(writer)?;
622        Ok(())
623    }
624
625    pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
626        self.event_count_store.clear(writer)?;
627        Ok(())
628    }
629
630    pub fn migrate_reset_to_v2(&self, writer: &mut Writer) -> Result<()> {
631        self.clear_experiments_and_enrollments(writer)?;
632
633        Ok(())
634    }
635
636    /// Migrates a v2 database to v3
637    ///
638    /// Separates global user participation into experiments and rollouts participation.
639    /// For privacy: if user opted out globally, they remain opted out of experiments.
640    fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
641        info!("Upgrading from version 2 to version 3");
642
643        let meta_store = &self.meta_store;
644
645        // Get the old global participation flag
646        let old_global_participation = meta_store
647            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
648            .unwrap_or(true); // Default was true
649
650        // Set new separate flags based on privacy requirements:
651        // - If user opted out globally, they stay opted out of experiments
652        // - If user opted out globally, they stay opted out of rollouts (per requirement #3)
653        meta_store.put(
654            writer,
655            DB_KEY_EXPERIMENT_PARTICIPATION,
656            &old_global_participation,
657        )?;
658        meta_store.put(
659            writer,
660            DB_KEY_ROLLOUT_PARTICIPATION,
661            &old_global_participation,
662        )?;
663
664        // Remove the old global participation key if it exists
665        if meta_store
666            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
667            .is_some()
668        {
669            meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
670        }
671
672        info!(
673            "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
674            old_global_participation, old_global_participation
675        );
676
677        Ok(())
678    }
679
680    /// Gets a Store object, which used with the writer returned by
681    /// `self.write()` to update the database in a transaction.
682    pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
683        match store_id {
684            StoreId::Meta => &self.meta_store,
685            StoreId::Experiments => &self.experiment_store,
686            StoreId::Enrollments => &self.enrollment_store,
687            StoreId::Updates => &self.updates_store,
688            StoreId::EventCounts => &self.event_count_store,
689        }
690    }
691
692    pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<(Rkv, OpenRkvMetadata)> {
693        let mut metadata = OpenRkvMetadata::default();
694
695        let path = std::path::Path::new(path.as_ref()).join("db");
696        debug!("open_rkv: path =  {:?}", path.display());
697        fs::create_dir_all(&path)?;
698        let rkv = match rkv_new(&path) {
699            Ok(rkv) => Ok(rkv),
700            Err(rkv_error) => {
701                match rkv_error {
702                    // For some errors we just delete the DB and start again.
703                    StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
704                        // On one hand this seems a little dangerous, but on
705                        // the other hand avoids us knowing about the
706                        // underlying implementation (ie, how do we know what
707                        // files might exist in all cases?)
708                        warn!(
709                            "Database at '{}' appears corrupt - removing and recreating",
710                            path.display()
711                        );
712                        fs::remove_dir_all(&path)?;
713                        fs::create_dir_all(&path)?;
714
715                        metadata.corrupt = true;
716
717                        rkv_new(&path)
718                    }
719                    // All other errors are fatal.
720                    _ => Err(rkv_error),
721                }
722            }
723        }?;
724        debug!("Database initialized");
725        Ok((rkv, metadata))
726    }
727
728    /// Function used to obtain a "reader" which is used for read-only transactions.
729    pub fn read(&self) -> Result<Reader<'_>> {
730        Ok(self.rkv.read()?)
731    }
732
733    /// Function used to obtain a "writer" which is used for transactions.
734    /// The `writer.commit();` must be called to commit data added via the
735    /// writer.
736    pub fn write(&self) -> Result<Writer<'_>> {
737        Ok(self.rkv.write()?)
738    }
739
740    /// Function used to retrieve persisted data outside of a transaction.
741    /// It allows retrieval of any serializable and deserializable data
742    /// Currently only supports JSON data
743    // Only available for tests; product code should always be using transactions.
744    ///
745    /// # Arguments
746    /// - `key`: A key for the data stored in the underlying database
747    #[cfg(test)]
748    pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
749        &self,
750        store_id: StoreId,
751        key: &str,
752    ) -> Result<Option<T>> {
753        let reader = self.rkv.read()?;
754        let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
755        match persisted_data {
756            Some(data) => {
757                if let rkv::Value::Json(data) = data {
758                    Ok(Some(match serde_json::from_str::<T>(data) {
759                        Ok(v) => v,
760                        Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
761                    }))
762                } else {
763                    Err(NimbusError::InvalidPersistedData)
764                }
765            }
766            None => Ok(None),
767        }
768    }
769
770    // Function for collecting all items in a store outside of a transaction.
771    // Only available for tests; product code should always be using transactions.
772    // Iters are a bit tricky - would be nice to make them generic, but this will
773    // do for our use-case.
774    #[cfg(test)]
775    pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
776        &self,
777        store_id: StoreId,
778    ) -> Result<Vec<T>> {
779        let mut result = Vec::new();
780        let reader = self.rkv.read()?;
781        let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
782        while let Some(Ok((_, data))) = iter.next() {
783            if let rkv::Value::Json(data) = data {
784                result.push(match serde_json::from_str::<T>(data) {
785                    Ok(v) => v,
786                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
787                });
788            }
789        }
790        Ok(result)
791    }
792}