1use std::collections::HashSet;
6use std::fmt::Debug;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use chrono::{DateTime, NaiveDateTime, Utc};
11use once_cell::sync::OnceCell;
12use remote_settings::RemoteSettingsService;
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::defaults::Defaults;
17use crate::enrollment::{
18 EnrolledFeature, EnrollmentChangeEvent, EnrollmentChangeEventType, EnrollmentsEvolver,
19 ExperimentEnrollment, PreviousGeckoPrefState,
20};
21use crate::error::{BehaviorError, info, warn};
22use crate::evaluator::{
23 CalculatedAttributes, ExperimentAvailable, TargetingAttributes, get_calculated_attributes,
24 is_experiment_available,
25};
26use crate::json::{JsonObject, PrefValue};
27use crate::metrics::{
28 DatabaseLoadExtraDef, DatabaseMigrationExtraDef, EnrollmentStatusExtraDef,
29 FeatureExposureExtraDef, MalformedFeatureConfigExtraDef, MetricsHandler,
30};
31use crate::schema::parse_experiments;
32use crate::stateful::behavior::EventStore;
33use crate::stateful::client::{NimbusServerSettings, SettingsClient, create_client};
34use crate::stateful::dbcache::DatabaseCache;
35use crate::stateful::enrollment::{
36 get_experiment_participation, get_rollout_participation, opt_in_with_branch, opt_out,
37 reset_telemetry_identifiers, set_experiment_participation, set_rollout_participation,
38 unenroll_for_pref,
39};
40use crate::stateful::gecko_prefs::{
41 GeckoPref, GeckoPrefHandler, GeckoPrefState, GeckoPrefStore, OriginalGeckoPref, PrefBranch,
42 PrefEnrollmentData, PrefUnenrollReason,
43};
44use crate::stateful::matcher::AppContext;
45use crate::stateful::persistence::{Database, StoreId, Writer};
46use crate::stateful::targeting::{RecordedContext, validate_event_queries};
47use crate::stateful::updating::{read_and_remove_pending_experiments, write_pending_experiments};
48use crate::strings::fmt_with_map;
49#[cfg(test)]
50use crate::tests::helpers::{TestGeckoPrefHandler, TestRecordedContext};
51use crate::{
52 AvailableExperiment, AvailableRandomizationUnits, EnrolledExperiment, EnrollmentStatus,
53};
54use crate::{Experiment, ExperimentBranch, NimbusError, NimbusTargetingHelper, Result};
55
56const DB_KEY_NIMBUS_ID: &str = "nimbus-id";
57pub const DB_KEY_INSTALLATION_DATE: &str = "installation-date";
58pub const DB_KEY_UPDATE_DATE: &str = "update-date";
59pub const DB_KEY_APP_VERSION: &str = "app-version";
60pub const DB_KEY_FETCH_ENABLED: &str = "fetch-enabled";
61
62#[derive(Default)]
67pub struct InternalMutableState {
68 pub(crate) available_randomization_units: AvailableRandomizationUnits,
69 pub(crate) install_date: Option<DateTime<Utc>>,
70 pub(crate) update_date: Option<DateTime<Utc>>,
71 pub(crate) targeting_attributes: TargetingAttributes,
73}
74
75impl InternalMutableState {
76 pub(crate) fn update_time_to_now(&mut self, now: DateTime<Utc>) {
77 self.targeting_attributes
78 .update_time_to_now(now, &self.install_date, &self.update_date);
79 }
80}
81
82pub struct NimbusClient {
86 settings_client: Mutex<Box<dyn SettingsClient + Send>>,
87 pub(crate) mutable_state: Mutex<InternalMutableState>,
88 app_context: AppContext,
89 pub(crate) db: OnceCell<Database>,
90 database_cache: DatabaseCache,
93 db_path: PathBuf,
94 coenrolling_feature_ids: Vec<String>,
95 event_store: Arc<Mutex<EventStore>>,
96 recorded_context: Option<Arc<dyn RecordedContext>>,
97 pub(crate) gecko_prefs: Option<Arc<GeckoPrefStore>>,
98 metrics_handler: Arc<dyn MetricsHandler>,
99}
100
101impl NimbusClient {
102 #[allow(clippy::too_many_arguments)]
105 pub fn new<P: Into<PathBuf>>(
106 app_context: AppContext,
107 recorded_context: Option<Arc<dyn RecordedContext>>,
108 coenrolling_feature_ids: Vec<String>,
109 db_path: P,
110 metrics_handler: Arc<dyn MetricsHandler>,
111 gecko_pref_handler: Option<Box<dyn GeckoPrefHandler>>,
112 remote_settings_info: Option<NimbusServerSettings>,
113 ) -> Result<Self> {
114 let settings_client = Mutex::new(create_client(remote_settings_info)?);
115
116 let targeting_attributes: TargetingAttributes = app_context.clone().into();
117 let mutable_state = Mutex::new(InternalMutableState {
118 available_randomization_units: Default::default(),
119 targeting_attributes,
120 install_date: Default::default(),
121 update_date: Default::default(),
122 });
123
124 let mut prefs = None;
125 if let Some(handler) = gecko_pref_handler {
126 prefs = Some(Arc::new(GeckoPrefStore::new(Arc::new(handler))));
127 }
128
129 info!(
130 "Initialized NimbusClient with: app_context = {:?}; recorded_context = {:?}",
131 app_context,
132 recorded_context
133 .as_ref()
134 .map(|rc| serde_json::Value::Object(rc.to_json()))
135 .unwrap_or(serde_json::Value::Null)
136 );
137
138 Ok(Self {
139 settings_client,
140 mutable_state,
141 app_context,
142 database_cache: Default::default(),
143 db_path: db_path.into(),
144 coenrolling_feature_ids,
145 db: OnceCell::default(),
146 event_store: Arc::default(),
147 recorded_context,
148 gecko_prefs: prefs,
149 metrics_handler,
150 })
151 }
152
153 pub fn with_targeting_attributes(&mut self, targeting_attributes: TargetingAttributes) {
154 let mut state = self.mutable_state.lock().unwrap();
155 state.targeting_attributes = targeting_attributes;
156 }
157
158 pub fn get_targeting_attributes(&self) -> TargetingAttributes {
159 let mut state = self.mutable_state.lock().unwrap();
160 state.update_time_to_now(Utc::now());
161 state.targeting_attributes.clone()
162 }
163
164 pub fn initialize(&self) -> Result<()> {
165 let db = self.db()?;
166 let mut writer = db.write()?;
168
169 let mut state = self.mutable_state.lock().unwrap();
170 self.begin_initialize(db, &mut writer, &mut state)?;
171 self.end_initialize(db, writer, &mut state)?;
172
173 Ok(())
174 }
175
176 fn begin_initialize(
179 &self,
180 db: &Database,
181 writer: &mut Writer,
182 state: &mut MutexGuard<InternalMutableState>,
183 ) -> Result<()> {
184 self.read_or_create_nimbus_id(db, writer, state)?;
185 self.update_ta_install_dates(db, writer, state)?;
186 self.event_store
187 .lock()
188 .expect("unable to lock event_store mutex")
189 .read_from_db(db)?;
190
191 if let Some(recorded_context) = &self.recorded_context {
192 let targeting_helper = self.create_targeting_helper_with_context(match serde_json::to_value(
193 &state.targeting_attributes,
194 ) {
195 Ok(v) => v,
196 Err(e) => return Err(NimbusError::JSONError("targeting_helper = nimbus::stateful::nimbus_client::NimbusClient::begin_initialize::serde_json::to_value".into(), e.to_string()))
197 });
198 recorded_context.execute_queries(targeting_helper.as_ref())?;
199 state
200 .targeting_attributes
201 .set_recorded_context(recorded_context.to_json());
202 }
203
204 if let Some(gecko_prefs) = &self.gecko_prefs {
205 gecko_prefs.initialize()?;
206 }
207
208 Ok(())
209 }
210
211 fn end_initialize(
214 &self,
215 db: &Database,
216 writer: Writer,
217 state: &mut MutexGuard<InternalMutableState>,
218 ) -> Result<()> {
219 self.update_ta_active_experiments(db, &writer, state)?;
220 let coenrolling_ids = self
221 .coenrolling_feature_ids
222 .iter()
223 .map(|s| s.as_str())
224 .collect();
225 self.database_cache.commit_and_update(
226 db,
227 writer,
228 &coenrolling_ids,
229 self.gecko_prefs.clone(),
230 )?;
231 Ok(())
232 }
233
234 pub fn get_enrollment_by_feature(&self, feature_id: String) -> Result<Option<EnrolledFeature>> {
235 self.database_cache.get_enrollment_by_feature(&feature_id)
236 }
237
238 pub fn get_experiment_branch(&self, slug: String) -> Result<Option<String>> {
240 self.database_cache.get_experiment_branch(&slug)
241 }
242
243 pub fn get_feature_config_variables(&self, feature_id: String) -> Result<Option<String>> {
244 Ok(
245 if let Some(s) = self
246 .database_cache
247 .get_feature_config_variables(&feature_id)?
248 {
249 self.record_feature_activation_if_needed(&feature_id);
250 Some(s)
251 } else {
252 None
253 },
254 )
255 }
256
257 pub fn get_experiment_branches(&self, slug: String) -> Result<Vec<ExperimentBranch>> {
258 self.get_all_experiments()?
259 .into_iter()
260 .find(|e| e.slug == slug)
261 .map(|e| e.branches.into_iter().map(|b| b.into()).collect())
262 .ok_or(NimbusError::NoSuchExperiment(slug))
263 }
264
265 pub fn get_experiment_participation(&self) -> Result<bool> {
266 let db = self.db()?;
267 let reader = db.read()?;
268 get_experiment_participation(db, &reader)
269 }
270
271 pub fn get_rollout_participation(&self) -> Result<bool> {
272 let db = self.db()?;
273 let reader = db.read()?;
274 get_rollout_participation(db, &reader)
275 }
276
277 pub fn set_experiment_participation(
278 &self,
279 user_participating: bool,
280 ) -> Result<Vec<EnrollmentChangeEvent>> {
281 let db = self.db()?;
282 let mut writer = db.write()?;
283 let mut state = self.mutable_state.lock().unwrap();
284 set_experiment_participation(db, &mut writer, user_participating)?;
285
286 let existing_experiments: Vec<Experiment> =
287 db.get_store(StoreId::Experiments).collect_all(&writer)?;
288 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
289 let res = self.end_initialize(db, writer, &mut state);
290 self.record_enrollment_status_telemetry(&mut state)?;
291 res?;
292 Ok(events)
293 }
294
295 pub fn set_rollout_participation(
296 &self,
297 user_participating: bool,
298 ) -> Result<Vec<EnrollmentChangeEvent>> {
299 let db = self.db()?;
300 let mut writer = db.write()?;
301 let mut state = self.mutable_state.lock().unwrap();
302 set_rollout_participation(db, &mut writer, user_participating)?;
303
304 let existing_experiments: Vec<Experiment> =
305 db.get_store(StoreId::Experiments).collect_all(&writer)?;
306 let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
307 let res = self.end_initialize(db, writer, &mut state);
308 self.record_enrollment_status_telemetry(&mut state)?;
309 res?;
310 Ok(events)
311 }
312
313 pub fn get_active_experiments(&self) -> Result<Vec<EnrolledExperiment>> {
314 self.database_cache.get_active_experiments()
315 }
316
317 pub fn get_all_experiments(&self) -> Result<Vec<Experiment>> {
318 let db = self.db()?;
319 let reader = db.read()?;
320 db.get_store(StoreId::Experiments)
321 .collect_all::<Experiment, _>(&reader)
322 }
323
324 pub fn get_available_experiments(&self) -> Result<Vec<AvailableExperiment>> {
325 let th = self.create_targeting_helper(None)?;
326 Ok(self
327 .get_all_experiments()?
328 .into_iter()
329 .filter(|exp| {
330 is_experiment_available(&th, exp, false) == ExperimentAvailable::Available
331 })
332 .map(|exp| exp.into())
333 .collect())
334 }
335
336 pub fn opt_in_with_branch(
337 &self,
338 experiment_slug: String,
339 branch: String,
340 ) -> Result<Vec<EnrollmentChangeEvent>> {
341 let db = self.db()?;
342 let mut writer = db.write()?;
343 let result = opt_in_with_branch(db, &mut writer, &experiment_slug, &branch)?;
344 let mut state = self.mutable_state.lock().unwrap();
345 self.end_initialize(db, writer, &mut state)?;
346 Ok(result)
347 }
348
349 pub fn opt_out(&self, experiment_slug: String) -> Result<Vec<EnrollmentChangeEvent>> {
350 let db = self.db()?;
351 let mut writer = db.write()?;
352 let result = opt_out(
353 db,
354 &mut writer,
355 &experiment_slug,
356 self.gecko_prefs.as_deref(),
357 )?;
358 let mut state = self.mutable_state.lock().unwrap();
359 self.end_initialize(db, writer, &mut state)?;
360 Ok(result)
361 }
362
363 pub fn fetch_experiments(&self) -> Result<()> {
364 if !self.is_fetch_enabled()? {
365 return Ok(());
366 }
367 info!("fetching experiments");
368 let settings_client = self.settings_client.lock().unwrap();
369 let new_experiments = settings_client.fetch_experiments()?;
370 let db = self.db()?;
371 let mut writer = db.write()?;
372 write_pending_experiments(db, &mut writer, new_experiments)?;
373 writer.commit()?;
374 Ok(())
375 }
376
377 pub fn set_fetch_enabled(&self, allow: bool) -> Result<()> {
378 let db = self.db()?;
379 let mut writer = db.write()?;
380 db.get_store(StoreId::Meta)
381 .put(&mut writer, DB_KEY_FETCH_ENABLED, &allow)?;
382 writer.commit()?;
383 Ok(())
384 }
385
386 pub(crate) fn is_fetch_enabled(&self) -> Result<bool> {
387 let db = self.db()?;
388 let reader = db.read()?;
389 let enabled = db
390 .get_store(StoreId::Meta)
391 .get(&reader, DB_KEY_FETCH_ENABLED)?
392 .unwrap_or(true);
393 Ok(enabled)
394 }
395
396 fn update_ta_install_dates(
400 &self,
401 db: &Database,
402 writer: &mut Writer,
403 state: &mut MutexGuard<InternalMutableState>,
404 ) -> Result<()> {
405 if state.install_date.is_none() {
410 let installation_date = self.get_installation_date(db, writer)?;
411 state.install_date = Some(installation_date);
412 }
413 if state.update_date.is_none() {
414 let update_date = self.get_update_date(db, writer)?;
415 state.update_date = Some(update_date);
416 }
417 state.update_time_to_now(Utc::now());
418
419 Ok(())
420 }
421
422 fn update_ta_active_experiments(
426 &self,
427 db: &Database,
428 writer: &Writer,
429 state: &mut MutexGuard<InternalMutableState>,
430 ) -> Result<()> {
431 let enrollments_store = db.get_store(StoreId::Enrollments);
432 let prev_enrollments: Vec<ExperimentEnrollment> = enrollments_store.collect_all(writer)?;
433
434 state
435 .targeting_attributes
436 .update_enrollments(&prev_enrollments);
437
438 Ok(())
439 }
440
441 fn evolve_experiments(
442 &self,
443 db: &Database,
444 writer: &mut Writer,
445 state: &mut InternalMutableState,
446 experiments: &[Experiment],
447 ) -> Result<Vec<EnrollmentChangeEvent>> {
448 let mut targeting_helper = NimbusTargetingHelper::with_targeting_attributes(
449 &state.targeting_attributes,
450 self.event_store.clone(),
451 self.gecko_prefs.clone(),
452 );
453 if let Some(ref recorded_context) = self.recorded_context {
454 recorded_context.record();
455 }
456 let coenrolling_feature_ids = self
457 .coenrolling_feature_ids
458 .iter()
459 .map(|s| s.as_str())
460 .collect();
461 let mut evolver = EnrollmentsEvolver::new(
462 &state.available_randomization_units,
463 &mut targeting_helper,
464 &coenrolling_feature_ids,
465 );
466 evolver.evolve_enrollments_in_db(db, writer, experiments, self.gecko_prefs.as_deref())
467 }
468
469 pub fn apply_pending_experiments(&self) -> Result<Vec<EnrollmentChangeEvent>> {
470 info!("updating experiment list");
471 let db = self.db()?;
472 let mut writer = db.write()?;
473
474 let pending_updates = read_and_remove_pending_experiments(db, &mut writer)?;
477 let mut state = self.mutable_state.lock().unwrap();
478 self.begin_initialize(db, &mut writer, &mut state)?;
479
480 let should_record_enrollment_status = pending_updates.is_some();
481 let res = match pending_updates {
482 Some(new_experiments) => {
483 self.update_ta_active_experiments(db, &writer, &mut state)?;
484 self.evolve_experiments(db, &mut writer, &mut state, &new_experiments)?
486 }
487 None => vec![],
488 };
489
490 let end_init_res = self.end_initialize(db, writer, &mut state);
492 if should_record_enrollment_status {
493 self.record_enrollment_status_telemetry(&mut state)?;
494 }
495 end_init_res?;
496 Ok(res)
497 }
498
499 #[allow(deprecated)] fn get_installation_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
501 if let Some(context_installation_date) = self.app_context.installation_date {
503 let res = DateTime::<Utc>::from_naive_utc_and_offset(
504 NaiveDateTime::from_timestamp_opt(context_installation_date / 1_000, 0).unwrap(),
505 Utc,
506 );
507 info!("[Nimbus] Retrieved date from Context: {}", res);
508 return Ok(res);
509 }
510 let store = db.get_store(StoreId::Meta);
511 let persisted_installation_date: Option<DateTime<Utc>> =
512 store.get(writer, DB_KEY_INSTALLATION_DATE)?;
513 Ok(
514 if let Some(installation_date) = persisted_installation_date {
515 installation_date
516 } else {
517 Utc::now()
518 },
519 )
520 }
521
522 fn get_update_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
523 let store = db.get_store(StoreId::Meta);
524
525 let persisted_app_version: Option<String> = store.get(writer, DB_KEY_APP_VERSION)?;
526 let update_date: Option<DateTime<Utc>> = store.get(writer, DB_KEY_UPDATE_DATE)?;
527 Ok(
528 match (
529 persisted_app_version,
530 &self.app_context.app_version,
531 update_date,
532 ) {
533 (Some(persisted), Some(current), Some(date)) if persisted == *current => date,
535 (Some(persisted), Some(current), _) if persisted != *current => {
537 let now = Utc::now();
538 store.put(writer, DB_KEY_APP_VERSION, current)?;
539 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
540 now
541 }
542 (None, Some(current), _) => {
544 let now = Utc::now();
545 store.put(writer, DB_KEY_APP_VERSION, current)?;
546 store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
547 now
548 }
549 (_, _, Some(date)) => date,
551 _ => Utc::now(),
553 },
554 )
555 }
556
557 pub fn set_experiments_locally(&self, experiments_json: String) -> Result<()> {
558 let new_experiments = parse_experiments(&experiments_json)?;
559 let db = self.db()?;
560 let mut writer = db.write()?;
561 write_pending_experiments(db, &mut writer, new_experiments)?;
562 writer.commit()?;
563 Ok(())
564 }
565
566 pub fn reset_enrollments(&self) -> Result<()> {
570 let db = self.db()?;
571 let mut writer = db.write()?;
572 let mut state = self.mutable_state.lock().unwrap();
573 db.clear_experiments_and_enrollments(&mut writer)?;
574 self.end_initialize(db, writer, &mut state)?;
575 Ok(())
576 }
577
578 pub fn reset_telemetry_identifiers(&self) -> Result<Vec<EnrollmentChangeEvent>> {
587 let mut events = vec![];
588 let db = self.db()?;
589 let mut writer = db.write()?;
590 let mut state = self.mutable_state.lock().unwrap();
591 let store = db.get_store(StoreId::Meta);
594 if store.get::<String, _>(&writer, DB_KEY_NIMBUS_ID)?.is_some() {
595 events = reset_telemetry_identifiers(db, &mut writer)?;
597
598 db.clear_event_count_data(&mut writer)?;
600
601 store.delete(&mut writer, DB_KEY_NIMBUS_ID)?;
604 self.end_initialize(db, writer, &mut state)?;
605 }
606
607 state.available_randomization_units = Default::default();
609 state.targeting_attributes.nimbus_id = None;
610
611 Ok(events)
612 }
613
614 pub fn nimbus_id(&self) -> Result<Uuid> {
615 let db = self.db()?;
616 let mut writer = db.write()?;
617 let mut state = self.mutable_state.lock().unwrap();
618 let uuid = self.read_or_create_nimbus_id(db, &mut writer, &mut state)?;
619
620 writer.commit()?;
624 Ok(uuid)
625 }
626
627 fn read_or_create_nimbus_id(
632 &self,
633 db: &Database,
634 writer: &mut Writer,
635 state: &mut MutexGuard<'_, InternalMutableState>,
636 ) -> Result<Uuid> {
637 let store = db.get_store(StoreId::Meta);
638 let nimbus_id = match store.get(writer, DB_KEY_NIMBUS_ID)? {
639 Some(nimbus_id) => nimbus_id,
640 None => {
641 let nimbus_id = Uuid::new_v4();
642 store.put(writer, DB_KEY_NIMBUS_ID, &nimbus_id)?;
643 nimbus_id
644 }
645 };
646
647 state.available_randomization_units.nimbus_id = Some(nimbus_id.to_string());
648 state.targeting_attributes.nimbus_id = Some(nimbus_id.to_string());
649
650 Ok(nimbus_id)
651 }
652
653 pub fn set_nimbus_id(&self, uuid: &Uuid) -> Result<()> {
657 let db = self.db()?;
658 let mut writer = db.write()?;
659 db.get_store(StoreId::Meta)
660 .put(&mut writer, DB_KEY_NIMBUS_ID, uuid)?;
661 writer.commit()?;
662 Ok(())
663 }
664
665 pub(crate) fn db(&self) -> Result<&Database> {
666 self.db
667 .get_or_try_init(|| Database::new(&self.db_path, self.metrics_handler.clone()))
668 }
669
670 fn merge_additional_context(&self, context: Option<JsonObject>) -> Result<Value> {
671 let context = context.map(Value::Object);
672 let targeting = match serde_json::to_value(self.get_targeting_attributes()) {
673 Ok(v) => v,
674 Err(e) => return Err(NimbusError::JSONError("targeting = nimbus::stateful::nimbus_client::NimbusClient::merge_additional_context::serde_json::to_value".into(), e.to_string()))
675 };
676 let context = match context {
677 Some(v) => v.defaults(&targeting)?,
678 None => targeting,
679 };
680
681 Ok(context)
682 }
683
684 pub fn create_targeting_helper(
685 &self,
686 additional_context: Option<JsonObject>,
687 ) -> Result<Arc<NimbusTargetingHelper>> {
688 let context = self.merge_additional_context(additional_context)?;
689 let helper =
690 NimbusTargetingHelper::new(context, self.event_store.clone(), self.gecko_prefs.clone());
691 Ok(Arc::new(helper))
692 }
693
694 pub fn create_targeting_helper_with_context(
695 &self,
696 context: Value,
697 ) -> Arc<NimbusTargetingHelper> {
698 Arc::new(NimbusTargetingHelper::new(
699 context,
700 self.event_store.clone(),
701 self.gecko_prefs.clone(),
702 ))
703 }
704
705 pub fn create_string_helper(
706 &self,
707 additional_context: Option<JsonObject>,
708 ) -> Result<Arc<NimbusStringHelper>> {
709 let context = self.merge_additional_context(additional_context)?;
710 let helper = NimbusStringHelper::new(context.as_object().unwrap().to_owned());
711 Ok(Arc::new(helper))
712 }
713
714 pub fn record_event(&self, event_id: String, count: i64) -> Result<()> {
719 let mut event_store = self.event_store.lock().unwrap();
720 event_store.record_event(count as u64, &event_id, None)?;
721 event_store.persist_data(self.db()?)?;
722 Ok(())
723 }
724
725 pub fn record_past_event(&self, event_id: String, seconds_ago: i64, count: i64) -> Result<()> {
730 if seconds_ago < 0 {
731 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
732 "Time duration in the past must be positive".to_string(),
733 )));
734 }
735 let mut event_store = self.event_store.lock().unwrap();
736 event_store.record_past_event(
737 count as u64,
738 &event_id,
739 None,
740 chrono::Duration::seconds(seconds_ago),
741 )?;
742 event_store.persist_data(self.db()?)?;
743 Ok(())
744 }
745
746 pub fn advance_event_time(&self, by_seconds: i64) -> Result<()> {
750 if by_seconds < 0 {
751 return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
752 "Time duration in the future must be positive".to_string(),
753 )));
754 }
755 let mut event_store = self.event_store.lock().unwrap();
756 event_store.advance_datum(chrono::Duration::seconds(by_seconds));
757 Ok(())
758 }
759
760 pub fn clear_events(&self) -> Result<()> {
764 let mut event_store = self.event_store.lock().unwrap();
765 event_store.clear(self.db()?)?;
766 Ok(())
767 }
768
769 pub fn event_store(&self) -> Arc<Mutex<EventStore>> {
770 self.event_store.clone()
771 }
772
773 pub fn dump_state_to_log(&self) -> Result<()> {
774 let experiments = self.get_active_experiments()?;
775 info!("{0: <65}| {1: <30}| {2}", "Slug", "Features", "Branch");
776 for exp in &experiments {
777 info!(
778 "{0: <65}| {1: <30}| {2}",
779 &exp.slug,
780 &exp.feature_ids.join(", "),
781 &exp.branch_slug
782 );
783 }
784 Ok(())
785 }
786
787 pub fn unenroll_for_gecko_pref(
789 &self,
790 pref_state: GeckoPrefState,
791 pref_unenroll_reason: PrefUnenrollReason,
792 ) -> Result<Vec<EnrollmentChangeEvent>> {
793 let mut events = Vec::new();
794 if let Some(prefs) = self.gecko_prefs.clone() {
795 {
796 let mut pref_store_state = prefs.get_mutable_pref_state();
797 pref_store_state.update_pref_state(&pref_state);
798 }
799 let enrollments = self
800 .database_cache
801 .get_enrollments_for_pref(&pref_state.gecko_pref.pref)?;
802
803 let db = self.db()?;
804 let mut writer = db.write()?;
805
806 if let Some(enrollments) = enrollments {
807 for experiment_slug in enrollments {
808 unenroll_for_pref(
809 db,
810 &mut writer,
811 &experiment_slug,
812 pref_unenroll_reason,
813 &pref_state.gecko_pref.pref,
814 self.gecko_prefs.as_deref(),
815 &mut events,
816 )?;
817 }
818 } else {
819 warn!(
820 "Could not find enrollment. Could unenrollment already occurred through another preference?"
821 )
822 }
823
824 let mut state = self.mutable_state.lock().unwrap();
825 self.end_initialize(db, writer, &mut state)?;
826 }
827 Ok(events)
828 }
829
830 pub fn register_previous_gecko_pref_states(
831 &self,
832 gecko_pref_states: &[GeckoPrefState],
833 ) -> Result<()> {
834 let all_prev_gecko_pref_states =
835 super::gecko_prefs::build_prev_gecko_pref_states(gecko_pref_states);
836
837 let db = self.db()?;
838 let mut writer = db.write()?;
839
840 for (experiment_slug, prev_gecko_pref_states) in all_prev_gecko_pref_states {
841 Self::add_prev_gecko_pref_state_for_experiment(
842 db,
843 &mut writer,
844 &experiment_slug,
845 prev_gecko_pref_states,
846 )?;
847 }
848
849 writer.commit()?;
850 Ok(())
851 }
852
853 pub(crate) fn add_prev_gecko_pref_state_for_experiment(
854 db: &Database,
855 writer: &mut Writer,
856 experiment_slug: &str,
857 prev_gecko_pref_states: Vec<PreviousGeckoPrefState>,
858 ) -> Result<()> {
859 let enrollments = db.get_store(StoreId::Enrollments);
860
861 if let Ok(Some(existing_enrollment)) =
862 enrollments.get::<ExperimentEnrollment, Writer>(writer, experiment_slug)
863 {
864 let updated_states =
866 existing_enrollment.on_add_gecko_pref_states(prev_gecko_pref_states);
867 enrollments.put(writer, experiment_slug, &updated_states)?;
868 }
869 Ok(())
870 }
871
872 pub fn get_previous_gecko_pref_states(
873 &self,
874 experiment_slug: String,
875 ) -> Result<Option<Vec<PreviousGeckoPrefState>>> {
876 let db = self.db()?;
877 let reader = db.read()?;
878
879 Ok(db
880 .get_store(StoreId::Enrollments)
881 .get::<ExperimentEnrollment, _>(&reader, &experiment_slug)?
882 .and_then(|enrollment| {
883 if let EnrollmentStatus::Enrolled {
884 prev_gecko_pref_states: prev_gecko_pref_state,
885 ..
886 } = enrollment.status
887 {
888 prev_gecko_pref_state
889 } else {
890 None
891 }
892 }))
893 }
894
895 #[cfg(test)]
896 pub fn get_recorded_context(&self) -> &&TestRecordedContext {
897 self.recorded_context
898 .clone()
899 .map(|ref recorded_context|
900 unsafe {
905 std::mem::transmute::<&&dyn RecordedContext, &&TestRecordedContext>(
906 &&**recorded_context,
907 )
908 })
909 .expect("failed to unwrap RecordedContext object")
910 }
911
912 #[cfg(test)]
913 pub fn get_gecko_pref_store(&self) -> Arc<Box<TestGeckoPrefHandler>> {
914 self.gecko_prefs.clone()
915 .clone()
916 .map(|ref pref_store|
917 unsafe {
922 std::mem::transmute::<Arc<Box<dyn GeckoPrefHandler>>, Arc<Box<TestGeckoPrefHandler>>>(
923 pref_store.clone().handler.clone(),
924 )
925 })
926 .expect("failed to unwrap GeckoPrefHandler object")
927 }
928}
929
930impl NimbusClient {
931 pub fn set_install_time(&mut self, then: DateTime<Utc>) {
932 let mut state = self.mutable_state.lock().unwrap();
933 state.install_date = Some(then);
934 state.update_time_to_now(Utc::now());
935 }
936
937 pub fn set_update_time(&mut self, then: DateTime<Utc>) {
938 let mut state = self.mutable_state.lock().unwrap();
939 state.update_date = Some(then);
940 state.update_time_to_now(Utc::now());
941 }
942}
943
944impl NimbusClient {
945 fn record_feature_activation_if_needed(&self, feature_id: &str) {
948 if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(feature_id)
949 && f.branch.is_some()
950 && !self.coenrolling_feature_ids.contains(&f.feature_id)
951 {
952 self.metrics_handler.record_feature_activation(f.into());
953 }
954 }
955
956 pub fn record_feature_exposure(&self, feature_id: String, slug: Option<String>) {
957 let event = if let Some(slug) = slug {
958 if let Ok(Some(branch)) = self.database_cache.get_experiment_branch(&slug) {
959 Some(FeatureExposureExtraDef {
960 feature_id,
961 branch: Some(branch),
962 slug,
963 })
964 } else {
965 None
966 }
967 } else if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id) {
968 if f.branch.is_some() {
969 Some(f.into())
970 } else {
971 None
972 }
973 } else {
974 None
975 };
976
977 if let Some(event) = event {
978 self.metrics_handler.record_feature_exposure(event);
979 }
980 }
981
982 pub fn record_malformed_feature_config(&self, feature_id: String, part_id: String) {
983 let event = if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id)
984 {
985 MalformedFeatureConfigExtraDef::from_feature_and_part(f, part_id)
986 } else {
987 MalformedFeatureConfigExtraDef::new(feature_id, part_id)
988 };
989 self.metrics_handler.record_malformed_feature_config(event);
990 }
991
992 fn record_enrollment_status_telemetry(
993 &self,
994 state: &mut MutexGuard<InternalMutableState>,
995 ) -> Result<()> {
996 let targeting_helper = NimbusTargetingHelper::new(
997 state.targeting_attributes.clone(),
998 self.event_store.clone(),
999 self.gecko_prefs.clone(),
1000 );
1001 let experiments = self.database_cache.get_experiments()?;
1002 let experiments = experiments
1003 .iter()
1004 .filter(|exp| {
1005 is_experiment_available(&targeting_helper, exp, true)
1006 == ExperimentAvailable::Available
1007 })
1008 .map(|exp| &*exp.slug)
1009 .collect::<HashSet<&str>>();
1010 self.metrics_handler.record_enrollment_statuses(
1011 self.database_cache
1012 .get_enrollments()?
1013 .into_iter()
1014 .filter_map(|e| match experiments.contains(&*e.slug) {
1015 true => Some(e.into()),
1016 false => None,
1017 })
1018 .collect(),
1019 );
1020 self.metrics_handler.submit_targeting_context();
1021 Ok(())
1022 }
1023}
1024
1025pub struct NimbusStringHelper {
1026 context: JsonObject,
1027}
1028
1029impl NimbusStringHelper {
1030 fn new(context: JsonObject) -> Self {
1031 Self { context }
1032 }
1033
1034 pub fn get_uuid(&self, template: String) -> Option<String> {
1035 if template.contains("{uuid}") {
1036 let uuid = Uuid::new_v4();
1037 Some(uuid.to_string())
1038 } else {
1039 None
1040 }
1041 }
1042
1043 pub fn string_format(&self, template: String, uuid: Option<String>) -> String {
1044 match uuid {
1045 Some(uuid) => {
1046 let mut map = self.context.clone();
1047 map.insert("uuid".to_string(), Value::String(uuid));
1048 fmt_with_map(&template, &map)
1049 }
1050 _ => fmt_with_map(&template, &self.context),
1051 }
1052 }
1053}
1054
1055#[cfg(feature = "stateful-uniffi-bindings")]
1056uniffi::custom_type!(JsonObject, String, {
1057 remote,
1058 try_lift: |val| {
1059 let json: Value = serde_json::from_str(&val)?;
1060
1061 match json.as_object() {
1062 Some(obj) => Ok(obj.clone()),
1063 _ => Err(uniffi::deps::anyhow::anyhow!(
1064 "Unexpected JSON-non-object in the bagging area"
1065 )),
1066 }
1067 },
1068 lower: |obj| serde_json::Value::Object(obj).to_string(),
1069});
1070
1071#[cfg(feature = "stateful-uniffi-bindings")]
1072uniffi::custom_type!(PrefValue, String, {
1073 remote,
1074 try_lift: |val| {
1075 let json: Value = match serde_json::from_str(&val) {
1078 Ok(json) => json,
1079 Err(_) => Value::String(val),
1080 };
1081 let is_valid_pref_type = json.is_string() || json.is_boolean()
1082 || (json.is_number() && !json.is_f64()) || json.is_null();
1083 if is_valid_pref_type {
1084 Ok(json)
1085 } else {
1086 Err(anyhow::anyhow!(format!("Value {} is not a string, boolean, number, or null, or is a float", json)))
1087 }
1088 },
1089 lower: |val| {
1090 val.to_string()
1091 }
1092});
1093
1094#[cfg(feature = "stateful-uniffi-bindings")]
1095uniffi::include_scaffolding!("nimbus");