1use rkv::{StoreError, StoreOptions};
8use std::fmt;
9use std::fs;
10use std::path::Path;
11use std::sync::Arc;
12
13use crate::error::{ErrorCode, NimbusError, Result, debug, info, warn};
14use crate::metrics::{DatabaseLoadExtraDef, DatabaseMigrationExtraDef, MetricsHandler};
15
16pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
28
29pub(crate) const DB_VERSION: u16 = 3;
31
32pub(crate) const DB_KEY_DB_WAS_CORRUPT: &str = "db-was-corrupt";
33
34pub(crate) const DB_MIN_VERSION: u16 = 2;
38
39const RKV_MAX_DBS: u32 = 6;
40
41pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
42pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
43
44pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
46
47pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
48pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
49
50#[cfg(not(feature = "rkv-safe-mode"))]
53mod backend {
54 use rkv::backend::{
55 Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
56 };
57 use std::path::Path;
58
59 use super::RKV_MAX_DBS;
60
61 pub type Rkv = rkv::Rkv<LmdbEnvironment>;
62 pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
63 pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
64 pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
65 pub trait Readable<'r>:
66 rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
67 {
68 }
69 impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
70 Readable<'r> for T
71 {
72 }
73
74 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
75 Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
76 }
77}
78
79#[cfg(feature = "rkv-safe-mode")]
81mod backend {
82 use rkv::backend::{
83 SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
84 SafeModeRwTransaction,
85 };
86 use std::path::Path;
87
88 use super::RKV_MAX_DBS;
89
90 pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
91 pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
92 pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
93 pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
94 pub trait Readable<'r>:
95 rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
96 {
97 }
98 impl<'r, T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>>
99 Readable<'r> for T
100 {
101 }
102
103 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
104 Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
105 }
106}
107
108use backend::*;
109pub use backend::{Readable, Writer};
110
111pub enum StoreId {
116 Experiments,
122 Enrollments,
129 Meta,
150 Updates,
156 EventCounts,
163}
164
165pub struct SingleStore {
168 store: RkvSingleStore,
169}
170
171impl SingleStore {
172 pub fn new(store: RkvSingleStore) -> Self {
173 SingleStore { store }
174 }
175
176 pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
177 &self,
178 writer: &mut Writer,
179 key: &str,
180 persisted_data: &T,
181 ) -> Result<()> {
182 let persisted_json = match serde_json::to_string(persisted_data) {
183 Ok(v) => v,
184 Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
185 };
186 self.store
187 .put(writer, key, &rkv::Value::Json(&persisted_json))?;
188 Ok(())
189 }
190
191 #[allow(dead_code)]
192 pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
193 self.store.delete(writer, key)?;
194 Ok(())
195 }
196
197 pub fn clear(&self, writer: &mut Writer) -> Result<()> {
198 self.store.clear(writer)?;
199 Ok(())
200 }
201
202 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
207 where
208 R: Readable<'r>,
209 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
210 {
211 let persisted_data = self.store.get(reader, key)?;
212 match persisted_data {
213 Some(data) => {
214 if let rkv::Value::Json(data) = data {
215 Ok(Some(match serde_json::from_str::<T>(data) {
216 Ok(v) => v,
217 Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
218 }))
219 } else {
220 Err(NimbusError::InvalidPersistedData)
221 }
222 }
223 None => Ok(None),
224 }
225 }
226
227 pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
231 where
232 R: Readable<'r>,
233 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
234 {
235 let mut result = Vec::new();
236 let mut iter = self.store.iter_start(reader)?;
237 while let Some(Ok((_, data))) = iter.next() {
238 if let rkv::Value::Json(data) = data {
239 let unserialized = serde_json::from_str::<T>(data);
240 match unserialized {
241 Ok(value) => result.push(value),
242 Err(e) => {
243 warn!(
247 "try_collect_all: discarded a record while deserializing with: {:?}",
248 e
249 );
250 warn!(
251 "try_collect_all: data that failed to deserialize: {:?}",
252 data
253 );
254 }
255 };
256 }
257 }
258 Ok(result)
259 }
260
261 pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
262 where
263 R: Readable<'r>,
264 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
265 {
266 let mut result = Vec::new();
267 let mut iter = self.store.iter_start(reader)?;
268 while let Some(Ok((_, data))) = iter.next() {
269 if let rkv::Value::Json(data) = data {
270 result.push(match serde_json::from_str::<T>(data) {
271 Ok(v) => v,
272 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
273 });
274 }
275 }
276 Ok(result)
277 }
278}
279
280pub struct SingleStoreDatabase {
281 rkv: Rkv,
282 pub(crate) store: SingleStore,
283}
284
285impl SingleStoreDatabase {
286 pub fn read(&self) -> Result<Reader<'_>> {
288 Ok(self.rkv.read()?)
289 }
290
291 pub fn write(&self) -> Result<Writer<'_>> {
295 Ok(self.rkv.write()?)
296 }
297
298 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
300 where
301 R: Readable<'r>,
302 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
303 {
304 self.store.get(reader, key)
305 }
306}
307
308#[derive(Default)]
310pub struct OpenRkvMetadata {
311 pub corrupt: bool,
314}
315
316pub struct OpenMetadata {
321 pub corrupt: bool,
324
325 pub initial_version: u16,
330}
331
332#[derive(Default)]
333pub struct MigrationMetadata {
334 pub initial_version: Option<u16>,
335 pub migrated_version: Option<u16>,
336 pub mirgation_error: Option<String>,
337}
338
339#[derive(Clone, Copy)]
340pub enum DatabaseMigrationReason {
341 Upgrade,
342 InvalidVersion,
343}
344
345impl fmt::Display for DatabaseMigrationReason {
346 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
347 f.write_str(match self {
348 Self::Upgrade => "upgrade",
349 Self::InvalidVersion => "invalid_version",
350 })
351 }
352}
353
354pub struct Database {
359 rkv: Rkv,
360 meta_store: SingleStore,
361 experiment_store: SingleStore,
362 enrollment_store: SingleStore,
363 updates_store: SingleStore,
364 event_count_store: SingleStore,
365
366 metrics_handler: Arc<dyn MetricsHandler>,
367}
368
369impl Database {
370 pub fn new<P: AsRef<Path>>(path: P, metrics_handler: Arc<dyn MetricsHandler>) -> Result<Self> {
375 let mut event = DatabaseLoadExtraDef::default();
376
377 let (db, open_metadata) = match Self::open(path, metrics_handler.clone()) {
378 Ok(db) => db,
379 Err(e) => {
380 event.error = Some(e.error_code().to_string());
381 metrics_handler.record_database_load(event);
382 return Err(e);
383 }
384 };
385
386 event.initial_version = Some(open_metadata.initial_version);
387 event.corrupt = Some(open_metadata.corrupt);
388
389 let migrate_result = db.maybe_upgrade(open_metadata.initial_version);
390 match migrate_result {
391 Ok(migrated_version) => event.migrated_version = migrated_version,
392 Err(ref e) => event.migration_error = Some(e.error_code().to_string()),
393 }
394
395 metrics_handler.record_database_load(event);
396
397 migrate_result?;
398
399 Ok(db)
400 }
401
402 fn open<P: AsRef<Path>>(
404 path: P,
405 metrics_handler: Arc<dyn MetricsHandler>,
406 ) -> Result<(Self, OpenMetadata)> {
407 let (rkv, open_metadata) = Self::open_rkv(path)?;
408
409 let meta_store = rkv.open_single("meta", StoreOptions::create())?;
410 let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
411 let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
412 let updates_store = rkv.open_single("updates", StoreOptions::create())?;
413 let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
414 let db = Self {
415 rkv,
416 meta_store: SingleStore::new(meta_store),
417 experiment_store: SingleStore::new(experiment_store),
418 enrollment_store: SingleStore::new(enrollment_store),
419 updates_store: SingleStore::new(updates_store),
420 event_count_store: SingleStore::new(event_count_store),
421 metrics_handler,
422 };
423
424 let mut writer = db.rkv.write()?;
425
426 let mut open_metadata = OpenMetadata {
427 corrupt: open_metadata.corrupt,
428 initial_version: db.meta_store.get(&writer, DB_KEY_DB_VERSION)?.unwrap_or(0),
429 };
430
431 if !open_metadata.corrupt {
432 open_metadata.corrupt = db
433 .meta_store
434 .get(&writer, DB_KEY_DB_WAS_CORRUPT)?
435 .unwrap_or(false);
436
437 if open_metadata.corrupt {
438 db.meta_store.delete(&mut writer, DB_KEY_DB_WAS_CORRUPT)?;
439 writer.commit()?;
440 }
441 }
442
443 Ok((db, open_metadata))
444 }
445
446 pub(crate) fn open_single<P: AsRef<Path>>(
447 path: P,
448 store_id: StoreId,
449 ) -> Result<SingleStoreDatabase> {
450 let (rkv, open_metadata) = Self::open_rkv(path)?;
460
461 if open_metadata.corrupt {
462 let meta = rkv.open_single("meta", StoreOptions::create())?;
463
464 let mut writer = rkv.write()?;
465 meta.put(
466 &mut writer,
467 DB_KEY_DB_WAS_CORRUPT,
468 &rkv::Value::Json("true"),
469 )?;
470 writer.commit()?;
471 }
472
473 let store = SingleStore::new(match store_id {
474 StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
475 StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
476 StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
477 StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
478 StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
479 });
480 Ok(SingleStoreDatabase { rkv, store })
481 }
482
483 fn maybe_upgrade(&self, current_version: u16) -> Result<Option<u16>> {
492 debug!("entered maybe upgrade");
493
494 println!("maybe_upgrade from {current_version}");
495
496 if current_version == DB_VERSION {
497 return Ok(None);
498 }
499
500 let mut writer = self.write()?;
501
502 let _ = self.apply_migrations(&mut writer, current_version);
511
512 self.updates_store.clear(&mut writer)?;
517 self.meta_store
518 .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
519 writer.commit()?;
520 debug!("maybe_upgrade: transaction committed");
521
522 Ok(Some(DB_VERSION))
523 }
524
525 fn apply_migrations(&self, writer: &mut Writer, initial_version: u16) -> Result<()> {
530 let mut current_version = initial_version;
531
532 if !(DB_MIN_VERSION..=DB_VERSION).contains(¤t_version) {
533 let reason = if current_version < DB_MIN_VERSION {
534 DatabaseMigrationReason::Upgrade
535 } else {
536 DatabaseMigrationReason::InvalidVersion
537 };
538
539 self.force_apply_migration(
541 writer,
542 |writer| self.migrate_reset_to_v2(writer),
543 &mut current_version,
544 2,
545 reason,
546 )?;
547 };
548
549 self.apply_migration(
550 writer,
551 |writer| self.migrate_v2_to_v3(writer),
552 &mut current_version,
553 3,
554 DatabaseMigrationReason::Upgrade,
555 )?;
556
557 Ok(())
558 }
559
560 fn apply_migration(
564 &self,
565 writer: &mut Writer,
566 migration: impl FnOnce(&mut Writer) -> Result<()>,
567 from_version: &mut u16,
568 to_version: u16,
569 reason: DatabaseMigrationReason,
570 ) -> Result<()> {
571 if *from_version >= to_version {
572 return Ok(());
573 }
574
575 self.force_apply_migration(writer, migration, from_version, to_version, reason)
576 }
577
578 fn force_apply_migration(
581 &self,
582 writer: &mut Writer,
583 migration: impl FnOnce(&mut Writer) -> Result<()>,
584 from_version: &mut u16,
585 to_version: u16,
586 reason: DatabaseMigrationReason,
587 ) -> Result<()> {
588 let mut event = DatabaseMigrationExtraDef {
589 from_version: *from_version,
590 to_version,
591 reason: reason.to_string(),
592 error: None,
593 };
594
595 if let Err(e) = migration(writer) {
596 event.error = Some(e.error_code().to_string());
597 self.metrics_handler.record_database_migration(event);
598
599 error_support::report_error!(
600 "nimbus-database-migration",
601 "Error migrating database from v{} to v{}: {:?}. Wiping experiments and enrollments",
602 from_version,
603 to_version,
604 e
605 );
606
607 self.clear_experiments_and_enrollments(writer)?;
608 return Err(e);
609 }
610
611 self.metrics_handler.record_database_migration(event);
612 *from_version = to_version;
613 Ok(())
614 }
615
616 pub(crate) fn clear_experiments_and_enrollments(
617 &self,
618 writer: &mut Writer,
619 ) -> Result<(), NimbusError> {
620 self.experiment_store.clear(writer)?;
621 self.enrollment_store.clear(writer)?;
622 Ok(())
623 }
624
625 pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
626 self.event_count_store.clear(writer)?;
627 Ok(())
628 }
629
630 pub fn migrate_reset_to_v2(&self, writer: &mut Writer) -> Result<()> {
631 self.clear_experiments_and_enrollments(writer)?;
632
633 Ok(())
634 }
635
636 fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
641 info!("Upgrading from version 2 to version 3");
642
643 let meta_store = &self.meta_store;
644
645 let old_global_participation = meta_store
647 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
648 .unwrap_or(true); meta_store.put(
654 writer,
655 DB_KEY_EXPERIMENT_PARTICIPATION,
656 &old_global_participation,
657 )?;
658 meta_store.put(
659 writer,
660 DB_KEY_ROLLOUT_PARTICIPATION,
661 &old_global_participation,
662 )?;
663
664 if meta_store
666 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
667 .is_some()
668 {
669 meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
670 }
671
672 info!(
673 "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
674 old_global_participation, old_global_participation
675 );
676
677 Ok(())
678 }
679
680 pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
683 match store_id {
684 StoreId::Meta => &self.meta_store,
685 StoreId::Experiments => &self.experiment_store,
686 StoreId::Enrollments => &self.enrollment_store,
687 StoreId::Updates => &self.updates_store,
688 StoreId::EventCounts => &self.event_count_store,
689 }
690 }
691
692 pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<(Rkv, OpenRkvMetadata)> {
693 let mut metadata = OpenRkvMetadata::default();
694
695 let path = std::path::Path::new(path.as_ref()).join("db");
696 debug!("open_rkv: path = {:?}", path.display());
697 fs::create_dir_all(&path)?;
698 let rkv = match rkv_new(&path) {
699 Ok(rkv) => Ok(rkv),
700 Err(rkv_error) => {
701 match rkv_error {
702 StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
704 warn!(
709 "Database at '{}' appears corrupt - removing and recreating",
710 path.display()
711 );
712 fs::remove_dir_all(&path)?;
713 fs::create_dir_all(&path)?;
714
715 metadata.corrupt = true;
716
717 rkv_new(&path)
718 }
719 _ => Err(rkv_error),
721 }
722 }
723 }?;
724 debug!("Database initialized");
725 Ok((rkv, metadata))
726 }
727
728 pub fn read(&self) -> Result<Reader<'_>> {
730 Ok(self.rkv.read()?)
731 }
732
733 pub fn write(&self) -> Result<Writer<'_>> {
737 Ok(self.rkv.write()?)
738 }
739
740 #[cfg(test)]
748 pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
749 &self,
750 store_id: StoreId,
751 key: &str,
752 ) -> Result<Option<T>> {
753 let reader = self.rkv.read()?;
754 let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
755 match persisted_data {
756 Some(data) => {
757 if let rkv::Value::Json(data) = data {
758 Ok(Some(match serde_json::from_str::<T>(data) {
759 Ok(v) => v,
760 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
761 }))
762 } else {
763 Err(NimbusError::InvalidPersistedData)
764 }
765 }
766 None => Ok(None),
767 }
768 }
769
770 #[cfg(test)]
775 pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
776 &self,
777 store_id: StoreId,
778 ) -> Result<Vec<T>> {
779 let mut result = Vec::new();
780 let reader = self.rkv.read()?;
781 let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
782 while let Some(Ok((_, data))) = iter.next() {
783 if let rkv::Value::Json(data) = data {
784 result.push(match serde_json::from_str::<T>(data) {
785 Ok(v) => v,
786 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
787 });
788 }
789 }
790 Ok(result)
791 }
792}