1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::error_recording::{record_error, ErrorType};
25use crate::metrics::{DatetimeMetric, TimeUnit};
26use crate::storage::INTERNAL_STORAGE;
27use crate::util::get_iso_time_string;
28use crate::Glean;
29use crate::Result;
30use crate::{CommonMetricData, CounterMetric, Lifetime};
31
32#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
34#[cfg_attr(test, derive(Default))]
35pub struct RecordedEvent {
36 pub timestamp: u64,
40
41 pub category: String,
45
46 pub name: String,
50
51 #[serde(skip_serializing_if = "Option::is_none")]
55 pub extra: Option<HashMap<String, String>>,
56}
57
58#[derive(
60 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
61)]
62struct StoredEvent {
63 #[serde(flatten)]
64 event: RecordedEvent,
65
66 #[serde(default)]
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub execution_counter: Option<i32>,
73}
74
75#[derive(Debug)]
97pub struct EventDatabase {
98 pub path: PathBuf,
100 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
102 event_store_files: RwLock<HashMap<String, Arc<File>>>,
103 file_lock: Mutex<()>,
105}
106
107impl MallocSizeOf for EventDatabase {
108 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
109 let mut n = 0;
110 n += self.event_stores.read().unwrap().size_of(ops);
111
112 let map = self.event_store_files.read().unwrap();
113 for store_name in map.keys() {
114 n += store_name.size_of(ops);
115 n += mem::size_of::<File>();
117 }
118 n
119 }
120}
121
122impl EventDatabase {
123 pub fn new(data_path: &Path) -> Result<Self> {
130 let path = data_path.join("events");
131 create_dir_all(&path)?;
132
133 Ok(Self {
134 path,
135 event_stores: RwLock::new(HashMap::new()),
136 event_store_files: RwLock::new(HashMap::new()),
137 file_lock: Mutex::new(()),
138 })
139 }
140
141 pub fn flush_pending_events_on_startup(
167 &self,
168 glean: &Glean,
169 trim_data_to_registered_pings: bool,
170 ) -> bool {
171 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
172 Ok(_) => {
173 let stores_with_events: Vec<String> = {
174 self.event_stores
175 .read()
176 .unwrap()
177 .keys()
178 .map(|x| x.to_owned())
179 .collect() };
181 let has_events_events = stores_with_events.contains(&"events".to_owned());
184 let glean_restarted_stores = if has_events_events {
185 stores_with_events
186 .into_iter()
187 .filter(|store| store != "events")
188 .collect()
189 } else {
190 stores_with_events
191 };
192 if !glean_restarted_stores.is_empty() {
193 for store_name in glean_restarted_stores.iter() {
194 CounterMetric::new(CommonMetricData {
195 name: "execution_counter".into(),
196 category: store_name.into(),
197 send_in_pings: vec![INTERNAL_STORAGE.into()],
198 lifetime: Lifetime::Ping,
199 ..Default::default()
200 })
201 .add_sync(glean, 1);
202 }
203 let glean_restarted = CommonMetricData {
204 name: "restarted".into(),
205 category: "glean".into(),
206 send_in_pings: glean_restarted_stores,
207 lifetime: Lifetime::Ping,
208 ..Default::default()
209 };
210 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
211 let mut extra: HashMap<String, String> =
212 [("glean.startup.date".into(), startup)].into();
213 if glean.with_timestamps() {
214 let now = Utc::now();
215 let precise_timestamp = now.timestamp_millis() as u64;
216 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
217 }
218 self.record(
219 glean,
220 &glean_restarted.into(),
221 crate::get_timestamp_ms(),
222 Some(extra),
223 );
224 }
225 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
226 }
227 Err(err) => {
228 log::warn!("Error loading events from disk: {}", err);
229 false
230 }
231 }
232 }
233
234 fn load_events_from_disk(
235 &self,
236 glean: &Glean,
237 trim_data_to_registered_pings: bool,
238 ) -> Result<()> {
239 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
247 let entry = entry?;
248 if entry.file_type()?.is_file() {
249 let store_name = entry.file_name().into_string()?;
250 log::info!("Loading events for {}", store_name);
251 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
252 log::warn!("Trimming {}'s events", store_name);
253 if let Err(err) = fs::remove_file(entry.path()) {
254 match err.kind() {
255 std::io::ErrorKind::NotFound => {
256 }
258 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
259 }
260 }
261 continue;
262 }
263 let file = BufReader::new(File::open(entry.path())?);
264 db.insert(
265 store_name,
266 file.lines()
267 .map_while(Result::ok)
268 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
269 .collect(),
270 );
271 }
272 }
273 Ok(())
274 }
275
276 pub fn record(
293 &self,
294 glean: &Glean,
295 meta: &CommonMetricDataInternal,
296 timestamp: u64,
297 extra: Option<HashMap<String, String>>,
298 ) -> bool {
299 if !glean.is_upload_enabled() {
301 return false;
302 }
303
304 let mut submit_max_capacity_event_ping = false;
305 {
306 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
308 if !glean.is_ping_enabled(store_name) {
309 continue;
310 }
311
312 let store = db.entry(store_name.to_string()).or_default();
313 let execution_counter = CounterMetric::new(CommonMetricData {
314 name: "execution_counter".into(),
315 category: store_name.into(),
316 send_in_pings: vec![INTERNAL_STORAGE.into()],
317 lifetime: Lifetime::Ping,
318 ..Default::default()
319 })
320 .get_value(glean, INTERNAL_STORAGE);
321 let event = StoredEvent {
323 event: RecordedEvent {
324 timestamp,
325 category: meta.inner.category.to_string(),
326 name: meta.inner.name.to_string(),
327 extra: extra.clone(),
328 },
329 execution_counter,
330 };
331 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
333 self.write_event_to_disk(store_name, &event_json);
334 if store_name == "events" && store.len() == glean.get_max_events() {
335 submit_max_capacity_event_ping = true;
336 }
337 }
338 }
339 if submit_max_capacity_event_ping {
340 glean.submit_ping_by_name("events", Some("max_capacity"));
341 true
342 } else {
343 false
344 }
345 }
346
347 fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
348 let mut map = self.event_store_files.write().unwrap();
350 let entry = map.entry(store_name.to_string());
351
352 match entry {
353 Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
354 Entry::Vacant(entry) => {
355 let file = OpenOptions::new()
356 .create(true)
357 .append(true)
358 .open(self.path.join(store_name))?;
359 let file = Arc::new(file);
360 let entry = entry.insert(file);
361 Ok(Arc::clone(entry))
362 }
363 }
364 }
365
366 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
373 let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
376 let mut file = self.get_event_store(store_name)?;
377 file.write_all(event_json.as_bytes())?;
378 file.write_all(b"\n")?;
379 file.flush()?;
380 Ok::<(), std::io::Error>(())
381 })();
382
383 if let Err(err) = write_res {
384 log::warn!("IO error writing event to store '{}': {}", store_name, err);
385 }
386 }
387
388 fn normalize_store(
417 &self,
418 glean: &Glean,
419 store_name: &str,
420 store: &mut Vec<StoredEvent>,
421 glean_start_time: DateTime<FixedOffset>,
422 ) {
423 let is_glean_restarted =
424 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
425 let glean_restarted_meta = |store_name: &str| CommonMetricData {
426 name: "restarted".into(),
427 category: "glean".into(),
428 send_in_pings: vec![store_name.into()],
429 lifetime: Lifetime::Ping,
430 ..Default::default()
431 };
432 store.sort_by(|a, b| {
434 a.execution_counter
435 .cmp(&b.execution_counter)
436 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
437 .then_with(|| {
438 if is_glean_restarted(&a.event) {
439 Ordering::Less
440 } else {
441 Ordering::Greater
442 }
443 })
444 });
445 let final_event = match store
449 .iter()
450 .rposition(|event| !is_glean_restarted(&event.event))
451 {
452 Some(idx) => idx + 1,
453 _ => 0,
454 };
455 store.drain(final_event..);
456 let first_event = store
457 .iter()
458 .position(|event| !is_glean_restarted(&event.event))
459 .unwrap_or(store.len());
460 store.drain(..first_event);
461 if store.is_empty() {
462 return;
464 }
465 let mut cur_ec = 0;
471 let mut intra_group_offset = store[0].event.timestamp;
473 let mut inter_group_offset = 0;
475 let mut highest_ts = 0;
476 for event in store.iter_mut() {
477 let execution_counter = event.execution_counter.take().unwrap_or(0);
478 if is_glean_restarted(&event.event) {
479 cur_ec = execution_counter;
482 let glean_startup_date = event
483 .event
484 .extra
485 .as_mut()
486 .and_then(|extra| {
487 extra.remove("glean.startup.date").and_then(|date_str| {
488 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
489 .map_err(|_| {
490 record_error(
491 glean,
492 &glean_restarted_meta(store_name).into(),
493 ErrorType::InvalidState,
494 format!("Unparseable glean.startup.date '{}'", date_str),
495 None,
496 );
497 })
498 .ok()
499 })
500 })
501 .unwrap_or(glean_start_time);
502 if event
503 .event
504 .extra
505 .as_ref()
506 .is_some_and(|extra| extra.is_empty())
507 {
508 event.event.extra = None;
510 }
511 let ping_start = DatetimeMetric::new(
512 CommonMetricData {
513 name: format!("{}#start", store_name),
514 category: "".into(),
515 send_in_pings: vec![INTERNAL_STORAGE.into()],
516 lifetime: Lifetime::User,
517 ..Default::default()
518 },
519 TimeUnit::Minute,
520 );
521 let ping_start = ping_start
522 .get_value(glean, INTERNAL_STORAGE)
523 .unwrap_or(glean_start_time);
524 let time_from_ping_start_to_glean_restarted =
525 (glean_startup_date - ping_start).num_milliseconds();
526 intra_group_offset = event.event.timestamp;
527 inter_group_offset =
528 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
529 if inter_group_offset < highest_ts {
530 record_error(
531 glean,
532 &glean_restarted_meta(store_name).into(),
533 ErrorType::InvalidValue,
534 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
535 None,
536 );
537 inter_group_offset = highest_ts + 1;
542 }
543 } else if cur_ec == 0 {
544 cur_ec = execution_counter;
546 }
547 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
548 if execution_counter != cur_ec {
549 record_error(
550 glean,
551 &glean_restarted_meta(store_name).into(),
552 ErrorType::InvalidState,
553 format!(
554 "Inconsistent execution counter {} (expected {})",
555 execution_counter, cur_ec
556 ),
557 None,
558 );
559 cur_ec = execution_counter;
561 }
562
563 if event.event.timestamp > i64::MAX as u64 {
566 glean
567 .additional_metrics
568 .event_timestamp_clamped
569 .add_sync(glean, 1);
570 log::warn!(
571 "Calculated event timestamp was too high. Got: {}, max: {}",
572 event.event.timestamp,
573 i64::MAX,
574 );
575 event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
576 }
577
578 if highest_ts > event.event.timestamp {
579 record_error(
582 glean,
583 &glean_restarted_meta(store_name).into(),
584 ErrorType::InvalidState,
585 format!(
586 "Inconsistent previous highest timestamp {} (expected <= {})",
587 highest_ts, event.event.timestamp
588 ),
589 None,
590 );
591 }
593 highest_ts = event.event.timestamp
594 }
595 }
596
597 pub fn snapshot_as_json(
609 &self,
610 glean: &Glean,
611 store_name: &str,
612 clear_store: bool,
613 ) -> Option<JsonValue> {
614 let result = {
615 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
617 if !store.is_empty() {
618 let mut clone;
621 let store = if clear_store {
622 store
623 } else {
624 clone = store.clone();
625 &mut clone
626 };
627 self.normalize_store(glean, store_name, store, glean.start_time());
629 Some(json!(store))
630 } else {
631 log::warn!("Unexpectly got empty event store for '{}'", store_name);
632 None
633 }
634 })
635 };
636
637 if clear_store {
638 self.event_stores
639 .write()
640 .unwrap() .remove(&store_name.to_string());
642 self.event_store_files
643 .write()
644 .unwrap() .remove(&store_name.to_string());
646
647 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
649 match err.kind() {
650 std::io::ErrorKind::NotFound => {
651 }
653 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
654 }
655 }
656 }
657
658 result
659 }
660
661 pub fn clear_all(&self) -> Result<()> {
663 self.event_stores.write().unwrap().clear();
665 self.event_store_files.write().unwrap().clear();
666
667 let _lock = self.file_lock.lock().unwrap();
669 std::fs::remove_dir_all(&self.path)?;
670 create_dir_all(&self.path)?;
671
672 Ok(())
673 }
674
675 pub fn test_get_value<'a>(
682 &'a self,
683 meta: &'a CommonMetricDataInternal,
684 store_name: &str,
685 ) -> Option<Vec<RecordedEvent>> {
686 let value: Vec<RecordedEvent> = self
687 .event_stores
688 .read()
689 .unwrap() .get(&store_name.to_string())
691 .into_iter()
692 .flatten()
693 .map(|stored_event| stored_event.event.clone())
694 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
695 .collect();
696 if !value.is_empty() {
697 Some(value)
698 } else {
699 None
700 }
701 }
702}
703
704#[cfg(test)]
705mod test {
706 use super::*;
707 use crate::test_get_num_recorded_errors;
708 use crate::tests::new_glean;
709 use chrono::{TimeZone, Timelike};
710
711 #[test]
712 fn handle_truncated_events_on_disk() {
713 let (glean, t) = new_glean(None);
714
715 {
716 let db = EventDatabase::new(t.path()).unwrap();
717 db.write_event_to_disk("events", "{\"timestamp\": 500");
718 db.write_event_to_disk("events", "{\"timestamp\"");
719 db.write_event_to_disk(
720 "events",
721 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
722 );
723 }
724
725 {
726 let db = EventDatabase::new(t.path()).unwrap();
727 db.load_events_from_disk(&glean, false).unwrap();
728 let events = &db.event_stores.read().unwrap()["events"];
729 assert_eq!(1, events.len());
730 }
731 }
732
733 #[test]
734 fn stable_serialization() {
735 let event_empty = RecordedEvent {
736 timestamp: 2,
737 category: "cat".to_string(),
738 name: "name".to_string(),
739 extra: None,
740 };
741
742 let mut data = HashMap::new();
743 data.insert("a key".to_string(), "a value".to_string());
744 let event_data = RecordedEvent {
745 timestamp: 2,
746 category: "cat".to_string(),
747 name: "name".to_string(),
748 extra: Some(data),
749 };
750
751 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
752 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
753
754 assert_eq!(
755 StoredEvent {
756 event: event_empty,
757 execution_counter: None
758 },
759 serde_json::from_str(&event_empty_json).unwrap()
760 );
761 assert_eq!(
762 StoredEvent {
763 event: event_data,
764 execution_counter: None
765 },
766 serde_json::from_str(&event_data_json).unwrap()
767 );
768 }
769
770 #[test]
771 fn deserialize_existing_data() {
772 let event_empty_json = r#"
773{
774 "timestamp": 2,
775 "category": "cat",
776 "name": "name"
777}
778 "#;
779
780 let event_data_json = r#"
781{
782 "timestamp": 2,
783 "category": "cat",
784 "name": "name",
785 "extra": {
786 "a key": "a value"
787 }
788}
789 "#;
790
791 let event_empty = RecordedEvent {
792 timestamp: 2,
793 category: "cat".to_string(),
794 name: "name".to_string(),
795 extra: None,
796 };
797
798 let mut data = HashMap::new();
799 data.insert("a key".to_string(), "a value".to_string());
800 let event_data = RecordedEvent {
801 timestamp: 2,
802 category: "cat".to_string(),
803 name: "name".to_string(),
804 extra: Some(data),
805 };
806
807 assert_eq!(
808 StoredEvent {
809 event: event_empty,
810 execution_counter: None
811 },
812 serde_json::from_str(event_empty_json).unwrap()
813 );
814 assert_eq!(
815 StoredEvent {
816 event: event_data,
817 execution_counter: None
818 },
819 serde_json::from_str(event_data_json).unwrap()
820 );
821 }
822
823 #[test]
824 fn doesnt_record_when_upload_is_disabled() {
825 let (mut glean, dir) = new_glean(None);
826 let db = EventDatabase::new(dir.path()).unwrap();
827
828 let test_storage = "store1";
829 let test_category = "category";
830 let test_name = "name";
831 let test_timestamp = 2;
832 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
833 let event_data = RecordedEvent {
834 timestamp: test_timestamp,
835 category: test_category.to_string(),
836 name: test_name.to_string(),
837 extra: None,
838 };
839
840 db.record(&glean, &test_meta, 2, None);
843 {
844 let event_stores = db.event_stores.read().unwrap();
845 assert_eq!(
846 &StoredEvent {
847 event: event_data,
848 execution_counter: None
849 },
850 &event_stores.get(test_storage).unwrap()[0]
851 );
852 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
853 }
854
855 glean.set_upload_enabled(false);
856
857 db.record(&glean, &test_meta, 2, None);
859 {
860 let event_stores = db.event_stores.read().unwrap();
861 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
862 }
863 }
864
865 #[test]
866 fn normalize_store_of_glean_restarted() {
867 let (glean, _dir) = new_glean(None);
869
870 let store_name = "store-name";
871 let glean_restarted = StoredEvent {
872 event: RecordedEvent {
873 timestamp: 2,
874 category: "glean".into(),
875 name: "restarted".into(),
876 extra: None,
877 },
878 execution_counter: None,
879 };
880 let mut store = vec![glean_restarted.clone()];
881 let glean_start_time = glean.start_time();
882
883 glean
884 .event_storage()
885 .normalize_store(&glean, store_name, &mut store, glean_start_time);
886 assert!(store.is_empty());
887
888 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
889 glean
890 .event_storage()
891 .normalize_store(&glean, store_name, &mut store, glean_start_time);
892 assert!(store.is_empty());
893
894 let mut store = vec![
895 glean_restarted.clone(),
896 glean_restarted.clone(),
897 glean_restarted,
898 ];
899 glean
900 .event_storage()
901 .normalize_store(&glean, store_name, &mut store, glean_start_time);
902 assert!(store.is_empty());
903 }
904
905 #[test]
906 fn normalize_store_of_glean_restarted_on_both_ends() {
907 let (glean, _dir) = new_glean(None);
909
910 let store_name = "store-name";
911 let glean_restarted = StoredEvent {
912 event: RecordedEvent {
913 timestamp: 2,
914 category: "glean".into(),
915 name: "restarted".into(),
916 extra: None,
917 },
918 execution_counter: None,
919 };
920 let not_glean_restarted = StoredEvent {
921 event: RecordedEvent {
922 timestamp: 20,
923 category: "category".into(),
924 name: "name".into(),
925 extra: None,
926 },
927 execution_counter: None,
928 };
929 let mut store = vec![
930 glean_restarted.clone(),
931 not_glean_restarted.clone(),
932 glean_restarted,
933 ];
934 let glean_start_time = glean.start_time();
935
936 glean
937 .event_storage()
938 .normalize_store(&glean, store_name, &mut store, glean_start_time);
939 assert_eq!(1, store.len());
940 assert_eq!(
941 StoredEvent {
942 event: RecordedEvent {
943 timestamp: 0,
944 ..not_glean_restarted.event
945 },
946 execution_counter: None
947 },
948 store[0]
949 );
950 }
951
952 #[test]
953 fn normalize_store_single_run_timestamp_math() {
954 let (glean, _dir) = new_glean(None);
958
959 let store_name = "store-name";
960 let glean_restarted = StoredEvent {
961 event: RecordedEvent {
962 timestamp: 2,
963 category: "glean".into(),
964 name: "restarted".into(),
965 extra: None,
966 },
967 execution_counter: None,
968 };
969 let timestamps = [20, 40, 200];
970 let not_glean_restarted = StoredEvent {
971 event: RecordedEvent {
972 timestamp: timestamps[0],
973 category: "category".into(),
974 name: "name".into(),
975 extra: None,
976 },
977 execution_counter: None,
978 };
979 let mut store = vec![
980 glean_restarted.clone(),
981 not_glean_restarted.clone(),
982 StoredEvent {
983 event: RecordedEvent {
984 timestamp: timestamps[1],
985 ..not_glean_restarted.event.clone()
986 },
987 execution_counter: None,
988 },
989 StoredEvent {
990 event: RecordedEvent {
991 timestamp: timestamps[2],
992 ..not_glean_restarted.event.clone()
993 },
994 execution_counter: None,
995 },
996 glean_restarted,
997 ];
998
999 glean
1000 .event_storage()
1001 .normalize_store(&glean, store_name, &mut store, glean.start_time());
1002 assert_eq!(3, store.len());
1003 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1004 assert_eq!(
1005 &StoredEvent {
1006 event: RecordedEvent {
1007 timestamp: timestamp - timestamps[0],
1008 ..not_glean_restarted.clone().event
1009 },
1010 execution_counter: None
1011 },
1012 event
1013 );
1014 }
1015 }
1016
1017 #[test]
1018 fn normalize_store_multi_run_timestamp_math() {
1019 let (glean, _dir) = new_glean(None);
1024
1025 let store_name = "store-name";
1026 let glean_restarted = StoredEvent {
1027 event: RecordedEvent {
1028 category: "glean".into(),
1029 name: "restarted".into(),
1030 ..Default::default()
1031 },
1032 execution_counter: None,
1033 };
1034 let not_glean_restarted = StoredEvent {
1035 event: RecordedEvent {
1036 category: "category".into(),
1037 name: "name".into(),
1038 ..Default::default()
1039 },
1040 execution_counter: None,
1041 };
1042
1043 let timestamps = [20, 40, 200, 12];
1046 let ecs = [0, 1];
1047 let some_hour = 16;
1048 let startup_date = FixedOffset::east_opt(0)
1049 .unwrap()
1050 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1052 let glean_start_time = startup_date.with_hour(some_hour - 1);
1053 let restarted_ts = 2;
1054 let mut store = vec![
1055 StoredEvent {
1056 event: RecordedEvent {
1057 timestamp: timestamps[0],
1058 ..not_glean_restarted.event.clone()
1059 },
1060 execution_counter: Some(ecs[0]),
1061 },
1062 StoredEvent {
1063 event: RecordedEvent {
1064 timestamp: timestamps[1],
1065 ..not_glean_restarted.event.clone()
1066 },
1067 execution_counter: Some(ecs[0]),
1068 },
1069 StoredEvent {
1070 event: RecordedEvent {
1071 timestamp: timestamps[2],
1072 ..not_glean_restarted.event.clone()
1073 },
1074 execution_counter: Some(ecs[0]),
1075 },
1076 StoredEvent {
1077 event: RecordedEvent {
1078 extra: Some(
1079 [(
1080 "glean.startup.date".into(),
1081 get_iso_time_string(startup_date, TimeUnit::Minute),
1082 )]
1083 .into(),
1084 ),
1085 timestamp: restarted_ts,
1086 ..glean_restarted.event.clone()
1087 },
1088 execution_counter: Some(ecs[1]),
1089 },
1090 StoredEvent {
1091 event: RecordedEvent {
1092 timestamp: timestamps[3],
1093 ..not_glean_restarted.event.clone()
1094 },
1095 execution_counter: Some(ecs[1]),
1096 },
1097 ];
1098
1099 glean.event_storage().normalize_store(
1100 &glean,
1101 store_name,
1102 &mut store,
1103 glean_start_time.unwrap(),
1104 );
1105 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1109 assert_eq!(
1110 StoredEvent {
1111 event: RecordedEvent {
1112 timestamp: timestamp - timestamps[0],
1113 ..not_glean_restarted.event.clone()
1114 },
1115 execution_counter: None,
1116 },
1117 event
1118 );
1119 }
1120 let hour_in_millis = 3600000;
1122 assert_eq!(
1123 store[3],
1124 StoredEvent {
1125 event: RecordedEvent {
1126 timestamp: hour_in_millis,
1127 ..glean_restarted.event
1128 },
1129 execution_counter: None,
1130 }
1131 );
1132 assert_eq!(
1134 store[4],
1135 StoredEvent {
1136 event: RecordedEvent {
1137 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1138 ..not_glean_restarted.event
1139 },
1140 execution_counter: None,
1141 }
1142 );
1143 }
1144
1145 #[test]
1146 fn normalize_store_multi_run_client_clocks() {
1147 let (glean, _dir) = new_glean(None);
1150
1151 let store_name = "store-name";
1152 let glean_restarted = StoredEvent {
1153 event: RecordedEvent {
1154 category: "glean".into(),
1155 name: "restarted".into(),
1156 ..Default::default()
1157 },
1158 execution_counter: None,
1159 };
1160 let not_glean_restarted = StoredEvent {
1161 event: RecordedEvent {
1162 category: "category".into(),
1163 name: "name".into(),
1164 ..Default::default()
1165 },
1166 execution_counter: None,
1167 };
1168
1169 let timestamps = [20, 40, 12, 200];
1172 let ecs = [0, 1];
1173 let some_hour = 10;
1174 let startup_date = FixedOffset::east_opt(0)
1175 .unwrap()
1176 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1178 let glean_start_time = startup_date.with_hour(some_hour + 1);
1179 let restarted_ts = 2;
1180 let mut store = vec![
1181 StoredEvent {
1182 event: RecordedEvent {
1183 timestamp: timestamps[0],
1184 ..not_glean_restarted.event.clone()
1185 },
1186 execution_counter: Some(ecs[0]),
1187 },
1188 StoredEvent {
1189 event: RecordedEvent {
1190 timestamp: timestamps[1],
1191 ..not_glean_restarted.event.clone()
1192 },
1193 execution_counter: Some(ecs[0]),
1194 },
1195 StoredEvent {
1196 event: RecordedEvent {
1197 extra: Some(
1198 [(
1199 "glean.startup.date".into(),
1200 get_iso_time_string(startup_date, TimeUnit::Minute),
1201 )]
1202 .into(),
1203 ),
1204 timestamp: restarted_ts,
1205 ..glean_restarted.event.clone()
1206 },
1207 execution_counter: Some(ecs[1]),
1208 },
1209 StoredEvent {
1210 event: RecordedEvent {
1211 timestamp: timestamps[2],
1212 ..not_glean_restarted.event.clone()
1213 },
1214 execution_counter: Some(ecs[1]),
1215 },
1216 StoredEvent {
1217 event: RecordedEvent {
1218 timestamp: timestamps[3],
1219 ..not_glean_restarted.event.clone()
1220 },
1221 execution_counter: Some(ecs[1]),
1222 },
1223 ];
1224
1225 glean.event_storage().normalize_store(
1226 &glean,
1227 store_name,
1228 &mut store,
1229 glean_start_time.unwrap(),
1230 );
1231 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1235 assert_eq!(
1236 StoredEvent {
1237 event: RecordedEvent {
1238 timestamp: timestamp - timestamps[0],
1239 ..not_glean_restarted.event.clone()
1240 },
1241 execution_counter: None,
1242 },
1243 event
1244 );
1245 }
1246 assert_eq!(
1250 store[2],
1251 StoredEvent {
1252 event: RecordedEvent {
1253 timestamp: store[1].event.timestamp + 1,
1254 ..glean_restarted.event
1255 },
1256 execution_counter: None,
1257 }
1258 );
1259 assert_eq!(
1261 store[3],
1262 StoredEvent {
1263 event: RecordedEvent {
1264 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1265 ..not_glean_restarted.event
1266 },
1267 execution_counter: None,
1268 }
1269 );
1270 assert_eq!(
1272 Ok(1),
1273 test_get_num_recorded_errors(
1274 &glean,
1275 &CommonMetricData {
1276 name: "restarted".into(),
1277 category: "glean".into(),
1278 send_in_pings: vec![store_name.into()],
1279 lifetime: Lifetime::Ping,
1280 ..Default::default()
1281 }
1282 .into(),
1283 ErrorType::InvalidValue
1284 )
1285 );
1286 }
1287
1288 #[test]
1289 fn normalize_store_non_zero_ec() {
1290 let (glean, _dir) = new_glean(None);
1293
1294 let store_name = "store-name";
1295 let glean_restarted = StoredEvent {
1296 event: RecordedEvent {
1297 timestamp: 2,
1298 category: "glean".into(),
1299 name: "restarted".into(),
1300 extra: None,
1301 },
1302 execution_counter: Some(2),
1303 };
1304 let not_glean_restarted = StoredEvent {
1305 event: RecordedEvent {
1306 timestamp: 20,
1307 category: "category".into(),
1308 name: "name".into(),
1309 extra: None,
1310 },
1311 execution_counter: Some(2),
1312 };
1313 let glean_restarted_2 = StoredEvent {
1314 event: RecordedEvent {
1315 timestamp: 2,
1316 category: "glean".into(),
1317 name: "restarted".into(),
1318 extra: None,
1319 },
1320 execution_counter: Some(3),
1321 };
1322 let mut store = vec![
1323 glean_restarted,
1324 not_glean_restarted.clone(),
1325 glean_restarted_2,
1326 ];
1327 let glean_start_time = glean.start_time();
1328
1329 glean
1330 .event_storage()
1331 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1332
1333 assert_eq!(1, store.len());
1334 assert_eq!(
1335 StoredEvent {
1336 event: RecordedEvent {
1337 timestamp: 0,
1338 ..not_glean_restarted.event
1339 },
1340 execution_counter: None
1341 },
1342 store[0]
1343 );
1344 assert!(test_get_num_recorded_errors(
1346 &glean,
1347 &CommonMetricData {
1348 name: "restarted".into(),
1349 category: "glean".into(),
1350 send_in_pings: vec![store_name.into()],
1351 lifetime: Lifetime::Ping,
1352 ..Default::default()
1353 }
1354 .into(),
1355 ErrorType::InvalidState
1356 )
1357 .is_err());
1358 assert!(test_get_num_recorded_errors(
1360 &glean,
1361 &CommonMetricData {
1362 name: "restarted".into(),
1363 category: "glean".into(),
1364 send_in_pings: vec![store_name.into()],
1365 lifetime: Lifetime::Ping,
1366 ..Default::default()
1367 }
1368 .into(),
1369 ErrorType::InvalidValue
1370 )
1371 .is_err());
1372 }
1373
1374 #[test]
1375 fn normalize_store_clamps_timestamp() {
1376 let (glean, _dir) = new_glean(None);
1377
1378 let store_name = "store-name";
1379 let event = RecordedEvent {
1380 category: "category".into(),
1381 name: "name".into(),
1382 ..Default::default()
1383 };
1384
1385 let timestamps = [
1386 0,
1387 (i64::MAX / 2) as u64,
1388 i64::MAX as _,
1389 (i64::MAX as u64) + 1,
1390 ];
1391 let mut store = timestamps
1392 .into_iter()
1393 .map(|timestamp| StoredEvent {
1394 event: RecordedEvent {
1395 timestamp,
1396 ..event.clone()
1397 },
1398 execution_counter: None,
1399 })
1400 .collect();
1401
1402 let glean_start_time = glean.start_time();
1403 glean
1404 .event_storage()
1405 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1406 assert_eq!(4, store.len());
1407
1408 assert_eq!(0, store[0].event.timestamp);
1409 assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1410 assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1411 assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1412
1413 let error_count = glean
1414 .additional_metrics
1415 .event_timestamp_clamped
1416 .get_value(&glean, "health");
1417 assert_eq!(Some(1), error_count);
1418 }
1419}