1use crate::error::{debug, info, warn, NimbusError, Result};
8use crate::enrollment::ExperimentEnrollment;
14use crate::Experiment;
15use core::iter::Iterator;
16use rkv::{StoreError, StoreOptions};
17use std::collections::HashSet;
18use std::fs;
19use std::path::Path;
20
21pub(crate) const DB_KEY_DB_VERSION: &str = "db_version";
27pub(crate) const DB_VERSION: u16 = 2;
28const RKV_MAX_DBS: u32 = 6;
29
30#[cfg(not(feature = "rkv-safe-mode"))]
33mod backend {
34 use rkv::backend::{
35 Lmdb, LmdbDatabase, LmdbEnvironment, LmdbRoCursor, LmdbRoTransaction, LmdbRwTransaction,
36 };
37 use std::path::Path;
38
39 use super::RKV_MAX_DBS;
40
41 pub type Rkv = rkv::Rkv<LmdbEnvironment>;
42 pub type RkvSingleStore = rkv::SingleStore<LmdbDatabase>;
43 pub type Reader<'t> = rkv::Reader<LmdbRoTransaction<'t>>;
44 pub type Writer<'t> = rkv::Writer<LmdbRwTransaction<'t>>;
45 pub trait Readable<'r>:
46 rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>
47 {
48 }
49 impl<'r, T: rkv::Readable<'r, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'r>>>
50 Readable<'r> for T
51 {
52 }
53
54 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
55 Rkv::with_capacity::<Lmdb>(path, RKV_MAX_DBS)
56 }
57}
58
59#[cfg(feature = "rkv-safe-mode")]
61mod backend {
62 use rkv::backend::{
63 SafeMode, SafeModeDatabase, SafeModeEnvironment, SafeModeRoCursor, SafeModeRoTransaction,
64 SafeModeRwTransaction,
65 };
66 use std::path::Path;
67
68 use super::RKV_MAX_DBS;
69
70 pub type Rkv = rkv::Rkv<SafeModeEnvironment>;
71 pub type RkvSingleStore = rkv::SingleStore<SafeModeDatabase>;
72 pub type Reader<'t> = rkv::Reader<SafeModeRoTransaction<'t>>;
73 pub type Writer<'t> = rkv::Writer<SafeModeRwTransaction<'t>>;
74 pub trait Readable<'r>:
75 rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>
76 {
77 }
78 impl<
79 'r,
80 T: rkv::Readable<'r, Database = SafeModeDatabase, RoCursor = SafeModeRoCursor<'r>>,
81 > Readable<'r> for T
82 {
83 }
84
85 pub fn rkv_new(path: &Path) -> Result<Rkv, rkv::StoreError> {
86 Rkv::with_capacity::<SafeMode>(path, RKV_MAX_DBS)
87 }
88}
89
90use backend::*;
91pub use backend::{Readable, Writer};
92
93pub enum StoreId {
98 Experiments,
104 Enrollments,
111 Meta,
128 Updates,
134 EventCounts,
141}
142
143pub struct SingleStore {
146 store: RkvSingleStore,
147}
148
149impl SingleStore {
150 pub fn new(store: RkvSingleStore) -> Self {
151 SingleStore { store }
152 }
153
154 pub fn put<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
155 &self,
156 writer: &mut Writer,
157 key: &str,
158 persisted_data: &T,
159 ) -> Result<()> {
160 let persisted_json = match serde_json::to_string(persisted_data) {
161 Ok(v) => v,
162 Err(e) => return Err(NimbusError::JSONError("persisted_json = nimbus::stateful::persistence::SingleStore::put::serde_json::to_string".into(), e.to_string()))
163 };
164 self.store
165 .put(writer, key, &rkv::Value::Json(&persisted_json))?;
166 Ok(())
167 }
168
169 #[allow(dead_code)]
170 pub fn delete(&self, writer: &mut Writer, key: &str) -> Result<()> {
171 self.store.delete(writer, key)?;
172 Ok(())
173 }
174
175 pub fn clear(&self, writer: &mut Writer) -> Result<()> {
176 self.store.clear(writer)?;
177 Ok(())
178 }
179
180 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
185 where
186 R: Readable<'r>,
187 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
188 {
189 let persisted_data = self.store.get(reader, key)?;
190 match persisted_data {
191 Some(data) => {
192 if let rkv::Value::Json(data) = data {
193 Ok(Some(match serde_json::from_str::<T>(data) {
194 Ok(v) => v,
195 Err(e) => return Err(NimbusError::JSONError("match persisted_data nimbus::stateful::persistence::SingleStore::get::serde_json::from_str".into(), e.to_string()))
196 }))
197 } else {
198 Err(NimbusError::InvalidPersistedData)
199 }
200 }
201 None => Ok(None),
202 }
203 }
204
205 pub fn try_collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
209 where
210 R: Readable<'r>,
211 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
212 {
213 let mut result = Vec::new();
214 let mut iter = self.store.iter_start(reader)?;
215 while let Some(Ok((_, data))) = iter.next() {
216 if let rkv::Value::Json(data) = data {
217 let unserialized = serde_json::from_str::<T>(data);
218 match unserialized {
219 Ok(value) => result.push(value),
220 Err(e) => {
221 warn!(
225 "try_collect_all: discarded a record while deserializing with: {:?}",
226 e
227 );
228 warn!(
229 "try_collect_all: data that failed to deserialize: {:?}",
230 data
231 );
232 }
233 };
234 }
235 }
236 Ok(result)
237 }
238
239 pub fn collect_all<'r, T, R>(&self, reader: &'r R) -> Result<Vec<T>>
240 where
241 R: Readable<'r>,
242 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
243 {
244 let mut result = Vec::new();
245 let mut iter = self.store.iter_start(reader)?;
246 while let Some(Ok((_, data))) = iter.next() {
247 if let rkv::Value::Json(data) = data {
248 result.push(match serde_json::from_str::<T>(data) {
249 Ok(v) => v,
250 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::SingleStore::collect_all::serde_json::from_str".into(), e.to_string()))
251 });
252 }
253 }
254 Ok(result)
255 }
256}
257
258pub struct SingleStoreDatabase {
259 rkv: Rkv,
260 pub(crate) store: SingleStore,
261}
262
263impl SingleStoreDatabase {
264 pub fn read(&self) -> Result<Reader> {
266 Ok(self.rkv.read()?)
267 }
268
269 pub fn write(&self) -> Result<Writer> {
273 Ok(self.rkv.write()?)
274 }
275
276 pub fn get<'r, T, R>(&self, reader: &'r R, key: &str) -> Result<Option<T>>
278 where
279 R: Readable<'r>,
280 T: serde::Serialize + for<'de> serde::Deserialize<'de>,
281 {
282 self.store.get(reader, key)
283 }
284}
285
286pub struct Database {
291 rkv: Rkv,
292 meta_store: SingleStore,
293 experiment_store: SingleStore,
294 enrollment_store: SingleStore,
295 updates_store: SingleStore,
296 event_count_store: SingleStore,
297}
298
299impl Database {
300 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
305 let rkv = Self::open_rkv(path)?;
306 let meta_store = rkv.open_single("meta", StoreOptions::create())?;
307 let experiment_store = rkv.open_single("experiments", StoreOptions::create())?;
308 let enrollment_store = rkv.open_single("enrollments", StoreOptions::create())?;
309 let updates_store = rkv.open_single("updates", StoreOptions::create())?;
310 let event_count_store = rkv.open_single("event_counts", StoreOptions::create())?;
311 let db = Self {
312 rkv,
313 meta_store: SingleStore::new(meta_store),
314 experiment_store: SingleStore::new(experiment_store),
315 enrollment_store: SingleStore::new(enrollment_store),
316 updates_store: SingleStore::new(updates_store),
317 event_count_store: SingleStore::new(event_count_store),
318 };
319 db.maybe_upgrade()?;
320 Ok(db)
321 }
322
323 pub fn open_single<P: AsRef<Path>>(path: P, store_id: StoreId) -> Result<SingleStoreDatabase> {
324 let rkv = Self::open_rkv(path)?;
325 let store = SingleStore::new(match store_id {
326 StoreId::Experiments => rkv.open_single("experiments", StoreOptions::create())?,
327 StoreId::Enrollments => rkv.open_single("enrollments", StoreOptions::create())?,
328 StoreId::Meta => rkv.open_single("meta", StoreOptions::create())?,
329 StoreId::Updates => rkv.open_single("updates", StoreOptions::create())?,
330 StoreId::EventCounts => rkv.open_single("event_counts", StoreOptions::create())?,
331 });
332 Ok(SingleStoreDatabase { rkv, store })
333 }
334
335 fn maybe_upgrade(&self) -> Result<()> {
336 debug!("entered maybe upgrade");
337 let mut writer = self.rkv.write()?;
338 let db_version = self.meta_store.get::<u16, _>(&writer, DB_KEY_DB_VERSION)?;
339 match db_version {
340 Some(DB_VERSION) => {
341 info!("Already at version {}, no upgrade needed", DB_VERSION);
343 return Ok(());
344 }
345 Some(1) => {
346 info!("Migrating database from v1 to v2");
347 match self.migrate_v1_to_v2(&mut writer) {
348 Ok(_) => (),
349 Err(e) => {
350 error_support::report_error!(
357 "nimbus-database-migration",
358 "Error migrating database v1 to v2: {:?}. Wiping experiments and enrollments",
359 e
360 );
361 self.clear_experiments_and_enrollments(&mut writer)?;
362 }
363 };
364 }
365 None => {
366 info!("maybe_upgrade: no version number; wiping most stores");
367 self.clear_experiments_and_enrollments(&mut writer)?;
372 }
373 _ => {
374 error_support::report_error!(
375 "nimbus-unknown-database-version",
376 "Unknown database version. Wiping all stores."
377 );
378 self.clear_experiments_and_enrollments(&mut writer)?;
379 self.meta_store.clear(&mut writer)?;
380 }
381 }
382 self.updates_store.clear(&mut writer)?;
387 self.meta_store
388 .put(&mut writer, DB_KEY_DB_VERSION, &DB_VERSION)?;
389 writer.commit()?;
390 debug!("maybe_upgrade: transaction committed");
391 Ok(())
392 }
393
394 pub(crate) fn clear_experiments_and_enrollments(
395 &self,
396 writer: &mut Writer,
397 ) -> Result<(), NimbusError> {
398 self.experiment_store.clear(writer)?;
399 self.enrollment_store.clear(writer)?;
400 Ok(())
401 }
402
403 pub(crate) fn clear_event_count_data(&self, writer: &mut Writer) -> Result<(), NimbusError> {
404 self.event_count_store.clear(writer)?;
405 Ok(())
406 }
407
408 fn migrate_v1_to_v2(&self, writer: &mut Writer) -> Result<()> {
416 info!("Upgrading from version 1 to version 2");
417
418 let reader = self.read()?;
422
423 let enrollments: Vec<ExperimentEnrollment> =
431 self.enrollment_store.try_collect_all(&reader)?;
432 let experiments: Vec<Experiment> = self.experiment_store.try_collect_all(&reader)?;
433
434 let empty_string = "".to_string();
437 let slugs_with_experiment_issues: HashSet<String> = experiments
438 .iter()
439 .filter_map(
440 |e| {
441 let branch_with_empty_feature_ids =
442 e.branches.iter().find(|b| b.feature.is_none() || b.feature.as_ref().unwrap().feature_id.is_empty());
443 if branch_with_empty_feature_ids.is_some() {
444 warn!("{:?} experiment has branch missing a feature prop; experiment & enrollment will be discarded", &e.slug);
445 Some(e.slug.to_owned())
446 } else if e.feature_ids.is_empty() || e.feature_ids.contains(&empty_string) {
447 warn!("{:?} experiment has invalid feature_ids array; experiment & enrollment will be discarded", &e.slug);
448 Some(e.slug.to_owned())
449 } else {
450 None
451 }
452 })
453 .collect();
454 let slugs_to_discard: HashSet<_> = slugs_with_experiment_issues;
455
456 let updated_experiments: Vec<Experiment> = experiments
458 .into_iter()
459 .filter(|e| !slugs_to_discard.contains(&e.slug))
460 .collect();
461 debug!("updated experiments = {:?}", updated_experiments);
462
463 let updated_enrollments: Vec<ExperimentEnrollment> = enrollments
465 .into_iter()
466 .filter(|e| !slugs_to_discard.contains(&e.slug))
467 .collect();
468 debug!("updated enrollments = {:?}", updated_enrollments);
469
470 self.experiment_store.clear(writer)?;
472 for experiment in updated_experiments {
473 self.experiment_store
474 .put(writer, &experiment.slug, &experiment)?;
475 }
476
477 self.enrollment_store.clear(writer)?;
478 for enrollment in updated_enrollments {
479 self.enrollment_store
480 .put(writer, &enrollment.slug, &enrollment)?;
481 }
482 debug!("exiting migrate_v1_to_v2");
483
484 Ok(())
485 }
486
487 pub fn get_store(&self, store_id: StoreId) -> &SingleStore {
490 match store_id {
491 StoreId::Meta => &self.meta_store,
492 StoreId::Experiments => &self.experiment_store,
493 StoreId::Enrollments => &self.enrollment_store,
494 StoreId::Updates => &self.updates_store,
495 StoreId::EventCounts => &self.event_count_store,
496 }
497 }
498
499 pub fn open_rkv<P: AsRef<Path>>(path: P) -> Result<Rkv> {
500 let path = std::path::Path::new(path.as_ref()).join("db");
501 debug!("open_rkv: path = {:?}", path.display());
502 fs::create_dir_all(&path)?;
503 let rkv = match rkv_new(&path) {
504 Ok(rkv) => Ok(rkv),
505 Err(rkv_error) => {
506 match rkv_error {
507 StoreError::DatabaseCorrupted | StoreError::FileInvalid => {
509 warn!(
514 "Database at '{}' appears corrupt - removing and recreating",
515 path.display()
516 );
517 fs::remove_dir_all(&path)?;
518 fs::create_dir_all(&path)?;
519 rkv_new(&path)
522 }
523 _ => Err(rkv_error),
525 }
526 }
527 }?;
528 debug!("Database initialized");
529 Ok(rkv)
530 }
531
532 pub fn read(&self) -> Result<Reader> {
534 Ok(self.rkv.read()?)
535 }
536
537 pub fn write(&self) -> Result<Writer> {
541 Ok(self.rkv.write()?)
542 }
543
544 #[cfg(test)]
552 pub fn get<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
553 &self,
554 store_id: StoreId,
555 key: &str,
556 ) -> Result<Option<T>> {
557 let reader = self.rkv.read()?;
558 let persisted_data = self.get_store(store_id).store.get(&reader, key)?;
559 match persisted_data {
560 Some(data) => {
561 if let rkv::Value::Json(data) = data {
562 Ok(Some(match serde_json::from_str::<T>(data) {
563 Ok(v) => v,
564 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::get::serde_json::from_str".into(), e.to_string()))
565 }))
566 } else {
567 Err(NimbusError::InvalidPersistedData)
568 }
569 }
570 None => Ok(None),
571 }
572 }
573
574 #[cfg(test)]
579 pub fn collect_all<T: serde::Serialize + for<'de> serde::Deserialize<'de>>(
580 &self,
581 store_id: StoreId,
582 ) -> Result<Vec<T>> {
583 let mut result = Vec::new();
584 let reader = self.rkv.read()?;
585 let mut iter = self.get_store(store_id).store.iter_start(&reader)?;
586 while let Some(Ok((_, data))) = iter.next() {
587 if let rkv::Value::Json(data) = data {
588 result.push(match serde_json::from_str::<T>(data) {
589 Ok(v) => v,
590 Err(e) => return Err(NimbusError::JSONError("rkv::Value::Json(data) nimbus::stateful::persistence::Database::collect_all::serde_json::from_str".into(), e.to_string()))
591 });
592 }
593 }
594 Ok(result)
595 }
596}