Skip to main content

glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::session::{EventSessionContext, SessionMetadata};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33/// Represents the recorded data for a single event.
34#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37    /// The timestamp of when the event was recorded.
38    ///
39    /// This allows to order events from a single process run.
40    pub timestamp: u64,
41
42    /// The event's category.
43    ///
44    /// This is defined by users in the metrics file.
45    pub category: String,
46
47    /// The event's name.
48    ///
49    /// This is defined by users in the metrics file.
50    pub name: String,
51
52    /// A map of all extra data values.
53    ///
54    /// The set of allowed extra keys is defined by users in the metrics file.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub extra: Option<HashMap<String, String>>,
57
58    /// Session metadata attached to this event.
59    ///
60    /// `None` for out-of-session events and events recorded before
61    /// sessions were introduced (backwards compatibility).
62    #[serde(skip_serializing_if = "Option::is_none")]
63    #[serde(default)]
64    pub session: Option<SessionMetadata>,
65}
66
67/// Represents the stored data for a single event.
68#[derive(
69    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
70)]
71struct StoredEvent {
72    #[serde(flatten)]
73    event: RecordedEvent,
74
75    /// The monotonically-increasing execution counter.
76    ///
77    /// Included to allow sending of events across Glean restarts (bug 1716725).
78    /// Is i32 because it is stored in a CounterMetric.
79    #[serde(default)]
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub execution_counter: Option<i32>,
82}
83
84/// This struct handles the in-memory and on-disk storage logic for events.
85///
86/// So that the data survives shutting down of the application, events are stored
87/// in an append-only file on disk, in addition to the store in memory. Each line
88/// of this file records a single event in JSON, exactly as it will be sent in the
89/// ping. There is one file per store.
90///
91/// When restarting the application, these on-disk files are checked, and if any are
92/// found, they are loaded, and a `glean.restarted` event is added before any
93/// further events are collected. This is because the timestamps for these events
94/// may have come from a previous boot of the device, and therefore will not be
95/// compatible with any newly-collected events.
96///
97/// Normalizing all these timestamps happens on serialization for submission (see
98/// `serialize_as_json`) where the client time between restarts is calculated using
99/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
100/// the `execution_counter` stored in events on disk.
101///
102/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
103/// The `glean.restarted` event is, though.
104/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
105#[derive(Debug)]
106pub struct EventDatabase {
107    /// Path to directory of on-disk event files
108    pub path: PathBuf,
109    /// The in-memory list of events
110    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
111    event_store_files: RwLock<HashMap<String, Arc<File>>>,
112    /// A lock to be held when doing operations on the filesystem
113    file_lock: Mutex<()>,
114}
115
116impl MallocSizeOf for EventDatabase {
117    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
118        let mut n = 0;
119        n += self.event_stores.read().unwrap().size_of(ops);
120
121        let map = self.event_store_files.read().unwrap();
122        for store_name in map.keys() {
123            n += store_name.size_of(ops);
124            // `File` doesn't allocate, but `Arc` puts it on the heap.
125            n += mem::size_of::<File>();
126        }
127        n
128    }
129}
130
131impl EventDatabase {
132    /// Creates a new event database.
133    ///
134    /// # Arguments
135    ///
136    /// * `data_path` - The directory to store events in. A new directory
137    /// * `events` - will be created inside of this directory.
138    pub fn new(data_path: &Path) -> Result<Self> {
139        let path = data_path.join("events");
140        create_dir_all(&path)?;
141
142        Ok(Self {
143            path,
144            event_stores: RwLock::new(HashMap::new()),
145            event_store_files: RwLock::new(HashMap::new()),
146            file_lock: Mutex::new(()),
147        })
148    }
149
150    /// Initializes events storage after Glean is fully initialized and ready to send pings.
151    ///
152    /// This must be called once on application startup, e.g. from
153    /// [Glean.initialize], but after we are ready to send pings, since this
154    /// could potentially collect and send the "events" ping.
155    ///
156    /// If there are any events queued on disk, it loads them into memory so
157    /// that the memory and disk representations are in sync.
158    ///
159    /// If event records for the "events" ping are present, they are assembled into
160    /// an "events" ping which is submitted immediately with reason "startup".
161    ///
162    /// If event records for custom pings are present, we increment the custom pings'
163    /// stores' `execution_counter` and record a `glean.restarted`
164    /// event with the current client clock in its `glean.startup.date` extra.
165    ///
166    /// # Arguments
167    ///
168    /// * `glean` - The Glean instance.
169    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
170    ///   any events not belonging to pings previously registered via `register_ping_type`.
171    ///
172    /// # Returns
173    ///
174    /// Whether the "events" ping was submitted.
175    pub fn flush_pending_events_on_startup(
176        &self,
177        glean: &Glean,
178        trim_data_to_registered_pings: bool,
179    ) -> bool {
180        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
181            Ok(_) => {
182                let stores_with_events: Vec<String> = {
183                    self.event_stores
184                        .read()
185                        .unwrap()
186                        .keys()
187                        .map(|x| x.to_owned())
188                        .collect() // safe unwrap, only error case is poisoning
189                };
190                // We do not want to be holding the event stores lock when
191                // submitting a ping or recording new events.
192                let has_events_events = stores_with_events.contains(&"events".to_owned());
193                let glean_restarted_stores = if has_events_events {
194                    stores_with_events
195                        .into_iter()
196                        .filter(|store| store != "events")
197                        .collect()
198                } else {
199                    stores_with_events
200                };
201                if !glean_restarted_stores.is_empty() {
202                    for store_name in glean_restarted_stores.iter() {
203                        CounterMetric::new(CommonMetricData {
204                            name: "execution_counter".into(),
205                            category: store_name.into(),
206                            send_in_pings: vec![INTERNAL_STORAGE.into()],
207                            lifetime: Lifetime::Ping,
208                            ..Default::default()
209                        })
210                        .add_sync(glean, 1);
211                    }
212                    let glean_restarted = CommonMetricData {
213                        name: "restarted".into(),
214                        category: "glean".into(),
215                        send_in_pings: glean_restarted_stores,
216                        lifetime: Lifetime::Ping,
217                        ..Default::default()
218                    };
219                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
220                    let mut extra: HashMap<String, String> =
221                        [("glean.startup.date".into(), startup)].into();
222                    if glean.with_timestamps() {
223                        let now = Utc::now();
224                        let precise_timestamp = now.timestamp_millis() as u64;
225                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
226                    }
227                    self.record(
228                        glean,
229                        &glean_restarted.into(),
230                        crate::get_timestamp_ms(),
231                        Some(extra),
232                        EventSessionContext::OutOfSession,
233                    );
234                }
235                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
236            }
237            Err(err) => {
238                log::warn!("Error loading events from disk: {}", err);
239                false
240            }
241        }
242    }
243
244    fn load_events_from_disk(
245        &self,
246        glean: &Glean,
247        trim_data_to_registered_pings: bool,
248    ) -> Result<()> {
249        // NOTE: The order of locks here is important.
250        // In other code parts we might acquire the `file_lock` when we already have acquired
251        // a lock on `event_stores`.
252        // This is a potential lock-order-inversion.
253        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
254        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
255
256        for entry in fs::read_dir(&self.path)? {
257            let entry = entry?;
258            if entry.file_type()?.is_file() {
259                let store_name = entry.file_name().into_string()?;
260                log::info!("Loading events for {}", store_name);
261                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
262                    log::warn!("Trimming {}'s events", store_name);
263                    if let Err(err) = fs::remove_file(entry.path()) {
264                        match err.kind() {
265                            std::io::ErrorKind::NotFound => {
266                                // silently drop this error, the file was already non-existing
267                            }
268                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
269                        }
270                    }
271                    continue;
272                }
273                let file = BufReader::new(File::open(entry.path())?);
274                db.insert(
275                    store_name,
276                    file.lines()
277                        .map_while(Result::ok)
278                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
279                        .collect(),
280                );
281            }
282        }
283        Ok(())
284    }
285
286    /// Records an event in the desired stores.
287    ///
288    /// # Arguments
289    ///
290    /// * `glean` - The Glean instance.
291    /// * `meta` - The metadata about the event metric. Used to get the category,
292    ///   name and stores for the metric.
293    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
294    ///   monotonically increasing timer (this value is obtained on the
295    ///   platform-specific side).
296    /// * `extra` - Extra data values, mapping strings to strings.
297    /// * `ctx` - The event's session context, conveying both whether session
298    ///   metadata should be attached and what that metadata is.
299    ///
300    /// ## Returns
301    ///
302    /// `true` if a ping was submitted and should be uploaded.
303    /// `false` otherwise.
304    pub fn record(
305        &self,
306        glean: &Glean,
307        meta: &CommonMetricDataInternal,
308        timestamp: u64,
309        extra: Option<HashMap<String, String>>,
310        ctx: EventSessionContext,
311    ) -> bool {
312        // If upload is disabled we don't want to record.
313        if !glean.is_upload_enabled() {
314            return false;
315        }
316
317        // Convert the session context to the optional metadata stored on the event.
318        let session = match ctx {
319            EventSessionContext::OutOfSession => None,
320            EventSessionContext::InSession(session_meta) => Some(session_meta),
321        };
322
323        let mut submit_max_capacity_event_ping = false;
324        {
325            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
326            for store_name in meta.inner.send_in_pings.iter() {
327                if !glean.is_ping_enabled(store_name) {
328                    continue;
329                }
330
331                let store = db.entry(store_name.to_string()).or_default();
332                let execution_counter = CounterMetric::new(CommonMetricData {
333                    name: "execution_counter".into(),
334                    category: store_name.into(),
335                    send_in_pings: vec![INTERNAL_STORAGE.into()],
336                    lifetime: Lifetime::Ping,
337                    ..Default::default()
338                })
339                .get_value(glean, INTERNAL_STORAGE);
340                // Create StoredEvent object, and its JSON form for serialization on disk.
341                let event = StoredEvent {
342                    event: RecordedEvent {
343                        timestamp,
344                        category: meta.inner.category.to_string(),
345                        name: meta.inner.name.to_string(),
346                        extra: extra.clone(),
347                        session: session.clone(),
348                    },
349                    execution_counter,
350                };
351                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
352                store.push(event);
353                self.write_event_to_disk(store_name, &event_json);
354                if store_name == "events" && store.len() == glean.get_max_events() {
355                    submit_max_capacity_event_ping = true;
356                }
357            }
358        }
359        if submit_max_capacity_event_ping {
360            glean.submit_ping_by_name("events", Some("max_capacity"));
361            true
362        } else {
363            false
364        }
365    }
366
367    fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
368        // safe unwrap, only error case is poisoning
369        let mut map = self.event_store_files.write().unwrap();
370        let entry = map.entry(store_name.to_string());
371
372        match entry {
373            Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
374            Entry::Vacant(entry) => {
375                let file = OpenOptions::new()
376                    .create(true)
377                    .append(true)
378                    .open(self.path.join(store_name))?;
379                let file = Arc::new(file);
380                let entry = entry.insert(file);
381                Ok(Arc::clone(entry))
382            }
383        }
384    }
385
386    /// Writes an event to a single store on disk.
387    ///
388    /// # Arguments
389    ///
390    /// * `store_name` - The name of the store.
391    /// * `event_json` - The event content, as a single-line JSON-encoded string.
392    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
393        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
394
395        let write_res = (|| {
396            let mut file = self.get_event_store(store_name)?;
397            file.write_all(event_json.as_bytes())?;
398            file.write_all(b"\n")?;
399            file.flush()?;
400            Ok::<(), std::io::Error>(())
401        })();
402
403        if let Err(err) = write_res {
404            log::warn!("IO error writing event to store '{}': {}", store_name, err);
405        }
406    }
407
408    /// Normalizes the store in-place.
409    ///
410    /// A store may be in any order and contain any number of `glean.restarted` events,
411    /// whose values must be taken into account, along with `execution_counter` values,
412    /// to come up with the correct events with correct `timestamp` values,
413    /// on which we then sort.
414    ///
415    /// 1. Sort by `execution_counter` and `timestamp`,
416    ///    breaking ties so that `glean.restarted` comes first.
417    /// 2. Remove all initial and final `glean.restarted` events
418    /// 3. For each group of events that share a `execution_counter`,
419    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
420    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
421    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
422    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
423    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
424    /// 5. Sort by `timestamp`
425    ///
426    /// In the event that something goes awry, this will record an invalid_state on
427    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
428    /// on client clock weirdness.
429    ///
430    /// # Arguments
431    ///
432    /// * `glean` - Used to report errors
433    /// * `store_name` - The name of the store we're normalizing.
434    /// * `store` - The store we're to normalize.
435    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
436    fn normalize_store(
437        &self,
438        glean: &Glean,
439        store_name: &str,
440        store: &mut Vec<StoredEvent>,
441        glean_start_time: DateTime<FixedOffset>,
442    ) {
443        let is_glean_restarted =
444            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
445        let glean_restarted_meta = |store_name: &str| CommonMetricData {
446            name: "restarted".into(),
447            category: "glean".into(),
448            send_in_pings: vec![store_name.into()],
449            lifetime: Lifetime::Ping,
450            ..Default::default()
451        };
452        // Step 1
453        store.sort_by(|a, b| {
454            a.execution_counter
455                .cmp(&b.execution_counter)
456                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
457                .then_with(|| {
458                    if is_glean_restarted(&a.event) {
459                        Ordering::Less
460                    } else {
461                        Ordering::Greater
462                    }
463                })
464        });
465        // Step 2
466        // Find the index of the first and final non-`glean.restarted` events.
467        // Remove events before the first and after the final.
468        let final_event = match store
469            .iter()
470            .rposition(|event| !is_glean_restarted(&event.event))
471        {
472            Some(idx) => idx + 1,
473            _ => 0,
474        };
475        store.drain(final_event..);
476        let first_event = store
477            .iter()
478            .position(|event| !is_glean_restarted(&event.event))
479            .unwrap_or(store.len());
480        store.drain(..first_event);
481        if store.is_empty() {
482            // There was nothing but `glean.restarted` events. Job's done!
483            return;
484        }
485        // Step 3
486        // It is allowed that there might not be any `glean.restarted` event, nor
487        // `execution_counter` extra values. (This should always be the case for the
488        // "events" ping, for instance).
489        // Other inconsistencies are evidence of errors, and so are logged.
490        let mut cur_ec = 0;
491        // The offset within a group of events with the same `execution_counter`.
492        let mut intra_group_offset = store[0].event.timestamp;
493        // The offset between this group and ping_info.start_date.
494        let mut inter_group_offset = 0;
495        let mut highest_ts = 0;
496        for event in store.iter_mut() {
497            let execution_counter = event.execution_counter.take().unwrap_or(0);
498            if is_glean_restarted(&event.event) {
499                // We've entered the next "event group".
500                // We need a new epoch based on glean.startup.date - ping_info.start_date
501                cur_ec = execution_counter;
502                let glean_startup_date = event
503                    .event
504                    .extra
505                    .as_mut()
506                    .and_then(|extra| {
507                        extra.remove("glean.startup.date").and_then(|date_str| {
508                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
509                                .map_err(|_| {
510                                    record_error(
511                                        glean,
512                                        &glean_restarted_meta(store_name).into(),
513                                        ErrorType::InvalidState,
514                                        format!("Unparseable glean.startup.date '{}'", date_str),
515                                        None,
516                                    );
517                                })
518                                .ok()
519                        })
520                    })
521                    .unwrap_or(glean_start_time);
522                if event
523                    .event
524                    .extra
525                    .as_ref()
526                    .is_some_and(|extra| extra.is_empty())
527                {
528                    // Small optimization to save us sending empty dicts.
529                    event.event.extra = None;
530                }
531                let ping_start = DatetimeMetric::new(
532                    CommonMetricData {
533                        name: format!("{}#start", store_name),
534                        category: "".into(),
535                        send_in_pings: vec![INTERNAL_STORAGE.into()],
536                        lifetime: Lifetime::User,
537                        ..Default::default()
538                    },
539                    TimeUnit::Minute,
540                );
541                let ping_start = ping_start
542                    .get_value(glean, INTERNAL_STORAGE)
543                    .unwrap_or(glean_start_time);
544                let time_from_ping_start_to_glean_restarted =
545                    (glean_startup_date - ping_start).num_milliseconds();
546                intra_group_offset = event.event.timestamp;
547                inter_group_offset =
548                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
549                if inter_group_offset < highest_ts {
550                    record_error(
551                        glean,
552                        &glean_restarted_meta(store_name).into(),
553                        ErrorType::InvalidValue,
554                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
555                        None,
556                    );
557                    // The client's clock went backwards enough that this event group's
558                    // glean.restarted looks like it happened _before_ the final event of the previous group.
559                    // Or, it went ahead enough to overflow u64.
560                    // Adjust things so this group starts 1ms after the previous one.
561                    inter_group_offset = highest_ts + 1;
562                }
563            } else if cur_ec == 0 {
564                // bug 1811872 - cur_ec might need initialization.
565                cur_ec = execution_counter;
566            }
567            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
568            if execution_counter != cur_ec {
569                record_error(
570                    glean,
571                    &glean_restarted_meta(store_name).into(),
572                    ErrorType::InvalidState,
573                    format!(
574                        "Inconsistent execution counter {} (expected {})",
575                        execution_counter, cur_ec
576                    ),
577                    None,
578                );
579                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
580                cur_ec = execution_counter;
581            }
582
583            // event timestamp is a `u64`, but BigQuery uses `i64` (signed!) everywhere. Let's clamp the value to make
584            // sure we stay within bounds.
585            if event.event.timestamp > i64::MAX as u64 {
586                glean
587                    .additional_metrics
588                    .event_timestamp_clamped
589                    .add_sync(glean, 1);
590                log::warn!(
591                    "Calculated event timestamp was too high. Got: {}, max: {}",
592                    event.event.timestamp,
593                    i64::MAX,
594                );
595                event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
596            }
597
598            if highest_ts > event.event.timestamp {
599                // Even though we sorted everything, something in the
600                // execution_counter or glean.startup.date math went awry.
601                record_error(
602                    glean,
603                    &glean_restarted_meta(store_name).into(),
604                    ErrorType::InvalidState,
605                    format!(
606                        "Inconsistent previous highest timestamp {} (expected <= {})",
607                        highest_ts, event.event.timestamp
608                    ),
609                    None,
610                );
611                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
612            }
613            highest_ts = event.event.timestamp
614        }
615    }
616
617    /// Gets a snapshot of the stored event data as a JsonValue.
618    ///
619    /// # Arguments
620    ///
621    /// * `glean` - the Glean instance.
622    /// * `store_name` - The name of the desired store.
623    /// * `clear_store` - Whether to clear the store after snapshotting.
624    ///
625    /// # Returns
626    ///
627    /// A array of events, JSON encoded, if any. Otherwise `None`.
628    pub fn snapshot_as_json(
629        &self,
630        glean: &Glean,
631        store_name: &str,
632        clear_store: bool,
633    ) -> Option<JsonValue> {
634        let result = {
635            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
636            db.get_mut(&store_name.to_string()).and_then(|store| {
637                if !store.is_empty() {
638                    // Normalization happens in-place, so if we're not clearing,
639                    // operate on a clone.
640                    let mut clone;
641                    let store = if clear_store {
642                        store
643                    } else {
644                        clone = store.clone();
645                        &mut clone
646                    };
647                    // We may need to normalize event timestamps across multiple restarts.
648                    self.normalize_store(glean, store_name, store, glean.start_time());
649                    Some(json!(store))
650                } else {
651                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
652                    None
653                }
654            })
655        };
656
657        if clear_store {
658            self.event_stores
659                .write()
660                .unwrap() // safe unwrap, only error case is poisoning
661                .remove(&store_name.to_string());
662            self.event_store_files
663                .write()
664                .unwrap() // safe unwrap, only error case is poisoning
665                .remove(&store_name.to_string());
666
667            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
668            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
669                match err.kind() {
670                    std::io::ErrorKind::NotFound => {
671                        // silently drop this error, the file was already non-existing
672                    }
673                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
674                }
675            }
676        }
677
678        result
679    }
680
681    /// Clears all stored events, both in memory and on-disk.
682    pub fn clear_all(&self) -> Result<()> {
683        // safe unwrap, only error case is poisoning
684        self.event_stores.write().unwrap().clear();
685        self.event_store_files.write().unwrap().clear();
686
687        // safe unwrap, only error case is poisoning
688        let _lock = self.file_lock.lock().unwrap();
689        std::fs::remove_dir_all(&self.path)?;
690        create_dir_all(&self.path)?;
691
692        Ok(())
693    }
694
695    /// **Test-only API (exported for FFI purposes).**
696    ///
697    /// Gets the vector of currently stored events for the given event metric in
698    /// the given store.
699    ///
700    /// This doesn't clear the stored value.
701    pub fn test_get_value<'a>(
702        &'a self,
703        meta: &'a CommonMetricDataInternal,
704        store_name: &str,
705    ) -> Option<Vec<RecordedEvent>> {
706        let value: Vec<RecordedEvent> = self
707            .event_stores
708            .read()
709            .unwrap() // safe unwrap, only error case is poisoning
710            .get(&store_name.to_string())
711            .into_iter()
712            .flatten()
713            .map(|stored_event| stored_event.event.clone())
714            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
715            .collect();
716        if !value.is_empty() {
717            Some(value)
718        } else {
719            None
720        }
721    }
722}
723
724#[cfg(test)]
725mod test {
726    use super::*;
727    use crate::metrics::RemoteSettingsConfig;
728    use crate::test_get_num_recorded_errors;
729    use crate::tests::new_glean;
730    use chrono::{TimeZone, Timelike};
731
732    #[test]
733    fn handle_truncated_events_on_disk() {
734        let (glean, t) = new_glean(None);
735
736        {
737            let db = EventDatabase::new(t.path()).unwrap();
738            db.write_event_to_disk("events", "{\"timestamp\": 500");
739            db.write_event_to_disk("events", "{\"timestamp\"");
740            db.write_event_to_disk(
741                "events",
742                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
743            );
744        }
745
746        {
747            let db = EventDatabase::new(t.path()).unwrap();
748            db.load_events_from_disk(&glean, false).unwrap();
749            let events = &db.event_stores.read().unwrap()["events"];
750            assert_eq!(1, events.len());
751        }
752    }
753
754    #[test]
755    fn stable_serialization() {
756        let event_empty = RecordedEvent {
757            timestamp: 2,
758            category: "cat".to_string(),
759            name: "name".to_string(),
760            extra: None,
761            session: None,
762        };
763
764        let mut data = HashMap::new();
765        data.insert("a key".to_string(), "a value".to_string());
766        let event_data = RecordedEvent {
767            timestamp: 2,
768            category: "cat".to_string(),
769            name: "name".to_string(),
770            extra: Some(data),
771            session: None,
772        };
773
774        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
775        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
776
777        assert_eq!(
778            StoredEvent {
779                event: event_empty,
780                execution_counter: None
781            },
782            serde_json::from_str(&event_empty_json).unwrap()
783        );
784        assert_eq!(
785            StoredEvent {
786                event: event_data,
787                execution_counter: None
788            },
789            serde_json::from_str(&event_data_json).unwrap()
790        );
791    }
792
793    #[test]
794    fn deserialize_existing_data() {
795        let event_empty_json = r#"
796{
797  "timestamp": 2,
798  "category": "cat",
799  "name": "name"
800}
801            "#;
802
803        let event_data_json = r#"
804{
805  "timestamp": 2,
806  "category": "cat",
807  "name": "name",
808  "extra": {
809    "a key": "a value"
810  }
811}
812        "#;
813
814        let event_empty = RecordedEvent {
815            timestamp: 2,
816            category: "cat".to_string(),
817            name: "name".to_string(),
818            extra: None,
819            session: None,
820        };
821
822        let mut data = HashMap::new();
823        data.insert("a key".to_string(), "a value".to_string());
824        let event_data = RecordedEvent {
825            timestamp: 2,
826            category: "cat".to_string(),
827            name: "name".to_string(),
828            extra: Some(data),
829            session: None,
830        };
831
832        assert_eq!(
833            StoredEvent {
834                event: event_empty,
835                execution_counter: None
836            },
837            serde_json::from_str(event_empty_json).unwrap()
838        );
839        assert_eq!(
840            StoredEvent {
841                event: event_data,
842                execution_counter: None
843            },
844            serde_json::from_str(event_data_json).unwrap()
845        );
846    }
847
848    #[test]
849    fn doesnt_record_when_upload_is_disabled() {
850        let (mut glean, dir) = new_glean(None);
851        let db = EventDatabase::new(dir.path()).unwrap();
852
853        let test_storage = "store1";
854        let test_category = "category";
855        let test_name = "name";
856        let test_timestamp = 2;
857        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
858        let event_data = RecordedEvent {
859            timestamp: test_timestamp,
860            category: test_category.to_string(),
861            name: test_name.to_string(),
862            extra: None,
863            session: None,
864        };
865
866        // Upload is not yet disabled,
867        // so let's check that everything is getting recorded as expected.
868        db.record(
869            &glean,
870            &test_meta,
871            2,
872            None,
873            EventSessionContext::OutOfSession,
874        );
875        {
876            let event_stores = db.event_stores.read().unwrap();
877            assert_eq!(
878                &StoredEvent {
879                    event: event_data,
880                    execution_counter: None
881                },
882                &event_stores.get(test_storage).unwrap()[0]
883            );
884            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
885        }
886
887        glean.set_upload_enabled(false);
888
889        // Now that upload is disabled, let's check nothing is recorded.
890        db.record(
891            &glean,
892            &test_meta,
893            2,
894            None,
895            EventSessionContext::OutOfSession,
896        );
897        {
898            let event_stores = db.event_stores.read().unwrap();
899            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
900        }
901    }
902
903    #[test]
904    fn normalize_store_of_glean_restarted() {
905        // Make sure stores empty of anything but glean.restarted events normalize without issue.
906        let (glean, _dir) = new_glean(None);
907
908        let store_name = "store-name";
909        let glean_restarted = StoredEvent {
910            event: RecordedEvent {
911                timestamp: 2,
912                category: "glean".into(),
913                name: "restarted".into(),
914                extra: None,
915                session: None,
916            },
917            execution_counter: None,
918        };
919        let mut store = vec![glean_restarted.clone()];
920        let glean_start_time = glean.start_time();
921
922        glean
923            .event_storage()
924            .normalize_store(&glean, store_name, &mut store, glean_start_time);
925        assert!(store.is_empty());
926
927        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
928        glean
929            .event_storage()
930            .normalize_store(&glean, store_name, &mut store, glean_start_time);
931        assert!(store.is_empty());
932
933        let mut store = vec![
934            glean_restarted.clone(),
935            glean_restarted.clone(),
936            glean_restarted,
937        ];
938        glean
939            .event_storage()
940            .normalize_store(&glean, store_name, &mut store, glean_start_time);
941        assert!(store.is_empty());
942    }
943
944    #[test]
945    fn normalize_store_of_glean_restarted_on_both_ends() {
946        // Make sure stores with non-glean.restarted events don't get drained too far.
947        let (glean, _dir) = new_glean(None);
948
949        let store_name = "store-name";
950        let glean_restarted = StoredEvent {
951            event: RecordedEvent {
952                timestamp: 2,
953                category: "glean".into(),
954                name: "restarted".into(),
955                extra: None,
956                session: None,
957            },
958            execution_counter: None,
959        };
960        let not_glean_restarted = StoredEvent {
961            event: RecordedEvent {
962                timestamp: 20,
963                category: "category".into(),
964                name: "name".into(),
965                extra: None,
966                session: None,
967            },
968            execution_counter: None,
969        };
970        let mut store = vec![
971            glean_restarted.clone(),
972            not_glean_restarted.clone(),
973            glean_restarted,
974        ];
975        let glean_start_time = glean.start_time();
976
977        glean
978            .event_storage()
979            .normalize_store(&glean, store_name, &mut store, glean_start_time);
980        assert_eq!(1, store.len());
981        assert_eq!(
982            StoredEvent {
983                event: RecordedEvent {
984                    timestamp: 0,
985                    ..not_glean_restarted.event
986                },
987                execution_counter: None
988            },
989            store[0]
990        );
991    }
992
993    #[test]
994    fn normalize_store_single_run_timestamp_math() {
995        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
996        // ensure the timestamp math works.
997        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
998        let (glean, _dir) = new_glean(None);
999
1000        let store_name = "store-name";
1001        let glean_restarted = StoredEvent {
1002            event: RecordedEvent {
1003                timestamp: 2,
1004                category: "glean".into(),
1005                name: "restarted".into(),
1006                extra: None,
1007                session: None,
1008            },
1009            execution_counter: None,
1010        };
1011        let timestamps = [20, 40, 200];
1012        let not_glean_restarted = StoredEvent {
1013            event: RecordedEvent {
1014                timestamp: timestamps[0],
1015                category: "category".into(),
1016                name: "name".into(),
1017                extra: None,
1018                session: None,
1019            },
1020            execution_counter: None,
1021        };
1022        let mut store = vec![
1023            glean_restarted.clone(),
1024            not_glean_restarted.clone(),
1025            StoredEvent {
1026                event: RecordedEvent {
1027                    timestamp: timestamps[1],
1028                    ..not_glean_restarted.event.clone()
1029                },
1030                execution_counter: None,
1031            },
1032            StoredEvent {
1033                event: RecordedEvent {
1034                    timestamp: timestamps[2],
1035                    ..not_glean_restarted.event.clone()
1036                },
1037                execution_counter: None,
1038            },
1039            glean_restarted,
1040        ];
1041
1042        glean
1043            .event_storage()
1044            .normalize_store(&glean, store_name, &mut store, glean.start_time());
1045        assert_eq!(3, store.len());
1046        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1047            assert_eq!(
1048                &StoredEvent {
1049                    event: RecordedEvent {
1050                        timestamp: timestamp - timestamps[0],
1051                        ..not_glean_restarted.clone().event
1052                    },
1053                    execution_counter: None
1054                },
1055                event
1056            );
1057        }
1058    }
1059
1060    #[test]
1061    fn normalize_store_multi_run_timestamp_math() {
1062        // With multiple runs of events (separated by `glean.restarted`),
1063        // ensure the timestamp math works.
1064        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
1065        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
1066        let (glean, _dir) = new_glean(None);
1067
1068        let store_name = "store-name";
1069        let glean_restarted = StoredEvent {
1070            event: RecordedEvent {
1071                category: "glean".into(),
1072                name: "restarted".into(),
1073                ..Default::default()
1074            },
1075            execution_counter: None,
1076        };
1077        let not_glean_restarted = StoredEvent {
1078            event: RecordedEvent {
1079                category: "category".into(),
1080                name: "name".into(),
1081                ..Default::default()
1082            },
1083            execution_counter: None,
1084        };
1085
1086        // This scenario represents a run of three events followed by an hour between runs,
1087        // followed by one final event.
1088        let timestamps = [20, 40, 200, 12];
1089        let ecs = [0, 1];
1090        let some_hour = 16;
1091        let startup_date = FixedOffset::east_opt(0)
1092            .unwrap()
1093            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) // TimeUnit::Minute -- don't put seconds
1094            .unwrap();
1095        let glean_start_time = startup_date.with_hour(some_hour - 1);
1096        let restarted_ts = 2;
1097        let mut store = vec![
1098            StoredEvent {
1099                event: RecordedEvent {
1100                    timestamp: timestamps[0],
1101                    ..not_glean_restarted.event.clone()
1102                },
1103                execution_counter: Some(ecs[0]),
1104            },
1105            StoredEvent {
1106                event: RecordedEvent {
1107                    timestamp: timestamps[1],
1108                    ..not_glean_restarted.event.clone()
1109                },
1110                execution_counter: Some(ecs[0]),
1111            },
1112            StoredEvent {
1113                event: RecordedEvent {
1114                    timestamp: timestamps[2],
1115                    ..not_glean_restarted.event.clone()
1116                },
1117                execution_counter: Some(ecs[0]),
1118            },
1119            StoredEvent {
1120                event: RecordedEvent {
1121                    extra: Some(
1122                        [(
1123                            "glean.startup.date".into(),
1124                            get_iso_time_string(startup_date, TimeUnit::Minute),
1125                        )]
1126                        .into(),
1127                    ),
1128                    timestamp: restarted_ts,
1129                    ..glean_restarted.event.clone()
1130                },
1131                execution_counter: Some(ecs[1]),
1132            },
1133            StoredEvent {
1134                event: RecordedEvent {
1135                    timestamp: timestamps[3],
1136                    ..not_glean_restarted.event.clone()
1137                },
1138                execution_counter: Some(ecs[1]),
1139            },
1140        ];
1141
1142        glean.event_storage().normalize_store(
1143            &glean,
1144            store_name,
1145            &mut store,
1146            glean_start_time.unwrap(),
1147        );
1148        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1149
1150        // Let's check the first three.
1151        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1152            assert_eq!(
1153                StoredEvent {
1154                    event: RecordedEvent {
1155                        timestamp: timestamp - timestamps[0],
1156                        ..not_glean_restarted.event.clone()
1157                    },
1158                    execution_counter: None,
1159                },
1160                event
1161            );
1162        }
1163        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1164        let hour_in_millis = 3600000;
1165        assert_eq!(
1166            store[3],
1167            StoredEvent {
1168                event: RecordedEvent {
1169                    timestamp: hour_in_millis,
1170                    ..glean_restarted.event
1171                },
1172                execution_counter: None,
1173            }
1174        );
1175        // The fifth should have a timestamp based on the new origin.
1176        assert_eq!(
1177            store[4],
1178            StoredEvent {
1179                event: RecordedEvent {
1180                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1181                    ..not_glean_restarted.event
1182                },
1183                execution_counter: None,
1184            }
1185        );
1186    }
1187
1188    #[test]
1189    fn normalize_store_multi_run_client_clocks() {
1190        // With multiple runs of events (separated by `glean.restarted`),
1191        // ensure the timestamp math works. Even when the client clock goes backwards.
1192        let (glean, _dir) = new_glean(None);
1193
1194        let store_name = "store-name";
1195        let glean_restarted = StoredEvent {
1196            event: RecordedEvent {
1197                category: "glean".into(),
1198                name: "restarted".into(),
1199                ..Default::default()
1200            },
1201            execution_counter: None,
1202        };
1203        let not_glean_restarted = StoredEvent {
1204            event: RecordedEvent {
1205                category: "category".into(),
1206                name: "name".into(),
1207                ..Default::default()
1208            },
1209            execution_counter: None,
1210        };
1211
1212        // This scenario represents a run of two events followed by negative one hours between runs,
1213        // followed by two more events.
1214        let timestamps = [20, 40, 12, 200];
1215        let ecs = [0, 1];
1216        let some_hour = 10;
1217        let startup_date = FixedOffset::east_opt(0)
1218            .unwrap()
1219            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) // TimeUnit::Minute -- don't put seconds
1220            .unwrap();
1221        let glean_start_time = startup_date.with_hour(some_hour + 1);
1222        let restarted_ts = 2;
1223        let mut store = vec![
1224            StoredEvent {
1225                event: RecordedEvent {
1226                    timestamp: timestamps[0],
1227                    ..not_glean_restarted.event.clone()
1228                },
1229                execution_counter: Some(ecs[0]),
1230            },
1231            StoredEvent {
1232                event: RecordedEvent {
1233                    timestamp: timestamps[1],
1234                    ..not_glean_restarted.event.clone()
1235                },
1236                execution_counter: Some(ecs[0]),
1237            },
1238            StoredEvent {
1239                event: RecordedEvent {
1240                    extra: Some(
1241                        [(
1242                            "glean.startup.date".into(),
1243                            get_iso_time_string(startup_date, TimeUnit::Minute),
1244                        )]
1245                        .into(),
1246                    ),
1247                    timestamp: restarted_ts,
1248                    ..glean_restarted.event.clone()
1249                },
1250                execution_counter: Some(ecs[1]),
1251            },
1252            StoredEvent {
1253                event: RecordedEvent {
1254                    timestamp: timestamps[2],
1255                    ..not_glean_restarted.event.clone()
1256                },
1257                execution_counter: Some(ecs[1]),
1258            },
1259            StoredEvent {
1260                event: RecordedEvent {
1261                    timestamp: timestamps[3],
1262                    ..not_glean_restarted.event.clone()
1263                },
1264                execution_counter: Some(ecs[1]),
1265            },
1266        ];
1267
1268        glean.event_storage().normalize_store(
1269            &glean,
1270            store_name,
1271            &mut store,
1272            glean_start_time.unwrap(),
1273        );
1274        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1275
1276        // Let's check the first two.
1277        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1278            assert_eq!(
1279                StoredEvent {
1280                    event: RecordedEvent {
1281                        timestamp: timestamp - timestamps[0],
1282                        ..not_glean_restarted.event.clone()
1283                    },
1284                    execution_counter: None,
1285                },
1286                event
1287            );
1288        }
1289        // The third should be a glean.restarted. Its timestamp should be
1290        // one larger than the largest timestamp seen so far (because that's
1291        // how we ensure monotonic timestamps when client clocks go backwards).
1292        assert_eq!(
1293            store[2],
1294            StoredEvent {
1295                event: RecordedEvent {
1296                    timestamp: store[1].event.timestamp + 1,
1297                    ..glean_restarted.event
1298                },
1299                execution_counter: None,
1300            }
1301        );
1302        // The fifth should have a timestamp based on the new origin.
1303        assert_eq!(
1304            store[3],
1305            StoredEvent {
1306                event: RecordedEvent {
1307                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1308                    ..not_glean_restarted.event
1309                },
1310                execution_counter: None,
1311            }
1312        );
1313        // And we should have an InvalidValue on glean.restarted to show for it.
1314        assert_eq!(
1315            Ok(1),
1316            test_get_num_recorded_errors(
1317                &glean,
1318                &CommonMetricData {
1319                    name: "restarted".into(),
1320                    category: "glean".into(),
1321                    send_in_pings: vec![store_name.into()],
1322                    lifetime: Lifetime::Ping,
1323                    ..Default::default()
1324                }
1325                .into(),
1326                ErrorType::InvalidValue
1327            )
1328        );
1329    }
1330
1331    #[test]
1332    fn normalize_store_non_zero_ec() {
1333        // After the first run, execution_counter will likely be non-zero.
1334        // Ensure normalizing a store that begins with non-zero ec works.
1335        let (glean, _dir) = new_glean(None);
1336
1337        let store_name = "store-name";
1338        let glean_restarted = StoredEvent {
1339            event: RecordedEvent {
1340                timestamp: 2,
1341                category: "glean".into(),
1342                name: "restarted".into(),
1343                extra: None,
1344                session: None,
1345            },
1346            execution_counter: Some(2),
1347        };
1348        let not_glean_restarted = StoredEvent {
1349            event: RecordedEvent {
1350                timestamp: 20,
1351                category: "category".into(),
1352                name: "name".into(),
1353                extra: None,
1354                session: None,
1355            },
1356            execution_counter: Some(2),
1357        };
1358        let glean_restarted_2 = StoredEvent {
1359            event: RecordedEvent {
1360                timestamp: 2,
1361                category: "glean".into(),
1362                name: "restarted".into(),
1363                extra: None,
1364                session: None,
1365            },
1366            execution_counter: Some(3),
1367        };
1368        let mut store = vec![
1369            glean_restarted,
1370            not_glean_restarted.clone(),
1371            glean_restarted_2,
1372        ];
1373        let glean_start_time = glean.start_time();
1374
1375        glean
1376            .event_storage()
1377            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1378
1379        assert_eq!(1, store.len());
1380        assert_eq!(
1381            StoredEvent {
1382                event: RecordedEvent {
1383                    timestamp: 0,
1384                    ..not_glean_restarted.event
1385                },
1386                execution_counter: None
1387            },
1388            store[0]
1389        );
1390        // And we should have no InvalidState errors on glean.restarted.
1391        assert!(test_get_num_recorded_errors(
1392            &glean,
1393            &CommonMetricData {
1394                name: "restarted".into(),
1395                category: "glean".into(),
1396                send_in_pings: vec![store_name.into()],
1397                lifetime: Lifetime::Ping,
1398                ..Default::default()
1399            }
1400            .into(),
1401            ErrorType::InvalidState
1402        )
1403        .is_err());
1404        // (and, just because we're here, double-check there are no InvalidValue either).
1405        assert!(test_get_num_recorded_errors(
1406            &glean,
1407            &CommonMetricData {
1408                name: "restarted".into(),
1409                category: "glean".into(),
1410                send_in_pings: vec![store_name.into()],
1411                lifetime: Lifetime::Ping,
1412                ..Default::default()
1413            }
1414            .into(),
1415            ErrorType::InvalidValue
1416        )
1417        .is_err());
1418    }
1419
1420    #[test]
1421    fn normalize_store_clamps_timestamp() {
1422        let (glean, _dir) = new_glean(None);
1423
1424        let store_name = "store-name";
1425        let event = RecordedEvent {
1426            category: "category".into(),
1427            name: "name".into(),
1428            ..Default::default()
1429        };
1430
1431        let timestamps = [
1432            0,
1433            (i64::MAX / 2) as u64,
1434            i64::MAX as _,
1435            (i64::MAX as u64) + 1,
1436        ];
1437        let mut store = timestamps
1438            .into_iter()
1439            .map(|timestamp| StoredEvent {
1440                event: RecordedEvent {
1441                    timestamp,
1442                    ..event.clone()
1443                },
1444                execution_counter: None,
1445            })
1446            .collect();
1447
1448        let glean_start_time = glean.start_time();
1449        glean
1450            .event_storage()
1451            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1452        assert_eq!(4, store.len());
1453
1454        assert_eq!(0, store[0].event.timestamp);
1455        assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1456        assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1457        assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1458    }
1459
1460    #[test]
1461    fn normalize_store_clamps_timestamp_metric_enabled() {
1462        let (glean, _dir) = new_glean(None);
1463
1464        let mut cfg = RemoteSettingsConfig::default();
1465        cfg.metrics_enabled
1466            .insert("glean.error.event_timestamp_clamped".to_string(), true);
1467        glean.apply_server_knobs_config(cfg);
1468
1469        let store_name = "store-name";
1470        let event = RecordedEvent {
1471            category: "category".into(),
1472            name: "name".into(),
1473            ..Default::default()
1474        };
1475
1476        let timestamps = [0, (i64::MAX as u64) + 1];
1477        let mut store = timestamps
1478            .into_iter()
1479            .map(|timestamp| StoredEvent {
1480                event: RecordedEvent {
1481                    timestamp,
1482                    ..event.clone()
1483                },
1484                execution_counter: None,
1485            })
1486            .collect();
1487
1488        let glean_start_time = glean.start_time();
1489        glean
1490            .event_storage()
1491            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1492        assert_eq!(2, store.len());
1493
1494        assert_eq!(0, store[0].event.timestamp);
1495        assert_eq!((i64::MAX as u64), store[1].event.timestamp);
1496
1497        let error_count = glean
1498            .additional_metrics
1499            .event_timestamp_clamped
1500            .get_value(&glean, "health");
1501        assert_eq!(Some(1), error_count);
1502    }
1503}