1use std::collections::HashSet;
6use std::fmt::Debug;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use chrono::{DateTime, NaiveDateTime, Utc};
11use once_cell::sync::OnceCell;
12use remote_settings::RemoteSettingsService;
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::defaults::Defaults;
17use crate::enrollment::{
18 EnrolledFeature, EnrollmentChangeEvent, EnrollmentChangeEventType, EnrollmentsEvolver,
19 ExperimentEnrollment, PreviousGeckoPrefState,
20};
21use crate::error::{BehaviorError, info, warn};
22use crate::evaluator::{
23 CalculatedAttributes, ExperimentAvailable, TargetingAttributes, get_calculated_attributes,
24 is_experiment_available,
25};
26use crate::json::{JsonObject, PrefValue};
27use crate::metrics::{
28 DatabaseLoadExtraDef, DatabaseMigrationExtraDef, EnrollmentStatusExtraDef,
29 FeatureExposureExtraDef, MalformedFeatureConfigExtraDef, MetricsHandler,
30};
31use crate::schema::parse_experiments;
32use crate::stateful::behavior::EventStore;
33use crate::stateful::client::{NimbusServerSettings, SettingsClient, create_client};
34use crate::stateful::dbcache::DatabaseCache;
35use crate::stateful::enrollment::{
36 get_experiment_participation, get_rollout_participation, opt_in_with_branch, opt_out,
37 reset_telemetry_identifiers, set_experiment_participation, set_rollout_participation,
38 unenroll_for_pref,
39};
40use crate::stateful::gecko_prefs::{
41 GeckoPref, GeckoPrefHandler, GeckoPrefState, GeckoPrefStore, OriginalGeckoPref, PrefBranch,
42 PrefEnrollmentData, PrefUnenrollReason,
43};
44use crate::stateful::matcher::AppContext;
45use crate::stateful::persistence::{Database, StoreId, Writer};
46use crate::stateful::targeting::{RecordedContext, validate_event_queries};
47use crate::stateful::updating::{read_and_remove_pending_experiments, write_pending_experiments};
48use crate::strings::fmt_with_map;
49#[cfg(test)]
50use crate::tests::helpers::{TestGeckoPrefHandler, TestRecordedContext};
51use crate::{
52 AvailableExperiment, AvailableRandomizationUnits, EnrolledExperiment, EnrollmentStatus,
53};
54use crate::{Experiment, ExperimentBranch, NimbusError, NimbusTargetingHelper, Result};
55
56const DB_KEY_NIMBUS_ID: &str = "nimbus-id";
57pub const DB_KEY_INSTALLATION_DATE: &str = "installation-date";
58pub const DB_KEY_UPDATE_DATE: &str = "update-date";
59pub const DB_KEY_APP_VERSION: &str = "app-version";
60pub const DB_KEY_FETCH_ENABLED: &str = "fetch-enabled";
61
62#[derive(Default)]
67pub struct InternalMutableState {
68 pub(crate) available_randomization_units: AvailableRandomizationUnits,
69 pub(crate) install_date: Option<DateTime<Utc>>,
70 pub(crate) update_date: Option<DateTime<Utc>>,
71 pub(crate) targeting_attributes: TargetingAttributes,
73}
74
75impl InternalMutableState {
76 pub(crate) fn update_time_to_now(&mut self, now: DateTime<Utc>) {
77 self.targeting_attributes
78 .update_time_to_now(now, &self.install_date, &self.update_date);
79 }
80}
81
82pub struct NimbusClient {
86 settings_client: Mutex<Box<dyn SettingsClient + Send>>,
87 pub(crate) mutable_state: Mutex<InternalMutableState>,
88 app_context: AppContext,
89 pub(crate) db: OnceCell<Database>,
90 database_cache: DatabaseCache,
93 db_path: PathBuf,
94 coenrolling_feature_ids: Vec<String>,
95 event_store: Arc<Mutex<EventStore>>,
96 recorded_context: Option<Arc<dyn RecordedContext>>,
97 pub(crate) gecko_prefs: Option<Arc<GeckoPrefStore>>,
98 metrics_handler: Arc<dyn MetricsHandler>,
99}
100
101impl NimbusClient {
102 #[allow(clippy::too_many_arguments)]
105 pub fn new<P: Into<PathBuf>>(
106 app_context: AppContext,
107 recorded_context: Option<Arc<dyn RecordedContext>>,
108 coenrolling_feature_ids: Vec<String>,
109 db_path: P,
110 metrics_handler: Arc<dyn MetricsHandler>,
111 gecko_pref_handler: Option<Box<dyn GeckoPrefHandler>>,
112 remote_settings_info: Option<NimbusServerSettings>,
113 ) -> Result<Self> {
114 let settings_client = Mutex::new(create_client(remote_settings_info)?);
115
116 let targeting_attributes: TargetingAttributes = app_context.clone().into();
117 let mutable_state = Mutex::new(InternalMutableState {
118 available_randomization_units: Default::default(),
119 targeting_attributes,
120 install_date: Default::default(),
121 update_date: Default::default(),
122 });
123
124 let mut prefs = None;
125 if let Some(handler) = gecko_pref_handler {
126 prefs = Some(Arc::new(GeckoPrefStore::new(Arc::new(handler))));
127 }
128
129 info!(
130 "Initialized NimbusClient with: app_context = {:?}; recorded_context = {:?}",
131 app_context,
132 recorded_context
133 .as_ref()
134 .map(|rc| serde_json::Value::Object(rc.to_json()))
135 .unwrap_or(serde_json::Value::Null)
136 );
137
138 Ok(Self {
139 settings_client,
140 mutable_state,
141 app_context,
142 database_cache: Default::default(),
143 db_path: db_path.into(),
144 coenrolling_feature_ids,
145 db: OnceCell::default(),
146 event_store: Arc::default(),
147 recorded_context,
148 gecko_prefs: prefs,
149 metrics_handler,
150 })
151 }
152
153 pub fn with_targeting_attributes(&mut self, targeting_attributes: TargetingAttributes) {
154 let mut state = self.mutable_state.lock().unwrap();
155 state.targeting_attributes = targeting_attributes;
156 }
157
158 pub fn get_targeting_attributes(&self) -> TargetingAttributes {
159 let mut state = self.mutable_state.lock().unwrap();
160 state.update_time_to_now(Utc::now());
161 state.targeting_attributes.clone()
162 }
163
164 pub fn initialize(&self) -> Result<()> {
165 let db = self.db()?;
166 let mut writer = db.write()?;
168
169 let mut state = self.mutable_state.lock().unwrap();
170 self.begin_initialize(db, &mut writer, &mut state)?;
171 self.end_initialize(db, writer, &mut state)?;
172
173 Ok(())
174 }
175
176 fn begin_initialize(
179 &self,
180 db: &Database,
181 writer: &mut Writer,
182 state: &mut MutexGuard<InternalMutableState>,
183 ) -> Result<()> {
184 self.read_or_create_nimbus_id(db, writer, state)?;
185 self.update_ta_install_dates(db, writer, state)?;
186 self.event_store
187 .lock()
188 .expect("unable to lock event_store mutex")
189 .read_from_db(db)?;
190
191 if let Some(recorded_context) = &self.recorded_context {
192 let targeting_helper = self.create_targeting_helper_with_context(match serde_json::to_value(
193 &state.targeting_attributes,
194 ) {
195 Ok(v) => v,
196 Err(e) => return Err(NimbusError::JSONError("targeting_helper = nimbus::stateful::nimbus_client::NimbusClient::begin_initialize::serde_json::to_value".into(), e.to_string()))
197 });
198 recorded_context.execute_queries(targeting_helper.as_ref())?;
199 state
200 .targeting_attributes
201 .set_recorded_context(recorded_context.to_json());
202 }
203
204 if let Some(gecko_prefs) = &self.gecko_prefs {
205 gecko_prefs.initialize()?;
206 }
207
208 Ok(())
209 }
210
211 fn end_initialize(
214 &self,
215 db: &Database,
216 writer: Writer,
217 state: &mut MutexGuard<InternalMutableState>,
218 ) -> Result<()> {
219 self.update_ta_active_experiments(db, &writer, state)?;
220 let coenrolling_ids = self
221 .coenrolling_feature_ids
222 .iter()
223 .map(|s| s.as_str())
224 .collect();
225 self.database_cache.commit_and_update(
226 db,
227 writer,
228 &coenrolling_ids,
229 self.gecko_prefs.clone(),
230 true,
231 )?;
232 Ok(())
233 }
234
235 pub fn get_enrollment_by_feature(&self, feature_id: String) -> Result<Option<EnrolledFeature>> {
236 self.database_cache.get_enrollment_by_feature(&feature_id)
237 }
238
239 pub fn get_experiment_branch(&self, slug: String) -> Result<Option<String>> {
241 self.database_cache.get_experiment_branch(&slug)
242 }
243
244 pub fn get_feature_config_variables(&self, feature_id: String) -> Result<Option<String>> {
245 Ok(
246 if let Some(s) = self
247 .database_cache
248 .get_feature_config_variables(&feature_id)?
249 {
250 self.record_feature_activation_if_needed(&feature_id);
251 Some(s)
252 } else {
253 None
254 },
255 )
256 }
257
258 pub fn get_experiment_branches(&self, slug: String) -> Result<Vec<ExperimentBranch>> {
259 self.get_all_experiments()?
260 .into_iter()
261 .find(|e| e.slug == slug)
262 .map(|e| e.branches.into_iter().map(|b| b.into()).collect())
263 .ok_or(NimbusError::NoSuchExperiment(slug))
264 }
265
266 pub fn get_experiment_participation(&self) -> Result<bool> {
267 let db = self.db()?;
268 let reader = db.read()?;
269 get_experiment_participation(db, &reader)
270 }
271
272 pub fn get_rollout_participation(&self) -> Result<bool> {
273 let db = self.db()?;
274 let reader = db.read()?;
275 get_rollout_participation(db, &reader)
276 }
277
278 pub fn set_experiment_participation(
279 &self,
280 user_participating: bool,
281 ) -> Result<Vec<EnrollmentChangeEvent>> {
282 let db = self.db()?;
283 let mut writer = db.write()?;
284 let mut state = self.mutable_state.lock().unwrap();
285 set_experiment_participation(db, &mut writer, user_participating)?;
286
287 let existing_experiments: Vec<Experiment> =
288 db.get_store(StoreId::Experiments).collect_all(&writer)?;
289 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
290 let res = self.end_initialize(db, writer, &mut state);
291 self.record_enrollment_status_telemetry(&mut state)?;
292 res?;
293 Ok(events)
294 }
295
296 pub fn set_rollout_participation(
297 &self,
298 user_participating: bool,
299 ) -> Result<Vec<EnrollmentChangeEvent>> {
300 let db = self.db()?;
301 let mut writer = db.write()?;
302 let mut state = self.mutable_state.lock().unwrap();
303 set_rollout_participation(db, &mut writer, user_participating)?;
304
305 let existing_experiments: Vec<Experiment> =
306 db.get_store(StoreId::Experiments).collect_all(&writer)?;
307 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
308 let res = self.end_initialize(db, writer, &mut state);
309 self.record_enrollment_status_telemetry(&mut state)?;
310 res?;
311 Ok(events)
312 }
313
314 pub fn get_active_experiments(&self) -> Result<Vec<EnrolledExperiment>> {
315 self.database_cache.get_active_experiments()
316 }
317
318 pub fn get_all_experiments(&self) -> Result<Vec<Experiment>> {
319 let db = self.db()?;
320 let reader = db.read()?;
321 db.get_store(StoreId::Experiments)
322 .collect_all::<Experiment, _>(&reader)
323 }
324
325 pub fn get_available_experiments(&self) -> Result<Vec<AvailableExperiment>> {
326 let th = self.create_targeting_helper(None)?;
327 Ok(self
328 .get_all_experiments()?
329 .into_iter()
330 .filter(|exp| {
331 is_experiment_available(&th, exp, false) == ExperimentAvailable::Available
332 })
333 .map(|exp| exp.into())
334 .collect())
335 }
336
337 pub fn opt_in_with_branch(
338 &self,
339 experiment_slug: String,
340 branch: String,
341 ) -> Result<Vec<EnrollmentChangeEvent>> {
342 let db = self.db()?;
343 let mut writer = db.write()?;
344 let result = opt_in_with_branch(db, &mut writer, &experiment_slug, &branch)?;
345 let mut state = self.mutable_state.lock().unwrap();
346 self.end_initialize(db, writer, &mut state)?;
347 Ok(result)
348 }
349
350 pub fn opt_out(&self, experiment_slug: String) -> Result<Vec<EnrollmentChangeEvent>> {
351 let db = self.db()?;
352 let mut writer = db.write()?;
353 let result = opt_out(
354 db,
355 &mut writer,
356 &experiment_slug,
357 self.gecko_prefs.as_deref(),
358 )?;
359 let mut state = self.mutable_state.lock().unwrap();
360 self.end_initialize(db, writer, &mut state)?;
361 Ok(result)
362 }
363
364 pub fn fetch_experiments(&self) -> Result<()> {
365 if !self.is_fetch_enabled()? {
366 return Ok(());
367 }
368 info!("fetching experiments");
369 let settings_client = self.settings_client.lock().unwrap();
370 let new_experiments = settings_client.fetch_experiments()?;
371 let db = self.db()?;
372 let mut writer = db.write()?;
373 write_pending_experiments(db, &mut writer, new_experiments)?;
374 writer.commit()?;
375 Ok(())
376 }
377
378 pub fn set_fetch_enabled(&self, allow: bool) -> Result<()> {
379 let db = self.db()?;
380 let mut writer = db.write()?;
381 db.get_store(StoreId::Meta)
382 .put(&mut writer, DB_KEY_FETCH_ENABLED, &allow)?;
383 writer.commit()?;
384 Ok(())
385 }
386
387 pub(crate) fn is_fetch_enabled(&self) -> Result<bool> {
388 let db = self.db()?;
389 let reader = db.read()?;
390 let enabled = db
391 .get_store(StoreId::Meta)
392 .get(&reader, DB_KEY_FETCH_ENABLED)?
393 .unwrap_or(true);
394 Ok(enabled)
395 }
396
397 fn update_ta_install_dates(
401 &self,
402 db: &Database,
403 writer: &mut Writer,
404 state: &mut MutexGuard<InternalMutableState>,
405 ) -> Result<()> {
406 if state.install_date.is_none() {
411 let installation_date = self.get_installation_date(db, writer)?;
412 state.install_date = Some(installation_date);
413 }
414 if state.update_date.is_none() {
415 let update_date = self.get_update_date(db, writer)?;
416 state.update_date = Some(update_date);
417 }
418 state.update_time_to_now(Utc::now());
419
420 Ok(())
421 }
422
423 fn update_ta_active_experiments(
427 &self,
428 db: &Database,
429 writer: &Writer,
430 state: &mut MutexGuard<InternalMutableState>,
431 ) -> Result<()> {
432 let enrollments_store = db.get_store(StoreId::Enrollments);
433 let prev_enrollments: Vec<ExperimentEnrollment> = enrollments_store.collect_all(writer)?;
434
435 state
436 .targeting_attributes
437 .update_enrollments(&prev_enrollments);
438
439 Ok(())
440 }
441
442 fn evolve_experiments(
443 &self,
444 db: &Database,
445 writer: &mut Writer,
446 state: &mut InternalMutableState,
447 experiments: &[Experiment],
448 ) -> Result<Vec<EnrollmentChangeEvent>> {
449 let mut targeting_helper = NimbusTargetingHelper::with_targeting_attributes(
450 &state.targeting_attributes,
451 self.event_store.clone(),
452 self.gecko_prefs.clone(),
453 );
454 if let Some(ref recorded_context) = self.recorded_context {
455 recorded_context.record();
456 }
457 let coenrolling_feature_ids = self
458 .coenrolling_feature_ids
459 .iter()
460 .map(|s| s.as_str())
461 .collect();
462 let mut evolver = EnrollmentsEvolver::new(
463 &state.available_randomization_units,
464 &mut targeting_helper,
465 &coenrolling_feature_ids,
466 );
467 evolver.evolve_enrollments_in_db(db, writer, experiments, self.gecko_prefs.as_deref())
468 }
469
470 pub fn apply_pending_experiments(&self) -> Result<Vec<EnrollmentChangeEvent>> {
471 info!("updating experiment list");
472 let db = self.db()?;
473 let mut writer = db.write()?;
474
475 let pending_updates = read_and_remove_pending_experiments(db, &mut writer)?;
478 let mut state = self.mutable_state.lock().unwrap();
479 self.begin_initialize(db, &mut writer, &mut state)?;
480
481 let should_record_enrollment_status = pending_updates.is_some();
482 let res = match pending_updates {
483 Some(new_experiments) => {
484 self.update_ta_active_experiments(db, &writer, &mut state)?;
485 self.evolve_experiments(db, &mut writer, &mut state, &new_experiments)?
487 }
488 None => vec![],
489 };
490
491 let end_init_res = self.end_initialize(db, writer, &mut state);
493 if should_record_enrollment_status {
494 self.record_enrollment_status_telemetry(&mut state)?;
495 }
496 end_init_res?;
497 Ok(res)
498 }
499
500 #[allow(deprecated)] fn get_installation_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
502 if let Some(context_installation_date) = self.app_context.installation_date {
504 let res = DateTime::<Utc>::from_naive_utc_and_offset(
505 NaiveDateTime::from_timestamp_opt(context_installation_date / 1_000, 0).unwrap(),
506 Utc,
507 );
508 info!("[Nimbus] Retrieved date from Context: {}", res);
509 return Ok(res);
510 }
511 let store = db.get_store(StoreId::Meta);
512 let persisted_installation_date: Option<DateTime<Utc>> =
513 store.get(writer, DB_KEY_INSTALLATION_DATE)?;
514 Ok(
515 if let Some(installation_date) = persisted_installation_date {
516 installation_date
517 } else {
518 Utc::now()
519 },
520 )
521 }
522
523 fn get_update_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
524 let store = db.get_store(StoreId::Meta);
525
526 let persisted_app_version: Option<String> = store.get(writer, DB_KEY_APP_VERSION)?;
527 let update_date: Option<DateTime<Utc>> = store.get(writer, DB_KEY_UPDATE_DATE)?;
528 Ok(
529 match (
530 persisted_app_version,
531 &self.app_context.app_version,
532 update_date,
533 ) {
534 (Some(persisted), Some(current), Some(date)) if persisted == *current => date,
536 (Some(persisted), Some(current), _) if persisted != *current => {
538 let now = Utc::now();
539 store.put(writer, DB_KEY_APP_VERSION, current)?;
540 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
541 now
542 }
543 (None, Some(current), _) => {
545 let now = Utc::now();
546 store.put(writer, DB_KEY_APP_VERSION, current)?;
547 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
548 now
549 }
550 (_, _, Some(date)) => date,
552 _ => Utc::now(),
554 },
555 )
556 }
557
558 pub fn set_experiments_locally(&self, experiments_json: String) -> Result<()> {
559 let new_experiments = parse_experiments(&experiments_json)?;
560 let db = self.db()?;
561 let mut writer = db.write()?;
562 write_pending_experiments(db, &mut writer, new_experiments)?;
563 writer.commit()?;
564 Ok(())
565 }
566
567 pub fn reset_enrollments(&self) -> Result<()> {
571 let db = self.db()?;
572 let mut writer = db.write()?;
573 let mut state = self.mutable_state.lock().unwrap();
574 db.clear_experiments_and_enrollments(&mut writer)?;
575 self.end_initialize(db, writer, &mut state)?;
576 Ok(())
577 }
578
579 pub fn reset_telemetry_identifiers(&self) -> Result<Vec<EnrollmentChangeEvent>> {
588 let mut events = vec![];
589 let db = self.db()?;
590 let mut writer = db.write()?;
591 let mut state = self.mutable_state.lock().unwrap();
592 let store = db.get_store(StoreId::Meta);
595 if store.get::<String, _>(&writer, DB_KEY_NIMBUS_ID)?.is_some() {
596 events = reset_telemetry_identifiers(db, &mut writer)?;
598
599 db.clear_event_count_data(&mut writer)?;
601
602 store.delete(&mut writer, DB_KEY_NIMBUS_ID)?;
605 self.end_initialize(db, writer, &mut state)?;
606 }
607
608 state.available_randomization_units = Default::default();
610 state.targeting_attributes.nimbus_id = None;
611
612 Ok(events)
613 }
614
615 pub fn nimbus_id(&self) -> Result<Uuid> {
616 let db = self.db()?;
617 let mut writer = db.write()?;
618 let mut state = self.mutable_state.lock().unwrap();
619 let uuid = self.read_or_create_nimbus_id(db, &mut writer, &mut state)?;
620
621 writer.commit()?;
625 Ok(uuid)
626 }
627
628 fn read_or_create_nimbus_id(
633 &self,
634 db: &Database,
635 writer: &mut Writer,
636 state: &mut MutexGuard<'_, InternalMutableState>,
637 ) -> Result<Uuid> {
638 let store = db.get_store(StoreId::Meta);
639 let nimbus_id = match store.get(writer, DB_KEY_NIMBUS_ID)? {
640 Some(nimbus_id) => nimbus_id,
641 None => {
642 let nimbus_id = Uuid::new_v4();
643 store.put(writer, DB_KEY_NIMBUS_ID, &nimbus_id)?;
644 nimbus_id
645 }
646 };
647
648 state.available_randomization_units.nimbus_id = Some(nimbus_id.to_string());
649 state.targeting_attributes.nimbus_id = Some(nimbus_id.to_string());
650
651 Ok(nimbus_id)
652 }
653
654 pub fn set_nimbus_id(&self, uuid: &Uuid) -> Result<()> {
658 let db = self.db()?;
659 let mut writer = db.write()?;
660 db.get_store(StoreId::Meta)
661 .put(&mut writer, DB_KEY_NIMBUS_ID, uuid)?;
662 writer.commit()?;
663 Ok(())
664 }
665
666 pub(crate) fn db(&self) -> Result<&Database> {
667 self.db
668 .get_or_try_init(|| Database::new(&self.db_path, self.metrics_handler.clone()))
669 }
670
671 fn merge_additional_context(&self, context: Option<JsonObject>) -> Result<Value> {
672 let context = context.map(Value::Object);
673 let targeting = match serde_json::to_value(self.get_targeting_attributes()) {
674 Ok(v) => v,
675 Err(e) => return Err(NimbusError::JSONError("targeting = nimbus::stateful::nimbus_client::NimbusClient::merge_additional_context::serde_json::to_value".into(), e.to_string()))
676 };
677 let context = match context {
678 Some(v) => v.defaults(&targeting)?,
679 None => targeting,
680 };
681
682 Ok(context)
683 }
684
685 pub fn create_targeting_helper(
686 &self,
687 additional_context: Option<JsonObject>,
688 ) -> Result<Arc<NimbusTargetingHelper>> {
689 let context = self.merge_additional_context(additional_context)?;
690 let helper =
691 NimbusTargetingHelper::new(context, self.event_store.clone(), self.gecko_prefs.clone());
692 Ok(Arc::new(helper))
693 }
694
695 pub fn create_targeting_helper_with_context(
696 &self,
697 context: Value,
698 ) -> Arc<NimbusTargetingHelper> {
699 Arc::new(NimbusTargetingHelper::new(
700 context,
701 self.event_store.clone(),
702 self.gecko_prefs.clone(),
703 ))
704 }
705
706 pub fn create_string_helper(
707 &self,
708 additional_context: Option<JsonObject>,
709 ) -> Result<Arc<NimbusStringHelper>> {
710 let context = self.merge_additional_context(additional_context)?;
711 let helper = NimbusStringHelper::new(context.as_object().unwrap().to_owned());
712 Ok(Arc::new(helper))
713 }
714
715 pub fn record_event(&self, event_id: String, count: i64) -> Result<()> {
720 let mut event_store = self.event_store.lock().unwrap();
721 event_store.record_event(count as u64, &event_id, None)?;
722 event_store.persist_data(self.db()?)?;
723 Ok(())
724 }
725
726 pub fn record_past_event(&self, event_id: String, seconds_ago: i64, count: i64) -> Result<()> {
731 if seconds_ago < 0 {
732 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
733 "Time duration in the past must be positive".to_string(),
734 )));
735 }
736 let mut event_store = self.event_store.lock().unwrap();
737 event_store.record_past_event(
738 count as u64,
739 &event_id,
740 None,
741 chrono::Duration::seconds(seconds_ago),
742 )?;
743 event_store.persist_data(self.db()?)?;
744 Ok(())
745 }
746
747 pub fn advance_event_time(&self, by_seconds: i64) -> Result<()> {
751 if by_seconds < 0 {
752 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
753 "Time duration in the future must be positive".to_string(),
754 )));
755 }
756 let mut event_store = self.event_store.lock().unwrap();
757 event_store.advance_datum(chrono::Duration::seconds(by_seconds));
758 Ok(())
759 }
760
761 pub fn clear_events(&self) -> Result<()> {
765 let mut event_store = self.event_store.lock().unwrap();
766 event_store.clear(self.db()?)?;
767 Ok(())
768 }
769
770 pub fn event_store(&self) -> Arc<Mutex<EventStore>> {
771 self.event_store.clone()
772 }
773
774 pub fn dump_state_to_log(&self) -> Result<()> {
775 let experiments = self.get_active_experiments()?;
776 info!("{0: <65}| {1: <30}| {2}", "Slug", "Features", "Branch");
777 for exp in &experiments {
778 info!(
779 "{0: <65}| {1: <30}| {2}",
780 &exp.slug,
781 &exp.feature_ids.join(", "),
782 &exp.branch_slug
783 );
784 }
785 Ok(())
786 }
787
788 pub fn unenroll_for_gecko_pref(
790 &self,
791 pref_state: GeckoPrefState,
792 pref_unenroll_reason: PrefUnenrollReason,
793 ) -> Result<Vec<EnrollmentChangeEvent>> {
794 let mut events = Vec::new();
795 if let Some(prefs) = self.gecko_prefs.clone() {
796 {
797 let mut pref_store_state = prefs.get_mutable_pref_state();
798 pref_store_state.update_pref_state(&pref_state);
799 }
800 let enrollments = self
801 .database_cache
802 .get_enrollments_for_pref(&pref_state.gecko_pref.pref)?;
803
804 let db = self.db()?;
805 let mut writer = db.write()?;
806
807 if let Some(enrollments) = enrollments {
808 for experiment_slug in enrollments {
809 unenroll_for_pref(
810 db,
811 &mut writer,
812 &experiment_slug,
813 pref_unenroll_reason,
814 &pref_state.gecko_pref.pref,
815 self.gecko_prefs.as_deref(),
816 &mut events,
817 )?;
818 }
819 } else {
820 warn!(
821 "Could not find enrollment. Could unenrollment already occurred through another preference?"
822 )
823 }
824
825 let mut state = self.mutable_state.lock().unwrap();
826 self.end_initialize(db, writer, &mut state)?;
827 }
828 Ok(events)
829 }
830
831 pub fn register_previous_gecko_pref_states(
832 &self,
833 gecko_pref_states: &[GeckoPrefState],
834 ) -> Result<()> {
835 let all_prev_gecko_pref_states =
836 super::gecko_prefs::build_prev_gecko_pref_states(gecko_pref_states);
837
838 let db = self.db()?;
839 let mut writer = db.write()?;
840
841 for (experiment_slug, prev_gecko_pref_states) in all_prev_gecko_pref_states {
842 Self::add_prev_gecko_pref_state_for_experiment(
843 db,
844 &mut writer,
845 &experiment_slug,
846 prev_gecko_pref_states,
847 )?;
848 }
849
850 let coenrolling_ids = self
851 .coenrolling_feature_ids
852 .iter()
853 .map(|s| s.as_str())
854 .collect();
855
856 self.database_cache.commit_and_update(
859 db,
860 writer,
861 &coenrolling_ids,
862 self.gecko_prefs.clone(),
863 false,
864 )?;
865
866 Ok(())
867 }
868
869 pub(crate) fn add_prev_gecko_pref_state_for_experiment(
870 db: &Database,
871 writer: &mut Writer,
872 experiment_slug: &str,
873 prev_gecko_pref_states: Vec<PreviousGeckoPrefState>,
874 ) -> Result<()> {
875 let enrollments = db.get_store(StoreId::Enrollments);
876
877 if let Ok(Some(existing_enrollment)) =
878 enrollments.get::<ExperimentEnrollment, Writer>(writer, experiment_slug)
879 {
880 let updated_states =
882 existing_enrollment.on_add_gecko_pref_states(prev_gecko_pref_states);
883 enrollments.put(writer, experiment_slug, &updated_states)?;
884 }
885 Ok(())
886 }
887
888 pub fn get_previous_gecko_pref_states(
889 &self,
890 experiment_slug: String,
891 ) -> Result<Option<Vec<PreviousGeckoPrefState>>> {
892 let db = self.db()?;
893 let reader = db.read()?;
894
895 Ok(db
896 .get_store(StoreId::Enrollments)
897 .get::<ExperimentEnrollment, _>(&reader, &experiment_slug)?
898 .and_then(|enrollment| {
899 if let EnrollmentStatus::Enrolled {
900 prev_gecko_pref_states: prev_gecko_pref_state,
901 ..
902 } = enrollment.status
903 {
904 prev_gecko_pref_state
905 } else {
906 None
907 }
908 }))
909 }
910
911 #[cfg(test)]
912 pub fn get_recorded_context(&self) -> &&TestRecordedContext {
913 self.recorded_context
914 .clone()
915 .map(|ref recorded_context|
916 unsafe {
921 std::mem::transmute::<&&dyn RecordedContext, &&TestRecordedContext>(
922 &&**recorded_context,
923 )
924 })
925 .expect("failed to unwrap RecordedContext object")
926 }
927
928 #[cfg(test)]
929 pub fn get_gecko_pref_store(&self) -> Arc<Box<TestGeckoPrefHandler>> {
930 self.gecko_prefs.clone()
931 .clone()
932 .map(|ref pref_store|
933 unsafe {
938 std::mem::transmute::<Arc<Box<dyn GeckoPrefHandler>>, Arc<Box<TestGeckoPrefHandler>>>(
939 pref_store.clone().handler.clone(),
940 )
941 })
942 .expect("failed to unwrap GeckoPrefHandler object")
943 }
944}
945
946impl NimbusClient {
947 pub fn set_install_time(&mut self, then: DateTime<Utc>) {
948 let mut state = self.mutable_state.lock().unwrap();
949 state.install_date = Some(then);
950 state.update_time_to_now(Utc::now());
951 }
952
953 pub fn set_update_time(&mut self, then: DateTime<Utc>) {
954 let mut state = self.mutable_state.lock().unwrap();
955 state.update_date = Some(then);
956 state.update_time_to_now(Utc::now());
957 }
958}
959
960impl NimbusClient {
961 fn record_feature_activation_if_needed(&self, feature_id: &str) {
964 if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(feature_id)
965 && f.branch.is_some()
966 && !self.coenrolling_feature_ids.contains(&f.feature_id)
967 {
968 self.metrics_handler.record_feature_activation(f.into());
969 }
970 }
971
972 pub fn record_feature_exposure(&self, feature_id: String, slug: Option<String>) {
973 let event = if let Some(slug) = slug {
974 if let Ok(Some(branch)) = self.database_cache.get_experiment_branch(&slug) {
975 Some(FeatureExposureExtraDef {
976 feature_id,
977 branch: Some(branch),
978 slug,
979 })
980 } else {
981 None
982 }
983 } else if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id) {
984 if f.branch.is_some() {
985 Some(f.into())
986 } else {
987 None
988 }
989 } else {
990 None
991 };
992
993 if let Some(event) = event {
994 self.metrics_handler.record_feature_exposure(event);
995 }
996 }
997
998 pub fn record_malformed_feature_config(&self, feature_id: String, part_id: String) {
999 let event = if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id)
1000 {
1001 MalformedFeatureConfigExtraDef::from_feature_and_part(f, part_id)
1002 } else {
1003 MalformedFeatureConfigExtraDef::new(feature_id, part_id)
1004 };
1005 self.metrics_handler.record_malformed_feature_config(event);
1006 }
1007
1008 fn record_enrollment_status_telemetry(
1009 &self,
1010 state: &mut MutexGuard<InternalMutableState>,
1011 ) -> Result<()> {
1012 let targeting_helper = NimbusTargetingHelper::new(
1013 state.targeting_attributes.clone(),
1014 self.event_store.clone(),
1015 self.gecko_prefs.clone(),
1016 );
1017 let experiments = self.database_cache.get_experiments()?;
1018 let experiments = experiments
1019 .iter()
1020 .filter(|exp| {
1021 is_experiment_available(&targeting_helper, exp, true)
1022 == ExperimentAvailable::Available
1023 })
1024 .map(|exp| &*exp.slug)
1025 .collect::<HashSet<&str>>();
1026 self.metrics_handler.record_enrollment_statuses(
1027 self.database_cache
1028 .get_enrollments()?
1029 .into_iter()
1030 .filter_map(|e| match experiments.contains(&*e.slug) {
1031 true => Some(e.into()),
1032 false => None,
1033 })
1034 .collect(),
1035 );
1036 self.metrics_handler.submit_targeting_context();
1037 Ok(())
1038 }
1039}
1040
1041pub struct NimbusStringHelper {
1042 context: JsonObject,
1043}
1044
1045impl NimbusStringHelper {
1046 fn new(context: JsonObject) -> Self {
1047 Self { context }
1048 }
1049
1050 pub fn get_uuid(&self, template: String) -> Option<String> {
1051 if template.contains("{uuid}") {
1052 let uuid = Uuid::new_v4();
1053 Some(uuid.to_string())
1054 } else {
1055 None
1056 }
1057 }
1058
1059 pub fn string_format(&self, template: String, uuid: Option<String>) -> String {
1060 match uuid {
1061 Some(uuid) => {
1062 let mut map = self.context.clone();
1063 map.insert("uuid".to_string(), Value::String(uuid));
1064 fmt_with_map(&template, &map)
1065 }
1066 _ => fmt_with_map(&template, &self.context),
1067 }
1068 }
1069}
1070
1071#[cfg(feature = "stateful-uniffi-bindings")]
1072uniffi::custom_type!(JsonObject, String, {
1073 remote,
1074 try_lift: |val| {
1075 let json: Value = serde_json::from_str(&val)?;
1076
1077 match json.as_object() {
1078 Some(obj) => Ok(obj.clone()),
1079 _ => Err(uniffi::deps::anyhow::anyhow!(
1080 "Unexpected JSON-non-object in the bagging area"
1081 )),
1082 }
1083 },
1084 lower: |obj| serde_json::Value::Object(obj).to_string(),
1085});
1086
1087#[cfg(feature = "stateful-uniffi-bindings")]
1088uniffi::custom_type!(PrefValue, String, {
1089 remote,
1090 try_lift: |val| {
1091 let json: Value = match serde_json::from_str(&val) {
1094 Ok(json) => json,
1095 Err(_) => Value::String(val),
1096 };
1097 let is_valid_pref_type = json.is_string() || json.is_boolean()
1098 || (json.is_number() && !json.is_f64()) || json.is_null();
1099 if is_valid_pref_type {
1100 Ok(json)
1101 } else {
1102 Err(anyhow::anyhow!(format!("Value {} is not a string, boolean, number, or null, or is a float", json)))
1103 }
1104 },
1105 lower: |val| {
1106 val.to_string()
1107 }
1108});
1109
1110#[cfg(feature = "stateful-uniffi-bindings")]
1111uniffi::include_scaffolding!("nimbus");