1use rkv::{StoreError, StoreOptions};
8use std::fmt;
9use std::fs;
10use std::path::Path;
11use std::sync::Arc;
12
13use crate::enrollment::ExperimentEnrollment;
14use crate::error::{ErrorCode, NimbusError, Result, debug, info, warn};
15use crate::metrics::{DatabaseLoadExtraDef, DatabaseMigrationExtraDef, MetricsHandler};
16use crate::stateful::enrollment::v3;
17
18pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
30
31pub(crate) const DB_VERSION: u16 = 4;
33
34pub(crate) const DB_KEY_DB_WAS_CORRUPT: &str = "db-was-corrupt";
35
36pub(crate) const DB_MIN_VERSION: u16 = 2;
40
41const RKV_MAX_DBS: u32 = 6;
42
43pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
44pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
45
46pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
48
49pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
50pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
51
52#[cfg(not(feature = "rkv-safe-mode"))]
55mod backend {
56 use rkv::backend::{
57 Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
58 };
59 use std::path::Path;
60
61 use super::RKV_MAX_DBS;
62
63 pub type Rkv = rkv::Rkv<LmdbEnvironment>;
64 pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
65 pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
66 pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
67 pub trait Readable<'r>:
68 rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
69 {
70 }
71 impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
72 Readable<'r> for T
73 {
74 }
75
76 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
77 Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
78 }
79}
80
81#[cfg(feature = "rkv-safe-mode")]
83mod backend {
84 use rkv::backend::{
85 SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
86 SafeModeRwTransaction,
87 };
88 use std::path::Path;
89
90 use super::RKV_MAX_DBS;
91
92 pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
93 pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
94 pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
95 pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
96 pub trait Readable<'r>:
97 rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
98 {
99 }
100 impl<'r, T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>>
101 Readable<'r> for T
102 {
103 }
104
105 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
106 Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
107 }
108}
109
110use backend::*;
111pub use backend::{Readable, Writer};
112
113pub enum StoreId {
118 Experiments,
124 Enrollments,
131 Meta,
152 Updates,
158 EventCounts,
165}
166
167pub struct SingleStore {
170 store: RkvSingleStore,
171}
172
173impl SingleStore {
174 pub fn new(store: RkvSingleStore) -> Self {
175 SingleStore { store }
176 }
177
178 pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
179 &self,
180 writer: &mut Writer,
181 key: &str,
182 persisted_data: &T,
183 ) -> Result<()> {
184 let persisted_json = match serde_json::to_string(persisted_data) {
185 Ok(v) => v,
186 Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
187 };
188 self.store
189 .put(writer, key, &rkv::Value::Json(&persisted_json))?;
190 Ok(())
191 }
192
193 #[allow(dead_code)]
194 pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
195 self.store.delete(writer, key)?;
196 Ok(())
197 }
198
199 pub fn clear(&self, writer: &mut Writer) -> Result<()> {
200 self.store.clear(writer)?;
201 Ok(())
202 }
203
204 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
209 where
210 R: Readable<'r>,
211 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
212 {
213 let persisted_data = self.store.get(reader, key)?;
214 match persisted_data {
215 Some(data) => {
216 if let rkv::Value::Json(data) = data {
217 Ok(Some(match serde_json::from_str::<T>(data) {
218 Ok(v) => v,
219 Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
220 }))
221 } else {
222 Err(NimbusError::InvalidPersistedData)
223 }
224 }
225 None => Ok(None),
226 }
227 }
228
229 pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
233 where
234 R: Readable<'r>,
235 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
236 {
237 let mut result = Vec::new();
238 let mut iter = self.store.iter_start(reader)?;
239 while let Some(Ok((_, data))) = iter.next() {
240 if let rkv::Value::Json(data) = data {
241 let unserialized = serde_json::from_str::<T>(data);
242 match unserialized {
243 Ok(value) => result.push(value),
244 Err(e) => {
245 warn!(
249 "try_collect_all: discarded a record while deserializing with: {:?}",
250 e
251 );
252 warn!(
253 "try_collect_all: data that failed to deserialize: {:?}",
254 data
255 );
256 }
257 };
258 }
259 }
260 Ok(result)
261 }
262
263 pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
264 where
265 R: Readable<'r>,
266 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
267 {
268 let mut result = Vec::new();
269 let mut iter = self.store.iter_start(reader)?;
270 while let Some(Ok((_, data))) = iter.next() {
271 if let rkv::Value::Json(data) = data {
272 result.push(match serde_json::from_str::<T>(data) {
273 Ok(v) => v,
274 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
275 });
276 }
277 }
278 Ok(result)
279 }
280}
281
282pub struct SingleStoreDatabase {
283 rkv: Rkv,
284 pub(crate) store: SingleStore,
285}
286
287impl SingleStoreDatabase {
288 pub fn read(&self) -> Result<Reader<'_>> {
290 Ok(self.rkv.read()?)
291 }
292
293 pub fn write(&self) -> Result<Writer<'_>> {
297 Ok(self.rkv.write()?)
298 }
299
300 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
302 where
303 R: Readable<'r>,
304 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
305 {
306 self.store.get(reader, key)
307 }
308}
309
310#[derive(Default)]
312pub struct OpenRkvMetadata {
313 pub corrupt: bool,
316}
317
318pub struct OpenMetadata {
323 pub corrupt: bool,
326
327 pub initial_version: u16,
332}
333
334#[derive(Default)]
335pub struct MigrationMetadata {
336 pub initial_version: Option<u16>,
337 pub migrated_version: Option<u16>,
338 pub mirgation_error: Option<String>,
339}
340
341#[derive(Clone, Copy)]
342pub enum DatabaseMigrationReason {
343 Upgrade,
344 InvalidVersion,
345}
346
347impl fmt::Display for DatabaseMigrationReason {
348 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
349 f.write_str(match self {
350 Self::Upgrade => "upgrade",
351 Self::InvalidVersion => "invalid_version",
352 })
353 }
354}
355
356pub struct Database {
361 rkv: Rkv,
362 meta_store: SingleStore,
363 experiment_store: SingleStore,
364 enrollment_store: SingleStore,
365 updates_store: SingleStore,
366 event_count_store: SingleStore,
367
368 metrics_handler: Arc<dyn MetricsHandler>,
369}
370
371impl Database {
372 pub fn new<P: AsRef<Path>>(path: P, metrics_handler: Arc<dyn MetricsHandler>) -> Result<Self> {
377 let mut event = DatabaseLoadExtraDef::default();
378
379 let (db, open_metadata) = match Self::open(path, metrics_handler.clone()) {
380 Ok(db) => db,
381 Err(e) => {
382 event.error = Some(e.error_code().to_string());
383 metrics_handler.record_database_load(event);
384 return Err(e);
385 }
386 };
387
388 event.initial_version = Some(open_metadata.initial_version);
389 event.corrupt = Some(open_metadata.corrupt);
390
391 let migrate_result = db.maybe_upgrade(open_metadata.initial_version);
392 match migrate_result {
393 Ok(migrated_version) => event.migrated_version = migrated_version,
394 Err(ref e) => event.migration_error = Some(e.error_code().to_string()),
395 }
396
397 metrics_handler.record_database_load(event);
398
399 migrate_result?;
400
401 Ok(db)
402 }
403
404 fn open<P: AsRef<Path>>(
406 path: P,
407 metrics_handler: Arc<dyn MetricsHandler>,
408 ) -> Result<(Self, OpenMetadata)> {
409 let (rkv, open_metadata) = Self::open_rkv(path)?;
410
411 let meta_store = rkv.open_single("meta", StoreOptions::create())?;
412 let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
413 let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
414 let updates_store = rkv.open_single("updates", StoreOptions::create())?;
415 let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
416 let db = Self {
417 rkv,
418 meta_store: SingleStore::new(meta_store),
419 experiment_store: SingleStore::new(experiment_store),
420 enrollment_store: SingleStore::new(enrollment_store),
421 updates_store: SingleStore::new(updates_store),
422 event_count_store: SingleStore::new(event_count_store),
423 metrics_handler,
424 };
425
426 let mut writer = db.rkv.write()?;
427
428 let mut open_metadata = OpenMetadata {
429 corrupt: open_metadata.corrupt,
430 initial_version: db.meta_store.get(&writer, DB_KEY_DB_VERSION)?.unwrap_or(0),
431 };
432
433 if !open_metadata.corrupt {
434 open_metadata.corrupt = db
435 .meta_store
436 .get(&writer, DB_KEY_DB_WAS_CORRUPT)?
437 .unwrap_or(false);
438
439 if open_metadata.corrupt {
440 db.meta_store.delete(&mut writer, DB_KEY_DB_WAS_CORRUPT)?;
441 writer.commit()?;
442 }
443 }
444
445 Ok((db, open_metadata))
446 }
447
448 pub(crate) fn open_single<P: AsRef<Path>>(
449 path: P,
450 store_id: StoreId,
451 ) -> Result<SingleStoreDatabase> {
452 let (rkv, open_metadata) = Self::open_rkv(path)?;
462
463 if open_metadata.corrupt {
464 let meta = rkv.open_single("meta", StoreOptions::create())?;
465
466 let mut writer = rkv.write()?;
467 meta.put(
468 &mut writer,
469 DB_KEY_DB_WAS_CORRUPT,
470 &rkv::Value::Json("true"),
471 )?;
472 writer.commit()?;
473 }
474
475 let store = SingleStore::new(match store_id {
476 StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
477 StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
478 StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
479 StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
480 StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
481 });
482 Ok(SingleStoreDatabase { rkv, store })
483 }
484
485 fn maybe_upgrade(&self, current_version: u16) -> Result<Option<u16>> {
494 debug!("entered maybe upgrade");
495
496 println!("maybe_upgrade from {current_version}");
497
498 if current_version == DB_VERSION {
499 return Ok(None);
500 }
501
502 let mut writer = self.write()?;
503
504 let _ = self.apply_migrations(&mut writer, current_version);
513
514 self.updates_store.clear(&mut writer)?;
519 self.meta_store
520 .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
521 writer.commit()?;
522 debug!("maybe_upgrade: transaction committed");
523
524 Ok(Some(DB_VERSION))
525 }
526
527 fn apply_migrations(&self, writer: &mut Writer, initial_version: u16) -> Result<()> {
532 let mut current_version = initial_version;
533
534 if !(DB_MIN_VERSION..=DB_VERSION).contains(¤t_version) {
535 let reason = if current_version < DB_MIN_VERSION {
536 DatabaseMigrationReason::Upgrade
537 } else {
538 DatabaseMigrationReason::InvalidVersion
539 };
540
541 self.force_apply_migration(
543 writer,
544 |writer| self.migrate_reset_to_v2(writer),
545 &mut current_version,
546 2,
547 reason,
548 )?;
549 };
550
551 self.apply_migration(
552 writer,
553 |writer| self.migrate_v2_to_v3(writer),
554 &mut current_version,
555 3,
556 DatabaseMigrationReason::Upgrade,
557 )?;
558
559 self.apply_migration(
560 writer,
561 |writer| self.migrate_v3_to_v4(writer),
562 &mut current_version,
563 4,
564 DatabaseMigrationReason::Upgrade,
565 )?;
566
567 Ok(())
568 }
569
570 fn apply_migration(
574 &self,
575 writer: &mut Writer,
576 migration: impl FnOnce(&mut Writer) -> Result<()>,
577 from_version: &mut u16,
578 to_version: u16,
579 reason: DatabaseMigrationReason,
580 ) -> Result<()> {
581 if *from_version >= to_version {
582 return Ok(());
583 }
584
585 self.force_apply_migration(writer, migration, from_version, to_version, reason)
586 }
587
588 fn force_apply_migration(
591 &self,
592 writer: &mut Writer,
593 migration: impl FnOnce(&mut Writer) -> Result<()>,
594 from_version: &mut u16,
595 to_version: u16,
596 reason: DatabaseMigrationReason,
597 ) -> Result<()> {
598 let mut event = DatabaseMigrationExtraDef {
599 from_version: *from_version,
600 to_version,
601 reason: reason.to_string(),
602 error: None,
603 };
604
605 if let Err(e) = migration(writer) {
606 event.error = Some(e.error_code().to_string());
607 self.metrics_handler.record_database_migration(event);
608
609 error_support::report_error!(
610 "nimbus-database-migration",
611 "Error migrating database from v{} to v{}: {:?}. Wiping experiments and enrollments",
612 from_version,
613 to_version,
614 e
615 );
616
617 self.clear_experiments_and_enrollments(writer)?;
618 return Err(e);
619 }
620
621 self.metrics_handler.record_database_migration(event);
622 *from_version = to_version;
623 Ok(())
624 }
625
626 pub(crate) fn clear_experiments_and_enrollments(
627 &self,
628 writer: &mut Writer,
629 ) -> Result<(), NimbusError> {
630 self.experiment_store.clear(writer)?;
631 self.enrollment_store.clear(writer)?;
632 Ok(())
633 }
634
635 pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
636 self.event_count_store.clear(writer)?;
637 Ok(())
638 }
639
640 pub fn migrate_reset_to_v2(&self, writer: &mut Writer) -> Result<()> {
641 self.clear_experiments_and_enrollments(writer)?;
642
643 Ok(())
644 }
645
646 fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
651 info!("Upgrading from version 2 to version 3");
652
653 let meta_store = &self.meta_store;
654
655 let old_global_participation = meta_store
657 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
658 .unwrap_or(true); meta_store.put(
664 writer,
665 DB_KEY_EXPERIMENT_PARTICIPATION,
666 &old_global_participation,
667 )?;
668 meta_store.put(
669 writer,
670 DB_KEY_ROLLOUT_PARTICIPATION,
671 &old_global_participation,
672 )?;
673
674 if meta_store
676 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
677 .is_some()
678 {
679 meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
680 }
681
682 info!(
683 "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
684 old_global_participation, old_global_participation
685 );
686
687 Ok(())
688 }
689
690 fn migrate_v3_to_v4(&self, writer: &mut Writer) -> Result<()> {
691 info!("Upgrading from version 3 to version 4");
692
693 let enrollment_store = self.get_store(StoreId::Enrollments);
694 let enrollments: Vec<v3::LegacyExperimentEnrollment> =
695 enrollment_store.try_collect_all(writer)?;
696
697 let enrollments: Vec<ExperimentEnrollment> =
698 enrollments.into_iter().map(Into::into).collect();
699
700 for enrollment in enrollments {
701 enrollment_store.put(writer, &enrollment.slug, &enrollment)?;
702 }
703
704 Ok(())
705 }
706
707 pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
710 match store_id {
711 StoreId::Meta => &self.meta_store,
712 StoreId::Experiments => &self.experiment_store,
713 StoreId::Enrollments => &self.enrollment_store,
714 StoreId::Updates => &self.updates_store,
715 StoreId::EventCounts => &self.event_count_store,
716 }
717 }
718
719 pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<(Rkv, OpenRkvMetadata)> {
720 let mut metadata = OpenRkvMetadata::default();
721
722 let path = std::path::Path::new(path.as_ref()).join("db");
723 debug!("open_rkv: path = {:?}", path.display());
724 fs::create_dir_all(&path)?;
725 let rkv = match rkv_new(&path) {
726 Ok(rkv) => Ok(rkv),
727 Err(rkv_error) => {
728 match rkv_error {
729 StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
731 warn!(
736 "Database at '{}' appears corrupt - removing and recreating",
737 path.display()
738 );
739 fs::remove_dir_all(&path)?;
740 fs::create_dir_all(&path)?;
741
742 metadata.corrupt = true;
743
744 rkv_new(&path)
745 }
746 _ => Err(rkv_error),
748 }
749 }
750 }?;
751 debug!("Database initialized");
752 Ok((rkv, metadata))
753 }
754
755 pub fn read(&self) -> Result<Reader<'_>> {
757 Ok(self.rkv.read()?)
758 }
759
760 pub fn write(&self) -> Result<Writer<'_>> {
764 Ok(self.rkv.write()?)
765 }
766
767 #[cfg(test)]
775 pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
776 &self,
777 store_id: StoreId,
778 key: &str,
779 ) -> Result<Option<T>> {
780 let reader = self.rkv.read()?;
781 let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
782 match persisted_data {
783 Some(data) => {
784 if let rkv::Value::Json(data) = data {
785 Ok(Some(match serde_json::from_str::<T>(data) {
786 Ok(v) => v,
787 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
788 }))
789 } else {
790 Err(NimbusError::InvalidPersistedData)
791 }
792 }
793 None => Ok(None),
794 }
795 }
796
797 #[cfg(test)]
802 pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
803 &self,
804 store_id: StoreId,
805 ) -> Result<Vec<T>> {
806 let mut result = Vec::new();
807 let reader = self.rkv.read()?;
808 let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
809 while let Some(Ok((_, data))) = iter.next() {
810 if let rkv::Value::Json(data) = data {
811 result.push(match serde_json::from_str::<T>(data) {
812 Ok(v) => v,
813 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
814 });
815 }
816 }
817 Ok(result)
818 }
819}