nimbus/stateful/
nimbus_client.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
4
5use std::collections::HashSet;
6use std::fmt::Debug;
7use std::path::PathBuf;
8use std::sync::{Arc, Mutex, MutexGuard};
9
10use chrono::{DateTime, NaiveDateTime, Utc};
11use once_cell::sync::OnceCell;
12use remote_settings::RemoteSettingsService;
13use serde_json::Value;
14use uuid::Uuid;
15
16use crate::defaults::Defaults;
17use crate::enrollment::{
18    EnrolledFeature, EnrollmentChangeEvent, EnrollmentChangeEventType, EnrollmentsEvolver,
19    ExperimentEnrollment, PreviousGeckoPrefState,
20};
21use crate::error::{BehaviorError, info};
22use crate::evaluator::{
23    CalculatedAttributes, ExperimentAvailable, TargetingAttributes, get_calculated_attributes,
24    is_experiment_available,
25};
26use crate::json::{JsonObject, PrefValue};
27use crate::metrics::{
28    DatabaseLoadExtraDef, DatabaseMigrationExtraDef, EnrollmentStatusExtraDef,
29    FeatureExposureExtraDef, MalformedFeatureConfigExtraDef, MetricsHandler,
30};
31use crate::schema::parse_experiments;
32use crate::stateful::behavior::EventStore;
33use crate::stateful::client::{NimbusServerSettings, SettingsClient, create_client};
34use crate::stateful::dbcache::DatabaseCache;
35use crate::stateful::enrollment::{
36    get_experiment_participation, get_rollout_participation, opt_in_with_branch, opt_out,
37    reset_telemetry_identifiers, set_experiment_participation, set_rollout_participation,
38    unenroll_for_pref,
39};
40use crate::stateful::gecko_prefs::{
41    GeckoPref, GeckoPrefHandler, GeckoPrefState, GeckoPrefStore, OriginalGeckoPref, PrefBranch,
42    PrefEnrollmentData, PrefUnenrollReason,
43};
44use crate::stateful::matcher::AppContext;
45use crate::stateful::persistence::{Database, StoreId, Writer};
46use crate::stateful::targeting::{RecordedContext, validate_event_queries};
47use crate::stateful::updating::{read_and_remove_pending_experiments, write_pending_experiments};
48use crate::strings::fmt_with_map;
49#[cfg(test)]
50use crate::tests::helpers::{TestGeckoPrefHandler, TestRecordedContext};
51use crate::{
52    AvailableExperiment, AvailableRandomizationUnits, EnrolledExperiment, EnrollmentStatus,
53};
54use crate::{Experiment, ExperimentBranch, NimbusError, NimbusTargetingHelper, Result};
55
56const DB_KEY_NIMBUS_ID: &str = "nimbus-id";
57pub const DB_KEY_INSTALLATION_DATE: &str = "installation-date";
58pub const DB_KEY_UPDATE_DATE: &str = "update-date";
59pub const DB_KEY_APP_VERSION: &str = "app-version";
60pub const DB_KEY_FETCH_ENABLED: &str = "fetch-enabled";
61
62// The main `NimbusClient` struct must not expose any methods that make an `&mut self`,
63// in order to be compatible with the uniffi's requirements on objects. This is a helper
64// struct to contain the bits that do actually need to be mutable, so they can be
65// protected by a Mutex.
66#[derive(Default)]
67pub struct InternalMutableState {
68    pub(crate) available_randomization_units: AvailableRandomizationUnits,
69    pub(crate) install_date: Option<DateTime<Utc>>,
70    pub(crate) update_date: Option<DateTime<Utc>>,
71    // Application level targeting attributes
72    pub(crate) targeting_attributes: TargetingAttributes,
73}
74
75impl InternalMutableState {
76    pub(crate) fn update_time_to_now(&mut self, now: DateTime<Utc>) {
77        self.targeting_attributes
78            .update_time_to_now(now, &self.install_date, &self.update_date);
79    }
80}
81
82/// Nimbus is the main struct representing the experiments state
83/// It should hold all the information needed to communicate a specific user's
84/// experimentation status
85pub struct NimbusClient {
86    settings_client: Mutex<Box<dyn SettingsClient + Send>>,
87    pub(crate) mutable_state: Mutex<InternalMutableState>,
88    app_context: AppContext,
89    pub(crate) db: OnceCell<Database>,
90    // Manages an in-memory cache so that we can answer certain requests
91    // without doing (or waiting for) IO.
92    database_cache: DatabaseCache,
93    db_path: PathBuf,
94    coenrolling_feature_ids: Vec<String>,
95    event_store: Arc<Mutex<EventStore>>,
96    recorded_context: Option<Arc<dyn RecordedContext>>,
97    pub(crate) gecko_prefs: Option<Arc<GeckoPrefStore>>,
98    metrics_handler: Arc<dyn MetricsHandler>,
99}
100
101impl NimbusClient {
102    // This constructor *must* not do any kind of I/O since it might be called on the main
103    // thread in the gecko Javascript stack, hence the use of OnceCell for the db.
104    #[allow(clippy::too_many_arguments)]
105    pub fn new<P: Into<PathBuf>>(
106        app_context: AppContext,
107        recorded_context: Option<Arc<dyn RecordedContext>>,
108        coenrolling_feature_ids: Vec<String>,
109        db_path: P,
110        metrics_handler: Arc<dyn MetricsHandler>,
111        gecko_pref_handler: Option<Box<dyn GeckoPrefHandler>>,
112        remote_settings_info: Option<NimbusServerSettings>,
113    ) -> Result<Self> {
114        let settings_client = Mutex::new(create_client(remote_settings_info)?);
115
116        let targeting_attributes: TargetingAttributes = app_context.clone().into();
117        let mutable_state = Mutex::new(InternalMutableState {
118            available_randomization_units: Default::default(),
119            targeting_attributes,
120            install_date: Default::default(),
121            update_date: Default::default(),
122        });
123
124        let mut prefs = None;
125        if let Some(handler) = gecko_pref_handler {
126            prefs = Some(Arc::new(GeckoPrefStore::new(Arc::new(handler))));
127        }
128
129        info!(
130            "Initialized NimbusClient with: app_context = {:?}; recorded_context = {:?}",
131            app_context,
132            recorded_context
133                .as_ref()
134                .map(|rc| serde_json::Value::Object(rc.to_json()))
135                .unwrap_or(serde_json::Value::Null)
136        );
137
138        Ok(Self {
139            settings_client,
140            mutable_state,
141            app_context,
142            database_cache: Default::default(),
143            db_path: db_path.into(),
144            coenrolling_feature_ids,
145            db: OnceCell::default(),
146            event_store: Arc::default(),
147            recorded_context,
148            gecko_prefs: prefs,
149            metrics_handler,
150        })
151    }
152
153    pub fn with_targeting_attributes(&mut self, targeting_attributes: TargetingAttributes) {
154        let mut state = self.mutable_state.lock().unwrap();
155        state.targeting_attributes = targeting_attributes;
156    }
157
158    pub fn get_targeting_attributes(&self) -> TargetingAttributes {
159        let mut state = self.mutable_state.lock().unwrap();
160        state.update_time_to_now(Utc::now());
161        state.targeting_attributes.clone()
162    }
163
164    pub fn initialize(&self) -> Result<()> {
165        let db = self.db()?;
166        // We're not actually going to write, we just want to exclude concurrent writers.
167        let mut writer = db.write()?;
168
169        let mut state = self.mutable_state.lock().unwrap();
170        self.begin_initialize(db, &mut writer, &mut state)?;
171        self.end_initialize(db, writer, &mut state)?;
172
173        Ok(())
174    }
175
176    // These are tasks which should be in the initialize and apply_pending_experiments
177    // but should happen before the enrollment calculations are done.
178    fn begin_initialize(
179        &self,
180        db: &Database,
181        writer: &mut Writer,
182        state: &mut MutexGuard<InternalMutableState>,
183    ) -> Result<()> {
184        self.read_or_create_nimbus_id(db, writer, state)?;
185        self.update_ta_install_dates(db, writer, state)?;
186        self.event_store
187            .lock()
188            .expect("unable to lock event_store mutex")
189            .read_from_db(db)?;
190
191        if let Some(recorded_context) = &self.recorded_context {
192            let targeting_helper = self.create_targeting_helper_with_context(match serde_json::to_value(
193                &state.targeting_attributes,
194            ) {
195                Ok(v) => v,
196                Err(e) => return Err(NimbusError::JSONError("targeting_helper = nimbus::stateful::nimbus_client::NimbusClient::begin_initialize::serde_json::to_value".into(), e.to_string()))
197            });
198            recorded_context.execute_queries(targeting_helper.as_ref())?;
199            state
200                .targeting_attributes
201                .set_recorded_context(recorded_context.to_json());
202        }
203
204        if let Some(gecko_prefs) = &self.gecko_prefs {
205            gecko_prefs.initialize()?;
206        }
207
208        Ok(())
209    }
210
211    // These are tasks which should be in the initialize and apply_pending_experiments
212    // but should happen after the enrollment calculations are done.
213    fn end_initialize(
214        &self,
215        db: &Database,
216        writer: Writer,
217        state: &mut MutexGuard<InternalMutableState>,
218    ) -> Result<()> {
219        self.update_ta_active_experiments(db, &writer, state)?;
220        let coenrolling_ids = self
221            .coenrolling_feature_ids
222            .iter()
223            .map(|s| s.as_str())
224            .collect();
225        self.database_cache.commit_and_update(
226            db,
227            writer,
228            &coenrolling_ids,
229            self.gecko_prefs.clone(),
230        )?;
231        Ok(())
232    }
233
234    pub fn get_enrollment_by_feature(&self, feature_id: String) -> Result<Option<EnrolledFeature>> {
235        self.database_cache.get_enrollment_by_feature(&feature_id)
236    }
237
238    // Note: the contract for this function is that it never blocks on IO.
239    pub fn get_experiment_branch(&self, slug: String) -> Result<Option<String>> {
240        self.database_cache.get_experiment_branch(&slug)
241    }
242
243    pub fn get_feature_config_variables(&self, feature_id: String) -> Result<Option<String>> {
244        Ok(
245            if let Some(s) = self
246                .database_cache
247                .get_feature_config_variables(&feature_id)?
248            {
249                self.record_feature_activation_if_needed(&feature_id);
250                Some(s)
251            } else {
252                None
253            },
254        )
255    }
256
257    pub fn get_experiment_branches(&self, slug: String) -> Result<Vec<ExperimentBranch>> {
258        self.get_all_experiments()?
259            .into_iter()
260            .find(|e| e.slug == slug)
261            .map(|e| e.branches.into_iter().map(|b| b.into()).collect())
262            .ok_or(NimbusError::NoSuchExperiment(slug))
263    }
264
265    pub fn get_experiment_participation(&self) -> Result<bool> {
266        let db = self.db()?;
267        let reader = db.read()?;
268        get_experiment_participation(db, &reader)
269    }
270
271    pub fn get_rollout_participation(&self) -> Result<bool> {
272        let db = self.db()?;
273        let reader = db.read()?;
274        get_rollout_participation(db, &reader)
275    }
276
277    pub fn set_experiment_participation(
278        &self,
279        user_participating: bool,
280    ) -> Result<Vec<EnrollmentChangeEvent>> {
281        let db = self.db()?;
282        let mut writer = db.write()?;
283        let mut state = self.mutable_state.lock().unwrap();
284        set_experiment_participation(db, &mut writer, user_participating)?;
285
286        let existing_experiments: Vec<Experiment> =
287            db.get_store(StoreId::Experiments).collect_all(&writer)?;
288        let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
289        let res = self.end_initialize(db, writer, &mut state);
290        self.record_enrollment_status_telemetry(&mut state)?;
291        res?;
292        Ok(events)
293    }
294
295    pub fn set_rollout_participation(
296        &self,
297        user_participating: bool,
298    ) -> Result<Vec<EnrollmentChangeEvent>> {
299        let db = self.db()?;
300        let mut writer = db.write()?;
301        let mut state = self.mutable_state.lock().unwrap();
302        set_rollout_participation(db, &mut writer, user_participating)?;
303
304        let existing_experiments: Vec<Experiment> =
305            db.get_store(StoreId::Experiments).collect_all(&writer)?;
306        let events = self.evolve_experiments(db, &mut writer, &mut state, &existing_experiments)?;
307        let res = self.end_initialize(db, writer, &mut state);
308        self.record_enrollment_status_telemetry(&mut state)?;
309        res?;
310        Ok(events)
311    }
312
313    pub fn get_active_experiments(&self) -> Result<Vec<EnrolledExperiment>> {
314        self.database_cache.get_active_experiments()
315    }
316
317    pub fn get_all_experiments(&self) -> Result<Vec<Experiment>> {
318        let db = self.db()?;
319        let reader = db.read()?;
320        db.get_store(StoreId::Experiments)
321            .collect_all::<Experiment, _>(&reader)
322    }
323
324    pub fn get_available_experiments(&self) -> Result<Vec<AvailableExperiment>> {
325        let th = self.create_targeting_helper(None)?;
326        Ok(self
327            .get_all_experiments()?
328            .into_iter()
329            .filter(|exp| {
330                is_experiment_available(&th, exp, false) == ExperimentAvailable::Available
331            })
332            .map(|exp| exp.into())
333            .collect())
334    }
335
336    pub fn opt_in_with_branch(
337        &self,
338        experiment_slug: String,
339        branch: String,
340    ) -> Result<Vec<EnrollmentChangeEvent>> {
341        let db = self.db()?;
342        let mut writer = db.write()?;
343        let result = opt_in_with_branch(db, &mut writer, &experiment_slug, &branch)?;
344        let mut state = self.mutable_state.lock().unwrap();
345        self.end_initialize(db, writer, &mut state)?;
346        Ok(result)
347    }
348
349    pub fn opt_out(&self, experiment_slug: String) -> Result<Vec<EnrollmentChangeEvent>> {
350        let db = self.db()?;
351        let mut writer = db.write()?;
352        let result = opt_out(
353            db,
354            &mut writer,
355            &experiment_slug,
356            self.gecko_prefs.as_deref(),
357        )?;
358        let mut state = self.mutable_state.lock().unwrap();
359        self.end_initialize(db, writer, &mut state)?;
360        Ok(result)
361    }
362
363    pub fn fetch_experiments(&self) -> Result<()> {
364        if !self.is_fetch_enabled()? {
365            return Ok(());
366        }
367        info!("fetching experiments");
368        let settings_client = self.settings_client.lock().unwrap();
369        let new_experiments = settings_client.fetch_experiments()?;
370        let db = self.db()?;
371        let mut writer = db.write()?;
372        write_pending_experiments(db, &mut writer, new_experiments)?;
373        writer.commit()?;
374        Ok(())
375    }
376
377    pub fn set_fetch_enabled(&self, allow: bool) -> Result<()> {
378        let db = self.db()?;
379        let mut writer = db.write()?;
380        db.get_store(StoreId::Meta)
381            .put(&mut writer, DB_KEY_FETCH_ENABLED, &allow)?;
382        writer.commit()?;
383        Ok(())
384    }
385
386    pub(crate) fn is_fetch_enabled(&self) -> Result<bool> {
387        let db = self.db()?;
388        let reader = db.read()?;
389        let enabled = db
390            .get_store(StoreId::Meta)
391            .get(&reader, DB_KEY_FETCH_ENABLED)?
392            .unwrap_or(true);
393        Ok(enabled)
394    }
395
396    /**
397     * Calculate the days since install and days since update on the targeting_attributes.
398     */
399    fn update_ta_install_dates(
400        &self,
401        db: &Database,
402        writer: &mut Writer,
403        state: &mut MutexGuard<InternalMutableState>,
404    ) -> Result<()> {
405        // Only set install_date and update_date with this method if it hasn't been set already.
406        // This cuts down on deriving the dates at runtime, but also allows us to use
407        // the test methods set_install_date() and set_update_date() to set up
408        // scenarios for test.
409        if state.install_date.is_none() {
410            let installation_date = self.get_installation_date(db, writer)?;
411            state.install_date = Some(installation_date);
412        }
413        if state.update_date.is_none() {
414            let update_date = self.get_update_date(db, writer)?;
415            state.update_date = Some(update_date);
416        }
417        state.update_time_to_now(Utc::now());
418
419        Ok(())
420    }
421
422    /**
423     * Calculates the active_experiments based on current enrollments for the targeting attributes.
424     */
425    fn update_ta_active_experiments(
426        &self,
427        db: &Database,
428        writer: &Writer,
429        state: &mut MutexGuard<InternalMutableState>,
430    ) -> Result<()> {
431        let enrollments_store = db.get_store(StoreId::Enrollments);
432        let prev_enrollments: Vec<ExperimentEnrollment> = enrollments_store.collect_all(writer)?;
433
434        state
435            .targeting_attributes
436            .update_enrollments(&prev_enrollments);
437
438        Ok(())
439    }
440
441    fn evolve_experiments(
442        &self,
443        db: &Database,
444        writer: &mut Writer,
445        state: &mut InternalMutableState,
446        experiments: &[Experiment],
447    ) -> Result<Vec<EnrollmentChangeEvent>> {
448        let mut targeting_helper = NimbusTargetingHelper::with_targeting_attributes(
449            &state.targeting_attributes,
450            self.event_store.clone(),
451            self.gecko_prefs.clone(),
452        );
453        if let Some(ref recorded_context) = self.recorded_context {
454            recorded_context.record();
455        }
456        let coenrolling_feature_ids = self
457            .coenrolling_feature_ids
458            .iter()
459            .map(|s| s.as_str())
460            .collect();
461        let mut evolver = EnrollmentsEvolver::new(
462            &state.available_randomization_units,
463            &mut targeting_helper,
464            &coenrolling_feature_ids,
465        );
466        evolver.evolve_enrollments_in_db(db, writer, experiments, self.gecko_prefs.as_deref())
467    }
468
469    pub fn apply_pending_experiments(&self) -> Result<Vec<EnrollmentChangeEvent>> {
470        info!("updating experiment list");
471        let db = self.db()?;
472        let mut writer = db.write()?;
473
474        // We'll get the pending experiments which were stored for us, either by fetch_experiments
475        // or by set_experiments_locally.
476        let pending_updates = read_and_remove_pending_experiments(db, &mut writer)?;
477        let mut state = self.mutable_state.lock().unwrap();
478        self.begin_initialize(db, &mut writer, &mut state)?;
479
480        let should_record_enrollment_status = pending_updates.is_some();
481        let res = match pending_updates {
482            Some(new_experiments) => {
483                self.update_ta_active_experiments(db, &writer, &mut state)?;
484                // Perform the enrollment calculations if there are pending experiments.
485                self.evolve_experiments(db, &mut writer, &mut state, &new_experiments)?
486            }
487            None => vec![],
488        };
489
490        // Finish up any cleanup, e.g. copying from database in to memory.
491        let end_init_res = self.end_initialize(db, writer, &mut state);
492        if should_record_enrollment_status {
493            self.record_enrollment_status_telemetry(&mut state)?;
494        }
495        end_init_res?;
496        Ok(res)
497    }
498
499    #[allow(deprecated)] // Bug 1960256 - use of deprecated chrono functions.
500    fn get_installation_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
501        // we first check our context
502        if let Some(context_installation_date) = self.app_context.installation_date {
503            let res = DateTime::<Utc>::from_naive_utc_and_offset(
504                NaiveDateTime::from_timestamp_opt(context_installation_date / 1_000, 0).unwrap(),
505                Utc,
506            );
507            info!("[Nimbus] Retrieved date from Context: {}", res);
508            return Ok(res);
509        }
510        let store = db.get_store(StoreId::Meta);
511        let persisted_installation_date: Option<DateTime<Utc>> =
512            store.get(writer, DB_KEY_INSTALLATION_DATE)?;
513        Ok(
514            if let Some(installation_date) = persisted_installation_date {
515                installation_date
516            } else {
517                Utc::now()
518            },
519        )
520    }
521
522    fn get_update_date(&self, db: &Database, writer: &mut Writer) -> Result<DateTime<Utc>> {
523        let store = db.get_store(StoreId::Meta);
524
525        let persisted_app_version: Option<String> = store.get(writer, DB_KEY_APP_VERSION)?;
526        let update_date: Option<DateTime<Utc>> = store.get(writer, DB_KEY_UPDATE_DATE)?;
527        Ok(
528            match (
529                persisted_app_version,
530                &self.app_context.app_version,
531                update_date,
532            ) {
533                // The app been run before, but has not just been updated.
534                (Some(persisted), Some(current), Some(date)) if persisted == *current => date,
535                // The app has been run before, and just been updated.
536                (Some(persisted), Some(current), _) if persisted != *current => {
537                    let now = Utc::now();
538                    store.put(writer, DB_KEY_APP_VERSION, current)?;
539                    store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
540                    now
541                }
542                // The app has just been installed
543                (None, Some(current), _) => {
544                    let now = Utc::now();
545                    store.put(writer, DB_KEY_APP_VERSION, current)?;
546                    store.put(writer, DB_KEY_UPDATE_DATE, &now)?;
547                    now
548                }
549                // The current version is not available, or the persisted date is not available.
550                (_, _, Some(date)) => date,
551                // Either way, this doesn't appear to be a good production environment.
552                _ => Utc::now(),
553            },
554        )
555    }
556
557    pub fn set_experiments_locally(&self, experiments_json: String) -> Result<()> {
558        let new_experiments = parse_experiments(&experiments_json)?;
559        let db = self.db()?;
560        let mut writer = db.write()?;
561        write_pending_experiments(db, &mut writer, new_experiments)?;
562        writer.commit()?;
563        Ok(())
564    }
565
566    /// Reset all enrollments and experiments in the database.
567    ///
568    /// This should only be used in testing.
569    pub fn reset_enrollments(&self) -> Result<()> {
570        let db = self.db()?;
571        let mut writer = db.write()?;
572        let mut state = self.mutable_state.lock().unwrap();
573        db.clear_experiments_and_enrollments(&mut writer)?;
574        self.end_initialize(db, writer, &mut state)?;
575        Ok(())
576    }
577
578    /// Reset internal state in response to application-level telemetry reset.
579    ///
580    /// When the user resets their telemetry state in the consuming application, we need learn
581    /// the new values of any external randomization units, and we need to reset any unique
582    /// identifiers used internally by the SDK. If we don't then we risk accidentally tracking
583    /// across the telemetry reset, since we could use Nimbus metrics to link their pings from
584    /// before and after the reset.
585    ///
586    pub fn reset_telemetry_identifiers(&self) -> Result<Vec<EnrollmentChangeEvent>> {
587        let mut events = vec![];
588        let db = self.db()?;
589        let mut writer = db.write()?;
590        let mut state = self.mutable_state.lock().unwrap();
591        // If we have no `nimbus_id` when we can safely assume that there's
592        // no other experiment state that needs to be reset.
593        let store = db.get_store(StoreId::Meta);
594        if store.get::<String, _>(&writer, DB_KEY_NIMBUS_ID)?.is_some() {
595            // Each enrollment state now opts out because we don't want to leak information between resets.
596            events = reset_telemetry_identifiers(db, &mut writer)?;
597
598            // Remove any stored event counts
599            db.clear_event_count_data(&mut writer)?;
600
601            // The `nimbus_id` itself is a unique identifier.
602            // N.B. we do this last, as a signal that all data has been reset.
603            store.delete(&mut writer, DB_KEY_NIMBUS_ID)?;
604            self.end_initialize(db, writer, &mut state)?;
605        }
606
607        // (No need to commit `writer` if the above check was false, since we didn't change anything)
608        state.available_randomization_units = Default::default();
609        state.targeting_attributes.nimbus_id = None;
610
611        Ok(events)
612    }
613
614    pub fn nimbus_id(&self) -> Result<Uuid> {
615        let db = self.db()?;
616        let mut writer = db.write()?;
617        let mut state = self.mutable_state.lock().unwrap();
618        let uuid = self.read_or_create_nimbus_id(db, &mut writer, &mut state)?;
619
620        // We don't know whether we needed to generate and save the uuid, so
621        // we commit just in case - this is hopefully close to a noop in that
622        // case!
623        writer.commit()?;
624        Ok(uuid)
625    }
626
627    /// Return the nimbus ID from the database, or create a new one and write it
628    /// to the database.
629    ///
630    /// The internal state will be updated with the nimbus ID.
631    fn read_or_create_nimbus_id(
632        &self,
633        db: &Database,
634        writer: &mut Writer,
635        state: &mut MutexGuard<'_, InternalMutableState>,
636    ) -> Result<Uuid> {
637        let store = db.get_store(StoreId::Meta);
638        let nimbus_id = match store.get(writer, DB_KEY_NIMBUS_ID)? {
639            Some(nimbus_id) => nimbus_id,
640            None => {
641                let nimbus_id = Uuid::new_v4();
642                store.put(writer, DB_KEY_NIMBUS_ID, &nimbus_id)?;
643                nimbus_id
644            }
645        };
646
647        state.available_randomization_units.nimbus_id = Some(nimbus_id.to_string());
648        state.targeting_attributes.nimbus_id = Some(nimbus_id.to_string());
649
650        Ok(nimbus_id)
651    }
652
653    // Sets the nimbus ID - TEST ONLY - should not be exposed to real clients.
654    // (Useful for testing so you can have some control over what experiments
655    // are enrolled)
656    pub fn set_nimbus_id(&self, uuid: &Uuid) -> Result<()> {
657        let db = self.db()?;
658        let mut writer = db.write()?;
659        db.get_store(StoreId::Meta)
660            .put(&mut writer, DB_KEY_NIMBUS_ID, uuid)?;
661        writer.commit()?;
662        Ok(())
663    }
664
665    pub(crate) fn db(&self) -> Result<&Database> {
666        self.db
667            .get_or_try_init(|| Database::new(&self.db_path, self.metrics_handler.clone()))
668    }
669
670    fn merge_additional_context(&self, context: Option<JsonObject>) -> Result<Value> {
671        let context = context.map(Value::Object);
672        let targeting = match serde_json::to_value(self.get_targeting_attributes()) {
673            Ok(v) => v,
674            Err(e) => return Err(NimbusError::JSONError("targeting = nimbus::stateful::nimbus_client::NimbusClient::merge_additional_context::serde_json::to_value".into(), e.to_string()))
675        };
676        let context = match context {
677            Some(v) => v.defaults(&targeting)?,
678            None => targeting,
679        };
680
681        Ok(context)
682    }
683
684    pub fn create_targeting_helper(
685        &self,
686        additional_context: Option<JsonObject>,
687    ) -> Result<Arc<NimbusTargetingHelper>> {
688        let context = self.merge_additional_context(additional_context)?;
689        let helper =
690            NimbusTargetingHelper::new(context, self.event_store.clone(), self.gecko_prefs.clone());
691        Ok(Arc::new(helper))
692    }
693
694    pub fn create_targeting_helper_with_context(
695        &self,
696        context: Value,
697    ) -> Arc<NimbusTargetingHelper> {
698        Arc::new(NimbusTargetingHelper::new(
699            context,
700            self.event_store.clone(),
701            self.gecko_prefs.clone(),
702        ))
703    }
704
705    pub fn create_string_helper(
706        &self,
707        additional_context: Option<JsonObject>,
708    ) -> Result<Arc<NimbusStringHelper>> {
709        let context = self.merge_additional_context(additional_context)?;
710        let helper = NimbusStringHelper::new(context.as_object().unwrap().to_owned());
711        Ok(Arc::new(helper))
712    }
713
714    /// Records an event for the purposes of behavioral targeting.
715    ///
716    /// This function is used to record and persist data used for the behavioral
717    /// targeting such as "core-active" user targeting.
718    pub fn record_event(&self, event_id: String, count: i64) -> Result<()> {
719        let mut event_store = self.event_store.lock().unwrap();
720        event_store.record_event(count as u64, &event_id, None)?;
721        event_store.persist_data(self.db()?)?;
722        Ok(())
723    }
724
725    /// Records an event for the purposes of behavioral targeting.
726    ///
727    /// This differs from the `record_event` method in that the event is recorded as if it were
728    /// recorded `seconds_ago` in the past. This makes it very useful for testing.
729    pub fn record_past_event(&self, event_id: String, seconds_ago: i64, count: i64) -> Result<()> {
730        if seconds_ago < 0 {
731            return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
732                "Time duration in the past must be positive".to_string(),
733            )));
734        }
735        let mut event_store = self.event_store.lock().unwrap();
736        event_store.record_past_event(
737            count as u64,
738            &event_id,
739            None,
740            chrono::Duration::seconds(seconds_ago),
741        )?;
742        event_store.persist_data(self.db()?)?;
743        Ok(())
744    }
745
746    /// Advances the event store's concept of `now` artificially.
747    ///
748    /// This works alongside `record_event` and `record_past_event` for testing purposes.
749    pub fn advance_event_time(&self, by_seconds: i64) -> Result<()> {
750        if by_seconds < 0 {
751            return Err(NimbusError::BehaviorError(BehaviorError::InvalidDuration(
752                "Time duration in the future must be positive".to_string(),
753            )));
754        }
755        let mut event_store = self.event_store.lock().unwrap();
756        event_store.advance_datum(chrono::Duration::seconds(by_seconds));
757        Ok(())
758    }
759
760    /// Clear all events in the Nimbus event store.
761    ///
762    /// This should only be used in testing or cases where the previous event store is no longer viable.
763    pub fn clear_events(&self) -> Result<()> {
764        let mut event_store = self.event_store.lock().unwrap();
765        event_store.clear(self.db()?)?;
766        Ok(())
767    }
768
769    pub fn event_store(&self) -> Arc<Mutex<EventStore>> {
770        self.event_store.clone()
771    }
772
773    pub fn dump_state_to_log(&self) -> Result<()> {
774        let experiments = self.get_active_experiments()?;
775        info!("{0: <65}| {1: <30}| {2}", "Slug", "Features", "Branch");
776        for exp in &experiments {
777            info!(
778                "{0: <65}| {1: <30}| {2}",
779                &exp.slug,
780                &exp.feature_ids.join(", "),
781                &exp.branch_slug
782            );
783        }
784        Ok(())
785    }
786
787    /// Given a Gecko pref state and a pref unenroll reason, unenroll from an experiment
788    pub fn unenroll_for_gecko_pref(
789        &self,
790        pref_state: GeckoPrefState,
791        pref_unenroll_reason: PrefUnenrollReason,
792    ) -> Result<Vec<EnrollmentChangeEvent>> {
793        if let Some(prefs) = self.gecko_prefs.clone() {
794            {
795                let mut pref_store_state = prefs.get_mutable_pref_state();
796                pref_store_state.update_pref_state(&pref_state);
797            }
798            let enrollments = self
799                .database_cache
800                .get_enrollments_for_pref(&pref_state.gecko_pref.pref)?;
801
802            let db = self.db()?;
803            let mut writer = db.write()?;
804
805            let mut results = Vec::new();
806            for experiment_slug in enrollments.unwrap() {
807                let result = unenroll_for_pref(
808                    db,
809                    &mut writer,
810                    &experiment_slug,
811                    pref_unenroll_reason,
812                    &pref_state.gecko_pref.pref,
813                    self.gecko_prefs.as_deref(),
814                )?;
815                results.push(result);
816            }
817
818            let mut state = self.mutable_state.lock().unwrap();
819            self.end_initialize(db, writer, &mut state)?;
820            return Ok(results.concat());
821        }
822        Ok(Vec::new())
823    }
824
825    pub fn register_previous_gecko_pref_states(
826        &self,
827        gecko_pref_states: &[GeckoPrefState],
828    ) -> Result<()> {
829        let all_prev_gecko_pref_states =
830            super::gecko_prefs::build_prev_gecko_pref_states(gecko_pref_states);
831
832        let db = self.db()?;
833        let mut writer = db.write()?;
834
835        for (experiment_slug, prev_gecko_pref_states) in all_prev_gecko_pref_states {
836            Self::add_prev_gecko_pref_state_for_experiment(
837                db,
838                &mut writer,
839                &experiment_slug,
840                prev_gecko_pref_states,
841            )?;
842        }
843
844        let mut state = self.mutable_state.lock().unwrap();
845        self.end_initialize(db, writer, &mut state)?;
846        Ok(())
847    }
848
849    pub(crate) fn add_prev_gecko_pref_state_for_experiment(
850        db: &Database,
851        writer: &mut Writer,
852        experiment_slug: &str,
853        prev_gecko_pref_states: Vec<PreviousGeckoPrefState>,
854    ) -> Result<()> {
855        let enrollments = db.get_store(StoreId::Enrollments);
856
857        if let Ok(Some(existing_enrollment)) =
858            enrollments.get::<ExperimentEnrollment, Writer>(writer, experiment_slug)
859        {
860            // Previous states are only valid on Enrolled experiments
861            let updated_states =
862                existing_enrollment.on_add_gecko_pref_states(prev_gecko_pref_states);
863            enrollments.put(writer, experiment_slug, &updated_states)?;
864        }
865        Ok(())
866    }
867
868    pub fn get_previous_gecko_pref_states(
869        &self,
870        experiment_slug: String,
871    ) -> Result<Option<Vec<PreviousGeckoPrefState>>> {
872        let db = self.db()?;
873        let reader = db.read()?;
874
875        Ok(db
876            .get_store(StoreId::Enrollments)
877            .get::<ExperimentEnrollment, _>(&reader, &experiment_slug)?
878            .and_then(|enrollment| {
879                if let EnrollmentStatus::Enrolled {
880                    prev_gecko_pref_states: prev_gecko_pref_state,
881                    ..
882                } = enrollment.status
883                {
884                    prev_gecko_pref_state
885                } else {
886                    None
887                }
888            }))
889    }
890
891    #[cfg(test)]
892    pub fn get_recorded_context(&self) -> &&TestRecordedContext {
893        self.recorded_context
894            .clone()
895            .map(|ref recorded_context|
896                // SAFETY: The cast to TestRecordedContext is safe because the Rust instance is
897                // guaranteed to be a TestRecordedContext instance. TestRecordedContext is the only
898                // Rust-implemented version of RecordedContext, and, like this method,  is only
899                // used in tests.
900                unsafe {
901                    std::mem::transmute::<&&dyn RecordedContext, &&TestRecordedContext>(
902                        &&**recorded_context,
903                    )
904                })
905            .expect("failed to unwrap RecordedContext object")
906    }
907
908    #[cfg(test)]
909    pub fn get_gecko_pref_store(&self) -> Arc<Box<TestGeckoPrefHandler>> {
910        self.gecko_prefs.clone()
911            .clone()
912            .map(|ref pref_store|
913                // SAFETY: The cast to TestGeckoPrefHandler is safe because the Rust instance is
914                // guaranteed to be a TestGeckoPrefHandler instance. TestGeckoPrefHandler is the only
915                // Rust-implemented version of GeckoPrefHandler, and, like this method,  is only
916                // used in tests.
917                unsafe {
918                    std::mem::transmute::<Arc<Box<dyn GeckoPrefHandler>>, Arc<Box<TestGeckoPrefHandler>>>(
919                        pref_store.clone().handler.clone(),
920                    )
921                })
922            .expect("failed to unwrap GeckoPrefHandler object")
923    }
924}
925
926impl NimbusClient {
927    pub fn set_install_time(&mut self, then: DateTime<Utc>) {
928        let mut state = self.mutable_state.lock().unwrap();
929        state.install_date = Some(then);
930        state.update_time_to_now(Utc::now());
931    }
932
933    pub fn set_update_time(&mut self, then: DateTime<Utc>) {
934        let mut state = self.mutable_state.lock().unwrap();
935        state.update_date = Some(then);
936        state.update_time_to_now(Utc::now());
937    }
938}
939
940impl NimbusClient {
941    /// This is only called from `get_feature_config_variables` which is itself is cached with
942    /// thread safety in the FeatureHolder.kt and FeatureHolder.swift
943    fn record_feature_activation_if_needed(&self, feature_id: &str) {
944        if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(feature_id)
945            && f.branch.is_some()
946            && !self.coenrolling_feature_ids.contains(&f.feature_id)
947        {
948            self.metrics_handler.record_feature_activation(f.into());
949        }
950    }
951
952    pub fn record_feature_exposure(&self, feature_id: String, slug: Option<String>) {
953        let event = if let Some(slug) = slug {
954            if let Ok(Some(branch)) = self.database_cache.get_experiment_branch(&slug) {
955                Some(FeatureExposureExtraDef {
956                    feature_id,
957                    branch: Some(branch),
958                    slug,
959                })
960            } else {
961                None
962            }
963        } else if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id) {
964            if f.branch.is_some() {
965                Some(f.into())
966            } else {
967                None
968            }
969        } else {
970            None
971        };
972
973        if let Some(event) = event {
974            self.metrics_handler.record_feature_exposure(event);
975        }
976    }
977
978    pub fn record_malformed_feature_config(&self, feature_id: String, part_id: String) {
979        let event = if let Ok(Some(f)) = self.database_cache.get_enrollment_by_feature(&feature_id)
980        {
981            MalformedFeatureConfigExtraDef::from_feature_and_part(f, part_id)
982        } else {
983            MalformedFeatureConfigExtraDef::new(feature_id, part_id)
984        };
985        self.metrics_handler.record_malformed_feature_config(event);
986    }
987
988    fn record_enrollment_status_telemetry(
989        &self,
990        state: &mut MutexGuard<InternalMutableState>,
991    ) -> Result<()> {
992        let targeting_helper = NimbusTargetingHelper::new(
993            state.targeting_attributes.clone(),
994            self.event_store.clone(),
995            self.gecko_prefs.clone(),
996        );
997        let experiments = self.database_cache.get_experiments()?;
998        let experiments = experiments
999            .iter()
1000            .filter(|exp| {
1001                is_experiment_available(&targeting_helper, exp, true)
1002                    == ExperimentAvailable::Available
1003            })
1004            .map(|exp| &*exp.slug)
1005            .collect::<HashSet<&str>>();
1006        self.metrics_handler.record_enrollment_statuses(
1007            self.database_cache
1008                .get_enrollments()?
1009                .into_iter()
1010                .filter_map(|e| match experiments.contains(&*e.slug) {
1011                    true => Some(e.into()),
1012                    false => None,
1013                })
1014                .collect(),
1015        );
1016        self.metrics_handler.submit_targeting_context();
1017        Ok(())
1018    }
1019}
1020
1021pub struct NimbusStringHelper {
1022    context: JsonObject,
1023}
1024
1025impl NimbusStringHelper {
1026    fn new(context: JsonObject) -> Self {
1027        Self { context }
1028    }
1029
1030    pub fn get_uuid(&self, template: String) -> Option<String> {
1031        if template.contains("{uuid}") {
1032            let uuid = Uuid::new_v4();
1033            Some(uuid.to_string())
1034        } else {
1035            None
1036        }
1037    }
1038
1039    pub fn string_format(&self, template: String, uuid: Option<String>) -> String {
1040        match uuid {
1041            Some(uuid) => {
1042                let mut map = self.context.clone();
1043                map.insert("uuid".to_string(), Value::String(uuid));
1044                fmt_with_map(&template, &map)
1045            }
1046            _ => fmt_with_map(&template, &self.context),
1047        }
1048    }
1049}
1050
1051#[cfg(feature = "stateful-uniffi-bindings")]
1052uniffi::custom_type!(JsonObject, String, {
1053    remote,
1054    try_lift: |val| {
1055        let json: Value = serde_json::from_str(&val)?;
1056
1057        match json.as_object() {
1058            Some(obj) => Ok(obj.clone()),
1059            _ => Err(uniffi::deps::anyhow::anyhow!(
1060                "Unexpected JSON-non-object in the bagging area"
1061            )),
1062        }
1063    },
1064    lower: |obj| serde_json::Value::Object(obj).to_string(),
1065});
1066
1067#[cfg(feature = "stateful-uniffi-bindings")]
1068uniffi::custom_type!(PrefValue, String, {
1069    remote,
1070    try_lift: |val| {
1071        let json: Value = serde_json::from_str(&val)?;
1072        if json.is_string() || json.is_boolean() || (json.is_number() && !json.is_f64()) || json.is_null() {
1073            Ok(json)
1074        } else {
1075            Err(anyhow::anyhow!(format!("Value {} is not a string, boolean, number, or null, or is a float", json)))
1076        }
1077    },
1078    lower: |val| {
1079        val.to_string()
1080    }
1081});
1082
1083#[cfg(feature = "stateful-uniffi-bindings")]
1084uniffi::include_scaffolding!("nimbus");