1use std::collections::HashSet;
6use std::fmt::Debug;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use chrono::{DateTime, NaiveDateTime, Utc};
11use once_cell::sync::OnceCell;
12use remote_settings::RemoteSettingsService;
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::defaults::Defaults;
17use crate::enrollment::{
18 EnrolledFeature, EnrollmentChangeEvent, EnrollmentChangeEventType, EnrollmentsEvolver,
19 ExperimentEnrollment, PreviousGeckoPrefState,
20};
21use crate::error::{BehaviorError, info, warn};
22use crate::evaluator::{
23 CalculatedAttributes, ExperimentAvailable, TargetingAttributes, get_calculated_attributes,
24 is_experiment_available,
25};
26use crate::json::{JsonObject, PrefValue};
27use crate::metrics::{
28 DatabaseLoadExtraDef, DatabaseMigrationExtraDef, EnrollmentStatusExtraDef,
29 FeatureExposureExtraDef, MalformedFeatureConfigExtraDef, MetricsHandler,
30};
31use crate::schema::parse_experiments;
32use crate::stateful::behavior::EventStore;
33use crate::stateful::client::{NimbusServerSettings, SettingsClient, create_client};
34use crate::stateful::dbcache::DatabaseCache;
35use crate::stateful::enrollment::{
36 get_experiment_participation, get_rollout_participation, opt_in_with_branch, opt_out,
37 reset_telemetry_identifiers, set_experiment_participation, set_rollout_participation,
38 unenroll_for_pref,
39};
40use crate::stateful::gecko_prefs::{
41 GeckoPref, GeckoPrefHandler, GeckoPrefState, GeckoPrefStore, OriginalGeckoPref, PrefBranch,
42 PrefEnrollmentData, PrefUnenrollReason,
43};
44use crate::stateful::matcher::AppContext;
45use crate::stateful::persistence::{Database, StoreId, Writer};
46use crate::stateful::targeting::{RecordedContext, validate_event_queries};
47use crate::stateful::updating::{read_and_remove_pending_experiments, write_pending_experiments};
48use crate::strings::fmt_with_map;
49#[cfg(test)]
50use crate::tests::helpers::{TestGeckoPrefHandler, TestRecordedContext};
51use crate::{
52 AvailableExperiment, AvailableRandomizationUnits, EnrolledExperiment, EnrollmentStatus,
53};
54use crate::{Experiment, ExperimentBranch, NimbusError, NimbusTargetingHelper, Result};
55
56const DB_KEY_NIMBUS_ID: &str = "nimbus-id";
57pub const DB_KEY_INSTALLATION_DATE: &str = "installation-date";
58pub const DB_KEY_UPDATE_DATE: &str = "update-date";
59pub const DB_KEY_APP_VERSION: &str = "app-version";
60pub const DB_KEY_FETCH_ENABLED: &str = "fetch-enabled";
61
62#[derive(Default)]
67pub struct InternalMutableState {
68 pub(crate) available_randomization_units: AvailableRandomizationUnits,
69 pub(crate) install_date: Option<DateTime<Utc>>,
70 pub(crate) update_date: Option<DateTime<Utc>>,
71 pub(crate) targeting_attributes: TargetingAttributes,
73}
74
75impl InternalMutableState {
76 pub(crate) fn update_time_to_now(&mut self, now: DateTime<Utc>) {
77 self.targeting_attributes
78 .update_time_to_now(now, &self.install_date, &self.update_date);
79 }
80}
81
82pub struct NimbusClient {
86 settings_client: Mutex<Box<dyn SettingsClient + Send>>,
87 pub(crate) mutable_state: Mutex<InternalMutableState>,
88 app_context: AppContext,
89 pub(crate) db: OnceCell<Database>,
90 database_cache: DatabaseCache,
93 db_path: PathBuf,
94 coenrolling_feature_ids: Vec<String>,
95 event_store: Arc<Mutex<EventStore>>,
96 recorded_context: Option<Arc<dyn RecordedContext>>,
97 pub(crate) gecko_prefs: Option<Arc<GeckoPrefStore>>,
98 metrics_handler: Arc<dyn MetricsHandler>,
99}
100
101impl NimbusClient {
102 #[allow(clippy::too_many_arguments)]
105 pub fn new<P: Into<PathBuf>>(
106 app_context: AppContext,
107 recorded_context: Option<Arc<dyn RecordedContext>>,
108 coenrolling_feature_ids: Vec<String>,
109 db_path: P,
110 metrics_handler: Arc<dyn MetricsHandler>,
111 gecko_pref_handler: Option<Box<dyn GeckoPrefHandler>>,
112 remote_settings_info: Option<NimbusServerSettings>,
113 ) -> Result<Self> {
114 let settings_client = Mutex::new(create_client(remote_settings_info)?);
115
116 let targeting_attributes: TargetingAttributes = app_context.clone().into();
117 let mutable_state = Mutex::new(InternalMutableState {
118 available_randomization_units: Default::default(),
119 targeting_attributes,
120 install_date: Default::default(),
121 update_date: Default::default(),
122 });
123
124 let mut prefs = None;
125 if let Some(handler) = gecko_pref_handler {
126 prefs = Some(Arc::new(GeckoPrefStore::new(Arc::new(handler))));
127 }
128
129 info!(
130 "Initialized NimbusClient with: app_context = {:?}; recorded_context = {:?}",
131 app_context,
132 recorded_context
133 .as_ref()
134 .map(|rc| serde_json::Value::Object(rc.to_json()))
135 .unwrap_or(serde_json::Value::Null)
136 );
137
138 Ok(Self {
139 settings_client,
140 mutable_state,
141 app_context,
142 database_cache: Default::default(),
143 db_path: db_path.into(),
144 coenrolling_feature_ids,
145 db: OnceCell::default(),
146 event_store: Arc::default(),
147 recorded_context,
148 gecko_prefs: prefs,
149 metrics_handler,
150 })
151 }
152
153 pub fn with_targeting_attributes(&mut self, targeting_attributes: TargetingAttributes) {
154 let mut state = self.mutable_state.lock().unwrap();
155 state.targeting_attributes = targeting_attributes;
156 }
157
158 pub fn get_targeting_attributes(&self) -> TargetingAttributes {
159 let mut state = self.mutable_state.lock().unwrap();
160 state.update_time_to_now(Utc::now());
161 state.targeting_attributes.clone()
162 }
163
164 pub fn initialize(&self) -> Result<()> {
165 let db = self.db()?;
166 let mut writer = db.write()?;
168
169 let mut state = self.mutable_state.lock().unwrap();
170 self.begin_initialize(db, &mut writer, &mut state)?;
171 self.end_initialize(db, writer, &mut state)?;
172
173 Ok(())
174 }
175
176 fn begin_initialize(
179 &self,
180 db: &Database,
181 writer: &mut Writer,
182 state: &mut MutexGuard<InternalMutableState>,
183 ) -> Result<()> {
184 self.read_or_create_nimbus_id(db, writer, state)?;
185 self.update_ta_install_dates(db, writer, state)?;
186 self.event_store
187 .lock()
188 .expect("unable to lock event_store mutex")
189 .read_from_db(db)?;
190
191 if let Some(recorded_context) = &self.recorded_context {
192 let targeting_helper = self.create_targeting_helper_with_context(match serde_json::to_value(
193 &state.targeting_attributes,
194 ) {
195 Ok(v) => v,
196 Err(e) => return Err(NimbusError::JSONError("targeting_helper = nimbus::stateful::nimbus_client::NimbusClient::begin_initialize::serde_json::to_value".into(), e.to_string()))
197 });
198 recorded_context.execute_queries(targeting_helper.as_ref())?;
199 state
200 .targeting_attributes
201 .set_recorded_context(recorded_context.to_json());
202 }
203
204 if let Some(gecko_prefs) = &self.gecko_prefs {
205 gecko_prefs.initialize()?;
206 }
207
208 Ok(())
209 }
210
211 fn end_initialize(
214 &self,
215 db: &Database,
216 writer: Writer,
217 state: &mut MutexGuard<InternalMutableState>,
218 ) -> Result<()> {
219 self.update_ta_active_experiments(db, &writer, state)?;
220 let coenrolling_ids = self
221 .coenrolling_feature_ids
222 .iter()
223 .map(|s| s.as_str())
224 .collect();
225 self.database_cache.commit_and_update(
226 db,
227 writer,
228 &coenrolling_ids,
229 self.gecko_prefs.clone(),
230 )?;
231 Ok(())
232 }
233
234 pub fn get_enrollment_by_feature(&self, feature_id: String) -> Result<Option<EnrolledFeature>> {
235 self.database_cache.get_enrollment_by_feature(&feature_id)
236 }
237
238 pub fn get_experiment_branch(&self, slug: String) -> Result<Option<String>> {
240 self.database_cache.get_experiment_branch(&slug)
241 }
242
243 pub fn get_feature_config_variables(&self, feature_id: String) -> Result<Option<String>> {
244 Ok(
245 if let Some(s) = self
246 .database_cache
247 .get_feature_config_variables(&feature_id)?
248 {
249 self.record_feature_activation_if_needed(&feature_id);
250 Some(s)
251 } else {
252 None
253 },
254 )
255 }
256
257 pub fn get_experiment_branches(&self, slug: String) -> Result<Vec<ExperimentBranch>> {
258 self.get_all_experiments()?
259 .into_iter()
260 .find(|e| e.slug == slug)
261 .map(|e| e.branches.into_iter().map(|b| b.into()).collect())
262 .ok_or(NimbusError::NoSuchExperiment(slug))
263 }
264
265 pub fn get_experiment_participation(&self) -> Result<bool> {
266 let db = self.db()?;
267 let reader = db.read()?;
268 get_experiment_participation(db, &reader)
269 }
270
271 pub fn get_rollout_participation(&self) -> Result<bool> {
272 let db = self.db()?;
273 let reader = db.read()?;
274 get_rollout_participation(db, &reader)
275 }
276
277 pub fn set_experiment_participation(
278 &self,
279 user_participating: bool,
280 ) -> Result<Vec<EnrollmentChangeEvent>> {
281 let db = self.db()?;
282 let mut writer = db.write()?;
283 let mut state = self.mutable_state.lock().unwrap();
284 set_experiment_participation(db, &mut writer, user_participating)?;
285
286 let existing_experiments: Vec<Experiment> =
287 db.get_store(StoreId::Experiments).collect_all(&writer)?;
288 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
289 let res = self.end_initialize(db, writer, &mut state);
290 self.record_enrollment_status_telemetry(&mut state)?;
291 res?;
292 Ok(events)
293 }
294
295 pub fn set_rollout_participation(
296 &self,
297 user_participating: bool,
298 ) -> Result<Vec<EnrollmentChangeEvent>> {
299 let db = self.db()?;
300 let mut writer = db.write()?;
301 let mut state = self.mutable_state.lock().unwrap();
302 set_rollout_participation(db, &mut writer, user_participating)?;
303
304 let existing_experiments: Vec<Experiment> =
305 db.get_store(StoreId::Experiments).collect_all(&writer)?;
306 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
307 let res = self.end_initialize(db, writer, &mut state);
308 self.record_enrollment_status_telemetry(&mut state)?;
309 res?;
310 Ok(events)
311 }
312
313 pub fn get_active_experiments(&self) -> Result<Vec<EnrolledExperiment>> {
314 self.database_cache.get_active_experiments()
315 }
316
317 pub fn get_all_experiments(&self) -> Result<Vec<Experiment>> {
318 let db = self.db()?;
319 let reader = db.read()?;
320 db.get_store(StoreId::Experiments)
321 .collect_all::<Experiment, _>(&reader)
322 }
323
324 pub fn get_available_experiments(&self) -> Result<Vec<AvailableExperiment>> {
325 let th = self.create_targeting_helper(None)?;
326 Ok(self
327 .get_all_experiments()?
328 .into_iter()
329 .filter(|exp| {
330 is_experiment_available(&th, exp, false) == ExperimentAvailable::Available
331 })
332 .map(|exp| exp.into())
333 .collect())
334 }
335
336 pub fn opt_in_with_branch(
337 &self,
338 experiment_slug: String,
339 branch: String,
340 ) -> Result<Vec<EnrollmentChangeEvent>> {
341 let db = self.db()?;
342 let mut writer = db.write()?;
343 let result = opt_in_with_branch(db, &mut writer, &experiment_slug, &branch)?;
344 let mut state = self.mutable_state.lock().unwrap();
345 self.end_initialize(db, writer, &mut state)?;
346 Ok(result)
347 }
348
349 pub fn opt_out(&self, experiment_slug: String) -> Result<Vec<EnrollmentChangeEvent>> {
350 let db = self.db()?;
351 let mut writer = db.write()?;
352 let result = opt_out(
353 db,
354 &mut writer,
355 &experiment_slug,
356 self.gecko_prefs.as_deref(),
357 )?;
358 let mut state = self.mutable_state.lock().unwrap();
359 self.end_initialize(db, writer, &mut state)?;
360 Ok(result)
361 }
362
363 pub fn fetch_experiments(&self) -> Result<()> {
364 if !self.is_fetch_enabled()? {
365 return Ok(());
366 }
367 info!("fetching experiments");
368 let settings_client = self.settings_client.lock().unwrap();
369 let new_experiments = settings_client.fetch_experiments()?;
370 let db = self.db()?;
371 let mut writer = db.write()?;
372 write_pending_experiments(db, &mut writer, new_experiments)?;
373 writer.commit()?;
374 Ok(())
375 }
376
377 pub fn set_fetch_enabled(&self, allow: bool) -> Result<()> {
378 let db = self.db()?;
379 let mut writer = db.write()?;
380 db.get_store(StoreId::Meta)
381 .put(&mut writer, DB_KEY_FETCH_ENABLED, &allow)?;
382 writer.commit()?;
383 Ok(())
384 }
385
386 pub(crate) fn is_fetch_enabled(&self) -> Result<bool> {
387 let db = self.db()?;
388 let reader = db.read()?;
389 let enabled = db
390 .get_store(StoreId::Meta)
391 .get(&reader, DB_KEY_FETCH_ENABLED)?
392 .unwrap_or(true);
393 Ok(enabled)
394 }
395
396 fn update_ta_install_dates(
400 &self,
401 db: &Database,
402 writer: &mut Writer,
403 state: &mut MutexGuard<InternalMutableState>,
404 ) -> Result<()> {
405 if state.install_date.is_none() {
410 let installation_date = self.get_installation_date(db, writer)?;
411 state.install_date = Some(installation_date);
412 }
413 if state.update_date.is_none() {
414 let update_date = self.get_update_date(db, writer)?;
415 state.update_date = Some(update_date);
416 }
417 state.update_time_to_now(Utc::now());
418
419 Ok(())
420 }
421
422 fn update_ta_active_experiments(
426 &self,
427 db: &Database,
428 writer: &Writer,
429 state: &mut MutexGuard<InternalMutableState>,
430 ) -> Result<()> {
431 let enrollments_store = db.get_store(StoreId::Enrollments);
432 let prev_enrollments: Vec<ExperimentEnrollment> = enrollments_store.collect_all(writer)?;
433
434 state
435 .targeting_attributes
436 .update_enrollments(&prev_enrollments);
437
438 Ok(())
439 }
440
441 fn evolve_experiments(
442 &self,
443 db: &Database,
444 writer: &mut Writer,
445 state: &mut InternalMutableState,
446 experiments: &[Experiment],
447 ) -> Result<Vec<EnrollmentChangeEvent>> {
448 let mut targeting_helper = NimbusTargetingHelper::with_targeting_attributes(
449 &state.targeting_attributes,
450 self.event_store.clone(),
451 self.gecko_prefs.clone(),
452 );
453 if let Some(ref recorded_context) = self.recorded_context {
454 recorded_context.record();
455 }
456 let coenrolling_feature_ids = self
457 .coenrolling_feature_ids
458 .iter()
459 .map(|s| s.as_str())
460 .collect();
461 let mut evolver = EnrollmentsEvolver::new(
462 &state.available_randomization_units,
463 &mut targeting_helper,
464 &coenrolling_feature_ids,
465 );
466 evolver.evolve_enrollments_in_db(db, writer, experiments, self.gecko_prefs.as_deref())
467 }
468
469 pub fn apply_pending_experiments(&self) -> Result<Vec<EnrollmentChangeEvent>> {
470 info!("updating experiment list");
471 let db = self.db()?;
472 let mut writer = db.write()?;
473
474 let pending_updates = read_and_remove_pending_experiments(db, &mut writer)?;
477 let mut state = self.mutable_state.lock().unwrap();
478 self.begin_initialize(db, &mut writer, &mut state)?;
479
480 let should_record_enrollment_status = pending_updates.is_some();
481 let res = match pending_updates {
482 Some(new_experiments) => {
483 self.update_ta_active_experiments(db, &writer, &mut state)?;
484 self.evolve_experiments(db, &mut writer, &mut state, &new_experiments)?
486 }
487 None => vec![],
488 };
489
490 let end_init_res = self.end_initialize(db, writer, &mut state);
492 if should_record_enrollment_status {
493 self.record_enrollment_status_telemetry(&mut state)?;
494 }
495 end_init_res?;
496 Ok(res)
497 }
498
499 #[allow(deprecated)] fn get_installation_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
501 if let Some(context_installation_date) = self.app_context.installation_date {
503 let res = DateTime::<Utc>::from_naive_utc_and_offset(
504 NaiveDateTime::from_timestamp_opt(context_installation_date / 1_000, 0).unwrap(),
505 Utc,
506 );
507 info!("[Nimbus] Retrieved date from Context: {}", res);
508 return Ok(res);
509 }
510 let store = db.get_store(StoreId::Meta);
511 let persisted_installation_date: Option<DateTime<Utc>> =
512 store.get(writer, DB_KEY_INSTALLATION_DATE)?;
513 Ok(
514 if let Some(installation_date) = persisted_installation_date {
515 installation_date
516 } else {
517 Utc::now()
518 },
519 )
520 }
521
522 fn get_update_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
523 let store = db.get_store(StoreId::Meta);
524
525 let persisted_app_version: Option<String> = store.get(writer, DB_KEY_APP_VERSION)?;
526 let update_date: Option<DateTime<Utc>> = store.get(writer, DB_KEY_UPDATE_DATE)?;
527 Ok(
528 match (
529 persisted_app_version,
530 &self.app_context.app_version,
531 update_date,
532 ) {
533 (Some(persisted), Some(current), Some(date)) if persisted == *current => date,
535 (Some(persisted), Some(current), _) if persisted != *current => {
537 let now = Utc::now();
538 store.put(writer, DB_KEY_APP_VERSION, current)?;
539 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
540 now
541 }
542 (None, Some(current), _) => {
544 let now = Utc::now();
545 store.put(writer, DB_KEY_APP_VERSION, current)?;
546 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
547 now
548 }
549 (_, _, Some(date)) => date,
551 _ => Utc::now(),
553 },
554 )
555 }
556
557 pub fn set_experiments_locally(&self, experiments_json: String) -> Result<()> {
558 let new_experiments = parse_experiments(&experiments_json)?;
559 let db = self.db()?;
560 let mut writer = db.write()?;
561 write_pending_experiments(db, &mut writer, new_experiments)?;
562 writer.commit()?;
563 Ok(())
564 }
565
566 pub fn reset_enrollments(&self) -> Result<()> {
570 let db = self.db()?;
571 let mut writer = db.write()?;
572 let mut state = self.mutable_state.lock().unwrap();
573 db.clear_experiments_and_enrollments(&mut writer)?;
574 self.end_initialize(db, writer, &mut state)?;
575 Ok(())
576 }
577
578 pub fn reset_telemetry_identifiers(&self) -> Result<Vec<EnrollmentChangeEvent>> {
587 let mut events = vec![];
588 let db = self.db()?;
589 let mut writer = db.write()?;
590 let mut state = self.mutable_state.lock().unwrap();
591 let store = db.get_store(StoreId::Meta);
594 if store.get::<String, _>(&writer, DB_KEY_NIMBUS_ID)?.is_some() {
595 events = reset_telemetry_identifiers(db, &mut writer)?;
597
598 db.clear_event_count_data(&mut writer)?;
600
601 store.delete(&mut writer, DB_KEY_NIMBUS_ID)?;
604 self.end_initialize(db, writer, &mut state)?;
605 }
606
607 state.available_randomization_units = Default::default();
609 state.targeting_attributes.nimbus_id = None;
610
611 Ok(events)
612 }
613
614 pub fn nimbus_id(&self) -> Result<Uuid> {
615 let db = self.db()?;
616 let mut writer = db.write()?;
617 let mut state = self.mutable_state.lock().unwrap();
618 let uuid = self.read_or_create_nimbus_id(db, &mut writer, &mut state)?;
619
620 writer.commit()?;
624 Ok(uuid)
625 }
626
627 fn read_or_create_nimbus_id(
632 &self,
633 db: &Database,
634 writer: &mut Writer,
635 state: &mut MutexGuard<'_, InternalMutableState>,
636 ) -> Result<Uuid> {
637 let store = db.get_store(StoreId::Meta);
638 let nimbus_id = match store.get(writer, DB_KEY_NIMBUS_ID)? {
639 Some(nimbus_id) => nimbus_id,
640 None => {
641 let nimbus_id = Uuid::new_v4();
642 store.put(writer, DB_KEY_NIMBUS_ID, &nimbus_id)?;
643 nimbus_id
644 }
645 };
646
647 state.available_randomization_units.nimbus_id = Some(nimbus_id.to_string());
648 state.targeting_attributes.nimbus_id = Some(nimbus_id.to_string());
649
650 Ok(nimbus_id)
651 }
652
653 pub fn set_nimbus_id(&self, uuid: &Uuid) -> Result<()> {
657 let db = self.db()?;
658 let mut writer = db.write()?;
659 db.get_store(StoreId::Meta)
660 .put(&mut writer, DB_KEY_NIMBUS_ID, uuid)?;
661 writer.commit()?;
662 Ok(())
663 }
664
665 pub(crate) fn db(&self) -> Result<&Database> {
666 self.db
667 .get_or_try_init(|| Database::new(&self.db_path, self.metrics_handler.clone()))
668 }
669
670 fn merge_additional_context(&self, context: Option<JsonObject>) -> Result<Value> {
671 let context = context.map(Value::Object);
672 let targeting = match serde_json::to_value(self.get_targeting_attributes()) {
673 Ok(v) => v,
674 Err(e) => return Err(NimbusError::JSONError("targeting = nimbus::stateful::nimbus_client::NimbusClient::merge_additional_context::serde_json::to_value".into(), e.to_string()))
675 };
676 let context = match context {
677 Some(v) => v.defaults(&targeting)?,
678 None => targeting,
679 };
680
681 Ok(context)
682 }
683
684 pub fn create_targeting_helper(
685 &self,
686 additional_context: Option<JsonObject>,
687 ) -> Result<Arc<NimbusTargetingHelper>> {
688 let context = self.merge_additional_context(additional_context)?;
689 let helper =
690 NimbusTargetingHelper::new(context, self.event_store.clone(), self.gecko_prefs.clone());
691 Ok(Arc::new(helper))
692 }
693
694 pub fn create_targeting_helper_with_context(
695 &self,
696 context: Value,
697 ) -> Arc<NimbusTargetingHelper> {
698 Arc::new(NimbusTargetingHelper::new(
699 context,
700 self.event_store.clone(),
701 self.gecko_prefs.clone(),
702 ))
703 }
704
705 pub fn create_string_helper(
706 &self,
707 additional_context: Option<JsonObject>,
708 ) -> Result<Arc<NimbusStringHelper>> {
709 let context = self.merge_additional_context(additional_context)?;
710 let helper = NimbusStringHelper::new(context.as_object().unwrap().to_owned());
711 Ok(Arc::new(helper))
712 }
713
714 pub fn record_event(&self, event_id: String, count: i64) -> Result<()> {
719 let mut event_store = self.event_store.lock().unwrap();
720 event_store.record_event(count as u64, &event_id, None)?;
721 event_store.persist_data(self.db()?)?;
722 Ok(())
723 }
724
725 pub fn record_past_event(&self, event_id: String, seconds_ago: i64, count: i64) -> Result<()> {
730 if seconds_ago < 0 {
731 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
732 "Time duration in the past must be positive".to_string(),
733 )));
734 }
735 let mut event_store = self.event_store.lock().unwrap();
736 event_store.record_past_event(
737 count as u64,
738 &event_id,
739 None,
740 chrono::Duration::seconds(seconds_ago),
741 )?;
742 event_store.persist_data(self.db()?)?;
743 Ok(())
744 }
745
746 pub fn advance_event_time(&self, by_seconds: i64) -> Result<()> {
750 if by_seconds < 0 {
751 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
752 "Time duration in the future must be positive".to_string(),
753 )));
754 }
755 let mut event_store = self.event_store.lock().unwrap();
756 event_store.advance_datum(chrono::Duration::seconds(by_seconds));
757 Ok(())
758 }
759
760 pub fn clear_events(&self) -> Result<()> {
764 let mut event_store = self.event_store.lock().unwrap();
765 event_store.clear(self.db()?)?;
766 Ok(())
767 }
768
769 pub fn event_store(&self) -> Arc<Mutex<EventStore>> {
770 self.event_store.clone()
771 }
772
773 pub fn dump_state_to_log(&self) -> Result<()> {
774 let experiments = self.get_active_experiments()?;
775 info!("{0: <65}| {1: <30}| {2}", "Slug", "Features", "Branch");
776 for exp in &experiments {
777 info!(
778 "{0: <65}| {1: <30}| {2}",
779 &exp.slug,
780 &exp.feature_ids.join(", "),
781 &exp.branch_slug
782 );
783 }
784 Ok(())
785 }
786
787 pub fn unenroll_for_gecko_pref(
789 &self,
790 pref_state: GeckoPrefState,
791 pref_unenroll_reason: PrefUnenrollReason,
792 ) -> Result<Vec<EnrollmentChangeEvent>> {
793 if let Some(prefs) = self.gecko_prefs.clone() {
794 {
795 let mut pref_store_state = prefs.get_mutable_pref_state();
796 pref_store_state.update_pref_state(&pref_state);
797 }
798 let enrollments = self
799 .database_cache
800 .get_enrollments_for_pref(&pref_state.gecko_pref.pref)?;
801
802 let db = self.db()?;
803 let mut writer = db.write()?;
804
805 let mut results = Vec::new();
806 if let Some(enrollments) = enrollments {
807 for experiment_slug in enrollments {
808 let result = unenroll_for_pref(
809 db,
810 &mut writer,
811 &experiment_slug,
812 pref_unenroll_reason,
813 &pref_state.gecko_pref.pref,
814 self.gecko_prefs.as_deref(),
815 )?;
816 results.push(result);
817 }
818 } else {
819 warn!(
820 "Could not find enrollment. Could unenrollment already occurred through another preference?"
821 )
822 }
823
824 let mut state = self.mutable_state.lock().unwrap();
825 self.end_initialize(db, writer, &mut state)?;
826 return Ok(results.concat());
827 }
828 Ok(Vec::new())
829 }
830
831 pub fn register_previous_gecko_pref_states(
832 &self,
833 gecko_pref_states: &[GeckoPrefState],
834 ) -> Result<()> {
835 let all_prev_gecko_pref_states =
836 super::gecko_prefs::build_prev_gecko_pref_states(gecko_pref_states);
837
838 let db = self.db()?;
839 let mut writer = db.write()?;
840
841 for (experiment_slug, prev_gecko_pref_states) in all_prev_gecko_pref_states {
842 Self::add_prev_gecko_pref_state_for_experiment(
843 db,
844 &mut writer,
845 &experiment_slug,
846 prev_gecko_pref_states,
847 )?;
848 }
849
850 writer.commit()?;
851 Ok(())
852 }
853
854 pub(crate) fn add_prev_gecko_pref_state_for_experiment(
855 db: &Database,
856 writer: &mut Writer,
857 experiment_slug: &str,
858 prev_gecko_pref_states: Vec<PreviousGeckoPrefState>,
859 ) -> Result<()> {
860 let enrollments = db.get_store(StoreId::Enrollments);
861
862 if let Ok(Some(existing_enrollment)) =
863 enrollments.get::<ExperimentEnrollment, Writer>(writer, experiment_slug)
864 {
865 let updated_states =
867 existing_enrollment.on_add_gecko_pref_states(prev_gecko_pref_states);
868 enrollments.put(writer, experiment_slug, &updated_states)?;
869 }
870 Ok(())
871 }
872
873 pub fn get_previous_gecko_pref_states(
874 &self,
875 experiment_slug: String,
876 ) -> Result<Option<Vec<PreviousGeckoPrefState>>> {
877 let db = self.db()?;
878 let reader = db.read()?;
879
880 Ok(db
881 .get_store(StoreId::Enrollments)
882 .get::<ExperimentEnrollment, _>(&reader, &experiment_slug)?
883 .and_then(|enrollment| {
884 if let EnrollmentStatus::Enrolled {
885 prev_gecko_pref_states: prev_gecko_pref_state,
886 ..
887 } = enrollment.status
888 {
889 prev_gecko_pref_state
890 } else {
891 None
892 }
893 }))
894 }
895
896 #[cfg(test)]
897 pub fn get_recorded_context(&self) -> &&TestRecordedContext {
898 self.recorded_context
899 .clone()
900 .map(|ref recorded_context|
901 unsafe {
906 std::mem::transmute::<&&dyn RecordedContext, &&TestRecordedContext>(
907 &&**recorded_context,
908 )
909 })
910 .expect("failed to unwrap RecordedContext object")
911 }
912
913 #[cfg(test)]
914 pub fn get_gecko_pref_store(&self) -> Arc<Box<TestGeckoPrefHandler>> {
915 self.gecko_prefs.clone()
916 .clone()
917 .map(|ref pref_store|
918 unsafe {
923 std::mem::transmute::<Arc<Box<dyn GeckoPrefHandler>>, Arc<Box<TestGeckoPrefHandler>>>(
924 pref_store.clone().handler.clone(),
925 )
926 })
927 .expect("failed to unwrap GeckoPrefHandler object")
928 }
929}
930
931impl NimbusClient {
932 pub fn set_install_time(&mut self, then: DateTime<Utc>) {
933 let mut state = self.mutable_state.lock().unwrap();
934 state.install_date = Some(then);
935 state.update_time_to_now(Utc::now());
936 }
937
938 pub fn set_update_time(&mut self, then: DateTime<Utc>) {
939 let mut state = self.mutable_state.lock().unwrap();
940 state.update_date = Some(then);
941 state.update_time_to_now(Utc::now());
942 }
943}
944
945impl NimbusClient {
946 fn record_feature_activation_if_needed(&self, feature_id: &str) {
949 if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(feature_id)
950 && f.branch.is_some()
951 && !self.coenrolling_feature_ids.contains(&f.feature_id)
952 {
953 self.metrics_handler.record_feature_activation(f.into());
954 }
955 }
956
957 pub fn record_feature_exposure(&self, feature_id: String, slug: Option<String>) {
958 let event = if let Some(slug) = slug {
959 if let Ok(Some(branch)) = self.database_cache.get_experiment_branch(&slug) {
960 Some(FeatureExposureExtraDef {
961 feature_id,
962 branch: Some(branch),
963 slug,
964 })
965 } else {
966 None
967 }
968 } else if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id) {
969 if f.branch.is_some() {
970 Some(f.into())
971 } else {
972 None
973 }
974 } else {
975 None
976 };
977
978 if let Some(event) = event {
979 self.metrics_handler.record_feature_exposure(event);
980 }
981 }
982
983 pub fn record_malformed_feature_config(&self, feature_id: String, part_id: String) {
984 let event = if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id)
985 {
986 MalformedFeatureConfigExtraDef::from_feature_and_part(f, part_id)
987 } else {
988 MalformedFeatureConfigExtraDef::new(feature_id, part_id)
989 };
990 self.metrics_handler.record_malformed_feature_config(event);
991 }
992
993 fn record_enrollment_status_telemetry(
994 &self,
995 state: &mut MutexGuard<InternalMutableState>,
996 ) -> Result<()> {
997 let targeting_helper = NimbusTargetingHelper::new(
998 state.targeting_attributes.clone(),
999 self.event_store.clone(),
1000 self.gecko_prefs.clone(),
1001 );
1002 let experiments = self.database_cache.get_experiments()?;
1003 let experiments = experiments
1004 .iter()
1005 .filter(|exp| {
1006 is_experiment_available(&targeting_helper, exp, true)
1007 == ExperimentAvailable::Available
1008 })
1009 .map(|exp| &*exp.slug)
1010 .collect::<HashSet<&str>>();
1011 self.metrics_handler.record_enrollment_statuses(
1012 self.database_cache
1013 .get_enrollments()?
1014 .into_iter()
1015 .filter_map(|e| match experiments.contains(&*e.slug) {
1016 true => Some(e.into()),
1017 false => None,
1018 })
1019 .collect(),
1020 );
1021 self.metrics_handler.submit_targeting_context();
1022 Ok(())
1023 }
1024}
1025
1026pub struct NimbusStringHelper {
1027 context: JsonObject,
1028}
1029
1030impl NimbusStringHelper {
1031 fn new(context: JsonObject) -> Self {
1032 Self { context }
1033 }
1034
1035 pub fn get_uuid(&self, template: String) -> Option<String> {
1036 if template.contains("{uuid}") {
1037 let uuid = Uuid::new_v4();
1038 Some(uuid.to_string())
1039 } else {
1040 None
1041 }
1042 }
1043
1044 pub fn string_format(&self, template: String, uuid: Option<String>) -> String {
1045 match uuid {
1046 Some(uuid) => {
1047 let mut map = self.context.clone();
1048 map.insert("uuid".to_string(), Value::String(uuid));
1049 fmt_with_map(&template, &map)
1050 }
1051 _ => fmt_with_map(&template, &self.context),
1052 }
1053 }
1054}
1055
1056#[cfg(feature = "stateful-uniffi-bindings")]
1057uniffi::custom_type!(JsonObject, String, {
1058 remote,
1059 try_lift: |val| {
1060 let json: Value = serde_json::from_str(&val)?;
1061
1062 match json.as_object() {
1063 Some(obj) => Ok(obj.clone()),
1064 _ => Err(uniffi::deps::anyhow::anyhow!(
1065 "Unexpected JSON-non-object in the bagging area"
1066 )),
1067 }
1068 },
1069 lower: |obj| serde_json::Value::Object(obj).to_string(),
1070});
1071
1072#[cfg(feature = "stateful-uniffi-bindings")]
1073uniffi::custom_type!(PrefValue, String, {
1074 remote,
1075 try_lift: |val| {
1076 let json: Value = match serde_json::from_str(&val) {
1079 Ok(json) => json,
1080 Err(_) => Value::String(val),
1081 };
1082 let is_valid_pref_type = json.is_string() || json.is_boolean()
1083 || (json.is_number() && !json.is_f64()) || json.is_null();
1084 if is_valid_pref_type {
1085 Ok(json)
1086 } else {
1087 Err(anyhow::anyhow!(format!("Value {} is not a string, boolean, number, or null, or is a float", json)))
1088 }
1089 },
1090 lower: |val| {
1091 val.to_string()
1092 }
1093});
1094
1095#[cfg(feature = "stateful-uniffi-bindings")]
1096uniffi::include_scaffolding!("nimbus");