1use crate::error::{debug, info, warn, NimbusError, Result};
8use crate::enrollment::ExperimentEnrollment;
14use crate::Experiment;
15use core::iter::Iterator;
16use rkv::{StoreError, StoreOptions};
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20
21pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
27pub(crate) const DB_VERSION: u16 = 3;
28const RKV_MAX_DBS: u32 = 6;
29
30pub(crate) const DB_KEY_EXPERIMENT_PARTICIPATION: &str = "user-opt-in-experiments";
31pub(crate) const DB_KEY_ROLLOUT_PARTICIPATION: &str = "user-opt-in-rollouts";
32
33pub(crate) const DB_KEY_GLOBAL_USER_PARTICIPATION: &str = "user-opt-in";
35
36pub(crate) const DEFAULT_EXPERIMENT_PARTICIPATION: bool = true;
37pub(crate) const DEFAULT_ROLLOUT_PARTICIPATION: bool = true;
38
39#[cfg(not(feature = "rkv-safe-mode"))]
42mod backend {
43 use rkv::backend::{
44 Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
45 };
46 use std::path::Path;
47
48 use super::RKV_MAX_DBS;
49
50 pub type Rkv = rkv::Rkv<LmdbEnvironment>;
51 pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
52 pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
53 pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
54 pub trait Readable<'r>:
55 rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
56 {
57 }
58 impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
59 Readable<'r> for T
60 {
61 }
62
63 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
64 Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
65 }
66}
67
68#[cfg(feature = "rkv-safe-mode")]
70mod backend {
71 use rkv::backend::{
72 SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
73 SafeModeRwTransaction,
74 };
75 use std::path::Path;
76
77 use super::RKV_MAX_DBS;
78
79 pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
80 pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
81 pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
82 pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
83 pub trait Readable<'r>:
84 rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
85 {
86 }
87 impl<
88 'r,
89 T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>,
90 > Readable<'r> for T
91 {
92 }
93
94 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
95 Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
96 }
97}
98
99use backend::*;
100pub use backend::{Readable, Writer};
101
102pub enum StoreId {
107 Experiments,
113 Enrollments,
120 Meta,
139 Updates,
145 EventCounts,
152}
153
154pub struct SingleStore {
157 store: RkvSingleStore,
158}
159
160impl SingleStore {
161 pub fn new(store: RkvSingleStore) -> Self {
162 SingleStore { store }
163 }
164
165 pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
166 &self,
167 writer: &mut Writer,
168 key: &str,
169 persisted_data: &T,
170 ) -> Result<()> {
171 let persisted_json = match serde_json::to_string(persisted_data) {
172 Ok(v) => v,
173 Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
174 };
175 self.store
176 .put(writer, key, &rkv::Value::Json(&persisted_json))?;
177 Ok(())
178 }
179
180 #[allow(dead_code)]
181 pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
182 self.store.delete(writer, key)?;
183 Ok(())
184 }
185
186 pub fn clear(&self, writer: &mut Writer) -> Result<()> {
187 self.store.clear(writer)?;
188 Ok(())
189 }
190
191 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
196 where
197 R: Readable<'r>,
198 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
199 {
200 let persisted_data = self.store.get(reader, key)?;
201 match persisted_data {
202 Some(data) => {
203 if let rkv::Value::Json(data) = data {
204 Ok(Some(match serde_json::from_str::<T>(data) {
205 Ok(v) => v,
206 Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
207 }))
208 } else {
209 Err(NimbusError::InvalidPersistedData)
210 }
211 }
212 None => Ok(None),
213 }
214 }
215
216 pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
220 where
221 R: Readable<'r>,
222 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
223 {
224 let mut result = Vec::new();
225 let mut iter = self.store.iter_start(reader)?;
226 while let Some(Ok((_, data))) = iter.next() {
227 if let rkv::Value::Json(data) = data {
228 let unserialized = serde_json::from_str::<T>(data);
229 match unserialized {
230 Ok(value) => result.push(value),
231 Err(e) => {
232 warn!(
236 "try_collect_all: discarded a record while deserializing with: {:?}",
237 e
238 );
239 warn!(
240 "try_collect_all: data that failed to deserialize: {:?}",
241 data
242 );
243 }
244 };
245 }
246 }
247 Ok(result)
248 }
249
250 pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
251 where
252 R: Readable<'r>,
253 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
254 {
255 let mut result = Vec::new();
256 let mut iter = self.store.iter_start(reader)?;
257 while let Some(Ok((_, data))) = iter.next() {
258 if let rkv::Value::Json(data) = data {
259 result.push(match serde_json::from_str::<T>(data) {
260 Ok(v) => v,
261 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
262 });
263 }
264 }
265 Ok(result)
266 }
267}
268
269pub struct SingleStoreDatabase {
270 rkv: Rkv,
271 pub(crate) store: SingleStore,
272}
273
274impl SingleStoreDatabase {
275 pub fn read(&self) -> Result<Reader> {
277 Ok(self.rkv.read()?)
278 }
279
280 pub fn write(&self) -> Result<Writer> {
284 Ok(self.rkv.write()?)
285 }
286
287 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
289 where
290 R: Readable<'r>,
291 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
292 {
293 self.store.get(reader, key)
294 }
295}
296
297pub struct Database {
302 rkv: Rkv,
303 meta_store: SingleStore,
304 experiment_store: SingleStore,
305 enrollment_store: SingleStore,
306 updates_store: SingleStore,
307 event_count_store: SingleStore,
308}
309
310impl Database {
311 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
316 let rkv = Self::open_rkv(path)?;
317 let meta_store = rkv.open_single("meta", StoreOptions::create())?;
318 let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
319 let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
320 let updates_store = rkv.open_single("updates", StoreOptions::create())?;
321 let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
322 let db = Self {
323 rkv,
324 meta_store: SingleStore::new(meta_store),
325 experiment_store: SingleStore::new(experiment_store),
326 enrollment_store: SingleStore::new(enrollment_store),
327 updates_store: SingleStore::new(updates_store),
328 event_count_store: SingleStore::new(event_count_store),
329 };
330 db.maybe_upgrade()?;
331 Ok(db)
332 }
333
334 pub fn open_single<P: AsRef<Path>>(path: P, store_id: StoreId) -> Result<SingleStoreDatabase> {
335 let rkv = Self::open_rkv(path)?;
336 let store = SingleStore::new(match store_id {
337 StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
338 StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
339 StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
340 StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
341 StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
342 });
343 Ok(SingleStoreDatabase { rkv, store })
344 }
345
346 fn maybe_upgrade(&self) -> Result<()> {
347 debug!("entered maybe upgrade");
348 let mut writer = self.rkv.write()?;
349 let current_version = self
350 .meta_store
351 .get::<u16, _>(&writer, DB_KEY_DB_VERSION)?
352 .unwrap_or(0);
353
354 if current_version == DB_VERSION {
355 info!("Already at version {}, no upgrade needed", DB_VERSION);
356 return Ok(());
357 }
358
359 if current_version == 0 || current_version > DB_VERSION {
360 info!(
361 "maybe_upgrade: current_version: {}, DB_VERSION: {}; wiping most stores",
362 current_version, DB_VERSION
363 );
364 self.clear_experiments_and_enrollments(&mut writer)?;
365 self.updates_store.clear(&mut writer)?;
366 self.meta_store
367 .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
368 writer.commit()?;
369 return Ok(());
370 }
371
372 if current_version == 1 {
373 info!("Migrating database from v1 to v2");
374 if let Err(e) = self.migrate_v1_to_v2(&mut writer) {
375 error_support::report_error!(
376 "nimbus-database-migration",
377 "Error migrating database v1 to v2: {:?}. Wiping experiments and enrollments",
378 e
379 );
380 self.clear_experiments_and_enrollments(&mut writer)?;
381 }
382 }
383
384 if current_version == 2 {
385 info!("Migrating database from v2 to v3");
386 if let Err(e) = self.migrate_v2_to_v3(&mut writer) {
387 error_support::report_error!(
388 "nimbus-database-migration",
389 "Error migrating database v2 to v3: {:?}. Wiping experiments and enrollments",
390 e
391 );
392 self.clear_experiments_and_enrollments(&mut writer)?;
393 }
394 }
395
396 self.updates_store.clear(&mut writer)?;
401 self.meta_store
402 .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
403 writer.commit()?;
404 debug!("maybe_upgrade: transaction committed");
405 Ok(())
406 }
407
408 pub(crate) fn clear_experiments_and_enrollments(
409 &self,
410 writer: &mut Writer,
411 ) -> Result<(), NimbusError> {
412 self.experiment_store.clear(writer)?;
413 self.enrollment_store.clear(writer)?;
414 Ok(())
415 }
416
417 pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
418 self.event_count_store.clear(writer)?;
419 Ok(())
420 }
421
422 fn migrate_v1_to_v2(&self, writer: &mut Writer) -> Result<()> {
430 info!("Upgrading from version 1 to version 2");
431
432 let reader = self.read()?;
436
437 let enrollments: Vec<ExperimentEnrollment> =
445 self.enrollment_store.try_collect_all(&reader)?;
446 let experiments: Vec<Experiment> = self.experiment_store.try_collect_all(&reader)?;
447
448 let empty_string = "".to_string();
451 let slugs_with_experiment_issues: HashSet<String> = experiments
452 .iter()
453 .filter_map(
454 |e| {
455 let branch_with_empty_feature_ids =
456 e.branches.iter().find(|b| b.feature.is_none() || b.feature.as_ref().unwrap().feature_id.is_empty());
457 if branch_with_empty_feature_ids.is_some() {
458 warn!("{:?} experiment has branch missing a feature prop; experiment & enrollment will be discarded", &e.slug);
459 Some(e.slug.to_owned())
460 } else if e.feature_ids.is_empty() || e.feature_ids.contains(&empty_string) {
461 warn!("{:?} experiment has invalid feature_ids array; experiment & enrollment will be discarded", &e.slug);
462 Some(e.slug.to_owned())
463 } else {
464 None
465 }
466 })
467 .collect();
468 let slugs_to_discard: HashSet<_> = slugs_with_experiment_issues;
469
470 let updated_experiments: Vec<Experiment> = experiments
472 .into_iter()
473 .filter(|e| !slugs_to_discard.contains(&e.slug))
474 .collect();
475 debug!("updated experiments = {:?}", updated_experiments);
476
477 let updated_enrollments: Vec<ExperimentEnrollment> = enrollments
479 .into_iter()
480 .filter(|e| !slugs_to_discard.contains(&e.slug))
481 .collect();
482 debug!("updated enrollments = {:?}", updated_enrollments);
483
484 self.experiment_store.clear(writer)?;
486 for experiment in updated_experiments {
487 self.experiment_store
488 .put(writer, &experiment.slug, &experiment)?;
489 }
490
491 self.enrollment_store.clear(writer)?;
492 for enrollment in updated_enrollments {
493 self.enrollment_store
494 .put(writer, &enrollment.slug, &enrollment)?;
495 }
496 debug!("exiting migrate_v1_to_v2");
497
498 Ok(())
499 }
500
501 fn migrate_v2_to_v3(&self, writer: &mut Writer) -> Result<()> {
506 info!("Upgrading from version 2 to version 3");
507
508 let meta_store = &self.meta_store;
509
510 let old_global_participation = meta_store
512 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
513 .unwrap_or(true); meta_store.put(
519 writer,
520 DB_KEY_EXPERIMENT_PARTICIPATION,
521 &old_global_participation,
522 )?;
523 meta_store.put(
524 writer,
525 DB_KEY_ROLLOUT_PARTICIPATION,
526 &old_global_participation,
527 )?;
528
529 if meta_store
531 .get::<bool, _>(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?
532 .is_some()
533 {
534 meta_store.delete(writer, DB_KEY_GLOBAL_USER_PARTICIPATION)?;
535 }
536
537 info!(
538 "Migration v2->v3: experiments_participation={}, rollouts_participation={}",
539 old_global_participation, old_global_participation
540 );
541
542 Ok(())
543 }
544
545 pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
548 match store_id {
549 StoreId::Meta => &self.meta_store,
550 StoreId::Experiments => &self.experiment_store,
551 StoreId::Enrollments => &self.enrollment_store,
552 StoreId::Updates => &self.updates_store,
553 StoreId::EventCounts => &self.event_count_store,
554 }
555 }
556
557 pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<Rkv> {
558 let path = std::path::Path::new(path.as_ref()).join("db");
559 debug!("open_rkv: path = {:?}", path.display());
560 fs::create_dir_all(&path)?;
561 let rkv = match rkv_new(&path) {
562 Ok(rkv) => Ok(rkv),
563 Err(rkv_error) => {
564 match rkv_error {
565 StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
567 warn!(
572 "Database at '{}' appears corrupt - removing and recreating",
573 path.display()
574 );
575 fs::remove_dir_all(&path)?;
576 fs::create_dir_all(&path)?;
577 rkv_new(&path)
580 }
581 _ => Err(rkv_error),
583 }
584 }
585 }?;
586 debug!("Database initialized");
587 Ok(rkv)
588 }
589
590 pub fn read(&self) -> Result<Reader> {
592 Ok(self.rkv.read()?)
593 }
594
595 pub fn write(&self) -> Result<Writer> {
599 Ok(self.rkv.write()?)
600 }
601
602 #[cfg(test)]
610 pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
611 &self,
612 store_id: StoreId,
613 key: &str,
614 ) -> Result<Option<T>> {
615 let reader = self.rkv.read()?;
616 let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
617 match persisted_data {
618 Some(data) => {
619 if let rkv::Value::Json(data) = data {
620 Ok(Some(match serde_json::from_str::<T>(data) {
621 Ok(v) => v,
622 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
623 }))
624 } else {
625 Err(NimbusError::InvalidPersistedData)
626 }
627 }
628 None => Ok(None),
629 }
630 }
631
632 #[cfg(test)]
637 pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
638 &self,
639 store_id: StoreId,
640 ) -> Result<Vec<T>> {
641 let mut result = Vec::new();
642 let reader = self.rkv.read()?;
643 let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
644 while let Some(Ok((_, data))) = iter.next() {
645 if let rkv::Value::Json(data) = data {
646 result.push(match serde_json::from_str::<T>(data) {
647 Ok(v) => v,
648 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
649 });
650 }
651 }
652 Ok(result)
653 }
654}