glean_core/event_database/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::coverage::record_coverage;
25use crate::error_recording::{record_error, ErrorType};
26use crate::metrics::{DatetimeMetric, TimeUnit};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33/// Represents the recorded data for a single event.
34#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37    /// The timestamp of when the event was recorded.
38    ///
39    /// This allows to order events from a single process run.
40    pub timestamp: u64,
41
42    /// The event's category.
43    ///
44    /// This is defined by users in the metrics file.
45    pub category: String,
46
47    /// The event's name.
48    ///
49    /// This is defined by users in the metrics file.
50    pub name: String,
51
52    /// A map of all extra data values.
53    ///
54    /// The set of allowed extra keys is defined by users in the metrics file.
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub extra: Option<HashMap<String, String>>,
57}
58
59/// Represents the stored data for a single event.
60#[derive(
61    Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
62)]
63struct StoredEvent {
64    #[serde(flatten)]
65    event: RecordedEvent,
66
67    /// The monotonically-increasing execution counter.
68    ///
69    /// Included to allow sending of events across Glean restarts (bug 1716725).
70    /// Is i32 because it is stored in a CounterMetric.
71    #[serde(default)]
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub execution_counter: Option<i32>,
74}
75
76/// This struct handles the in-memory and on-disk storage logic for events.
77///
78/// So that the data survives shutting down of the application, events are stored
79/// in an append-only file on disk, in addition to the store in memory. Each line
80/// of this file records a single event in JSON, exactly as it will be sent in the
81/// ping. There is one file per store.
82///
83/// When restarting the application, these on-disk files are checked, and if any are
84/// found, they are loaded, and a `glean.restarted` event is added before any
85/// further events are collected. This is because the timestamps for these events
86/// may have come from a previous boot of the device, and therefore will not be
87/// compatible with any newly-collected events.
88///
89/// Normalizing all these timestamps happens on serialization for submission (see
90/// `serialize_as_json`) where the client time between restarts is calculated using
91/// data stored in the `glean.startup.date` extra of the `glean.restarted` event, plus
92/// the `execution_counter` stored in events on disk.
93///
94/// Neither `execution_counter` nor `glean.startup.date` is submitted in pings.
95/// The `glean.restarted` event is, though.
96/// (See [bug 1716725](https://bugzilla.mozilla.org/show_bug.cgi?id=1716725).)
97#[derive(Debug)]
98pub struct EventDatabase {
99    /// Path to directory of on-disk event files
100    pub path: PathBuf,
101    /// The in-memory list of events
102    event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
103    event_store_files: RwLock<HashMap<String, Arc<File>>>,
104    /// A lock to be held when doing operations on the filesystem
105    file_lock: Mutex<()>,
106}
107
108impl MallocSizeOf for EventDatabase {
109    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
110        let mut n = 0;
111        n += self.event_stores.read().unwrap().size_of(ops);
112
113        let map = self.event_store_files.read().unwrap();
114        for store_name in map.keys() {
115            n += store_name.size_of(ops);
116            // `File` doesn't allocate, but `Arc` puts it on the heap.
117            n += mem::size_of::<File>();
118        }
119        n
120    }
121}
122
123impl EventDatabase {
124    /// Creates a new event database.
125    ///
126    /// # Arguments
127    ///
128    /// * `data_path` - The directory to store events in. A new directory
129    /// * `events` - will be created inside of this directory.
130    pub fn new(data_path: &Path) -> Result<Self> {
131        let path = data_path.join("events");
132        create_dir_all(&path)?;
133
134        Ok(Self {
135            path,
136            event_stores: RwLock::new(HashMap::new()),
137            event_store_files: RwLock::new(HashMap::new()),
138            file_lock: Mutex::new(()),
139        })
140    }
141
142    /// Initializes events storage after Glean is fully initialized and ready to send pings.
143    ///
144    /// This must be called once on application startup, e.g. from
145    /// [Glean.initialize], but after we are ready to send pings, since this
146    /// could potentially collect and send the "events" ping.
147    ///
148    /// If there are any events queued on disk, it loads them into memory so
149    /// that the memory and disk representations are in sync.
150    ///
151    /// If event records for the "events" ping are present, they are assembled into
152    /// an "events" ping which is submitted immediately with reason "startup".
153    ///
154    /// If event records for custom pings are present, we increment the custom pings'
155    /// stores' `execution_counter` and record a `glean.restarted`
156    /// event with the current client clock in its `glean.startup.date` extra.
157    ///
158    /// # Arguments
159    ///
160    /// * `glean` - The Glean instance.
161    /// * `trim_data_to_registered_pings` - Whether we should trim the event storage of
162    ///   any events not belonging to pings previously registered via `register_ping_type`.
163    ///
164    /// # Returns
165    ///
166    /// Whether the "events" ping was submitted.
167    pub fn flush_pending_events_on_startup(
168        &self,
169        glean: &Glean,
170        trim_data_to_registered_pings: bool,
171    ) -> bool {
172        match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
173            Ok(_) => {
174                let stores_with_events: Vec<String> = {
175                    self.event_stores
176                        .read()
177                        .unwrap()
178                        .keys()
179                        .map(|x| x.to_owned())
180                        .collect() // safe unwrap, only error case is poisoning
181                };
182                // We do not want to be holding the event stores lock when
183                // submitting a ping or recording new events.
184                let has_events_events = stores_with_events.contains(&"events".to_owned());
185                let glean_restarted_stores = if has_events_events {
186                    stores_with_events
187                        .into_iter()
188                        .filter(|store| store != "events")
189                        .collect()
190                } else {
191                    stores_with_events
192                };
193                if !glean_restarted_stores.is_empty() {
194                    for store_name in glean_restarted_stores.iter() {
195                        CounterMetric::new(CommonMetricData {
196                            name: "execution_counter".into(),
197                            category: store_name.into(),
198                            send_in_pings: vec![INTERNAL_STORAGE.into()],
199                            lifetime: Lifetime::Ping,
200                            ..Default::default()
201                        })
202                        .add_sync(glean, 1);
203                    }
204                    let glean_restarted = CommonMetricData {
205                        name: "restarted".into(),
206                        category: "glean".into(),
207                        send_in_pings: glean_restarted_stores,
208                        lifetime: Lifetime::Ping,
209                        ..Default::default()
210                    };
211                    let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
212                    let mut extra: HashMap<String, String> =
213                        [("glean.startup.date".into(), startup)].into();
214                    if glean.with_timestamps() {
215                        let now = Utc::now();
216                        let precise_timestamp = now.timestamp_millis() as u64;
217                        extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
218                    }
219                    self.record(
220                        glean,
221                        &glean_restarted.into(),
222                        crate::get_timestamp_ms(),
223                        Some(extra),
224                    );
225                }
226                has_events_events && glean.submit_ping_by_name("events", Some("startup"))
227            }
228            Err(err) => {
229                log::warn!("Error loading events from disk: {}", err);
230                false
231            }
232        }
233    }
234
235    fn load_events_from_disk(
236        &self,
237        glean: &Glean,
238        trim_data_to_registered_pings: bool,
239    ) -> Result<()> {
240        // NOTE: The order of locks here is important.
241        // In other code parts we might acquire the `file_lock` when we already have acquired
242        // a lock on `event_stores`.
243        // This is a potential lock-order-inversion.
244        let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
245        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
246
247        for entry in fs::read_dir(&self.path)? {
248            let entry = entry?;
249            if entry.file_type()?.is_file() {
250                let store_name = entry.file_name().into_string()?;
251                log::info!("Loading events for {}", store_name);
252                if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
253                    log::warn!("Trimming {}'s events", store_name);
254                    if let Err(err) = fs::remove_file(entry.path()) {
255                        match err.kind() {
256                            std::io::ErrorKind::NotFound => {
257                                // silently drop this error, the file was already non-existing
258                            }
259                            _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
260                        }
261                    }
262                    continue;
263                }
264                let file = BufReader::new(File::open(entry.path())?);
265                db.insert(
266                    store_name,
267                    file.lines()
268                        .map_while(Result::ok)
269                        .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
270                        .collect(),
271                );
272            }
273        }
274        Ok(())
275    }
276
277    /// Records an event in the desired stores.
278    ///
279    /// # Arguments
280    ///
281    /// * `glean` - The Glean instance.
282    /// * `meta` - The metadata about the event metric. Used to get the category,
283    ///   name and stores for the metric.
284    /// * `timestamp` - The timestamp of the event, in milliseconds. Must use a
285    ///   monotonically increasing timer (this value is obtained on the
286    ///   platform-specific side).
287    /// * `extra` - Extra data values, mapping strings to strings.
288    ///
289    /// ## Returns
290    ///
291    /// `true` if a ping was submitted and should be uploaded.
292    /// `false` otherwise.
293    pub fn record(
294        &self,
295        glean: &Glean,
296        meta: &CommonMetricDataInternal,
297        timestamp: u64,
298        extra: Option<HashMap<String, String>>,
299    ) -> bool {
300        // If upload is disabled we don't want to record.
301        if !glean.is_upload_enabled() {
302            return false;
303        }
304
305        let mut submit_max_capacity_event_ping = false;
306        {
307            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
308            for store_name in meta.inner.send_in_pings.iter() {
309                if !glean.is_ping_enabled(store_name) {
310                    continue;
311                }
312
313                let store = db.entry(store_name.to_string()).or_default();
314                let execution_counter = CounterMetric::new(CommonMetricData {
315                    name: "execution_counter".into(),
316                    category: store_name.into(),
317                    send_in_pings: vec![INTERNAL_STORAGE.into()],
318                    lifetime: Lifetime::Ping,
319                    ..Default::default()
320                })
321                .get_value(glean, INTERNAL_STORAGE);
322                // Create StoredEvent object, and its JSON form for serialization on disk.
323                let event = StoredEvent {
324                    event: RecordedEvent {
325                        timestamp,
326                        category: meta.inner.category.to_string(),
327                        name: meta.inner.name.to_string(),
328                        extra: extra.clone(),
329                    },
330                    execution_counter,
331                };
332                let event_json = serde_json::to_string(&event).unwrap(); // safe unwrap, event can always be serialized
333                store.push(event);
334                self.write_event_to_disk(store_name, &event_json);
335                if store_name == "events" && store.len() == glean.get_max_events() {
336                    submit_max_capacity_event_ping = true;
337                }
338            }
339        }
340        if submit_max_capacity_event_ping {
341            glean.submit_ping_by_name("events", Some("max_capacity"));
342            true
343        } else {
344            false
345        }
346    }
347
348    fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
349        // safe unwrap, only error case is poisoning
350        let mut map = self.event_store_files.write().unwrap();
351        let entry = map.entry(store_name.to_string());
352
353        match entry {
354            Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
355            Entry::Vacant(entry) => {
356                let file = OpenOptions::new()
357                    .create(true)
358                    .append(true)
359                    .open(self.path.join(store_name))?;
360                let file = Arc::new(file);
361                let entry = entry.insert(file);
362                Ok(Arc::clone(entry))
363            }
364        }
365    }
366
367    /// Writes an event to a single store on disk.
368    ///
369    /// # Arguments
370    ///
371    /// * `store_name` - The name of the store.
372    /// * `event_json` - The event content, as a single-line JSON-encoded string.
373    fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
374        let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
375
376        let write_res = (|| {
377            let mut file = self.get_event_store(store_name)?;
378            file.write_all(event_json.as_bytes())?;
379            file.write_all(b"\n")?;
380            file.flush()?;
381            Ok::<(), std::io::Error>(())
382        })();
383
384        if let Err(err) = write_res {
385            log::warn!("IO error writing event to store '{}': {}", store_name, err);
386        }
387    }
388
389    /// Normalizes the store in-place.
390    ///
391    /// A store may be in any order and contain any number of `glean.restarted` events,
392    /// whose values must be taken into account, along with `execution_counter` values,
393    /// to come up with the correct events with correct `timestamp` values,
394    /// on which we then sort.
395    ///
396    /// 1. Sort by `execution_counter` and `timestamp`,
397    ///    breaking ties so that `glean.restarted` comes first.
398    /// 2. Remove all initial and final `glean.restarted` events
399    /// 3. For each group of events that share a `execution_counter`,
400    ///    i. calculate the initial `glean.restarted` event's `timestamp`s to be
401    ///       clamp(glean.startup.date - ping_info.start_time, biggest_timestamp_of_previous_group + 1)
402    ///    ii. normalize each non-`glean-restarted` event's `timestamp`
403    ///        relative to the `glean.restarted` event's uncalculated `timestamp`
404    /// 4. Remove `execution_counter` and `glean.startup.date` extra keys
405    /// 5. Sort by `timestamp`
406    ///
407    /// In the event that something goes awry, this will record an invalid_state on
408    /// glean.restarted if it is due to internal inconsistencies, or invalid_value
409    /// on client clock weirdness.
410    ///
411    /// # Arguments
412    ///
413    /// * `glean` - Used to report errors
414    /// * `store_name` - The name of the store we're normalizing.
415    /// * `store` - The store we're to normalize.
416    /// * `glean_start_time` - Used if the glean.startup.date or ping_info.start_time aren't available. Passed as a parameter to ease unit-testing.
417    fn normalize_store(
418        &self,
419        glean: &Glean,
420        store_name: &str,
421        store: &mut Vec<StoredEvent>,
422        glean_start_time: DateTime<FixedOffset>,
423    ) {
424        let is_glean_restarted =
425            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
426        let glean_restarted_meta = |store_name: &str| CommonMetricData {
427            name: "restarted".into(),
428            category: "glean".into(),
429            send_in_pings: vec![store_name.into()],
430            lifetime: Lifetime::Ping,
431            ..Default::default()
432        };
433        // Step 1
434        store.sort_by(|a, b| {
435            a.execution_counter
436                .cmp(&b.execution_counter)
437                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
438                .then_with(|| {
439                    if is_glean_restarted(&a.event) {
440                        Ordering::Less
441                    } else {
442                        Ordering::Greater
443                    }
444                })
445        });
446        // Step 2
447        // Find the index of the first and final non-`glean.restarted` events.
448        // Remove events before the first and after the final.
449        let final_event = match store
450            .iter()
451            .rposition(|event| !is_glean_restarted(&event.event))
452        {
453            Some(idx) => idx + 1,
454            _ => 0,
455        };
456        store.drain(final_event..);
457        let first_event = store
458            .iter()
459            .position(|event| !is_glean_restarted(&event.event))
460            .unwrap_or(store.len());
461        store.drain(..first_event);
462        if store.is_empty() {
463            // There was nothing but `glean.restarted` events. Job's done!
464            return;
465        }
466        // Step 3
467        // It is allowed that there might not be any `glean.restarted` event, nor
468        // `execution_counter` extra values. (This should always be the case for the
469        // "events" ping, for instance).
470        // Other inconsistencies are evidence of errors, and so are logged.
471        let mut cur_ec = 0;
472        // The offset within a group of events with the same `execution_counter`.
473        let mut intra_group_offset = store[0].event.timestamp;
474        // The offset between this group and ping_info.start_date.
475        let mut inter_group_offset = 0;
476        let mut highest_ts = 0;
477        for event in store.iter_mut() {
478            let execution_counter = event.execution_counter.take().unwrap_or(0);
479            if is_glean_restarted(&event.event) {
480                // We've entered the next "event group".
481                // We need a new epoch based on glean.startup.date - ping_info.start_date
482                cur_ec = execution_counter;
483                let glean_startup_date = event
484                    .event
485                    .extra
486                    .as_mut()
487                    .and_then(|extra| {
488                        extra.remove("glean.startup.date").and_then(|date_str| {
489                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
490                                .map_err(|_| {
491                                    record_error(
492                                        glean,
493                                        &glean_restarted_meta(store_name).into(),
494                                        ErrorType::InvalidState,
495                                        format!("Unparseable glean.startup.date '{}'", date_str),
496                                        None,
497                                    );
498                                })
499                                .ok()
500                        })
501                    })
502                    .unwrap_or(glean_start_time);
503                if event
504                    .event
505                    .extra
506                    .as_ref()
507                    .is_some_and(|extra| extra.is_empty())
508                {
509                    // Small optimization to save us sending empty dicts.
510                    event.event.extra = None;
511                }
512                let ping_start = DatetimeMetric::new(
513                    CommonMetricData {
514                        name: format!("{}#start", store_name),
515                        category: "".into(),
516                        send_in_pings: vec![INTERNAL_STORAGE.into()],
517                        lifetime: Lifetime::User,
518                        ..Default::default()
519                    },
520                    TimeUnit::Minute,
521                );
522                let ping_start = ping_start
523                    .get_value(glean, INTERNAL_STORAGE)
524                    .unwrap_or(glean_start_time);
525                let time_from_ping_start_to_glean_restarted =
526                    (glean_startup_date - ping_start).num_milliseconds();
527                intra_group_offset = event.event.timestamp;
528                inter_group_offset =
529                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
530                if inter_group_offset < highest_ts {
531                    record_error(
532                        glean,
533                        &glean_restarted_meta(store_name).into(),
534                        ErrorType::InvalidValue,
535                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
536                        None,
537                    );
538                    // The client's clock went backwards enough that this event group's
539                    // glean.restarted looks like it happened _before_ the final event of the previous group.
540                    // Or, it went ahead enough to overflow u64.
541                    // Adjust things so this group starts 1ms after the previous one.
542                    inter_group_offset = highest_ts + 1;
543                }
544            } else if cur_ec == 0 {
545                // bug 1811872 - cur_ec might need initialization.
546                cur_ec = execution_counter;
547            }
548            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
549            if execution_counter != cur_ec {
550                record_error(
551                    glean,
552                    &glean_restarted_meta(store_name).into(),
553                    ErrorType::InvalidState,
554                    format!(
555                        "Inconsistent execution counter {} (expected {})",
556                        execution_counter, cur_ec
557                    ),
558                    None,
559                );
560                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
561                cur_ec = execution_counter;
562            }
563
564            // event timestamp is a `u64`, but BigQuery uses `i64` (signed!) everywhere. Let's clamp the value to make
565            // sure we stay within bounds.
566            if event.event.timestamp > i64::MAX as u64 {
567                glean
568                    .additional_metrics
569                    .event_timestamp_clamped
570                    .add_sync(glean, 1);
571                log::warn!(
572                    "Calculated event timestamp was too high. Got: {}, max: {}",
573                    event.event.timestamp,
574                    i64::MAX,
575                );
576                event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
577            }
578
579            if highest_ts > event.event.timestamp {
580                // Even though we sorted everything, something in the
581                // execution_counter or glean.startup.date math went awry.
582                record_error(
583                    glean,
584                    &glean_restarted_meta(store_name).into(),
585                    ErrorType::InvalidState,
586                    format!(
587                        "Inconsistent previous highest timestamp {} (expected <= {})",
588                        highest_ts, event.event.timestamp
589                    ),
590                    None,
591                );
592                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
593            }
594            highest_ts = event.event.timestamp
595        }
596    }
597
598    /// Gets a snapshot of the stored event data as a JsonValue.
599    ///
600    /// # Arguments
601    ///
602    /// * `glean` - the Glean instance.
603    /// * `store_name` - The name of the desired store.
604    /// * `clear_store` - Whether to clear the store after snapshotting.
605    ///
606    /// # Returns
607    ///
608    /// A array of events, JSON encoded, if any. Otherwise `None`.
609    pub fn snapshot_as_json(
610        &self,
611        glean: &Glean,
612        store_name: &str,
613        clear_store: bool,
614    ) -> Option<JsonValue> {
615        let result = {
616            let mut db = self.event_stores.write().unwrap(); // safe unwrap, only error case is poisoning
617            db.get_mut(&store_name.to_string()).and_then(|store| {
618                if !store.is_empty() {
619                    // Normalization happens in-place, so if we're not clearing,
620                    // operate on a clone.
621                    let mut clone;
622                    let store = if clear_store {
623                        store
624                    } else {
625                        clone = store.clone();
626                        &mut clone
627                    };
628                    // We may need to normalize event timestamps across multiple restarts.
629                    self.normalize_store(glean, store_name, store, glean.start_time());
630                    Some(json!(store))
631                } else {
632                    log::warn!("Unexpectly got empty event store for '{}'", store_name);
633                    None
634                }
635            })
636        };
637
638        if clear_store {
639            self.event_stores
640                .write()
641                .unwrap() // safe unwrap, only error case is poisoning
642                .remove(&store_name.to_string());
643            self.event_store_files
644                .write()
645                .unwrap() // safe unwrap, only error case is poisoning
646                .remove(&store_name.to_string());
647
648            let _lock = self.file_lock.lock().unwrap(); // safe unwrap, only error case is poisoning
649            if let Err(err) = fs::remove_file(self.path.join(store_name)) {
650                match err.kind() {
651                    std::io::ErrorKind::NotFound => {
652                        // silently drop this error, the file was already non-existing
653                    }
654                    _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
655                }
656            }
657        }
658
659        result
660    }
661
662    /// Clears all stored events, both in memory and on-disk.
663    pub fn clear_all(&self) -> Result<()> {
664        // safe unwrap, only error case is poisoning
665        self.event_stores.write().unwrap().clear();
666        self.event_store_files.write().unwrap().clear();
667
668        // safe unwrap, only error case is poisoning
669        let _lock = self.file_lock.lock().unwrap();
670        std::fs::remove_dir_all(&self.path)?;
671        create_dir_all(&self.path)?;
672
673        Ok(())
674    }
675
676    /// **Test-only API (exported for FFI purposes).**
677    ///
678    /// Gets the vector of currently stored events for the given event metric in
679    /// the given store.
680    ///
681    /// This doesn't clear the stored value.
682    pub fn test_get_value<'a>(
683        &'a self,
684        meta: &'a CommonMetricDataInternal,
685        store_name: &str,
686    ) -> Option<Vec<RecordedEvent>> {
687        record_coverage(&meta.base_identifier());
688
689        let value: Vec<RecordedEvent> = self
690            .event_stores
691            .read()
692            .unwrap() // safe unwrap, only error case is poisoning
693            .get(&store_name.to_string())
694            .into_iter()
695            .flatten()
696            .map(|stored_event| stored_event.event.clone())
697            .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
698            .collect();
699        if !value.is_empty() {
700            Some(value)
701        } else {
702            None
703        }
704    }
705}
706
707#[cfg(test)]
708mod test {
709    use super::*;
710    use crate::test_get_num_recorded_errors;
711    use crate::tests::new_glean;
712    use chrono::{TimeZone, Timelike};
713
714    #[test]
715    fn handle_truncated_events_on_disk() {
716        let (glean, t) = new_glean(None);
717
718        {
719            let db = EventDatabase::new(t.path()).unwrap();
720            db.write_event_to_disk("events", "{\"timestamp\": 500");
721            db.write_event_to_disk("events", "{\"timestamp\"");
722            db.write_event_to_disk(
723                "events",
724                "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
725            );
726        }
727
728        {
729            let db = EventDatabase::new(t.path()).unwrap();
730            db.load_events_from_disk(&glean, false).unwrap();
731            let events = &db.event_stores.read().unwrap()["events"];
732            assert_eq!(1, events.len());
733        }
734    }
735
736    #[test]
737    fn stable_serialization() {
738        let event_empty = RecordedEvent {
739            timestamp: 2,
740            category: "cat".to_string(),
741            name: "name".to_string(),
742            extra: None,
743        };
744
745        let mut data = HashMap::new();
746        data.insert("a key".to_string(), "a value".to_string());
747        let event_data = RecordedEvent {
748            timestamp: 2,
749            category: "cat".to_string(),
750            name: "name".to_string(),
751            extra: Some(data),
752        };
753
754        let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
755        let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
756
757        assert_eq!(
758            StoredEvent {
759                event: event_empty,
760                execution_counter: None
761            },
762            serde_json::from_str(&event_empty_json).unwrap()
763        );
764        assert_eq!(
765            StoredEvent {
766                event: event_data,
767                execution_counter: None
768            },
769            serde_json::from_str(&event_data_json).unwrap()
770        );
771    }
772
773    #[test]
774    fn deserialize_existing_data() {
775        let event_empty_json = r#"
776{
777  "timestamp": 2,
778  "category": "cat",
779  "name": "name"
780}
781            "#;
782
783        let event_data_json = r#"
784{
785  "timestamp": 2,
786  "category": "cat",
787  "name": "name",
788  "extra": {
789    "a key": "a value"
790  }
791}
792        "#;
793
794        let event_empty = RecordedEvent {
795            timestamp: 2,
796            category: "cat".to_string(),
797            name: "name".to_string(),
798            extra: None,
799        };
800
801        let mut data = HashMap::new();
802        data.insert("a key".to_string(), "a value".to_string());
803        let event_data = RecordedEvent {
804            timestamp: 2,
805            category: "cat".to_string(),
806            name: "name".to_string(),
807            extra: Some(data),
808        };
809
810        assert_eq!(
811            StoredEvent {
812                event: event_empty,
813                execution_counter: None
814            },
815            serde_json::from_str(event_empty_json).unwrap()
816        );
817        assert_eq!(
818            StoredEvent {
819                event: event_data,
820                execution_counter: None
821            },
822            serde_json::from_str(event_data_json).unwrap()
823        );
824    }
825
826    #[test]
827    fn doesnt_record_when_upload_is_disabled() {
828        let (mut glean, dir) = new_glean(None);
829        let db = EventDatabase::new(dir.path()).unwrap();
830
831        let test_storage = "store1";
832        let test_category = "category";
833        let test_name = "name";
834        let test_timestamp = 2;
835        let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
836        let event_data = RecordedEvent {
837            timestamp: test_timestamp,
838            category: test_category.to_string(),
839            name: test_name.to_string(),
840            extra: None,
841        };
842
843        // Upload is not yet disabled,
844        // so let's check that everything is getting recorded as expected.
845        db.record(&glean, &test_meta, 2, None);
846        {
847            let event_stores = db.event_stores.read().unwrap();
848            assert_eq!(
849                &StoredEvent {
850                    event: event_data,
851                    execution_counter: None
852                },
853                &event_stores.get(test_storage).unwrap()[0]
854            );
855            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
856        }
857
858        glean.set_upload_enabled(false);
859
860        // Now that upload is disabled, let's check nothing is recorded.
861        db.record(&glean, &test_meta, 2, None);
862        {
863            let event_stores = db.event_stores.read().unwrap();
864            assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
865        }
866    }
867
868    #[test]
869    fn normalize_store_of_glean_restarted() {
870        // Make sure stores empty of anything but glean.restarted events normalize without issue.
871        let (glean, _dir) = new_glean(None);
872
873        let store_name = "store-name";
874        let glean_restarted = StoredEvent {
875            event: RecordedEvent {
876                timestamp: 2,
877                category: "glean".into(),
878                name: "restarted".into(),
879                extra: None,
880            },
881            execution_counter: None,
882        };
883        let mut store = vec![glean_restarted.clone()];
884        let glean_start_time = glean.start_time();
885
886        glean
887            .event_storage()
888            .normalize_store(&glean, store_name, &mut store, glean_start_time);
889        assert!(store.is_empty());
890
891        let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
892        glean
893            .event_storage()
894            .normalize_store(&glean, store_name, &mut store, glean_start_time);
895        assert!(store.is_empty());
896
897        let mut store = vec![
898            glean_restarted.clone(),
899            glean_restarted.clone(),
900            glean_restarted,
901        ];
902        glean
903            .event_storage()
904            .normalize_store(&glean, store_name, &mut store, glean_start_time);
905        assert!(store.is_empty());
906    }
907
908    #[test]
909    fn normalize_store_of_glean_restarted_on_both_ends() {
910        // Make sure stores with non-glean.restarted events don't get drained too far.
911        let (glean, _dir) = new_glean(None);
912
913        let store_name = "store-name";
914        let glean_restarted = StoredEvent {
915            event: RecordedEvent {
916                timestamp: 2,
917                category: "glean".into(),
918                name: "restarted".into(),
919                extra: None,
920            },
921            execution_counter: None,
922        };
923        let not_glean_restarted = StoredEvent {
924            event: RecordedEvent {
925                timestamp: 20,
926                category: "category".into(),
927                name: "name".into(),
928                extra: None,
929            },
930            execution_counter: None,
931        };
932        let mut store = vec![
933            glean_restarted.clone(),
934            not_glean_restarted.clone(),
935            glean_restarted,
936        ];
937        let glean_start_time = glean.start_time();
938
939        glean
940            .event_storage()
941            .normalize_store(&glean, store_name, &mut store, glean_start_time);
942        assert_eq!(1, store.len());
943        assert_eq!(
944            StoredEvent {
945                event: RecordedEvent {
946                    timestamp: 0,
947                    ..not_glean_restarted.event
948                },
949                execution_counter: None
950            },
951            store[0]
952        );
953    }
954
955    #[test]
956    fn normalize_store_single_run_timestamp_math() {
957        // With a single run of events (no non-initial or non-terminal `glean.restarted`),
958        // ensure the timestamp math works.
959        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0 ))
960        let (glean, _dir) = new_glean(None);
961
962        let store_name = "store-name";
963        let glean_restarted = StoredEvent {
964            event: RecordedEvent {
965                timestamp: 2,
966                category: "glean".into(),
967                name: "restarted".into(),
968                extra: None,
969            },
970            execution_counter: None,
971        };
972        let timestamps = [20, 40, 200];
973        let not_glean_restarted = StoredEvent {
974            event: RecordedEvent {
975                timestamp: timestamps[0],
976                category: "category".into(),
977                name: "name".into(),
978                extra: None,
979            },
980            execution_counter: None,
981        };
982        let mut store = vec![
983            glean_restarted.clone(),
984            not_glean_restarted.clone(),
985            StoredEvent {
986                event: RecordedEvent {
987                    timestamp: timestamps[1],
988                    ..not_glean_restarted.event.clone()
989                },
990                execution_counter: None,
991            },
992            StoredEvent {
993                event: RecordedEvent {
994                    timestamp: timestamps[2],
995                    ..not_glean_restarted.event.clone()
996                },
997                execution_counter: None,
998            },
999            glean_restarted,
1000        ];
1001
1002        glean
1003            .event_storage()
1004            .normalize_store(&glean, store_name, &mut store, glean.start_time());
1005        assert_eq!(3, store.len());
1006        for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1007            assert_eq!(
1008                &StoredEvent {
1009                    event: RecordedEvent {
1010                        timestamp: timestamp - timestamps[0],
1011                        ..not_glean_restarted.clone().event
1012                    },
1013                    execution_counter: None
1014                },
1015                event
1016            );
1017        }
1018    }
1019
1020    #[test]
1021    fn normalize_store_multi_run_timestamp_math() {
1022        // With multiple runs of events (separated by `glean.restarted`),
1023        // ensure the timestamp math works.
1024        // (( works = Initial event gets to be 0, subsequent events get normalized to that 0.
1025        //            Subsequent runs figure it out via glean.restarted.date and ping_info.start_time ))
1026        let (glean, _dir) = new_glean(None);
1027
1028        let store_name = "store-name";
1029        let glean_restarted = StoredEvent {
1030            event: RecordedEvent {
1031                category: "glean".into(),
1032                name: "restarted".into(),
1033                ..Default::default()
1034            },
1035            execution_counter: None,
1036        };
1037        let not_glean_restarted = StoredEvent {
1038            event: RecordedEvent {
1039                category: "category".into(),
1040                name: "name".into(),
1041                ..Default::default()
1042            },
1043            execution_counter: None,
1044        };
1045
1046        // This scenario represents a run of three events followed by an hour between runs,
1047        // followed by one final event.
1048        let timestamps = [20, 40, 200, 12];
1049        let ecs = [0, 1];
1050        let some_hour = 16;
1051        let startup_date = FixedOffset::east_opt(0)
1052            .unwrap()
1053            .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) // TimeUnit::Minute -- don't put seconds
1054            .unwrap();
1055        let glean_start_time = startup_date.with_hour(some_hour - 1);
1056        let restarted_ts = 2;
1057        let mut store = vec![
1058            StoredEvent {
1059                event: RecordedEvent {
1060                    timestamp: timestamps[0],
1061                    ..not_glean_restarted.event.clone()
1062                },
1063                execution_counter: Some(ecs[0]),
1064            },
1065            StoredEvent {
1066                event: RecordedEvent {
1067                    timestamp: timestamps[1],
1068                    ..not_glean_restarted.event.clone()
1069                },
1070                execution_counter: Some(ecs[0]),
1071            },
1072            StoredEvent {
1073                event: RecordedEvent {
1074                    timestamp: timestamps[2],
1075                    ..not_glean_restarted.event.clone()
1076                },
1077                execution_counter: Some(ecs[0]),
1078            },
1079            StoredEvent {
1080                event: RecordedEvent {
1081                    extra: Some(
1082                        [(
1083                            "glean.startup.date".into(),
1084                            get_iso_time_string(startup_date, TimeUnit::Minute),
1085                        )]
1086                        .into(),
1087                    ),
1088                    timestamp: restarted_ts,
1089                    ..glean_restarted.event.clone()
1090                },
1091                execution_counter: Some(ecs[1]),
1092            },
1093            StoredEvent {
1094                event: RecordedEvent {
1095                    timestamp: timestamps[3],
1096                    ..not_glean_restarted.event.clone()
1097                },
1098                execution_counter: Some(ecs[1]),
1099            },
1100        ];
1101
1102        glean.event_storage().normalize_store(
1103            &glean,
1104            store_name,
1105            &mut store,
1106            glean_start_time.unwrap(),
1107        );
1108        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1109
1110        // Let's check the first three.
1111        for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1112            assert_eq!(
1113                StoredEvent {
1114                    event: RecordedEvent {
1115                        timestamp: timestamp - timestamps[0],
1116                        ..not_glean_restarted.event.clone()
1117                    },
1118                    execution_counter: None,
1119                },
1120                event
1121            );
1122        }
1123        // The fourth should be a glean.restarted and have a realtime-based timestamp.
1124        let hour_in_millis = 3600000;
1125        assert_eq!(
1126            store[3],
1127            StoredEvent {
1128                event: RecordedEvent {
1129                    timestamp: hour_in_millis,
1130                    ..glean_restarted.event
1131                },
1132                execution_counter: None,
1133            }
1134        );
1135        // The fifth should have a timestamp based on the new origin.
1136        assert_eq!(
1137            store[4],
1138            StoredEvent {
1139                event: RecordedEvent {
1140                    timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1141                    ..not_glean_restarted.event
1142                },
1143                execution_counter: None,
1144            }
1145        );
1146    }
1147
1148    #[test]
1149    fn normalize_store_multi_run_client_clocks() {
1150        // With multiple runs of events (separated by `glean.restarted`),
1151        // ensure the timestamp math works. Even when the client clock goes backwards.
1152        let (glean, _dir) = new_glean(None);
1153
1154        let store_name = "store-name";
1155        let glean_restarted = StoredEvent {
1156            event: RecordedEvent {
1157                category: "glean".into(),
1158                name: "restarted".into(),
1159                ..Default::default()
1160            },
1161            execution_counter: None,
1162        };
1163        let not_glean_restarted = StoredEvent {
1164            event: RecordedEvent {
1165                category: "category".into(),
1166                name: "name".into(),
1167                ..Default::default()
1168            },
1169            execution_counter: None,
1170        };
1171
1172        // This scenario represents a run of two events followed by negative one hours between runs,
1173        // followed by two more events.
1174        let timestamps = [20, 40, 12, 200];
1175        let ecs = [0, 1];
1176        let some_hour = 10;
1177        let startup_date = FixedOffset::east_opt(0)
1178            .unwrap()
1179            .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) // TimeUnit::Minute -- don't put seconds
1180            .unwrap();
1181        let glean_start_time = startup_date.with_hour(some_hour + 1);
1182        let restarted_ts = 2;
1183        let mut store = vec![
1184            StoredEvent {
1185                event: RecordedEvent {
1186                    timestamp: timestamps[0],
1187                    ..not_glean_restarted.event.clone()
1188                },
1189                execution_counter: Some(ecs[0]),
1190            },
1191            StoredEvent {
1192                event: RecordedEvent {
1193                    timestamp: timestamps[1],
1194                    ..not_glean_restarted.event.clone()
1195                },
1196                execution_counter: Some(ecs[0]),
1197            },
1198            StoredEvent {
1199                event: RecordedEvent {
1200                    extra: Some(
1201                        [(
1202                            "glean.startup.date".into(),
1203                            get_iso_time_string(startup_date, TimeUnit::Minute),
1204                        )]
1205                        .into(),
1206                    ),
1207                    timestamp: restarted_ts,
1208                    ..glean_restarted.event.clone()
1209                },
1210                execution_counter: Some(ecs[1]),
1211            },
1212            StoredEvent {
1213                event: RecordedEvent {
1214                    timestamp: timestamps[2],
1215                    ..not_glean_restarted.event.clone()
1216                },
1217                execution_counter: Some(ecs[1]),
1218            },
1219            StoredEvent {
1220                event: RecordedEvent {
1221                    timestamp: timestamps[3],
1222                    ..not_glean_restarted.event.clone()
1223                },
1224                execution_counter: Some(ecs[1]),
1225            },
1226        ];
1227
1228        glean.event_storage().normalize_store(
1229            &glean,
1230            store_name,
1231            &mut store,
1232            glean_start_time.unwrap(),
1233        );
1234        assert_eq!(5, store.len()); // 4 "real" events plus 1 `glean.restarted`
1235
1236        // Let's check the first two.
1237        for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1238            assert_eq!(
1239                StoredEvent {
1240                    event: RecordedEvent {
1241                        timestamp: timestamp - timestamps[0],
1242                        ..not_glean_restarted.event.clone()
1243                    },
1244                    execution_counter: None,
1245                },
1246                event
1247            );
1248        }
1249        // The third should be a glean.restarted. Its timestamp should be
1250        // one larger than the largest timestamp seen so far (because that's
1251        // how we ensure monotonic timestamps when client clocks go backwards).
1252        assert_eq!(
1253            store[2],
1254            StoredEvent {
1255                event: RecordedEvent {
1256                    timestamp: store[1].event.timestamp + 1,
1257                    ..glean_restarted.event
1258                },
1259                execution_counter: None,
1260            }
1261        );
1262        // The fifth should have a timestamp based on the new origin.
1263        assert_eq!(
1264            store[3],
1265            StoredEvent {
1266                event: RecordedEvent {
1267                    timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1268                    ..not_glean_restarted.event
1269                },
1270                execution_counter: None,
1271            }
1272        );
1273        // And we should have an InvalidValue on glean.restarted to show for it.
1274        assert_eq!(
1275            Ok(1),
1276            test_get_num_recorded_errors(
1277                &glean,
1278                &CommonMetricData {
1279                    name: "restarted".into(),
1280                    category: "glean".into(),
1281                    send_in_pings: vec![store_name.into()],
1282                    lifetime: Lifetime::Ping,
1283                    ..Default::default()
1284                }
1285                .into(),
1286                ErrorType::InvalidValue
1287            )
1288        );
1289    }
1290
1291    #[test]
1292    fn normalize_store_non_zero_ec() {
1293        // After the first run, execution_counter will likely be non-zero.
1294        // Ensure normalizing a store that begins with non-zero ec works.
1295        let (glean, _dir) = new_glean(None);
1296
1297        let store_name = "store-name";
1298        let glean_restarted = StoredEvent {
1299            event: RecordedEvent {
1300                timestamp: 2,
1301                category: "glean".into(),
1302                name: "restarted".into(),
1303                extra: None,
1304            },
1305            execution_counter: Some(2),
1306        };
1307        let not_glean_restarted = StoredEvent {
1308            event: RecordedEvent {
1309                timestamp: 20,
1310                category: "category".into(),
1311                name: "name".into(),
1312                extra: None,
1313            },
1314            execution_counter: Some(2),
1315        };
1316        let glean_restarted_2 = StoredEvent {
1317            event: RecordedEvent {
1318                timestamp: 2,
1319                category: "glean".into(),
1320                name: "restarted".into(),
1321                extra: None,
1322            },
1323            execution_counter: Some(3),
1324        };
1325        let mut store = vec![
1326            glean_restarted,
1327            not_glean_restarted.clone(),
1328            glean_restarted_2,
1329        ];
1330        let glean_start_time = glean.start_time();
1331
1332        glean
1333            .event_storage()
1334            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1335
1336        assert_eq!(1, store.len());
1337        assert_eq!(
1338            StoredEvent {
1339                event: RecordedEvent {
1340                    timestamp: 0,
1341                    ..not_glean_restarted.event
1342                },
1343                execution_counter: None
1344            },
1345            store[0]
1346        );
1347        // And we should have no InvalidState errors on glean.restarted.
1348        assert!(test_get_num_recorded_errors(
1349            &glean,
1350            &CommonMetricData {
1351                name: "restarted".into(),
1352                category: "glean".into(),
1353                send_in_pings: vec![store_name.into()],
1354                lifetime: Lifetime::Ping,
1355                ..Default::default()
1356            }
1357            .into(),
1358            ErrorType::InvalidState
1359        )
1360        .is_err());
1361        // (and, just because we're here, double-check there are no InvalidValue either).
1362        assert!(test_get_num_recorded_errors(
1363            &glean,
1364            &CommonMetricData {
1365                name: "restarted".into(),
1366                category: "glean".into(),
1367                send_in_pings: vec![store_name.into()],
1368                lifetime: Lifetime::Ping,
1369                ..Default::default()
1370            }
1371            .into(),
1372            ErrorType::InvalidValue
1373        )
1374        .is_err());
1375    }
1376
1377    #[test]
1378    fn normalize_store_clamps_timestamp() {
1379        let (glean, _dir) = new_glean(None);
1380
1381        let store_name = "store-name";
1382        let event = RecordedEvent {
1383            category: "category".into(),
1384            name: "name".into(),
1385            ..Default::default()
1386        };
1387
1388        let timestamps = [
1389            0,
1390            (i64::MAX / 2) as u64,
1391            i64::MAX as _,
1392            (i64::MAX as u64) + 1,
1393        ];
1394        let mut store = timestamps
1395            .into_iter()
1396            .map(|timestamp| StoredEvent {
1397                event: RecordedEvent {
1398                    timestamp,
1399                    ..event.clone()
1400                },
1401                execution_counter: None,
1402            })
1403            .collect();
1404
1405        let glean_start_time = glean.start_time();
1406        glean
1407            .event_storage()
1408            .normalize_store(&glean, store_name, &mut store, glean_start_time);
1409        assert_eq!(4, store.len());
1410
1411        assert_eq!(0, store[0].event.timestamp);
1412        assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1413        assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1414        assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1415
1416        let error_count = glean
1417            .additional_metrics
1418            .event_timestamp_clamped
1419            .get_value(&glean, "health");
1420        assert_eq!(Some(1), error_count);
1421    }
1422}