1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::session::{EventSessionContext, SessionMetadata};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37 pub timestamp: u64,
41
42 pub category: String,
46
47 pub name: String,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
56 pub extra: Option<HashMap<String, String>>,
57
58 #[serde(skip_serializing_if = "Option::is_none")]
63 #[serde(default)]
64 pub session: Option<SessionMetadata>,
65}
66
67#[derive(
69 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
70)]
71struct StoredEvent {
72 #[serde(flatten)]
73 event: RecordedEvent,
74
75 #[serde(default)]
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub execution_counter: Option<i32>,
82}
83
84#[derive(Debug)]
106pub struct EventDatabase {
107 pub path: PathBuf,
109 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
111 event_store_files: RwLock<HashMap<String, Arc<File>>>,
112 file_lock: Mutex<()>,
114}
115
116impl MallocSizeOf for EventDatabase {
117 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
118 let mut n = 0;
119 n += self.event_stores.read().unwrap().size_of(ops);
120
121 let map = self.event_store_files.read().unwrap();
122 for store_name in map.keys() {
123 n += store_name.size_of(ops);
124 n += mem::size_of::<File>();
126 }
127 n
128 }
129}
130
131impl EventDatabase {
132 pub fn new(data_path: &Path) -> Result<Self> {
139 let path = data_path.join("events");
140 create_dir_all(&path)?;
141
142 Ok(Self {
143 path,
144 event_stores: RwLock::new(HashMap::new()),
145 event_store_files: RwLock::new(HashMap::new()),
146 file_lock: Mutex::new(()),
147 })
148 }
149
150 pub fn flush_pending_events_on_startup(
176 &self,
177 glean: &Glean,
178 trim_data_to_registered_pings: bool,
179 ) -> bool {
180 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
181 Ok(_) => {
182 let stores_with_events: Vec<String> = {
183 self.event_stores
184 .read()
185 .unwrap()
186 .keys()
187 .map(|x| x.to_owned())
188 .collect() };
190 let has_events_events = stores_with_events.contains(&"events".to_owned());
193 let glean_restarted_stores = if has_events_events {
194 stores_with_events
195 .into_iter()
196 .filter(|store| store != "events")
197 .collect()
198 } else {
199 stores_with_events
200 };
201 if !glean_restarted_stores.is_empty() {
202 for store_name in glean_restarted_stores.iter() {
203 CounterMetric::new(CommonMetricData {
204 name: "execution_counter".into(),
205 category: store_name.into(),
206 send_in_pings: vec![INTERNAL_STORAGE.into()],
207 lifetime: Lifetime::Ping,
208 ..Default::default()
209 })
210 .add_sync(glean, 1);
211 }
212 let glean_restarted = CommonMetricData {
213 name: "restarted".into(),
214 category: "glean".into(),
215 send_in_pings: glean_restarted_stores,
216 lifetime: Lifetime::Ping,
217 ..Default::default()
218 };
219 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
220 let mut extra: HashMap<String, String> =
221 [("glean.startup.date".into(), startup)].into();
222 if glean.with_timestamps() {
223 let now = Utc::now();
224 let precise_timestamp = now.timestamp_millis() as u64;
225 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
226 }
227 self.record(
228 glean,
229 &glean_restarted.into(),
230 crate::get_timestamp_ms(),
231 Some(extra),
232 EventSessionContext::OutOfSession,
233 );
234 }
235 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
236 }
237 Err(err) => {
238 log::warn!("Error loading events from disk: {}", err);
239 false
240 }
241 }
242 }
243
244 fn load_events_from_disk(
245 &self,
246 glean: &Glean,
247 trim_data_to_registered_pings: bool,
248 ) -> Result<()> {
249 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
257 let entry = entry?;
258 if entry.file_type()?.is_file() {
259 let store_name = entry.file_name().into_string()?;
260 log::info!("Loading events for {}", store_name);
261 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
262 log::warn!("Trimming {}'s events", store_name);
263 if let Err(err) = fs::remove_file(entry.path()) {
264 match err.kind() {
265 std::io::ErrorKind::NotFound => {
266 }
268 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
269 }
270 }
271 continue;
272 }
273 let file = BufReader::new(File::open(entry.path())?);
274 db.insert(
275 store_name,
276 file.lines()
277 .map_while(Result::ok)
278 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
279 .collect(),
280 );
281 }
282 }
283 Ok(())
284 }
285
286 pub fn record(
305 &self,
306 glean: &Glean,
307 meta: &CommonMetricDataInternal,
308 timestamp: u64,
309 extra: Option<HashMap<String, String>>,
310 ctx: EventSessionContext,
311 ) -> bool {
312 if !glean.is_upload_enabled() {
314 return false;
315 }
316
317 let session = match ctx {
319 EventSessionContext::OutOfSession => None,
320 EventSessionContext::InSession(session_meta) => Some(session_meta),
321 };
322
323 let mut submit_max_capacity_event_ping = false;
324 {
325 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
327 if !glean.is_ping_enabled(store_name) {
328 continue;
329 }
330
331 let store = db.entry(store_name.to_string()).or_default();
332 let execution_counter = CounterMetric::new(CommonMetricData {
333 name: "execution_counter".into(),
334 category: store_name.into(),
335 send_in_pings: vec![INTERNAL_STORAGE.into()],
336 lifetime: Lifetime::Ping,
337 ..Default::default()
338 })
339 .get_value(glean, INTERNAL_STORAGE);
340 let event = StoredEvent {
342 event: RecordedEvent {
343 timestamp,
344 category: meta.inner.category.to_string(),
345 name: meta.inner.name.to_string(),
346 extra: extra.clone(),
347 session: session.clone(),
348 },
349 execution_counter,
350 };
351 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
353 self.write_event_to_disk(store_name, &event_json);
354 if store_name == "events" && store.len() == glean.get_max_events() {
355 submit_max_capacity_event_ping = true;
356 }
357 }
358 }
359 if submit_max_capacity_event_ping {
360 glean.submit_ping_by_name("events", Some("max_capacity"));
361 true
362 } else {
363 false
364 }
365 }
366
367 fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
368 let mut map = self.event_store_files.write().unwrap();
370 let entry = map.entry(store_name.to_string());
371
372 match entry {
373 Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
374 Entry::Vacant(entry) => {
375 let file = OpenOptions::new()
376 .create(true)
377 .append(true)
378 .open(self.path.join(store_name))?;
379 let file = Arc::new(file);
380 let entry = entry.insert(file);
381 Ok(Arc::clone(entry))
382 }
383 }
384 }
385
386 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
393 let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
396 let mut file = self.get_event_store(store_name)?;
397 file.write_all(event_json.as_bytes())?;
398 file.write_all(b"\n")?;
399 file.flush()?;
400 Ok::<(), std::io::Error>(())
401 })();
402
403 if let Err(err) = write_res {
404 log::warn!("IO error writing event to store '{}': {}", store_name, err);
405 }
406 }
407
408 fn normalize_store(
437 &self,
438 glean: &Glean,
439 store_name: &str,
440 store: &mut Vec<StoredEvent>,
441 glean_start_time: DateTime<FixedOffset>,
442 ) {
443 let is_glean_restarted =
444 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
445 let glean_restarted_meta = |store_name: &str| CommonMetricData {
446 name: "restarted".into(),
447 category: "glean".into(),
448 send_in_pings: vec![store_name.into()],
449 lifetime: Lifetime::Ping,
450 ..Default::default()
451 };
452 store.sort_by(|a, b| {
454 a.execution_counter
455 .cmp(&b.execution_counter)
456 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
457 .then_with(|| {
458 if is_glean_restarted(&a.event) {
459 Ordering::Less
460 } else {
461 Ordering::Greater
462 }
463 })
464 });
465 let final_event = match store
469 .iter()
470 .rposition(|event| !is_glean_restarted(&event.event))
471 {
472 Some(idx) => idx + 1,
473 _ => 0,
474 };
475 store.drain(final_event..);
476 let first_event = store
477 .iter()
478 .position(|event| !is_glean_restarted(&event.event))
479 .unwrap_or(store.len());
480 store.drain(..first_event);
481 if store.is_empty() {
482 return;
484 }
485 let mut cur_ec = 0;
491 let mut intra_group_offset = store[0].event.timestamp;
493 let mut inter_group_offset = 0;
495 let mut highest_ts = 0;
496 for event in store.iter_mut() {
497 let execution_counter = event.execution_counter.take().unwrap_or(0);
498 if is_glean_restarted(&event.event) {
499 cur_ec = execution_counter;
502 let glean_startup_date = event
503 .event
504 .extra
505 .as_mut()
506 .and_then(|extra| {
507 extra.remove("glean.startup.date").and_then(|date_str| {
508 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
509 .map_err(|_| {
510 record_error(
511 glean,
512 &glean_restarted_meta(store_name).into(),
513 ErrorType::InvalidState,
514 format!("Unparseable glean.startup.date '{}'", date_str),
515 None,
516 );
517 })
518 .ok()
519 })
520 })
521 .unwrap_or(glean_start_time);
522 if event
523 .event
524 .extra
525 .as_ref()
526 .is_some_and(|extra| extra.is_empty())
527 {
528 event.event.extra = None;
530 }
531 let ping_start = DatetimeMetric::new(
532 CommonMetricData {
533 name: format!("{}#start", store_name),
534 category: "".into(),
535 send_in_pings: vec![INTERNAL_STORAGE.into()],
536 lifetime: Lifetime::User,
537 ..Default::default()
538 },
539 TimeUnit::Minute,
540 );
541 let ping_start = ping_start
542 .get_value(glean, INTERNAL_STORAGE)
543 .unwrap_or(glean_start_time);
544 let time_from_ping_start_to_glean_restarted =
545 (glean_startup_date - ping_start).num_milliseconds();
546 intra_group_offset = event.event.timestamp;
547 inter_group_offset =
548 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
549 if inter_group_offset < highest_ts {
550 record_error(
551 glean,
552 &glean_restarted_meta(store_name).into(),
553 ErrorType::InvalidValue,
554 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
555 None,
556 );
557 inter_group_offset = highest_ts + 1;
562 }
563 } else if cur_ec == 0 {
564 cur_ec = execution_counter;
566 }
567 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
568 if execution_counter != cur_ec {
569 record_error(
570 glean,
571 &glean_restarted_meta(store_name).into(),
572 ErrorType::InvalidState,
573 format!(
574 "Inconsistent execution counter {} (expected {})",
575 execution_counter, cur_ec
576 ),
577 None,
578 );
579 cur_ec = execution_counter;
581 }
582
583 if event.event.timestamp > i64::MAX as u64 {
586 glean
587 .additional_metrics
588 .event_timestamp_clamped
589 .add_sync(glean, 1);
590 log::warn!(
591 "Calculated event timestamp was too high. Got: {}, max: {}",
592 event.event.timestamp,
593 i64::MAX,
594 );
595 event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
596 }
597
598 if highest_ts > event.event.timestamp {
599 record_error(
602 glean,
603 &glean_restarted_meta(store_name).into(),
604 ErrorType::InvalidState,
605 format!(
606 "Inconsistent previous highest timestamp {} (expected <= {})",
607 highest_ts, event.event.timestamp
608 ),
609 None,
610 );
611 }
613 highest_ts = event.event.timestamp
614 }
615 }
616
617 pub fn snapshot_as_json(
629 &self,
630 glean: &Glean,
631 store_name: &str,
632 clear_store: bool,
633 ) -> Option<JsonValue> {
634 let result = {
635 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
637 if !store.is_empty() {
638 let mut clone;
641 let store = if clear_store {
642 store
643 } else {
644 clone = store.clone();
645 &mut clone
646 };
647 self.normalize_store(glean, store_name, store, glean.start_time());
649 Some(json!(store))
650 } else {
651 log::warn!("Unexpectly got empty event store for '{}'", store_name);
652 None
653 }
654 })
655 };
656
657 if clear_store {
658 self.event_stores
659 .write()
660 .unwrap() .remove(&store_name.to_string());
662 self.event_store_files
663 .write()
664 .unwrap() .remove(&store_name.to_string());
666
667 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
669 match err.kind() {
670 std::io::ErrorKind::NotFound => {
671 }
673 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
674 }
675 }
676 }
677
678 result
679 }
680
681 pub fn clear_all(&self) -> Result<()> {
683 self.event_stores.write().unwrap().clear();
685 self.event_store_files.write().unwrap().clear();
686
687 let _lock = self.file_lock.lock().unwrap();
689 std::fs::remove_dir_all(&self.path)?;
690 create_dir_all(&self.path)?;
691
692 Ok(())
693 }
694
695 pub fn test_get_value<'a>(
702 &'a self,
703 meta: &'a CommonMetricDataInternal,
704 store_name: &str,
705 ) -> Option<Vec<RecordedEvent>> {
706 let value: Vec<RecordedEvent> = self
707 .event_stores
708 .read()
709 .unwrap() .get(&store_name.to_string())
711 .into_iter()
712 .flatten()
713 .map(|stored_event| stored_event.event.clone())
714 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
715 .collect();
716 if !value.is_empty() {
717 Some(value)
718 } else {
719 None
720 }
721 }
722}
723
724#[cfg(test)]
725mod test {
726 use super::*;
727 use crate::metrics::RemoteSettingsConfig;
728 use crate::test_get_num_recorded_errors;
729 use crate::tests::new_glean;
730 use chrono::{TimeZone, Timelike};
731
732 #[test]
733 fn handle_truncated_events_on_disk() {
734 let (glean, t) = new_glean(None);
735
736 {
737 let db = EventDatabase::new(t.path()).unwrap();
738 db.write_event_to_disk("events", "{\"timestamp\": 500");
739 db.write_event_to_disk("events", "{\"timestamp\"");
740 db.write_event_to_disk(
741 "events",
742 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
743 );
744 }
745
746 {
747 let db = EventDatabase::new(t.path()).unwrap();
748 db.load_events_from_disk(&glean, false).unwrap();
749 let events = &db.event_stores.read().unwrap()["events"];
750 assert_eq!(1, events.len());
751 }
752 }
753
754 #[test]
755 fn stable_serialization() {
756 let event_empty = RecordedEvent {
757 timestamp: 2,
758 category: "cat".to_string(),
759 name: "name".to_string(),
760 extra: None,
761 session: None,
762 };
763
764 let mut data = HashMap::new();
765 data.insert("a key".to_string(), "a value".to_string());
766 let event_data = RecordedEvent {
767 timestamp: 2,
768 category: "cat".to_string(),
769 name: "name".to_string(),
770 extra: Some(data),
771 session: None,
772 };
773
774 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
775 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
776
777 assert_eq!(
778 StoredEvent {
779 event: event_empty,
780 execution_counter: None
781 },
782 serde_json::from_str(&event_empty_json).unwrap()
783 );
784 assert_eq!(
785 StoredEvent {
786 event: event_data,
787 execution_counter: None
788 },
789 serde_json::from_str(&event_data_json).unwrap()
790 );
791 }
792
793 #[test]
794 fn deserialize_existing_data() {
795 let event_empty_json = r#"
796{
797 "timestamp": 2,
798 "category": "cat",
799 "name": "name"
800}
801 "#;
802
803 let event_data_json = r#"
804{
805 "timestamp": 2,
806 "category": "cat",
807 "name": "name",
808 "extra": {
809 "a key": "a value"
810 }
811}
812 "#;
813
814 let event_empty = RecordedEvent {
815 timestamp: 2,
816 category: "cat".to_string(),
817 name: "name".to_string(),
818 extra: None,
819 session: None,
820 };
821
822 let mut data = HashMap::new();
823 data.insert("a key".to_string(), "a value".to_string());
824 let event_data = RecordedEvent {
825 timestamp: 2,
826 category: "cat".to_string(),
827 name: "name".to_string(),
828 extra: Some(data),
829 session: None,
830 };
831
832 assert_eq!(
833 StoredEvent {
834 event: event_empty,
835 execution_counter: None
836 },
837 serde_json::from_str(event_empty_json).unwrap()
838 );
839 assert_eq!(
840 StoredEvent {
841 event: event_data,
842 execution_counter: None
843 },
844 serde_json::from_str(event_data_json).unwrap()
845 );
846 }
847
848 #[test]
849 fn doesnt_record_when_upload_is_disabled() {
850 let (mut glean, dir) = new_glean(None);
851 let db = EventDatabase::new(dir.path()).unwrap();
852
853 let test_storage = "store1";
854 let test_category = "category";
855 let test_name = "name";
856 let test_timestamp = 2;
857 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
858 let event_data = RecordedEvent {
859 timestamp: test_timestamp,
860 category: test_category.to_string(),
861 name: test_name.to_string(),
862 extra: None,
863 session: None,
864 };
865
866 db.record(
869 &glean,
870 &test_meta,
871 2,
872 None,
873 EventSessionContext::OutOfSession,
874 );
875 {
876 let event_stores = db.event_stores.read().unwrap();
877 assert_eq!(
878 &StoredEvent {
879 event: event_data,
880 execution_counter: None
881 },
882 &event_stores.get(test_storage).unwrap()[0]
883 );
884 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
885 }
886
887 glean.set_upload_enabled(false);
888
889 db.record(
891 &glean,
892 &test_meta,
893 2,
894 None,
895 EventSessionContext::OutOfSession,
896 );
897 {
898 let event_stores = db.event_stores.read().unwrap();
899 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
900 }
901 }
902
903 #[test]
904 fn normalize_store_of_glean_restarted() {
905 let (glean, _dir) = new_glean(None);
907
908 let store_name = "store-name";
909 let glean_restarted = StoredEvent {
910 event: RecordedEvent {
911 timestamp: 2,
912 category: "glean".into(),
913 name: "restarted".into(),
914 extra: None,
915 session: None,
916 },
917 execution_counter: None,
918 };
919 let mut store = vec![glean_restarted.clone()];
920 let glean_start_time = glean.start_time();
921
922 glean
923 .event_storage()
924 .normalize_store(&glean, store_name, &mut store, glean_start_time);
925 assert!(store.is_empty());
926
927 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
928 glean
929 .event_storage()
930 .normalize_store(&glean, store_name, &mut store, glean_start_time);
931 assert!(store.is_empty());
932
933 let mut store = vec![
934 glean_restarted.clone(),
935 glean_restarted.clone(),
936 glean_restarted,
937 ];
938 glean
939 .event_storage()
940 .normalize_store(&glean, store_name, &mut store, glean_start_time);
941 assert!(store.is_empty());
942 }
943
944 #[test]
945 fn normalize_store_of_glean_restarted_on_both_ends() {
946 let (glean, _dir) = new_glean(None);
948
949 let store_name = "store-name";
950 let glean_restarted = StoredEvent {
951 event: RecordedEvent {
952 timestamp: 2,
953 category: "glean".into(),
954 name: "restarted".into(),
955 extra: None,
956 session: None,
957 },
958 execution_counter: None,
959 };
960 let not_glean_restarted = StoredEvent {
961 event: RecordedEvent {
962 timestamp: 20,
963 category: "category".into(),
964 name: "name".into(),
965 extra: None,
966 session: None,
967 },
968 execution_counter: None,
969 };
970 let mut store = vec![
971 glean_restarted.clone(),
972 not_glean_restarted.clone(),
973 glean_restarted,
974 ];
975 let glean_start_time = glean.start_time();
976
977 glean
978 .event_storage()
979 .normalize_store(&glean, store_name, &mut store, glean_start_time);
980 assert_eq!(1, store.len());
981 assert_eq!(
982 StoredEvent {
983 event: RecordedEvent {
984 timestamp: 0,
985 ..not_glean_restarted.event
986 },
987 execution_counter: None
988 },
989 store[0]
990 );
991 }
992
993 #[test]
994 fn normalize_store_single_run_timestamp_math() {
995 let (glean, _dir) = new_glean(None);
999
1000 let store_name = "store-name";
1001 let glean_restarted = StoredEvent {
1002 event: RecordedEvent {
1003 timestamp: 2,
1004 category: "glean".into(),
1005 name: "restarted".into(),
1006 extra: None,
1007 session: None,
1008 },
1009 execution_counter: None,
1010 };
1011 let timestamps = [20, 40, 200];
1012 let not_glean_restarted = StoredEvent {
1013 event: RecordedEvent {
1014 timestamp: timestamps[0],
1015 category: "category".into(),
1016 name: "name".into(),
1017 extra: None,
1018 session: None,
1019 },
1020 execution_counter: None,
1021 };
1022 let mut store = vec![
1023 glean_restarted.clone(),
1024 not_glean_restarted.clone(),
1025 StoredEvent {
1026 event: RecordedEvent {
1027 timestamp: timestamps[1],
1028 ..not_glean_restarted.event.clone()
1029 },
1030 execution_counter: None,
1031 },
1032 StoredEvent {
1033 event: RecordedEvent {
1034 timestamp: timestamps[2],
1035 ..not_glean_restarted.event.clone()
1036 },
1037 execution_counter: None,
1038 },
1039 glean_restarted,
1040 ];
1041
1042 glean
1043 .event_storage()
1044 .normalize_store(&glean, store_name, &mut store, glean.start_time());
1045 assert_eq!(3, store.len());
1046 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1047 assert_eq!(
1048 &StoredEvent {
1049 event: RecordedEvent {
1050 timestamp: timestamp - timestamps[0],
1051 ..not_glean_restarted.clone().event
1052 },
1053 execution_counter: None
1054 },
1055 event
1056 );
1057 }
1058 }
1059
1060 #[test]
1061 fn normalize_store_multi_run_timestamp_math() {
1062 let (glean, _dir) = new_glean(None);
1067
1068 let store_name = "store-name";
1069 let glean_restarted = StoredEvent {
1070 event: RecordedEvent {
1071 category: "glean".into(),
1072 name: "restarted".into(),
1073 ..Default::default()
1074 },
1075 execution_counter: None,
1076 };
1077 let not_glean_restarted = StoredEvent {
1078 event: RecordedEvent {
1079 category: "category".into(),
1080 name: "name".into(),
1081 ..Default::default()
1082 },
1083 execution_counter: None,
1084 };
1085
1086 let timestamps = [20, 40, 200, 12];
1089 let ecs = [0, 1];
1090 let some_hour = 16;
1091 let startup_date = FixedOffset::east_opt(0)
1092 .unwrap()
1093 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1095 let glean_start_time = startup_date.with_hour(some_hour - 1);
1096 let restarted_ts = 2;
1097 let mut store = vec![
1098 StoredEvent {
1099 event: RecordedEvent {
1100 timestamp: timestamps[0],
1101 ..not_glean_restarted.event.clone()
1102 },
1103 execution_counter: Some(ecs[0]),
1104 },
1105 StoredEvent {
1106 event: RecordedEvent {
1107 timestamp: timestamps[1],
1108 ..not_glean_restarted.event.clone()
1109 },
1110 execution_counter: Some(ecs[0]),
1111 },
1112 StoredEvent {
1113 event: RecordedEvent {
1114 timestamp: timestamps[2],
1115 ..not_glean_restarted.event.clone()
1116 },
1117 execution_counter: Some(ecs[0]),
1118 },
1119 StoredEvent {
1120 event: RecordedEvent {
1121 extra: Some(
1122 [(
1123 "glean.startup.date".into(),
1124 get_iso_time_string(startup_date, TimeUnit::Minute),
1125 )]
1126 .into(),
1127 ),
1128 timestamp: restarted_ts,
1129 ..glean_restarted.event.clone()
1130 },
1131 execution_counter: Some(ecs[1]),
1132 },
1133 StoredEvent {
1134 event: RecordedEvent {
1135 timestamp: timestamps[3],
1136 ..not_glean_restarted.event.clone()
1137 },
1138 execution_counter: Some(ecs[1]),
1139 },
1140 ];
1141
1142 glean.event_storage().normalize_store(
1143 &glean,
1144 store_name,
1145 &mut store,
1146 glean_start_time.unwrap(),
1147 );
1148 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1152 assert_eq!(
1153 StoredEvent {
1154 event: RecordedEvent {
1155 timestamp: timestamp - timestamps[0],
1156 ..not_glean_restarted.event.clone()
1157 },
1158 execution_counter: None,
1159 },
1160 event
1161 );
1162 }
1163 let hour_in_millis = 3600000;
1165 assert_eq!(
1166 store[3],
1167 StoredEvent {
1168 event: RecordedEvent {
1169 timestamp: hour_in_millis,
1170 ..glean_restarted.event
1171 },
1172 execution_counter: None,
1173 }
1174 );
1175 assert_eq!(
1177 store[4],
1178 StoredEvent {
1179 event: RecordedEvent {
1180 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1181 ..not_glean_restarted.event
1182 },
1183 execution_counter: None,
1184 }
1185 );
1186 }
1187
1188 #[test]
1189 fn normalize_store_multi_run_client_clocks() {
1190 let (glean, _dir) = new_glean(None);
1193
1194 let store_name = "store-name";
1195 let glean_restarted = StoredEvent {
1196 event: RecordedEvent {
1197 category: "glean".into(),
1198 name: "restarted".into(),
1199 ..Default::default()
1200 },
1201 execution_counter: None,
1202 };
1203 let not_glean_restarted = StoredEvent {
1204 event: RecordedEvent {
1205 category: "category".into(),
1206 name: "name".into(),
1207 ..Default::default()
1208 },
1209 execution_counter: None,
1210 };
1211
1212 let timestamps = [20, 40, 12, 200];
1215 let ecs = [0, 1];
1216 let some_hour = 10;
1217 let startup_date = FixedOffset::east_opt(0)
1218 .unwrap()
1219 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1221 let glean_start_time = startup_date.with_hour(some_hour + 1);
1222 let restarted_ts = 2;
1223 let mut store = vec![
1224 StoredEvent {
1225 event: RecordedEvent {
1226 timestamp: timestamps[0],
1227 ..not_glean_restarted.event.clone()
1228 },
1229 execution_counter: Some(ecs[0]),
1230 },
1231 StoredEvent {
1232 event: RecordedEvent {
1233 timestamp: timestamps[1],
1234 ..not_glean_restarted.event.clone()
1235 },
1236 execution_counter: Some(ecs[0]),
1237 },
1238 StoredEvent {
1239 event: RecordedEvent {
1240 extra: Some(
1241 [(
1242 "glean.startup.date".into(),
1243 get_iso_time_string(startup_date, TimeUnit::Minute),
1244 )]
1245 .into(),
1246 ),
1247 timestamp: restarted_ts,
1248 ..glean_restarted.event.clone()
1249 },
1250 execution_counter: Some(ecs[1]),
1251 },
1252 StoredEvent {
1253 event: RecordedEvent {
1254 timestamp: timestamps[2],
1255 ..not_glean_restarted.event.clone()
1256 },
1257 execution_counter: Some(ecs[1]),
1258 },
1259 StoredEvent {
1260 event: RecordedEvent {
1261 timestamp: timestamps[3],
1262 ..not_glean_restarted.event.clone()
1263 },
1264 execution_counter: Some(ecs[1]),
1265 },
1266 ];
1267
1268 glean.event_storage().normalize_store(
1269 &glean,
1270 store_name,
1271 &mut store,
1272 glean_start_time.unwrap(),
1273 );
1274 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1278 assert_eq!(
1279 StoredEvent {
1280 event: RecordedEvent {
1281 timestamp: timestamp - timestamps[0],
1282 ..not_glean_restarted.event.clone()
1283 },
1284 execution_counter: None,
1285 },
1286 event
1287 );
1288 }
1289 assert_eq!(
1293 store[2],
1294 StoredEvent {
1295 event: RecordedEvent {
1296 timestamp: store[1].event.timestamp + 1,
1297 ..glean_restarted.event
1298 },
1299 execution_counter: None,
1300 }
1301 );
1302 assert_eq!(
1304 store[3],
1305 StoredEvent {
1306 event: RecordedEvent {
1307 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1308 ..not_glean_restarted.event
1309 },
1310 execution_counter: None,
1311 }
1312 );
1313 assert_eq!(
1315 Ok(1),
1316 test_get_num_recorded_errors(
1317 &glean,
1318 &CommonMetricData {
1319 name: "restarted".into(),
1320 category: "glean".into(),
1321 send_in_pings: vec![store_name.into()],
1322 lifetime: Lifetime::Ping,
1323 ..Default::default()
1324 }
1325 .into(),
1326 ErrorType::InvalidValue
1327 )
1328 );
1329 }
1330
1331 #[test]
1332 fn normalize_store_non_zero_ec() {
1333 let (glean, _dir) = new_glean(None);
1336
1337 let store_name = "store-name";
1338 let glean_restarted = StoredEvent {
1339 event: RecordedEvent {
1340 timestamp: 2,
1341 category: "glean".into(),
1342 name: "restarted".into(),
1343 extra: None,
1344 session: None,
1345 },
1346 execution_counter: Some(2),
1347 };
1348 let not_glean_restarted = StoredEvent {
1349 event: RecordedEvent {
1350 timestamp: 20,
1351 category: "category".into(),
1352 name: "name".into(),
1353 extra: None,
1354 session: None,
1355 },
1356 execution_counter: Some(2),
1357 };
1358 let glean_restarted_2 = StoredEvent {
1359 event: RecordedEvent {
1360 timestamp: 2,
1361 category: "glean".into(),
1362 name: "restarted".into(),
1363 extra: None,
1364 session: None,
1365 },
1366 execution_counter: Some(3),
1367 };
1368 let mut store = vec![
1369 glean_restarted,
1370 not_glean_restarted.clone(),
1371 glean_restarted_2,
1372 ];
1373 let glean_start_time = glean.start_time();
1374
1375 glean
1376 .event_storage()
1377 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1378
1379 assert_eq!(1, store.len());
1380 assert_eq!(
1381 StoredEvent {
1382 event: RecordedEvent {
1383 timestamp: 0,
1384 ..not_glean_restarted.event
1385 },
1386 execution_counter: None
1387 },
1388 store[0]
1389 );
1390 assert!(test_get_num_recorded_errors(
1392 &glean,
1393 &CommonMetricData {
1394 name: "restarted".into(),
1395 category: "glean".into(),
1396 send_in_pings: vec![store_name.into()],
1397 lifetime: Lifetime::Ping,
1398 ..Default::default()
1399 }
1400 .into(),
1401 ErrorType::InvalidState
1402 )
1403 .is_err());
1404 assert!(test_get_num_recorded_errors(
1406 &glean,
1407 &CommonMetricData {
1408 name: "restarted".into(),
1409 category: "glean".into(),
1410 send_in_pings: vec![store_name.into()],
1411 lifetime: Lifetime::Ping,
1412 ..Default::default()
1413 }
1414 .into(),
1415 ErrorType::InvalidValue
1416 )
1417 .is_err());
1418 }
1419
1420 #[test]
1421 fn normalize_store_clamps_timestamp() {
1422 let (glean, _dir) = new_glean(None);
1423
1424 let store_name = "store-name";
1425 let event = RecordedEvent {
1426 category: "category".into(),
1427 name: "name".into(),
1428 ..Default::default()
1429 };
1430
1431 let timestamps = [
1432 0,
1433 (i64::MAX / 2) as u64,
1434 i64::MAX as _,
1435 (i64::MAX as u64) + 1,
1436 ];
1437 let mut store = timestamps
1438 .into_iter()
1439 .map(|timestamp| StoredEvent {
1440 event: RecordedEvent {
1441 timestamp,
1442 ..event.clone()
1443 },
1444 execution_counter: None,
1445 })
1446 .collect();
1447
1448 let glean_start_time = glean.start_time();
1449 glean
1450 .event_storage()
1451 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1452 assert_eq!(4, store.len());
1453
1454 assert_eq!(0, store[0].event.timestamp);
1455 assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1456 assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1457 assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1458 }
1459
1460 #[test]
1461 fn normalize_store_clamps_timestamp_metric_enabled() {
1462 let (glean, _dir) = new_glean(None);
1463
1464 let mut cfg = RemoteSettingsConfig::default();
1465 cfg.metrics_enabled
1466 .insert("glean.error.event_timestamp_clamped".to_string(), true);
1467 glean.apply_server_knobs_config(cfg);
1468
1469 let store_name = "store-name";
1470 let event = RecordedEvent {
1471 category: "category".into(),
1472 name: "name".into(),
1473 ..Default::default()
1474 };
1475
1476 let timestamps = [0, (i64::MAX as u64) + 1];
1477 let mut store = timestamps
1478 .into_iter()
1479 .map(|timestamp| StoredEvent {
1480 event: RecordedEvent {
1481 timestamp,
1482 ..event.clone()
1483 },
1484 execution_counter: None,
1485 })
1486 .collect();
1487
1488 let glean_start_time = glean.start_time();
1489 glean
1490 .event_storage()
1491 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1492 assert_eq!(2, store.len());
1493
1494 assert_eq!(0, store[0].event.timestamp);
1495 assert_eq!((i64::MAX as u64), store[1].event.timestamp);
1496
1497 let error_count = glean
1498 .additional_metrics
1499 .event_timestamp_clamped
1500 .get_value(&glean, "health");
1501 assert_eq!(Some(1), error_count);
1502 }
1503}