glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32/// Represents the recorded data for a single event.
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36    /// The timestamp of when the event was recorded.
37    ///
38    /// This allows to order events from a single process run.
39    pub timestamp: u64,
40
41    /// The event's category.
42    ///
43    /// This is defined by users in the metrics file.
44    pub category: String,
45
46    /// The event's name.
47    ///
48    /// This is defined by users in the metrics file.
49    pub name: String,
50
51    /// A map of all extra data values.
52    ///
53    /// The set of allowed extra keys is defined by users in the metrics file.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub extra: Option<HashMap<String, String>>,
56}
57
58/// Represents the stored data for a single event.
59#[derive(
60    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63    #[serde(flatten)]
64    event: RecordedEvent,
65
66    /// The monotonically-increasing execution counter.
67    ///
68    /// Included to allow sending of events across Glean restarts (bug 1716725).
69    /// Is i32 because it is stored in a CounterMetric.
70    #[serde(default)]
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub execution_counter: Option<i32>,
73}
74
75/// This struct handles the in-memory and on-disk storage logic for events.
76///
77/// So that the data survives shutting down of the application, events are stored
78/// in an append-only file on disk, in addition to the store in memory. Each line
79/// of this file records a single event in JSON, exactly as it will be sent in the
80/// ping. There is one file per store.
81///
82/// When restarting the application, these on-disk files are checked, and if any are
83/// found, they are loaded, and a `glean.restarted` event is added before any
84/// further events are collected. This is because the timestamps for these events
85/// may have come from a previous boot of the device, and therefore will not be
86/// compatible with any newly-collected events.
87///
88/// Normalizing all these timestamps happens on serialization for submission (see
89/// `serialize_as_json`) where the client time between restarts is calculated using
90/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
91/// the `execution_counter` stored in events on disk.
92///
93/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
94/// The `glean.restarted` event is, though.
95/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
96#[derive(Debug)]
97pub struct EventDatabase {
98    /// Path to directory of on-disk event files
99    pub path: PathBuf,
100    /// The in-memory list of events
101    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102    event_store_files: RwLock<HashMap<String, Arc<File>>>,
103    /// A lock to be held when doing operations on the filesystem
104    file_lock: Mutex<()>,
105}
106
107impl MallocSizeOf for EventDatabase {
108    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
109        let mut n = 0;
110        n += self.event_stores.read().unwrap().size_of(ops);
111
112        let map = self.event_store_files.read().unwrap();
113        for store_name in map.keys() {
114            n += store_name.size_of(ops);
115            // `File` doesn't allocate, but `Arc` puts it on the heap.
116            n += mem::size_of::<File>();
117        }
118        n
119    }
120}
121
122impl EventDatabase {
123    /// Creates a new event database.
124    ///
125    /// # Arguments
126    ///
127    /// * `data_path` - The directory to store events in. A new directory
128    /// * `events` - will be created inside of this directory.
129    pub fn new(data_path: &Path) -> Result<Self> {
130        let path = data_path.join("events");
131        create_dir_all(&path)?;
132
133        Ok(Self {
134            path,
135            event_stores: RwLock::new(HashMap::new()),
136            event_store_files: RwLock::new(HashMap::new()),
137            file_lock: Mutex::new(()),
138        })
139    }
140
141    /// Initializes events storage after Glean is fully initialized and ready to send pings.
142    ///
143    /// This must be called once on application startup, e.g. from
144    /// [Glean.initialize], but after we are ready to send pings, since this
145    /// could potentially collect and send the "events" ping.
146    ///
147    /// If there are any events queued on disk, it loads them into memory so
148    /// that the memory and disk representations are in sync.
149    ///
150    /// If event records for the "events" ping are present, they are assembled into
151    /// an "events" ping which is submitted immediately with reason "startup".
152    ///
153    /// If event records for custom pings are present, we increment the custom pings'
154    /// stores' `execution_counter` and record a `glean.restarted`
155    /// event with the current client clock in its `glean.startup.date` extra.
156    ///
157    /// # Arguments
158    ///
159    /// * `glean` - The Glean instance.
160    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
161    ///   any events not belonging to pings previously registered via `register_ping_type`.
162    ///
163    /// # Returns
164    ///
165    /// Whether the "events" ping was submitted.
166    pub fn flush_pending_events_on_startup(
167        &self,
168        glean: &Glean,
169        trim_data_to_registered_pings: bool,
170    ) -> bool {
171        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
172            Ok(_) => {
173                let stores_with_events: Vec<String> = {
174                    self.event_stores
175                        .read()
176                        .unwrap()
177                        .keys()
178                        .map(|x| x.to_owned())
179                        .collect() // safe unwrap, only error case is poisoning
180                };
181                // We do not want to be holding the event stores lock when
182                // submitting a ping or recording new events.
183                let has_events_events = stores_with_events.contains(&"events".to_owned());
184                let glean_restarted_stores = if has_events_events {
185                    stores_with_events
186                        .into_iter()
187                        .filter(|store| store != "events")
188                        .collect()
189                } else {
190                    stores_with_events
191                };
192                if !glean_restarted_stores.is_empty() {
193                    for store_name in glean_restarted_stores.iter() {
194                        CounterMetric::new(CommonMetricData {
195                            name: "execution_counter".into(),
196                            category: store_name.into(),
197                            send_in_pings: vec![INTERNAL_STORAGE.into()],
198                            lifetime: Lifetime::Ping,
199                            ..Default::default()
200                        })
201                        .add_sync(glean, 1);
202                    }
203                    let glean_restarted = CommonMetricData {
204                        name: "restarted".into(),
205                        category: "glean".into(),
206                        send_in_pings: glean_restarted_stores,
207                        lifetime: Lifetime::Ping,
208                        ..Default::default()
209                    };
210                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
211                    let mut extra: HashMap<String, String> =
212                        [("glean.startup.date".into(), startup)].into();
213                    if glean.with_timestamps() {
214                        let now = Utc::now();
215                        let precise_timestamp = now.timestamp_millis() as u64;
216                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
217                    }
218                    self.record(
219                        glean,
220                        &glean_restarted.into(),
221                        crate::get_timestamp_ms(),
222                        Some(extra),
223                    );
224                }
225                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
226            }
227            Err(err) => {
228                log::warn!("Error loading events from disk: {}", err);
229                false
230            }
231        }
232    }
233
234    fn load_events_from_disk(
235        &self,
236        glean: &Glean,
237        trim_data_to_registered_pings: bool,
238    ) -> Result<()> {
239        // NOTE: The order of locks here is important.
240        // In other code parts we might acquire the `file_lock` when we already have acquired
241        // a lock on `event_stores`.
242        // This is a potential lock-order-inversion.
243        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
244        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
245
246        for entry in fs::read_dir(&self.path)? {
247            let entry = entry?;
248            if entry.file_type()?.is_file() {
249                let store_name = entry.file_name().into_string()?;
250                log::info!("Loading events for {}", store_name);
251                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
252                    log::warn!("Trimming {}'s events", store_name);
253                    if let Err(err) = fs::remove_file(entry.path()) {
254                        match err.kind() {
255                            std::io::ErrorKind::NotFound => {
256                                // silently drop this error, the file was already non-existing
257                            }
258                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
259                        }
260                    }
261                    continue;
262                }
263                let file = BufReader::new(File::open(entry.path())?);
264                db.insert(
265                    store_name,
266                    file.lines()
267                        .map_while(Result::ok)
268                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
269                        .collect(),
270                );
271            }
272        }
273        Ok(())
274    }
275
276    /// Records an event in the desired stores.
277    ///
278    /// # Arguments
279    ///
280    /// * `glean` - The Glean instance.
281    /// * `meta` - The metadata about the event metric. Used to get the category,
282    ///   name and stores for the metric.
283    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
284    ///   monotonically increasing timer (this value is obtained on the
285    ///   platform-specific side).
286    /// * `extra` - Extra data values, mapping strings to strings.
287    ///
288    /// ## Returns
289    ///
290    /// `true` if a ping was submitted and should be uploaded.
291    /// `false` otherwise.
292    pub fn record(
293        &self,
294        glean: &Glean,
295        meta: &CommonMetricDataInternal,
296        timestamp: u64,
297        extra: Option<HashMap<String, String>>,
298    ) -> bool {
299        // If upload is disabled we don't want to record.
300        if !glean.is_upload_enabled() {
301            return false;
302        }
303
304        let mut submit_max_capacity_event_ping = false;
305        {
306            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
307            for store_name in meta.inner.send_in_pings.iter() {
308                if !glean.is_ping_enabled(store_name) {
309                    continue;
310                }
311
312                let store = db.entry(store_name.to_string()).or_default();
313                let execution_counter = CounterMetric::new(CommonMetricData {
314                    name: "execution_counter".into(),
315                    category: store_name.into(),
316                    send_in_pings: vec![INTERNAL_STORAGE.into()],
317                    lifetime: Lifetime::Ping,
318                    ..Default::default()
319                })
320                .get_value(glean, INTERNAL_STORAGE);
321                // Create StoredEvent object, and its JSON form for serialization on disk.
322                let event = StoredEvent {
323                    event: RecordedEvent {
324                        timestamp,
325                        category: meta.inner.category.to_string(),
326                        name: meta.inner.name.to_string(),
327                        extra: extra.clone(),
328                    },
329                    execution_counter,
330                };
331                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
332                store.push(event);
333                self.write_event_to_disk(store_name, &event_json);
334                if store_name == "events" && store.len() == glean.get_max_events() {
335                    submit_max_capacity_event_ping = true;
336                }
337            }
338        }
339        if submit_max_capacity_event_ping {
340            glean.submit_ping_by_name("events", Some("max_capacity"));
341            true
342        } else {
343            false
344        }
345    }
346
347    fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
348        // safe unwrap, only error case is poisoning
349        let mut map = self.event_store_files.write().unwrap();
350        let entry = map.entry(store_name.to_string());
351
352        match entry {
353            Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
354            Entry::Vacant(entry) => {
355                let file = OpenOptions::new()
356                    .create(true)
357                    .append(true)
358                    .open(self.path.join(store_name))?;
359                let file = Arc::new(file);
360                let entry = entry.insert(file);
361                Ok(Arc::clone(entry))
362            }
363        }
364    }
365
366    /// Writes an event to a single store on disk.
367    ///
368    /// # Arguments
369    ///
370    /// * `store_name` - The name of the store.
371    /// * `event_json` - The event content, as a single-line JSON-encoded string.
372    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
373        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
374
375        let write_res = (|| {
376            let mut file = self.get_event_store(store_name)?;
377            file.write_all(event_json.as_bytes())?;
378            file.write_all(b"\n")?;
379            file.flush()?;
380            Ok::<(), std::io::Error>(())
381        })();
382
383        if let Err(err) = write_res {
384            log::warn!("IO error writing event to store '{}': {}", store_name, err);
385        }
386    }
387
388    /// Normalizes the store in-place.
389    ///
390    /// A store may be in any order and contain any number of `glean.restarted` events,
391    /// whose values must be taken into account, along with `execution_counter` values,
392    /// to come up with the correct events with correct `timestamp` values,
393    /// on which we then sort.
394    ///
395    /// 1. Sort by `execution_counter` and `timestamp`,
396    ///    breaking ties so that `glean.restarted` comes first.
397    /// 2. Remove all initial and final `glean.restarted` events
398    /// 3. For each group of events that share a `execution_counter`,
399    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
400    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
401    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
402    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
403    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
404    /// 5. Sort by `timestamp`
405    ///
406    /// In the event that something goes awry, this will record an invalid_state on
407    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
408    /// on client clock weirdness.
409    ///
410    /// # Arguments
411    ///
412    /// * `glean` - Used to report errors
413    /// * `store_name` - The name of the store we're normalizing.
414    /// * `store` - The store we're to normalize.
415    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
416    fn normalize_store(
417        &self,
418        glean: &Glean,
419        store_name: &str,
420        store: &mut Vec<StoredEvent>,
421        glean_start_time: DateTime<FixedOffset>,
422    ) {
423        let is_glean_restarted =
424            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
425        let glean_restarted_meta = |store_name: &str| CommonMetricData {
426            name: "restarted".into(),
427            category: "glean".into(),
428            send_in_pings: vec![store_name.into()],
429            lifetime: Lifetime::Ping,
430            ..Default::default()
431        };
432        // Step 1
433        store.sort_by(|a, b| {
434            a.execution_counter
435                .cmp(&b.execution_counter)
436                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
437                .then_with(|| {
438                    if is_glean_restarted(&a.event) {
439                        Ordering::Less
440                    } else {
441                        Ordering::Greater
442                    }
443                })
444        });
445        // Step 2
446        // Find the index of the first and final non-`glean.restarted` events.
447        // Remove events before the first and after the final.
448        let final_event = match store
449            .iter()
450            .rposition(|event| !is_glean_restarted(&event.event))
451        {
452            Some(idx) => idx + 1,
453            _ => 0,
454        };
455        store.drain(final_event..);
456        let first_event = store
457            .iter()
458            .position(|event| !is_glean_restarted(&event.event))
459            .unwrap_or(store.len());
460        store.drain(..first_event);
461        if store.is_empty() {
462            // There was nothing but `glean.restarted` events. Job's done!
463            return;
464        }
465        // Step 3
466        // It is allowed that there might not be any `glean.restarted` event, nor
467        // `execution_counter` extra values. (This should always be the case for the
468        // "events" ping, for instance).
469        // Other inconsistencies are evidence of errors, and so are logged.
470        let mut cur_ec = 0;
471        // The offset within a group of events with the same `execution_counter`.
472        let mut intra_group_offset = store[0].event.timestamp;
473        // The offset between this group and ping_info.start_date.
474        let mut inter_group_offset = 0;
475        let mut highest_ts = 0;
476        for event in store.iter_mut() {
477            let execution_counter = event.execution_counter.take().unwrap_or(0);
478            if is_glean_restarted(&event.event) {
479                // We've entered the next "event group".
480                // We need a new epoch based on glean.startup.date - ping_info.start_date
481                cur_ec = execution_counter;
482                let glean_startup_date = event
483                    .event
484                    .extra
485                    .as_mut()
486                    .and_then(|extra| {
487                        extra.remove("glean.startup.date").and_then(|date_str| {
488                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
489                                .map_err(|_| {
490                                    record_error(
491                                        glean,
492                                        &glean_restarted_meta(store_name).into(),
493                                        ErrorType::InvalidState,
494                                        format!("Unparseable glean.startup.date '{}'", date_str),
495                                        None,
496                                    );
497                                })
498                                .ok()
499                        })
500                    })
501                    .unwrap_or(glean_start_time);
502                if event
503                    .event
504                    .extra
505                    .as_ref()
506                    .is_some_and(|extra| extra.is_empty())
507                {
508                    // Small optimization to save us sending empty dicts.
509                    event.event.extra = None;
510                }
511                let ping_start = DatetimeMetric::new(
512                    CommonMetricData {
513                        name: format!("{}#start", store_name),
514                        category: "".into(),
515                        send_in_pings: vec![INTERNAL_STORAGE.into()],
516                        lifetime: Lifetime::User,
517                        ..Default::default()
518                    },
519                    TimeUnit::Minute,
520                );
521                let ping_start = ping_start
522                    .get_value(glean, INTERNAL_STORAGE)
523                    .unwrap_or(glean_start_time);
524                let time_from_ping_start_to_glean_restarted =
525                    (glean_startup_date - ping_start).num_milliseconds();
526                intra_group_offset = event.event.timestamp;
527                inter_group_offset =
528                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
529                if inter_group_offset < highest_ts {
530                    record_error(
531                        glean,
532                        &glean_restarted_meta(store_name).into(),
533                        ErrorType::InvalidValue,
534                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
535                        None,
536                    );
537                    // The client's clock went backwards enough that this event group's
538                    // glean.restarted looks like it happened _before_ the final event of the previous group.
539                    // Or, it went ahead enough to overflow u64.
540                    // Adjust things so this group starts 1ms after the previous one.
541                    inter_group_offset = highest_ts + 1;
542                }
543            } else if cur_ec == 0 {
544                // bug 1811872 - cur_ec might need initialization.
545                cur_ec = execution_counter;
546            }
547            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
548            if execution_counter != cur_ec {
549                record_error(
550                    glean,
551                    &glean_restarted_meta(store_name).into(),
552                    ErrorType::InvalidState,
553                    format!(
554                        "Inconsistent execution counter {} (expected {})",
555                        execution_counter, cur_ec
556                    ),
557                    None,
558                );
559                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
560                cur_ec = execution_counter;
561            }
562
563            // event timestamp is a `u64`, but BigQuery uses `i64` (signed!) everywhere. Let's clamp the value to make
564            // sure we stay within bounds.
565            if event.event.timestamp > i64::MAX as u64 {
566                glean
567                    .additional_metrics
568                    .event_timestamp_clamped
569                    .add_sync(glean, 1);
570                log::warn!(
571                    "Calculated event timestamp was too high. Got: {}, max: {}",
572                    event.event.timestamp,
573                    i64::MAX,
574                );
575                event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
576            }
577
578            if highest_ts > event.event.timestamp {
579                // Even though we sorted everything, something in the
580                // execution_counter or glean.startup.date math went awry.
581                record_error(
582                    glean,
583                    &glean_restarted_meta(store_name).into(),
584                    ErrorType::InvalidState,
585                    format!(
586                        "Inconsistent previous highest timestamp {} (expected <= {})",
587                        highest_ts, event.event.timestamp
588                    ),
589                    None,
590                );
591                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
592            }
593            highest_ts = event.event.timestamp
594        }
595    }
596
597    /// Gets a snapshot of the stored event data as a JsonValue.
598    ///
599    /// # Arguments
600    ///
601    /// * `glean` - the Glean instance.
602    /// * `store_name` - The name of the desired store.
603    /// * `clear_store` - Whether to clear the store after snapshotting.
604    ///
605    /// # Returns
606    ///
607    /// A array of events, JSON encoded, if any. Otherwise `None`.
608    pub fn snapshot_as_json(
609        &self,
610        glean: &Glean,
611        store_name: &str,
612        clear_store: bool,
613    ) -> Option<JsonValue> {
614        let result = {
615            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
616            db.get_mut(&store_name.to_string()).and_then(|store| {
617                if !store.is_empty() {
618                    // Normalization happens in-place, so if we're not clearing,
619                    // operate on a clone.
620                    let mut clone;
621                    let store = if clear_store {
622                        store
623                    } else {
624                        clone = store.clone();
625                        &mut clone
626                    };
627                    // We may need to normalize event timestamps across multiple restarts.
628                    self.normalize_store(glean, store_name, store, glean.start_time());
629                    Some(json!(store))
630                } else {
631                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
632                    None
633                }
634            })
635        };
636
637        if clear_store {
638            self.event_stores
639                .write()
640                .unwrap() // safe unwrap, only error case is poisoning
641                .remove(&store_name.to_string());
642            self.event_store_files
643                .write()
644                .unwrap() // safe unwrap, only error case is poisoning
645                .remove(&store_name.to_string());
646
647            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
648            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
649                match err.kind() {
650                    std::io::ErrorKind::NotFound => {
651                        // silently drop this error, the file was already non-existing
652                    }
653                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
654                }
655            }
656        }
657
658        result
659    }
660
661    /// Clears all stored events, both in memory and on-disk.
662    pub fn clear_all(&self) -> Result<()> {
663        // safe unwrap, only error case is poisoning
664        self.event_stores.write().unwrap().clear();
665        self.event_store_files.write().unwrap().clear();
666
667        // safe unwrap, only error case is poisoning
668        let _lock = self.file_lock.lock().unwrap();
669        std::fs::remove_dir_all(&self.path)?;
670        create_dir_all(&self.path)?;
671
672        Ok(())
673    }
674
675    /// **Test-only API (exported for FFI purposes).**
676    ///
677    /// Gets the vector of currently stored events for the given event metric in
678    /// the given store.
679    ///
680    /// This doesn't clear the stored value.
681    pub fn test_get_value<'a>(
682        &'a self,
683        meta: &'a CommonMetricDataInternal,
684        store_name: &str,
685    ) -> Option<Vec<RecordedEvent>> {
686        let value: Vec<RecordedEvent> = self
687            .event_stores
688            .read()
689            .unwrap() // safe unwrap, only error case is poisoning
690            .get(&store_name.to_string())
691            .into_iter()
692            .flatten()
693            .map(|stored_event| stored_event.event.clone())
694            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
695            .collect();
696        if !value.is_empty() {
697            Some(value)
698        } else {
699            None
700        }
701    }
702}
703
704#[cfg(test)]
705mod test {
706    use super::*;
707    use crate::test_get_num_recorded_errors;
708    use crate::tests::new_glean;
709    use chrono::{TimeZone, Timelike};
710
711    #[test]
712    fn handle_truncated_events_on_disk() {
713        let (glean, t) = new_glean(None);
714
715        {
716            let db = EventDatabase::new(t.path()).unwrap();
717            db.write_event_to_disk("events", "{\"timestamp\": 500");
718            db.write_event_to_disk("events", "{\"timestamp\"");
719            db.write_event_to_disk(
720                "events",
721                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
722            );
723        }
724
725        {
726            let db = EventDatabase::new(t.path()).unwrap();
727            db.load_events_from_disk(&glean, false).unwrap();
728            let events = &db.event_stores.read().unwrap()["events"];
729            assert_eq!(1, events.len());
730        }
731    }
732
733    #[test]
734    fn stable_serialization() {
735        let event_empty = RecordedEvent {
736            timestamp: 2,
737            category: "cat".to_string(),
738            name: "name".to_string(),
739            extra: None,
740        };
741
742        let mut data = HashMap::new();
743        data.insert("a key".to_string(), "a value".to_string());
744        let event_data = RecordedEvent {
745            timestamp: 2,
746            category: "cat".to_string(),
747            name: "name".to_string(),
748            extra: Some(data),
749        };
750
751        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
752        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
753
754        assert_eq!(
755            StoredEvent {
756                event: event_empty,
757                execution_counter: None
758            },
759            serde_json::from_str(&event_empty_json).unwrap()
760        );
761        assert_eq!(
762            StoredEvent {
763                event: event_data,
764                execution_counter: None
765            },
766            serde_json::from_str(&event_data_json).unwrap()
767        );
768    }
769
770    #[test]
771    fn deserialize_existing_data() {
772        let event_empty_json = r#"
773{
774  "timestamp": 2,
775  "category": "cat",
776  "name": "name"
777}
778            "#;
779
780        let event_data_json = r#"
781{
782  "timestamp": 2,
783  "category": "cat",
784  "name": "name",
785  "extra": {
786    "a key": "a value"
787  }
788}
789        "#;
790
791        let event_empty = RecordedEvent {
792            timestamp: 2,
793            category: "cat".to_string(),
794            name: "name".to_string(),
795            extra: None,
796        };
797
798        let mut data = HashMap::new();
799        data.insert("a key".to_string(), "a value".to_string());
800        let event_data = RecordedEvent {
801            timestamp: 2,
802            category: "cat".to_string(),
803            name: "name".to_string(),
804            extra: Some(data),
805        };
806
807        assert_eq!(
808            StoredEvent {
809                event: event_empty,
810                execution_counter: None
811            },
812            serde_json::from_str(event_empty_json).unwrap()
813        );
814        assert_eq!(
815            StoredEvent {
816                event: event_data,
817                execution_counter: None
818            },
819            serde_json::from_str(event_data_json).unwrap()
820        );
821    }
822
823    #[test]
824    fn doesnt_record_when_upload_is_disabled() {
825        let (mut glean, dir) = new_glean(None);
826        let db = EventDatabase::new(dir.path()).unwrap();
827
828        let test_storage = "store1";
829        let test_category = "category";
830        let test_name = "name";
831        let test_timestamp = 2;
832        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
833        let event_data = RecordedEvent {
834            timestamp: test_timestamp,
835            category: test_category.to_string(),
836            name: test_name.to_string(),
837            extra: None,
838        };
839
840        // Upload is not yet disabled,
841        // so let's check that everything is getting recorded as expected.
842        db.record(&glean, &test_meta, 2, None);
843        {
844            let event_stores = db.event_stores.read().unwrap();
845            assert_eq!(
846                &StoredEvent {
847                    event: event_data,
848                    execution_counter: None
849                },
850                &event_stores.get(test_storage).unwrap()[0]
851            );
852            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
853        }
854
855        glean.set_upload_enabled(false);
856
857        // Now that upload is disabled, let's check nothing is recorded.
858        db.record(&glean, &test_meta, 2, None);
859        {
860            let event_stores = db.event_stores.read().unwrap();
861            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
862        }
863    }
864
865    #[test]
866    fn normalize_store_of_glean_restarted() {
867        // Make sure stores empty of anything but glean.restarted events normalize without issue.
868        let (glean, _dir) = new_glean(None);
869
870        let store_name = "store-name";
871        let glean_restarted = StoredEvent {
872            event: RecordedEvent {
873                timestamp: 2,
874                category: "glean".into(),
875                name: "restarted".into(),
876                extra: None,
877            },
878            execution_counter: None,
879        };
880        let mut store = vec![glean_restarted.clone()];
881        let glean_start_time = glean.start_time();
882
883        glean
884            .event_storage()
885            .normalize_store(&glean, store_name, &mut store, glean_start_time);
886        assert!(store.is_empty());
887
888        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
889        glean
890            .event_storage()
891            .normalize_store(&glean, store_name, &mut store, glean_start_time);
892        assert!(store.is_empty());
893
894        let mut store = vec![
895            glean_restarted.clone(),
896            glean_restarted.clone(),
897            glean_restarted,
898        ];
899        glean
900            .event_storage()
901            .normalize_store(&glean, store_name, &mut store, glean_start_time);
902        assert!(store.is_empty());
903    }
904
905    #[test]
906    fn normalize_store_of_glean_restarted_on_both_ends() {
907        // Make sure stores with non-glean.restarted events don't get drained too far.
908        let (glean, _dir) = new_glean(None);
909
910        let store_name = "store-name";
911        let glean_restarted = StoredEvent {
912            event: RecordedEvent {
913                timestamp: 2,
914                category: "glean".into(),
915                name: "restarted".into(),
916                extra: None,
917            },
918            execution_counter: None,
919        };
920        let not_glean_restarted = StoredEvent {
921            event: RecordedEvent {
922                timestamp: 20,
923                category: "category".into(),
924                name: "name".into(),
925                extra: None,
926            },
927            execution_counter: None,
928        };
929        let mut store = vec![
930            glean_restarted.clone(),
931            not_glean_restarted.clone(),
932            glean_restarted,
933        ];
934        let glean_start_time = glean.start_time();
935
936        glean
937            .event_storage()
938            .normalize_store(&glean, store_name, &mut store, glean_start_time);
939        assert_eq!(1, store.len());
940        assert_eq!(
941            StoredEvent {
942                event: RecordedEvent {
943                    timestamp: 0,
944                    ..not_glean_restarted.event
945                },
946                execution_counter: None
947            },
948            store[0]
949        );
950    }
951
952    #[test]
953    fn normalize_store_single_run_timestamp_math() {
954        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
955        // ensure the timestamp math works.
956        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
957        let (glean, _dir) = new_glean(None);
958
959        let store_name = "store-name";
960        let glean_restarted = StoredEvent {
961            event: RecordedEvent {
962                timestamp: 2,
963                category: "glean".into(),
964                name: "restarted".into(),
965                extra: None,
966            },
967            execution_counter: None,
968        };
969        let timestamps = [20, 40, 200];
970        let not_glean_restarted = StoredEvent {
971            event: RecordedEvent {
972                timestamp: timestamps[0],
973                category: "category".into(),
974                name: "name".into(),
975                extra: None,
976            },
977            execution_counter: None,
978        };
979        let mut store = vec![
980            glean_restarted.clone(),
981            not_glean_restarted.clone(),
982            StoredEvent {
983                event: RecordedEvent {
984                    timestamp: timestamps[1],
985                    ..not_glean_restarted.event.clone()
986                },
987                execution_counter: None,
988            },
989            StoredEvent {
990                event: RecordedEvent {
991                    timestamp: timestamps[2],
992                    ..not_glean_restarted.event.clone()
993                },
994                execution_counter: None,
995            },
996            glean_restarted,
997        ];
998
999        glean
1000            .event_storage()
1001            .normalize_store(&glean, store_name, &mut store, glean.start_time());
1002        assert_eq!(3, store.len());
1003        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1004            assert_eq!(
1005                &StoredEvent {
1006                    event: RecordedEvent {
1007                        timestamp: timestamp - timestamps[0],
1008                        ..not_glean_restarted.clone().event
1009                    },
1010                    execution_counter: None
1011                },
1012                event
1013            );
1014        }
1015    }
1016
1017    #[test]
1018    fn normalize_store_multi_run_timestamp_math() {
1019        // With multiple runs of events (separated by `glean.restarted`),
1020        // ensure the timestamp math works.
1021        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
1022        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
1023        let (glean, _dir) = new_glean(None);
1024
1025        let store_name = "store-name";
1026        let glean_restarted = StoredEvent {
1027            event: RecordedEvent {
1028                category: "glean".into(),
1029                name: "restarted".into(),
1030                ..Default::default()
1031            },
1032            execution_counter: None,
1033        };
1034        let not_glean_restarted = StoredEvent {
1035            event: RecordedEvent {
1036                category: "category".into(),
1037                name: "name".into(),
1038                ..Default::default()
1039            },
1040            execution_counter: None,
1041        };
1042
1043        // This scenario represents a run of three events followed by an hour between runs,
1044        // followed by one final event.
1045        let timestamps = [20, 40, 200, 12];
1046        let ecs = [0, 1];
1047        let some_hour = 16;
1048        let startup_date = FixedOffset::east_opt(0)
1049            .unwrap()
1050            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) // TimeUnit::Minute -- don't put seconds
1051            .unwrap();
1052        let glean_start_time = startup_date.with_hour(some_hour - 1);
1053        let restarted_ts = 2;
1054        let mut store = vec![
1055            StoredEvent {
1056                event: RecordedEvent {
1057                    timestamp: timestamps[0],
1058                    ..not_glean_restarted.event.clone()
1059                },
1060                execution_counter: Some(ecs[0]),
1061            },
1062            StoredEvent {
1063                event: RecordedEvent {
1064                    timestamp: timestamps[1],
1065                    ..not_glean_restarted.event.clone()
1066                },
1067                execution_counter: Some(ecs[0]),
1068            },
1069            StoredEvent {
1070                event: RecordedEvent {
1071                    timestamp: timestamps[2],
1072                    ..not_glean_restarted.event.clone()
1073                },
1074                execution_counter: Some(ecs[0]),
1075            },
1076            StoredEvent {
1077                event: RecordedEvent {
1078                    extra: Some(
1079                        [(
1080                            "glean.startup.date".into(),
1081                            get_iso_time_string(startup_date, TimeUnit::Minute),
1082                        )]
1083                        .into(),
1084                    ),
1085                    timestamp: restarted_ts,
1086                    ..glean_restarted.event.clone()
1087                },
1088                execution_counter: Some(ecs[1]),
1089            },
1090            StoredEvent {
1091                event: RecordedEvent {
1092                    timestamp: timestamps[3],
1093                    ..not_glean_restarted.event.clone()
1094                },
1095                execution_counter: Some(ecs[1]),
1096            },
1097        ];
1098
1099        glean.event_storage().normalize_store(
1100            &glean,
1101            store_name,
1102            &mut store,
1103            glean_start_time.unwrap(),
1104        );
1105        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1106
1107        // Let's check the first three.
1108        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1109            assert_eq!(
1110                StoredEvent {
1111                    event: RecordedEvent {
1112                        timestamp: timestamp - timestamps[0],
1113                        ..not_glean_restarted.event.clone()
1114                    },
1115                    execution_counter: None,
1116                },
1117                event
1118            );
1119        }
1120        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1121        let hour_in_millis = 3600000;
1122        assert_eq!(
1123            store[3],
1124            StoredEvent {
1125                event: RecordedEvent {
1126                    timestamp: hour_in_millis,
1127                    ..glean_restarted.event
1128                },
1129                execution_counter: None,
1130            }
1131        );
1132        // The fifth should have a timestamp based on the new origin.
1133        assert_eq!(
1134            store[4],
1135            StoredEvent {
1136                event: RecordedEvent {
1137                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1138                    ..not_glean_restarted.event
1139                },
1140                execution_counter: None,
1141            }
1142        );
1143    }
1144
1145    #[test]
1146    fn normalize_store_multi_run_client_clocks() {
1147        // With multiple runs of events (separated by `glean.restarted`),
1148        // ensure the timestamp math works. Even when the client clock goes backwards.
1149        let (glean, _dir) = new_glean(None);
1150
1151        let store_name = "store-name";
1152        let glean_restarted = StoredEvent {
1153            event: RecordedEvent {
1154                category: "glean".into(),
1155                name: "restarted".into(),
1156                ..Default::default()
1157            },
1158            execution_counter: None,
1159        };
1160        let not_glean_restarted = StoredEvent {
1161            event: RecordedEvent {
1162                category: "category".into(),
1163                name: "name".into(),
1164                ..Default::default()
1165            },
1166            execution_counter: None,
1167        };
1168
1169        // This scenario represents a run of two events followed by negative one hours between runs,
1170        // followed by two more events.
1171        let timestamps = [20, 40, 12, 200];
1172        let ecs = [0, 1];
1173        let some_hour = 10;
1174        let startup_date = FixedOffset::east_opt(0)
1175            .unwrap()
1176            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) // TimeUnit::Minute -- don't put seconds
1177            .unwrap();
1178        let glean_start_time = startup_date.with_hour(some_hour + 1);
1179        let restarted_ts = 2;
1180        let mut store = vec![
1181            StoredEvent {
1182                event: RecordedEvent {
1183                    timestamp: timestamps[0],
1184                    ..not_glean_restarted.event.clone()
1185                },
1186                execution_counter: Some(ecs[0]),
1187            },
1188            StoredEvent {
1189                event: RecordedEvent {
1190                    timestamp: timestamps[1],
1191                    ..not_glean_restarted.event.clone()
1192                },
1193                execution_counter: Some(ecs[0]),
1194            },
1195            StoredEvent {
1196                event: RecordedEvent {
1197                    extra: Some(
1198                        [(
1199                            "glean.startup.date".into(),
1200                            get_iso_time_string(startup_date, TimeUnit::Minute),
1201                        )]
1202                        .into(),
1203                    ),
1204                    timestamp: restarted_ts,
1205                    ..glean_restarted.event.clone()
1206                },
1207                execution_counter: Some(ecs[1]),
1208            },
1209            StoredEvent {
1210                event: RecordedEvent {
1211                    timestamp: timestamps[2],
1212                    ..not_glean_restarted.event.clone()
1213                },
1214                execution_counter: Some(ecs[1]),
1215            },
1216            StoredEvent {
1217                event: RecordedEvent {
1218                    timestamp: timestamps[3],
1219                    ..not_glean_restarted.event.clone()
1220                },
1221                execution_counter: Some(ecs[1]),
1222            },
1223        ];
1224
1225        glean.event_storage().normalize_store(
1226            &glean,
1227            store_name,
1228            &mut store,
1229            glean_start_time.unwrap(),
1230        );
1231        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1232
1233        // Let's check the first two.
1234        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1235            assert_eq!(
1236                StoredEvent {
1237                    event: RecordedEvent {
1238                        timestamp: timestamp - timestamps[0],
1239                        ..not_glean_restarted.event.clone()
1240                    },
1241                    execution_counter: None,
1242                },
1243                event
1244            );
1245        }
1246        // The third should be a glean.restarted. Its timestamp should be
1247        // one larger than the largest timestamp seen so far (because that's
1248        // how we ensure monotonic timestamps when client clocks go backwards).
1249        assert_eq!(
1250            store[2],
1251            StoredEvent {
1252                event: RecordedEvent {
1253                    timestamp: store[1].event.timestamp + 1,
1254                    ..glean_restarted.event
1255                },
1256                execution_counter: None,
1257            }
1258        );
1259        // The fifth should have a timestamp based on the new origin.
1260        assert_eq!(
1261            store[3],
1262            StoredEvent {
1263                event: RecordedEvent {
1264                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1265                    ..not_glean_restarted.event
1266                },
1267                execution_counter: None,
1268            }
1269        );
1270        // And we should have an InvalidValue on glean.restarted to show for it.
1271        assert_eq!(
1272            Ok(1),
1273            test_get_num_recorded_errors(
1274                &glean,
1275                &CommonMetricData {
1276                    name: "restarted".into(),
1277                    category: "glean".into(),
1278                    send_in_pings: vec![store_name.into()],
1279                    lifetime: Lifetime::Ping,
1280                    ..Default::default()
1281                }
1282                .into(),
1283                ErrorType::InvalidValue
1284            )
1285        );
1286    }
1287
1288    #[test]
1289    fn normalize_store_non_zero_ec() {
1290        // After the first run, execution_counter will likely be non-zero.
1291        // Ensure normalizing a store that begins with non-zero ec works.
1292        let (glean, _dir) = new_glean(None);
1293
1294        let store_name = "store-name";
1295        let glean_restarted = StoredEvent {
1296            event: RecordedEvent {
1297                timestamp: 2,
1298                category: "glean".into(),
1299                name: "restarted".into(),
1300                extra: None,
1301            },
1302            execution_counter: Some(2),
1303        };
1304        let not_glean_restarted = StoredEvent {
1305            event: RecordedEvent {
1306                timestamp: 20,
1307                category: "category".into(),
1308                name: "name".into(),
1309                extra: None,
1310            },
1311            execution_counter: Some(2),
1312        };
1313        let glean_restarted_2 = StoredEvent {
1314            event: RecordedEvent {
1315                timestamp: 2,
1316                category: "glean".into(),
1317                name: "restarted".into(),
1318                extra: None,
1319            },
1320            execution_counter: Some(3),
1321        };
1322        let mut store = vec![
1323            glean_restarted,
1324            not_glean_restarted.clone(),
1325            glean_restarted_2,
1326        ];
1327        let glean_start_time = glean.start_time();
1328
1329        glean
1330            .event_storage()
1331            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1332
1333        assert_eq!(1, store.len());
1334        assert_eq!(
1335            StoredEvent {
1336                event: RecordedEvent {
1337                    timestamp: 0,
1338                    ..not_glean_restarted.event
1339                },
1340                execution_counter: None
1341            },
1342            store[0]
1343        );
1344        // And we should have no InvalidState errors on glean.restarted.
1345        assert!(test_get_num_recorded_errors(
1346            &glean,
1347            &CommonMetricData {
1348                name: "restarted".into(),
1349                category: "glean".into(),
1350                send_in_pings: vec![store_name.into()],
1351                lifetime: Lifetime::Ping,
1352                ..Default::default()
1353            }
1354            .into(),
1355            ErrorType::InvalidState
1356        )
1357        .is_err());
1358        // (and, just because we're here, double-check there are no InvalidValue either).
1359        assert!(test_get_num_recorded_errors(
1360            &glean,
1361            &CommonMetricData {
1362                name: "restarted".into(),
1363                category: "glean".into(),
1364                send_in_pings: vec![store_name.into()],
1365                lifetime: Lifetime::Ping,
1366                ..Default::default()
1367            }
1368            .into(),
1369            ErrorType::InvalidValue
1370        )
1371        .is_err());
1372    }
1373
1374    #[test]
1375    fn normalize_store_clamps_timestamp() {
1376        let (glean, _dir) = new_glean(None);
1377
1378        let store_name = "store-name";
1379        let event = RecordedEvent {
1380            category: "category".into(),
1381            name: "name".into(),
1382            ..Default::default()
1383        };
1384
1385        let timestamps = [
1386            0,
1387            (i64::MAX / 2) as u64,
1388            i64::MAX as _,
1389            (i64::MAX as u64) + 1,
1390        ];
1391        let mut store = timestamps
1392            .into_iter()
1393            .map(|timestamp| StoredEvent {
1394                event: RecordedEvent {
1395                    timestamp,
1396                    ..event.clone()
1397                },
1398                execution_counter: None,
1399            })
1400            .collect();
1401
1402        let glean_start_time = glean.start_time();
1403        glean
1404            .event_storage()
1405            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1406        assert_eq!(4, store.len());
1407
1408        assert_eq!(0, store[0].event.timestamp);
1409        assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1410        assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1411        assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1412
1413        let error_count = glean
1414            .additional_metrics
1415            .event_timestamp_clamped
1416            .get_value(&glean, "health");
1417        assert_eq!(Some(1), error_count);
1418    }
1419}