nimbus/stateful/
persistence.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5//! Our storage abstraction, currently backed by Rkv.
6
7use rkv::{StoreError, StoreOptions};
8use std::fmt;
9use std::fs;
10use std::path::Path;
11use std::sync::Arc;
12
13use crate::enrollment::ExperimentEnrollment;
14use crate::error::{ErrorCode, NimbusError, Result, debug, info, warn};
15use crate::metrics::{DatabaseLoadExtraDef, DatabaseMigrationExtraDef, MetricsHandler};
16use crate::stateful::enrollment::v3;
17
18// This uses the lmdb backend for rkv, which is unstable.
19// We use it for now since glean didn't seem to have trouble with it (although
20// it must be noted that the rkv documentation explicitly says "To use rkv in
21// production/release environments at Mozilla, you may do so with the "SafeMode"
22// backend", so we really should get more guidance here.)
23
24// We use an incrementing integer to manage database migrations.
25// If you need to make a backwards-incompatible change to the data schema,
26// increment `DB_VERSION` and implement some migration logic in `maybe_upgrade`.
27//
28// ⚠️ Warning : Altering the type of `DB_VERSION` would itself require a DB migration. ⚠️
29pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
30
31/// The current database version.
32pub(crate) const DB_VERSION: u16 = 4;
33
34pub(crate) const DB_KEY_DB_WAS_CORRUPT: &str = "db-was-corrupt";
35
36/// The minimum database version that will be migrated.
37///
38/// If the version is below this threshold, the database will be reset.
39pub(crate) const DB_MIN_VERSION: u16 = 2;
40
41const RKV_MAX_DBS: u32 = 6;
42
43pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
44pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
45
46// Legacy key for migration purposes
47pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
48
49pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
50pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
51
52// Inspired by Glean - use a feature to choose between the backends.
53// Select the LMDB-powered storage backend when the feature is not activated.
54#[cfg(not(feature = "rkv-safe-mode"))]
55mod backend {
56    use rkv::backend::{
57        Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
58    };
59    use std::path::Path;
60
61    use super::RKV_MAX_DBS;
62
63    pub type Rkv = rkv::Rkv<LmdbEnvironment>;
64    pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
65    pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
66    pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
67    pub trait Readable<'r>:
68        rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
69    {
70    }
71    impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
72        Readable<'r> for T
73    {
74    }
75
76    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
77        Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
78    }
79}
80
81// Select the "safe mode" storage backend when the feature is activated.
82#[cfg(feature = "rkv-safe-mode")]
83mod backend {
84    use rkv::backend::{
85        SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
86        SafeModeRwTransaction,
87    };
88    use std::path::Path;
89
90    use super::RKV_MAX_DBS;
91
92    pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
93    pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
94    pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
95    pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
96    pub trait Readable<'r>:
97        rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
98    {
99    }
100    impl<'r, T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>>
101        Readable<'r> for T
102    {
103    }
104
105    pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
106        Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
107    }
108}
109
110use backend::*;
111pub use backend::{Readable, Writer};
112
113/// Enumeration of the different stores within our database.
114///
115/// Our rkv database contains a number of different "stores", and the items
116/// in each store correspond to a particular type of object at the Rust level.
117pub enum StoreId {
118    /// Store containing the set of known experiments, as read from the server.
119    ///
120    /// Keys in the `Experiments` store are experiment identifier slugs, and their
121    /// corresponding values are  serialized instances of the [`Experiment`] struct
122    /// representing the last known state of that experiment.
123    Experiments,
124    /// Store containing the set of known experiment enrollments.
125    ///
126    /// Keys in the `Enrollments` store are experiment identifier slugs, and their
127    /// corresponding values are serialized instances of the [`ExperimentEnrollment`]
128    /// struct representing the current state of this client's enrollment (or not)
129    /// in that experiment.
130    Enrollments,
131    /// Store containing miscellaneous metadata about this client instance.
132    ///
133    /// Keys in the `Meta` store are string constants, and their corresponding values
134    /// are serialized items whose type depends on the constant. Known constraints
135    /// include:
136    ///   * "db_version":   u16, the version number of the most revent migration
137    ///     applied to this database.
138    ///   * "db_was_corrupt":   boolean, whether or not a corrupt database was
139    ///     replaced with a new one in Database::open_single
140    ///   * "nimbus-id":    String, the randomly-generated identifier for the
141    ///     current client instance.
142    ///   * "user-opt-in-experiments":  bool, whether the user has explicitly opted in or out
143    ///     of participating in experiments.
144    ///   * "user-opt-in-rollouts":  bool, whether the user has explicitly opted in or out
145    ///     of participating in rollouts.
146    ///   * "installation-date": a UTC DateTime string, defining the date the consuming app was
147    ///     installed
148    ///   * "update-date": a UTC DateTime string, defining the date the consuming app was
149    ///     last updated
150    ///   * "app-version": String, the version of the app last persisted
151    Meta,
152    /// Store containing pending updates to experiment data.
153    ///
154    /// The `Updates` store contains a single key "pending-experiment-updates", whose
155    /// corresponding value is a serialized `Vec<Experiment>` of new experiment data
156    /// that has been received from the server but not yet processed by the application.
157    Updates,
158    /// Store containing collected counts of behavior events for targeting purposes.
159    ///
160    /// Keys in the `EventCounts` store are strings representing the identifier for
161    /// the event and their corresponding values represent a serialized instance of a
162    /// [`MultiIntervalCounter`] struct that contains a set of configurations and data
163    /// for the different time periods that the data will be aggregated on.
164    EventCounts,
165}
166
167/// A wrapper for an Rkv store. Implemented to allow any value which supports
168/// serde to be used.
169pub struct SingleStore {
170    store: RkvSingleStore,
171}
172
173impl SingleStore {
174    pub fn new(store: RkvSingleStore) -> Self {
175        SingleStore { store }
176    }
177
178    pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
179        &self,
180        writer: &mut Writer,
181        key: &str,
182        persisted_data: &T,
183    ) -> Result<()> {
184        let persisted_json = match serde_json::to_string(persisted_data) {
185            Ok(v) => v,
186            Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
187        };
188        self.store
189            .put(writer, key, &rkv::Value::Json(&persisted_json))?;
190        Ok(())
191    }
192
193    #[allow(dead_code)]
194    pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
195        self.store.delete(writer, key)?;
196        Ok(())
197    }
198
199    pub fn clear(&self, writer: &mut Writer) -> Result<()> {
200        self.store.clear(writer)?;
201        Ok(())
202    }
203
204    // Some "get" functions that cooperate with transactions (ie, so we can
205    // get what we've written to the transaction before it's committed).
206    // It's unfortunate that these are duplicated with the DB itself, but the
207    // traits used by rkv make this tricky.
208    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
209    where
210        R: Readable<'r>,
211        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
212    {
213        let persisted_data = self.store.get(reader, key)?;
214        match persisted_data {
215            Some(data) => {
216                if let rkv::Value::Json(data) = data {
217                    Ok(Some(match serde_json::from_str::<T>(data) {
218                        Ok(v) => v,
219                        Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
220                    }))
221                } else {
222                    Err(NimbusError::InvalidPersistedData)
223                }
224            }
225            None => Ok(None),
226        }
227    }
228
229    /// Fork of collect_all that simply drops records that fail to read
230    /// rather than simply returning an error up the stack.  This likely
231    /// wants to be just a parameter to collect_all, but for now....
232    pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
233    where
234        R: Readable<'r>,
235        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
236    {
237        let mut result = Vec::new();
238        let mut iter = self.store.iter_start(reader)?;
239        while let Some(Ok((_, data))) = iter.next() {
240            if let rkv::Value::Json(data) = data {
241                let unserialized = serde_json::from_str::<T>(data);
242                match unserialized {
243                    Ok(value) => result.push(value),
244                    Err(e) => {
245                        // If there is an error, we won't push this onto the
246                        // result Vec, but we won't blow up the entire
247                        // deserialization either.
248                        warn!(
249                            "try_collect_all: discarded a record while deserializing with: {:?}",
250                            e
251                        );
252                        warn!(
253                            "try_collect_all:   data that failed to deserialize: {:?}",
254                            data
255                        );
256                    }
257                };
258            }
259        }
260        Ok(result)
261    }
262
263    pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
264    where
265        R: Readable<'r>,
266        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
267    {
268        let mut result = Vec::new();
269        let mut iter = self.store.iter_start(reader)?;
270        while let Some(Ok((_, data))) = iter.next() {
271            if let rkv::Value::Json(data) = data {
272                result.push(match serde_json::from_str::<T>(data) {
273                    Ok(v) => v,
274                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
275                });
276            }
277        }
278        Ok(result)
279    }
280}
281
282pub struct SingleStoreDatabase {
283    rkv: Rkv,
284    pub(crate) store: SingleStore,
285}
286
287impl SingleStoreDatabase {
288    /// Function used to obtain a "reader" which is used for read-only transactions.
289    pub fn read(&self) -> Result<Reader<'_>> {
290        Ok(self.rkv.read()?)
291    }
292
293    /// Function used to obtain a "writer" which is used for transactions.
294    /// The `writer.commit();` must be called to commit data added via the
295    /// writer.
296    pub fn write(&self) -> Result<Writer<'_>> {
297        Ok(self.rkv.write()?)
298    }
299
300    /// Function used to obtain values from the internal store.
301    pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
302    where
303        R: Readable<'r>,
304        T: serde::Serialize + for<'de> serde::Deserialize<'de>,
305    {
306        self.store.get(reader, key)
307    }
308}
309
310/// Metadata about opening an RKV database.
311#[derive(Default)]
312pub struct OpenRkvMetadata {
313    /// Was the database corrupt? If so, it has been replaced by a new, blank
314    /// database.
315    pub corrupt: bool,
316}
317
318/// Metadata about opening an RKV database and its stores.
319///
320/// This has more information than [`OpenRkvMetadata`] because the former does
321/// not attempt to open any stores.
322pub struct OpenMetadata {
323    /// Was the database corrupt? If so, it has been replaced by a new, blank
324    /// database.
325    pub corrupt: bool,
326
327    /// The database version recorded at load time.
328    ///
329    /// A value of `0` may indicate that no version was recorded, as there was
330    /// never a v0 database.
331    pub initial_version: u16,
332}
333
334#[derive(Default)]
335pub struct MigrationMetadata {
336    pub initial_version: Option<u16>,
337    pub migrated_version: Option<u16>,
338    pub mirgation_error: Option<String>,
339}
340
341#[derive(Clone, Copy)]
342pub enum DatabaseMigrationReason {
343    Upgrade,
344    InvalidVersion,
345}
346
347impl fmt::Display for DatabaseMigrationReason {
348    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349        f.write_str(match self {
350            Self::Upgrade => "upgrade",
351            Self::InvalidVersion => "invalid_version",
352        })
353    }
354}
355
356/// Database used to access persisted data
357/// This an abstraction around an Rkv database
358/// An instance on this database is created each time the component is loaded
359/// if there is persisted data, the `get` functions should retrieve it
360pub struct Database {
361    rkv: Rkv,
362    meta_store: SingleStore,
363    experiment_store: SingleStore,
364    enrollment_store: SingleStore,
365    updates_store: SingleStore,
366    event_count_store: SingleStore,
367
368    metrics_handler: Arc<dyn MetricsHandler>,
369}
370
371impl Database {
372    /// Main constructor for a database
373    /// Initiates the Rkv database to be used to retrieve persisted data
374    /// # Arguments
375    /// - `path`: A path to the persisted data, this is provided by the consuming application
376    pub fn new<P: AsRef<Path>>(path: P, metrics_handler: Arc<dyn MetricsHandler>) -> Result<Self> {
377        let mut event = DatabaseLoadExtraDef::default();
378
379        let (db, open_metadata) = match Self::open(path, metrics_handler.clone()) {
380            Ok(db) => db,
381            Err(e) => {
382                event.error = Some(e.error_code().to_string());
383                metrics_handler.record_database_load(event);
384                return Err(e);
385            }
386        };
387
388        event.initial_version = Some(open_metadata.initial_version);
389        event.corrupt = Some(open_metadata.corrupt);
390
391        let migrate_result = db.maybe_upgrade(open_metadata.initial_version);
392        match migrate_result {
393            Ok(migrated_version) => event.migrated_version = migrated_version,
394            Err(ref e) => event.migration_error = Some(e.error_code().to_string()),
395        }
396
397        metrics_handler.record_database_load(event);
398
399        migrate_result?;
400
401        Ok(db)
402    }
403
404    /// Open a database, creating it if it does not exist.
405    fn open<P: AsRef<Path>>(
406        path: P,
407        metrics_handler: Arc<dyn MetricsHandler>,
408    ) -> Result<(Self, OpenMetadata)> {
409        let (rkv, open_metadata) = Self::open_rkv(path)?;
410
411        let meta_store = rkv.open_single("meta", StoreOptions::create())?;
412        let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
413        let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
414        let updates_store = rkv.open_single("updates", StoreOptions::create())?;
415        let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
416        let db = Self {
417            rkv,
418            meta_store: SingleStore::new(meta_store),
419            experiment_store: SingleStore::new(experiment_store),
420            enrollment_store: SingleStore::new(enrollment_store),
421            updates_store: SingleStore::new(updates_store),
422            event_count_store: SingleStore::new(event_count_store),
423            metrics_handler,
424        };
425
426        let mut writer = db.rkv.write()?;
427
428        let mut open_metadata = OpenMetadata {
429            corrupt: open_metadata.corrupt,
430            initial_version: db.meta_store.get(&writer, DB_KEY_DB_VERSION)?.unwrap_or(0),
431        };
432
433        if !open_metadata.corrupt {
434            open_metadata.corrupt = db
435                .meta_store
436                .get(&writer, DB_KEY_DB_WAS_CORRUPT)?
437                .unwrap_or(false);
438
439            if open_metadata.corrupt {
440                db.meta_store.delete(&mut writer, DB_KEY_DB_WAS_CORRUPT)?;
441                writer.commit()?;
442            }
443        }
444
445        Ok((db, open_metadata))
446    }
447
448    pub(crate) fn open_single<P: AsRef<Path>>(
449        path: P,
450        store_id: StoreId,
451    ) -> Result<SingleStoreDatabase> {
452        // `Database::open_single` is used by `get_calculated_attributes` to
453        // compute `days_since_update` before the Nimbus SDK has loaded the
454        // database. The following `open_rkv` call *will* wipe the database upon
455        // encountering corruption, which means when we call `Database::new()`
456        // from the client we will not be able to determine that corruption occured.
457        //
458        // Record the corrupted state into the meta store so that the client
459        // will be able to report whether or not the database was actually
460        // corrupted at startup, regardless if we already clobbered it.
461        let (rkv, open_metadata) = Self::open_rkv(path)?;
462
463        if open_metadata.corrupt {
464            let meta = rkv.open_single("meta", StoreOptions::create())?;
465
466            let mut writer = rkv.write()?;
467            meta.put(
468                &mut writer,
469                DB_KEY_DB_WAS_CORRUPT,
470                &rkv::Value::Json("true"),
471            )?;
472            writer.commit()?;
473        }
474
475        let store = SingleStore::new(match store_id {
476            StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
477            StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
478            StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
479            StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
480            StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
481        });
482        Ok(SingleStoreDatabase { rkv, store })
483    }
484
485    /// Attempt to upgrade the database.
486    ///
487    /// If the database is already up-to-date, no operations will be performed.
488    /// Otherwise migrations will be applied in order until the database is at
489    /// [`DB_VERSION`].
490    ///
491    /// If an error occurs during migration, the experiments, enrollments, and
492    /// meta stores will be cleared.
493    fn maybe_upgrade(&self, current_version: u16) -> Result<Option<u16>> {
494        debug!("entered maybe upgrade");
495
496        println!("maybe_upgrade from {current_version}");
497
498        if current_version == DB_VERSION {
499            return Ok(None);
500        }
501
502        let mut writer = self.write()?;
503
504        // An `Err` here means either:
505        //
506        // - an individual migration failed, in which case the machinery in
507        //   [`force_apply_migration`] will have wiped the database in an attempt
508        //   to recover; or
509        //
510        // - the database wipe resulting from a failed migration *also* failed,
511        //   in which case there is not really anything we can do.
512        let _ = self.apply_migrations(&mut writer, current_version);
513
514        // It is safe to clear the update store (i.e. the pending experiments)
515        // on all schema upgrades as it will be re-filled from the server on the
516        // next `fetch_experiments()`. The current contents of the update store
517        // may cause experiments to not load, or worse, accidentally unenroll.
518        self.updates_store.clear(&mut writer)?;
519        self.meta_store
520            .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
521        writer.commit()?;
522        debug!("maybe_upgrade: transaction committed");
523
524        Ok(Some(DB_VERSION))
525    }
526
527    /// Apply all pending migrations.
528    ///
529    /// If all migrations apply successfully, the database will have version
530    /// [`DB_VERSION`].
531    fn apply_migrations(&self, writer: &mut Writer, initial_version: u16) -> Result<()> {
532        let mut current_version = initial_version;
533
534        if !(DB_MIN_VERSION..=DB_VERSION).contains(&current_version) {
535            let reason = if current_version < DB_MIN_VERSION {
536                DatabaseMigrationReason::Upgrade
537            } else {
538                DatabaseMigrationReason::InvalidVersion
539            };
540
541            // We need to force-apply this migration because current_version may be > 2.
542            self.force_apply_migration(
543                writer,
544                |writer| self.migrate_reset_to_v2(writer),
545                &mut current_version,
546                2,
547                reason,
548            )?;
549        };
550
551        self.apply_migration(
552            writer,
553            |writer| self.migrate_v2_to_v3(writer),
554            &mut current_version,
555            3,
556            DatabaseMigrationReason::Upgrade,
557        )?;
558
559        self.apply_migration(
560            writer,
561            |writer| self.migrate_v3_to_v4(writer),
562            &mut current_version,
563            4,
564            DatabaseMigrationReason::Upgrade,
565        )?;
566
567        Ok(())
568    }
569
570    /// Apply a single migration, if it is applicable.
571    ///
572    /// The result of the migration will be reported via telemetry.
573    fn apply_migration(
574        &self,
575        writer: &mut Writer,
576        migration: impl FnOnce(&mut Writer) -> Result<()>,
577        from_version: &mut u16,
578        to_version: u16,
579        reason: DatabaseMigrationReason,
580    ) -> Result<()> {
581        if *from_version >= to_version {
582            return Ok(());
583        }
584
585        self.force_apply_migration(writer, migration, from_version, to_version, reason)
586    }
587
588    /// Forcibly apply a migration, without taking version constraints into
589    /// account.
590    fn force_apply_migration(
591        &self,
592        writer: &mut Writer,
593        migration: impl FnOnce(&mut Writer) -> Result<()>,
594        from_version: &mut u16,
595        to_version: u16,
596        reason: DatabaseMigrationReason,
597    ) -> Result<()> {
598        let mut event = DatabaseMigrationExtraDef {
599            from_version: *from_version,
600            to_version,
601            reason: reason.to_string(),
602            error: None,
603        };
604
605        if let Err(e) = migration(writer) {
606            event.error = Some(e.error_code().to_string());
607            self.metrics_handler.record_database_migration(event);
608
609            error_support::report_error!(
610                "nimbus-database-migration",
611                "Error migrating database from v{} to v{}: {:?}. Wiping experiments and enrollments",
612                from_version,
613                to_version,
614                e
615            );
616
617            self.clear_experiments_and_enrollments(writer)?;
618            return Err(e);
619        }
620
621        self.metrics_handler.record_database_migration(event);
622        *from_version = to_version;
623        Ok(())
624    }
625
626    pub(crate) fn clear_experiments_and_enrollments(
627        &self,
628        writer: &mut Writer,
629    ) -> Result<(), NimbusError> {
630        self.experiment_store.clear(writer)?;
631        self.enrollment_store.clear(writer)?;
632        Ok(())
633    }
634
635    pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
636        self.event_count_store.clear(writer)?;
637        Ok(())
638    }
639
640    pub fn migrate_reset_to_v2(&self, writer: &mut Writer) -> Result<()> {
641        self.clear_experiments_and_enrollments(writer)?;
642
643        Ok(())
644    }
645
646    /// Migrates a v2 database to v3
647    ///
648    /// Separates global user participation into experiments and rollouts participation.
649    /// For privacy: if user opted out globally, they remain opted out of experiments.
650    fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
651        info!("Upgrading from version 2 to version 3");
652
653        let meta_store = &self.meta_store;
654
655        // Get the old global participation flag
656        let old_global_participation = meta_store
657            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
658            .unwrap_or(true); // Default was true
659
660        // Set new separate flags based on privacy requirements:
661        // - If user opted out globally, they stay opted out of experiments
662        // - If user opted out globally, they stay opted out of rollouts (per requirement #3)
663        meta_store.put(
664            writer,
665            DB_KEY_EXPERIMENT_PARTICIPATION,
666            &old_global_participation,
667        )?;
668        meta_store.put(
669            writer,
670            DB_KEY_ROLLOUT_PARTICIPATION,
671            &old_global_participation,
672        )?;
673
674        // Remove the old global participation key if it exists
675        if meta_store
676            .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
677            .is_some()
678        {
679            meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
680        }
681
682        info!(
683            "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
684            old_global_participation, old_global_participation
685        );
686
687        Ok(())
688    }
689
690    fn migrate_v3_to_v4(&self, writer: &mut Writer) -> Result<()> {
691        info!("Upgrading from version 3 to version 4");
692
693        let enrollment_store = self.get_store(StoreId::Enrollments);
694        let enrollments: Vec<v3::LegacyExperimentEnrollment> =
695            enrollment_store.try_collect_all(writer)?;
696
697        let enrollments: Vec<ExperimentEnrollment> =
698            enrollments.into_iter().map(Into::into).collect();
699
700        for enrollment in enrollments {
701            enrollment_store.put(writer, &enrollment.slug, &enrollment)?;
702        }
703
704        Ok(())
705    }
706
707    /// Gets a Store object, which used with the writer returned by
708    /// `self.write()` to update the database in a transaction.
709    pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
710        match store_id {
711            StoreId::Meta => &self.meta_store,
712            StoreId::Experiments => &self.experiment_store,
713            StoreId::Enrollments => &self.enrollment_store,
714            StoreId::Updates => &self.updates_store,
715            StoreId::EventCounts => &self.event_count_store,
716        }
717    }
718
719    pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<(Rkv, OpenRkvMetadata)> {
720        let mut metadata = OpenRkvMetadata::default();
721
722        let path = std::path::Path::new(path.as_ref()).join("db");
723        debug!("open_rkv: path =  {:?}", path.display());
724        fs::create_dir_all(&path)?;
725        let rkv = match rkv_new(&path) {
726            Ok(rkv) => Ok(rkv),
727            Err(rkv_error) => {
728                match rkv_error {
729                    // For some errors we just delete the DB and start again.
730                    StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
731                        // On one hand this seems a little dangerous, but on
732                        // the other hand avoids us knowing about the
733                        // underlying implementation (ie, how do we know what
734                        // files might exist in all cases?)
735                        warn!(
736                            "Database at '{}' appears corrupt - removing and recreating",
737                            path.display()
738                        );
739                        fs::remove_dir_all(&path)?;
740                        fs::create_dir_all(&path)?;
741
742                        metadata.corrupt = true;
743
744                        rkv_new(&path)
745                    }
746                    // All other errors are fatal.
747                    _ => Err(rkv_error),
748                }
749            }
750        }?;
751        debug!("Database initialized");
752        Ok((rkv, metadata))
753    }
754
755    /// Function used to obtain a "reader" which is used for read-only transactions.
756    pub fn read(&self) -> Result<Reader<'_>> {
757        Ok(self.rkv.read()?)
758    }
759
760    /// Function used to obtain a "writer" which is used for transactions.
761    /// The `writer.commit();` must be called to commit data added via the
762    /// writer.
763    pub fn write(&self) -> Result<Writer<'_>> {
764        Ok(self.rkv.write()?)
765    }
766
767    /// Function used to retrieve persisted data outside of a transaction.
768    /// It allows retrieval of any serializable and deserializable data
769    /// Currently only supports JSON data
770    // Only available for tests; product code should always be using transactions.
771    ///
772    /// # Arguments
773    /// - `key`: A key for the data stored in the underlying database
774    #[cfg(test)]
775    pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
776        &self,
777        store_id: StoreId,
778        key: &str,
779    ) -> Result<Option<T>> {
780        let reader = self.rkv.read()?;
781        let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
782        match persisted_data {
783            Some(data) => {
784                if let rkv::Value::Json(data) = data {
785                    Ok(Some(match serde_json::from_str::<T>(data) {
786                        Ok(v) => v,
787                        Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
788                    }))
789                } else {
790                    Err(NimbusError::InvalidPersistedData)
791                }
792            }
793            None => Ok(None),
794        }
795    }
796
797    // Function for collecting all items in a store outside of a transaction.
798    // Only available for tests; product code should always be using transactions.
799    // Iters are a bit tricky - would be nice to make them generic, but this will
800    // do for our use-case.
801    #[cfg(test)]
802    pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
803        &self,
804        store_id: StoreId,
805    ) -> Result<Vec<T>> {
806        let mut result = Vec::new();
807        let reader = self.rkv.read()?;
808        let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
809        while let Some(Ok((_, data))) = iter.next() {
810            if let rkv::Value::Json(data) = data {
811                result.push(match serde_json::from_str::<T>(data) {
812                    Ok(v) => v,
813                    Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
814                });
815            }
816        }
817        Ok(result)
818    }
819}