1use std::cmp::Ordering;
6use std::collections::hash_map::Entry;
7use std::collections::HashMap;
8use std::fs::{create_dir_all, File, OpenOptions};
9use std::io::BufReader;
10use std::io::Write;
11use std::io::{self, BufRead};
12use std::path::{Path, PathBuf};
13use std::sync::{Arc, Mutex, RwLock};
14use std::{fs, mem};
15
16use chrono::{DateTime, FixedOffset, Utc};
17
18use malloc_size_of::MallocSizeOf;
19use malloc_size_of_derive::MallocSizeOf;
20use serde::{Deserialize, Serialize};
21use serde_json::{json, Value as JsonValue};
22
23use crate::common_metric_data::CommonMetricDataInternal;
24use crate::coverage::record_coverage;
25use crate::error_recording::{record_error, ErrorType};
26use crate::metrics::{DatetimeMetric, TimeUnit};
27use crate::storage::INTERNAL_STORAGE;
28use crate::util::get_iso_time_string;
29use crate::Glean;
30use crate::Result;
31use crate::{CommonMetricData, CounterMetric, Lifetime};
32
33#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, MallocSizeOf)]
35#[cfg_attr(test, derive(Default))]
36pub struct RecordedEvent {
37 pub timestamp: u64,
41
42 pub category: String,
46
47 pub name: String,
51
52 #[serde(skip_serializing_if = "Option::is_none")]
56 pub extra: Option<HashMap<String, String>>,
57}
58
59#[derive(
61 Debug, Clone, Deserialize, Serialize, PartialEq, Eq, malloc_size_of_derive::MallocSizeOf,
62)]
63struct StoredEvent {
64 #[serde(flatten)]
65 event: RecordedEvent,
66
67 #[serde(default)]
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub execution_counter: Option<i32>,
74}
75
76#[derive(Debug)]
98pub struct EventDatabase {
99 pub path: PathBuf,
101 event_stores: RwLock<HashMap<String, Vec<StoredEvent>>>,
103 event_store_files: RwLock<HashMap<String, Arc<File>>>,
104 file_lock: Mutex<()>,
106}
107
108impl MallocSizeOf for EventDatabase {
109 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
110 let mut n = 0;
111 n += self.event_stores.read().unwrap().size_of(ops);
112
113 let map = self.event_store_files.read().unwrap();
114 for store_name in map.keys() {
115 n += store_name.size_of(ops);
116 n += mem::size_of::<File>();
118 }
119 n
120 }
121}
122
123impl EventDatabase {
124 pub fn new(data_path: &Path) -> Result<Self> {
131 let path = data_path.join("events");
132 create_dir_all(&path)?;
133
134 Ok(Self {
135 path,
136 event_stores: RwLock::new(HashMap::new()),
137 event_store_files: RwLock::new(HashMap::new()),
138 file_lock: Mutex::new(()),
139 })
140 }
141
142 pub fn flush_pending_events_on_startup(
168 &self,
169 glean: &Glean,
170 trim_data_to_registered_pings: bool,
171 ) -> bool {
172 match self.load_events_from_disk(glean, trim_data_to_registered_pings) {
173 Ok(_) => {
174 let stores_with_events: Vec<String> = {
175 self.event_stores
176 .read()
177 .unwrap()
178 .keys()
179 .map(|x| x.to_owned())
180 .collect() };
182 let has_events_events = stores_with_events.contains(&"events".to_owned());
185 let glean_restarted_stores = if has_events_events {
186 stores_with_events
187 .into_iter()
188 .filter(|store| store != "events")
189 .collect()
190 } else {
191 stores_with_events
192 };
193 if !glean_restarted_stores.is_empty() {
194 for store_name in glean_restarted_stores.iter() {
195 CounterMetric::new(CommonMetricData {
196 name: "execution_counter".into(),
197 category: store_name.into(),
198 send_in_pings: vec![INTERNAL_STORAGE.into()],
199 lifetime: Lifetime::Ping,
200 ..Default::default()
201 })
202 .add_sync(glean, 1);
203 }
204 let glean_restarted = CommonMetricData {
205 name: "restarted".into(),
206 category: "glean".into(),
207 send_in_pings: glean_restarted_stores,
208 lifetime: Lifetime::Ping,
209 ..Default::default()
210 };
211 let startup = get_iso_time_string(glean.start_time(), TimeUnit::Minute);
212 let mut extra: HashMap<String, String> =
213 [("glean.startup.date".into(), startup)].into();
214 if glean.with_timestamps() {
215 let now = Utc::now();
216 let precise_timestamp = now.timestamp_millis() as u64;
217 extra.insert("glean_timestamp".to_string(), precise_timestamp.to_string());
218 }
219 self.record(
220 glean,
221 &glean_restarted.into(),
222 crate::get_timestamp_ms(),
223 Some(extra),
224 );
225 }
226 has_events_events && glean.submit_ping_by_name("events", Some("startup"))
227 }
228 Err(err) => {
229 log::warn!("Error loading events from disk: {}", err);
230 false
231 }
232 }
233 }
234
235 fn load_events_from_disk(
236 &self,
237 glean: &Glean,
238 trim_data_to_registered_pings: bool,
239 ) -> Result<()> {
240 let mut db = self.event_stores.write().unwrap(); let _lock = self.file_lock.lock().unwrap(); for entry in fs::read_dir(&self.path)? {
248 let entry = entry?;
249 if entry.file_type()?.is_file() {
250 let store_name = entry.file_name().into_string()?;
251 log::info!("Loading events for {}", store_name);
252 if trim_data_to_registered_pings && glean.get_ping_by_name(&store_name).is_none() {
253 log::warn!("Trimming {}'s events", store_name);
254 if let Err(err) = fs::remove_file(entry.path()) {
255 match err.kind() {
256 std::io::ErrorKind::NotFound => {
257 }
259 _ => log::warn!("Error trimming events file '{}': {}", store_name, err),
260 }
261 }
262 continue;
263 }
264 let file = BufReader::new(File::open(entry.path())?);
265 db.insert(
266 store_name,
267 file.lines()
268 .map_while(Result::ok)
269 .filter_map(|line| serde_json::from_str::<StoredEvent>(&line).ok())
270 .collect(),
271 );
272 }
273 }
274 Ok(())
275 }
276
277 pub fn record(
294 &self,
295 glean: &Glean,
296 meta: &CommonMetricDataInternal,
297 timestamp: u64,
298 extra: Option<HashMap<String, String>>,
299 ) -> bool {
300 if !glean.is_upload_enabled() {
302 return false;
303 }
304
305 let mut submit_max_capacity_event_ping = false;
306 {
307 let mut db = self.event_stores.write().unwrap(); for store_name in meta.inner.send_in_pings.iter() {
309 if !glean.is_ping_enabled(store_name) {
310 continue;
311 }
312
313 let store = db.entry(store_name.to_string()).or_default();
314 let execution_counter = CounterMetric::new(CommonMetricData {
315 name: "execution_counter".into(),
316 category: store_name.into(),
317 send_in_pings: vec![INTERNAL_STORAGE.into()],
318 lifetime: Lifetime::Ping,
319 ..Default::default()
320 })
321 .get_value(glean, INTERNAL_STORAGE);
322 let event = StoredEvent {
324 event: RecordedEvent {
325 timestamp,
326 category: meta.inner.category.to_string(),
327 name: meta.inner.name.to_string(),
328 extra: extra.clone(),
329 },
330 execution_counter,
331 };
332 let event_json = serde_json::to_string(&event).unwrap(); store.push(event);
334 self.write_event_to_disk(store_name, &event_json);
335 if store_name == "events" && store.len() == glean.get_max_events() {
336 submit_max_capacity_event_ping = true;
337 }
338 }
339 }
340 if submit_max_capacity_event_ping {
341 glean.submit_ping_by_name("events", Some("max_capacity"));
342 true
343 } else {
344 false
345 }
346 }
347
348 fn get_event_store(&self, store_name: &str) -> Result<Arc<File>, io::Error> {
349 let mut map = self.event_store_files.write().unwrap();
351 let entry = map.entry(store_name.to_string());
352
353 match entry {
354 Entry::Occupied(entry) => Ok(Arc::clone(entry.get())),
355 Entry::Vacant(entry) => {
356 let file = OpenOptions::new()
357 .create(true)
358 .append(true)
359 .open(self.path.join(store_name))?;
360 let file = Arc::new(file);
361 let entry = entry.insert(file);
362 Ok(Arc::clone(entry))
363 }
364 }
365 }
366
367 fn write_event_to_disk(&self, store_name: &str, event_json: &str) {
374 let _lock = self.file_lock.lock().unwrap(); let write_res = (|| {
377 let mut file = self.get_event_store(store_name)?;
378 file.write_all(event_json.as_bytes())?;
379 file.write_all(b"\n")?;
380 file.flush()?;
381 Ok::<(), std::io::Error>(())
382 })();
383
384 if let Err(err) = write_res {
385 log::warn!("IO error writing event to store '{}': {}", store_name, err);
386 }
387 }
388
389 fn normalize_store(
418 &self,
419 glean: &Glean,
420 store_name: &str,
421 store: &mut Vec<StoredEvent>,
422 glean_start_time: DateTime<FixedOffset>,
423 ) {
424 let is_glean_restarted =
425 |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
426 let glean_restarted_meta = |store_name: &str| CommonMetricData {
427 name: "restarted".into(),
428 category: "glean".into(),
429 send_in_pings: vec![store_name.into()],
430 lifetime: Lifetime::Ping,
431 ..Default::default()
432 };
433 store.sort_by(|a, b| {
435 a.execution_counter
436 .cmp(&b.execution_counter)
437 .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
438 .then_with(|| {
439 if is_glean_restarted(&a.event) {
440 Ordering::Less
441 } else {
442 Ordering::Greater
443 }
444 })
445 });
446 let final_event = match store
450 .iter()
451 .rposition(|event| !is_glean_restarted(&event.event))
452 {
453 Some(idx) => idx + 1,
454 _ => 0,
455 };
456 store.drain(final_event..);
457 let first_event = store
458 .iter()
459 .position(|event| !is_glean_restarted(&event.event))
460 .unwrap_or(store.len());
461 store.drain(..first_event);
462 if store.is_empty() {
463 return;
465 }
466 let mut cur_ec = 0;
472 let mut intra_group_offset = store[0].event.timestamp;
474 let mut inter_group_offset = 0;
476 let mut highest_ts = 0;
477 for event in store.iter_mut() {
478 let execution_counter = event.execution_counter.take().unwrap_or(0);
479 if is_glean_restarted(&event.event) {
480 cur_ec = execution_counter;
483 let glean_startup_date = event
484 .event
485 .extra
486 .as_mut()
487 .and_then(|extra| {
488 extra.remove("glean.startup.date").and_then(|date_str| {
489 DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
490 .map_err(|_| {
491 record_error(
492 glean,
493 &glean_restarted_meta(store_name).into(),
494 ErrorType::InvalidState,
495 format!("Unparseable glean.startup.date '{}'", date_str),
496 None,
497 );
498 })
499 .ok()
500 })
501 })
502 .unwrap_or(glean_start_time);
503 if event
504 .event
505 .extra
506 .as_ref()
507 .is_some_and(|extra| extra.is_empty())
508 {
509 event.event.extra = None;
511 }
512 let ping_start = DatetimeMetric::new(
513 CommonMetricData {
514 name: format!("{}#start", store_name),
515 category: "".into(),
516 send_in_pings: vec![INTERNAL_STORAGE.into()],
517 lifetime: Lifetime::User,
518 ..Default::default()
519 },
520 TimeUnit::Minute,
521 );
522 let ping_start = ping_start
523 .get_value(glean, INTERNAL_STORAGE)
524 .unwrap_or(glean_start_time);
525 let time_from_ping_start_to_glean_restarted =
526 (glean_startup_date - ping_start).num_milliseconds();
527 intra_group_offset = event.event.timestamp;
528 inter_group_offset =
529 u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
530 if inter_group_offset < highest_ts {
531 record_error(
532 glean,
533 &glean_restarted_meta(store_name).into(),
534 ErrorType::InvalidValue,
535 format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
536 None,
537 );
538 inter_group_offset = highest_ts + 1;
543 }
544 } else if cur_ec == 0 {
545 cur_ec = execution_counter;
547 }
548 event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
549 if execution_counter != cur_ec {
550 record_error(
551 glean,
552 &glean_restarted_meta(store_name).into(),
553 ErrorType::InvalidState,
554 format!(
555 "Inconsistent execution counter {} (expected {})",
556 execution_counter, cur_ec
557 ),
558 None,
559 );
560 cur_ec = execution_counter;
562 }
563
564 if event.event.timestamp > i64::MAX as u64 {
567 glean
568 .additional_metrics
569 .event_timestamp_clamped
570 .add_sync(glean, 1);
571 log::warn!(
572 "Calculated event timestamp was too high. Got: {}, max: {}",
573 event.event.timestamp,
574 i64::MAX,
575 );
576 event.event.timestamp = event.event.timestamp.clamp(0, i64::MAX as u64);
577 }
578
579 if highest_ts > event.event.timestamp {
580 record_error(
583 glean,
584 &glean_restarted_meta(store_name).into(),
585 ErrorType::InvalidState,
586 format!(
587 "Inconsistent previous highest timestamp {} (expected <= {})",
588 highest_ts, event.event.timestamp
589 ),
590 None,
591 );
592 }
594 highest_ts = event.event.timestamp
595 }
596 }
597
598 pub fn snapshot_as_json(
610 &self,
611 glean: &Glean,
612 store_name: &str,
613 clear_store: bool,
614 ) -> Option<JsonValue> {
615 let result = {
616 let mut db = self.event_stores.write().unwrap(); db.get_mut(&store_name.to_string()).and_then(|store| {
618 if !store.is_empty() {
619 let mut clone;
622 let store = if clear_store {
623 store
624 } else {
625 clone = store.clone();
626 &mut clone
627 };
628 self.normalize_store(glean, store_name, store, glean.start_time());
630 Some(json!(store))
631 } else {
632 log::warn!("Unexpectly got empty event store for '{}'", store_name);
633 None
634 }
635 })
636 };
637
638 if clear_store {
639 self.event_stores
640 .write()
641 .unwrap() .remove(&store_name.to_string());
643 self.event_store_files
644 .write()
645 .unwrap() .remove(&store_name.to_string());
647
648 let _lock = self.file_lock.lock().unwrap(); if let Err(err) = fs::remove_file(self.path.join(store_name)) {
650 match err.kind() {
651 std::io::ErrorKind::NotFound => {
652 }
654 _ => log::warn!("Error removing events queue file '{}': {}", store_name, err),
655 }
656 }
657 }
658
659 result
660 }
661
662 pub fn clear_all(&self) -> Result<()> {
664 self.event_stores.write().unwrap().clear();
666 self.event_store_files.write().unwrap().clear();
667
668 let _lock = self.file_lock.lock().unwrap();
670 std::fs::remove_dir_all(&self.path)?;
671 create_dir_all(&self.path)?;
672
673 Ok(())
674 }
675
676 pub fn test_get_value<'a>(
683 &'a self,
684 meta: &'a CommonMetricDataInternal,
685 store_name: &str,
686 ) -> Option<Vec<RecordedEvent>> {
687 record_coverage(&meta.base_identifier());
688
689 let value: Vec<RecordedEvent> = self
690 .event_stores
691 .read()
692 .unwrap() .get(&store_name.to_string())
694 .into_iter()
695 .flatten()
696 .map(|stored_event| stored_event.event.clone())
697 .filter(|event| event.name == meta.inner.name && event.category == meta.inner.category)
698 .collect();
699 if !value.is_empty() {
700 Some(value)
701 } else {
702 None
703 }
704 }
705}
706
707#[cfg(test)]
708mod test {
709 use super::*;
710 use crate::test_get_num_recorded_errors;
711 use crate::tests::new_glean;
712 use chrono::{TimeZone, Timelike};
713
714 #[test]
715 fn handle_truncated_events_on_disk() {
716 let (glean, t) = new_glean(None);
717
718 {
719 let db = EventDatabase::new(t.path()).unwrap();
720 db.write_event_to_disk("events", "{\"timestamp\": 500");
721 db.write_event_to_disk("events", "{\"timestamp\"");
722 db.write_event_to_disk(
723 "events",
724 "{\"timestamp\": 501, \"category\": \"ui\", \"name\": \"click\"}",
725 );
726 }
727
728 {
729 let db = EventDatabase::new(t.path()).unwrap();
730 db.load_events_from_disk(&glean, false).unwrap();
731 let events = &db.event_stores.read().unwrap()["events"];
732 assert_eq!(1, events.len());
733 }
734 }
735
736 #[test]
737 fn stable_serialization() {
738 let event_empty = RecordedEvent {
739 timestamp: 2,
740 category: "cat".to_string(),
741 name: "name".to_string(),
742 extra: None,
743 };
744
745 let mut data = HashMap::new();
746 data.insert("a key".to_string(), "a value".to_string());
747 let event_data = RecordedEvent {
748 timestamp: 2,
749 category: "cat".to_string(),
750 name: "name".to_string(),
751 extra: Some(data),
752 };
753
754 let event_empty_json = ::serde_json::to_string_pretty(&event_empty).unwrap();
755 let event_data_json = ::serde_json::to_string_pretty(&event_data).unwrap();
756
757 assert_eq!(
758 StoredEvent {
759 event: event_empty,
760 execution_counter: None
761 },
762 serde_json::from_str(&event_empty_json).unwrap()
763 );
764 assert_eq!(
765 StoredEvent {
766 event: event_data,
767 execution_counter: None
768 },
769 serde_json::from_str(&event_data_json).unwrap()
770 );
771 }
772
773 #[test]
774 fn deserialize_existing_data() {
775 let event_empty_json = r#"
776{
777 "timestamp": 2,
778 "category": "cat",
779 "name": "name"
780}
781 "#;
782
783 let event_data_json = r#"
784{
785 "timestamp": 2,
786 "category": "cat",
787 "name": "name",
788 "extra": {
789 "a key": "a value"
790 }
791}
792 "#;
793
794 let event_empty = RecordedEvent {
795 timestamp: 2,
796 category: "cat".to_string(),
797 name: "name".to_string(),
798 extra: None,
799 };
800
801 let mut data = HashMap::new();
802 data.insert("a key".to_string(), "a value".to_string());
803 let event_data = RecordedEvent {
804 timestamp: 2,
805 category: "cat".to_string(),
806 name: "name".to_string(),
807 extra: Some(data),
808 };
809
810 assert_eq!(
811 StoredEvent {
812 event: event_empty,
813 execution_counter: None
814 },
815 serde_json::from_str(event_empty_json).unwrap()
816 );
817 assert_eq!(
818 StoredEvent {
819 event: event_data,
820 execution_counter: None
821 },
822 serde_json::from_str(event_data_json).unwrap()
823 );
824 }
825
826 #[test]
827 fn doesnt_record_when_upload_is_disabled() {
828 let (mut glean, dir) = new_glean(None);
829 let db = EventDatabase::new(dir.path()).unwrap();
830
831 let test_storage = "store1";
832 let test_category = "category";
833 let test_name = "name";
834 let test_timestamp = 2;
835 let test_meta = CommonMetricDataInternal::new(test_category, test_name, test_storage);
836 let event_data = RecordedEvent {
837 timestamp: test_timestamp,
838 category: test_category.to_string(),
839 name: test_name.to_string(),
840 extra: None,
841 };
842
843 db.record(&glean, &test_meta, 2, None);
846 {
847 let event_stores = db.event_stores.read().unwrap();
848 assert_eq!(
849 &StoredEvent {
850 event: event_data,
851 execution_counter: None
852 },
853 &event_stores.get(test_storage).unwrap()[0]
854 );
855 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
856 }
857
858 glean.set_upload_enabled(false);
859
860 db.record(&glean, &test_meta, 2, None);
862 {
863 let event_stores = db.event_stores.read().unwrap();
864 assert_eq!(event_stores.get(test_storage).unwrap().len(), 1);
865 }
866 }
867
868 #[test]
869 fn normalize_store_of_glean_restarted() {
870 let (glean, _dir) = new_glean(None);
872
873 let store_name = "store-name";
874 let glean_restarted = StoredEvent {
875 event: RecordedEvent {
876 timestamp: 2,
877 category: "glean".into(),
878 name: "restarted".into(),
879 extra: None,
880 },
881 execution_counter: None,
882 };
883 let mut store = vec![glean_restarted.clone()];
884 let glean_start_time = glean.start_time();
885
886 glean
887 .event_storage()
888 .normalize_store(&glean, store_name, &mut store, glean_start_time);
889 assert!(store.is_empty());
890
891 let mut store = vec![glean_restarted.clone(), glean_restarted.clone()];
892 glean
893 .event_storage()
894 .normalize_store(&glean, store_name, &mut store, glean_start_time);
895 assert!(store.is_empty());
896
897 let mut store = vec![
898 glean_restarted.clone(),
899 glean_restarted.clone(),
900 glean_restarted,
901 ];
902 glean
903 .event_storage()
904 .normalize_store(&glean, store_name, &mut store, glean_start_time);
905 assert!(store.is_empty());
906 }
907
908 #[test]
909 fn normalize_store_of_glean_restarted_on_both_ends() {
910 let (glean, _dir) = new_glean(None);
912
913 let store_name = "store-name";
914 let glean_restarted = StoredEvent {
915 event: RecordedEvent {
916 timestamp: 2,
917 category: "glean".into(),
918 name: "restarted".into(),
919 extra: None,
920 },
921 execution_counter: None,
922 };
923 let not_glean_restarted = StoredEvent {
924 event: RecordedEvent {
925 timestamp: 20,
926 category: "category".into(),
927 name: "name".into(),
928 extra: None,
929 },
930 execution_counter: None,
931 };
932 let mut store = vec![
933 glean_restarted.clone(),
934 not_glean_restarted.clone(),
935 glean_restarted,
936 ];
937 let glean_start_time = glean.start_time();
938
939 glean
940 .event_storage()
941 .normalize_store(&glean, store_name, &mut store, glean_start_time);
942 assert_eq!(1, store.len());
943 assert_eq!(
944 StoredEvent {
945 event: RecordedEvent {
946 timestamp: 0,
947 ..not_glean_restarted.event
948 },
949 execution_counter: None
950 },
951 store[0]
952 );
953 }
954
955 #[test]
956 fn normalize_store_single_run_timestamp_math() {
957 let (glean, _dir) = new_glean(None);
961
962 let store_name = "store-name";
963 let glean_restarted = StoredEvent {
964 event: RecordedEvent {
965 timestamp: 2,
966 category: "glean".into(),
967 name: "restarted".into(),
968 extra: None,
969 },
970 execution_counter: None,
971 };
972 let timestamps = [20, 40, 200];
973 let not_glean_restarted = StoredEvent {
974 event: RecordedEvent {
975 timestamp: timestamps[0],
976 category: "category".into(),
977 name: "name".into(),
978 extra: None,
979 },
980 execution_counter: None,
981 };
982 let mut store = vec![
983 glean_restarted.clone(),
984 not_glean_restarted.clone(),
985 StoredEvent {
986 event: RecordedEvent {
987 timestamp: timestamps[1],
988 ..not_glean_restarted.event.clone()
989 },
990 execution_counter: None,
991 },
992 StoredEvent {
993 event: RecordedEvent {
994 timestamp: timestamps[2],
995 ..not_glean_restarted.event.clone()
996 },
997 execution_counter: None,
998 },
999 glean_restarted,
1000 ];
1001
1002 glean
1003 .event_storage()
1004 .normalize_store(&glean, store_name, &mut store, glean.start_time());
1005 assert_eq!(3, store.len());
1006 for (timestamp, event) in timestamps.iter().zip(store.iter()) {
1007 assert_eq!(
1008 &StoredEvent {
1009 event: RecordedEvent {
1010 timestamp: timestamp - timestamps[0],
1011 ..not_glean_restarted.clone().event
1012 },
1013 execution_counter: None
1014 },
1015 event
1016 );
1017 }
1018 }
1019
1020 #[test]
1021 fn normalize_store_multi_run_timestamp_math() {
1022 let (glean, _dir) = new_glean(None);
1027
1028 let store_name = "store-name";
1029 let glean_restarted = StoredEvent {
1030 event: RecordedEvent {
1031 category: "glean".into(),
1032 name: "restarted".into(),
1033 ..Default::default()
1034 },
1035 execution_counter: None,
1036 };
1037 let not_glean_restarted = StoredEvent {
1038 event: RecordedEvent {
1039 category: "category".into(),
1040 name: "name".into(),
1041 ..Default::default()
1042 },
1043 execution_counter: None,
1044 };
1045
1046 let timestamps = [20, 40, 200, 12];
1049 let ecs = [0, 1];
1050 let some_hour = 16;
1051 let startup_date = FixedOffset::east_opt(0)
1052 .unwrap()
1053 .with_ymd_and_hms(2022, 11, 24, some_hour, 29, 0) .unwrap();
1055 let glean_start_time = startup_date.with_hour(some_hour - 1);
1056 let restarted_ts = 2;
1057 let mut store = vec![
1058 StoredEvent {
1059 event: RecordedEvent {
1060 timestamp: timestamps[0],
1061 ..not_glean_restarted.event.clone()
1062 },
1063 execution_counter: Some(ecs[0]),
1064 },
1065 StoredEvent {
1066 event: RecordedEvent {
1067 timestamp: timestamps[1],
1068 ..not_glean_restarted.event.clone()
1069 },
1070 execution_counter: Some(ecs[0]),
1071 },
1072 StoredEvent {
1073 event: RecordedEvent {
1074 timestamp: timestamps[2],
1075 ..not_glean_restarted.event.clone()
1076 },
1077 execution_counter: Some(ecs[0]),
1078 },
1079 StoredEvent {
1080 event: RecordedEvent {
1081 extra: Some(
1082 [(
1083 "glean.startup.date".into(),
1084 get_iso_time_string(startup_date, TimeUnit::Minute),
1085 )]
1086 .into(),
1087 ),
1088 timestamp: restarted_ts,
1089 ..glean_restarted.event.clone()
1090 },
1091 execution_counter: Some(ecs[1]),
1092 },
1093 StoredEvent {
1094 event: RecordedEvent {
1095 timestamp: timestamps[3],
1096 ..not_glean_restarted.event.clone()
1097 },
1098 execution_counter: Some(ecs[1]),
1099 },
1100 ];
1101
1102 glean.event_storage().normalize_store(
1103 &glean,
1104 store_name,
1105 &mut store,
1106 glean_start_time.unwrap(),
1107 );
1108 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 1].iter().zip(store.clone()) {
1112 assert_eq!(
1113 StoredEvent {
1114 event: RecordedEvent {
1115 timestamp: timestamp - timestamps[0],
1116 ..not_glean_restarted.event.clone()
1117 },
1118 execution_counter: None,
1119 },
1120 event
1121 );
1122 }
1123 let hour_in_millis = 3600000;
1125 assert_eq!(
1126 store[3],
1127 StoredEvent {
1128 event: RecordedEvent {
1129 timestamp: hour_in_millis,
1130 ..glean_restarted.event
1131 },
1132 execution_counter: None,
1133 }
1134 );
1135 assert_eq!(
1137 store[4],
1138 StoredEvent {
1139 event: RecordedEvent {
1140 timestamp: hour_in_millis + timestamps[3] - restarted_ts,
1141 ..not_glean_restarted.event
1142 },
1143 execution_counter: None,
1144 }
1145 );
1146 }
1147
1148 #[test]
1149 fn normalize_store_multi_run_client_clocks() {
1150 let (glean, _dir) = new_glean(None);
1153
1154 let store_name = "store-name";
1155 let glean_restarted = StoredEvent {
1156 event: RecordedEvent {
1157 category: "glean".into(),
1158 name: "restarted".into(),
1159 ..Default::default()
1160 },
1161 execution_counter: None,
1162 };
1163 let not_glean_restarted = StoredEvent {
1164 event: RecordedEvent {
1165 category: "category".into(),
1166 name: "name".into(),
1167 ..Default::default()
1168 },
1169 execution_counter: None,
1170 };
1171
1172 let timestamps = [20, 40, 12, 200];
1175 let ecs = [0, 1];
1176 let some_hour = 10;
1177 let startup_date = FixedOffset::east_opt(0)
1178 .unwrap()
1179 .with_ymd_and_hms(2022, 11, 25, some_hour, 37, 0) .unwrap();
1181 let glean_start_time = startup_date.with_hour(some_hour + 1);
1182 let restarted_ts = 2;
1183 let mut store = vec![
1184 StoredEvent {
1185 event: RecordedEvent {
1186 timestamp: timestamps[0],
1187 ..not_glean_restarted.event.clone()
1188 },
1189 execution_counter: Some(ecs[0]),
1190 },
1191 StoredEvent {
1192 event: RecordedEvent {
1193 timestamp: timestamps[1],
1194 ..not_glean_restarted.event.clone()
1195 },
1196 execution_counter: Some(ecs[0]),
1197 },
1198 StoredEvent {
1199 event: RecordedEvent {
1200 extra: Some(
1201 [(
1202 "glean.startup.date".into(),
1203 get_iso_time_string(startup_date, TimeUnit::Minute),
1204 )]
1205 .into(),
1206 ),
1207 timestamp: restarted_ts,
1208 ..glean_restarted.event.clone()
1209 },
1210 execution_counter: Some(ecs[1]),
1211 },
1212 StoredEvent {
1213 event: RecordedEvent {
1214 timestamp: timestamps[2],
1215 ..not_glean_restarted.event.clone()
1216 },
1217 execution_counter: Some(ecs[1]),
1218 },
1219 StoredEvent {
1220 event: RecordedEvent {
1221 timestamp: timestamps[3],
1222 ..not_glean_restarted.event.clone()
1223 },
1224 execution_counter: Some(ecs[1]),
1225 },
1226 ];
1227
1228 glean.event_storage().normalize_store(
1229 &glean,
1230 store_name,
1231 &mut store,
1232 glean_start_time.unwrap(),
1233 );
1234 assert_eq!(5, store.len()); for (timestamp, event) in timestamps[..timestamps.len() - 2].iter().zip(store.clone()) {
1238 assert_eq!(
1239 StoredEvent {
1240 event: RecordedEvent {
1241 timestamp: timestamp - timestamps[0],
1242 ..not_glean_restarted.event.clone()
1243 },
1244 execution_counter: None,
1245 },
1246 event
1247 );
1248 }
1249 assert_eq!(
1253 store[2],
1254 StoredEvent {
1255 event: RecordedEvent {
1256 timestamp: store[1].event.timestamp + 1,
1257 ..glean_restarted.event
1258 },
1259 execution_counter: None,
1260 }
1261 );
1262 assert_eq!(
1264 store[3],
1265 StoredEvent {
1266 event: RecordedEvent {
1267 timestamp: timestamps[2] - restarted_ts + store[2].event.timestamp,
1268 ..not_glean_restarted.event
1269 },
1270 execution_counter: None,
1271 }
1272 );
1273 assert_eq!(
1275 Ok(1),
1276 test_get_num_recorded_errors(
1277 &glean,
1278 &CommonMetricData {
1279 name: "restarted".into(),
1280 category: "glean".into(),
1281 send_in_pings: vec![store_name.into()],
1282 lifetime: Lifetime::Ping,
1283 ..Default::default()
1284 }
1285 .into(),
1286 ErrorType::InvalidValue
1287 )
1288 );
1289 }
1290
1291 #[test]
1292 fn normalize_store_non_zero_ec() {
1293 let (glean, _dir) = new_glean(None);
1296
1297 let store_name = "store-name";
1298 let glean_restarted = StoredEvent {
1299 event: RecordedEvent {
1300 timestamp: 2,
1301 category: "glean".into(),
1302 name: "restarted".into(),
1303 extra: None,
1304 },
1305 execution_counter: Some(2),
1306 };
1307 let not_glean_restarted = StoredEvent {
1308 event: RecordedEvent {
1309 timestamp: 20,
1310 category: "category".into(),
1311 name: "name".into(),
1312 extra: None,
1313 },
1314 execution_counter: Some(2),
1315 };
1316 let glean_restarted_2 = StoredEvent {
1317 event: RecordedEvent {
1318 timestamp: 2,
1319 category: "glean".into(),
1320 name: "restarted".into(),
1321 extra: None,
1322 },
1323 execution_counter: Some(3),
1324 };
1325 let mut store = vec![
1326 glean_restarted,
1327 not_glean_restarted.clone(),
1328 glean_restarted_2,
1329 ];
1330 let glean_start_time = glean.start_time();
1331
1332 glean
1333 .event_storage()
1334 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1335
1336 assert_eq!(1, store.len());
1337 assert_eq!(
1338 StoredEvent {
1339 event: RecordedEvent {
1340 timestamp: 0,
1341 ..not_glean_restarted.event
1342 },
1343 execution_counter: None
1344 },
1345 store[0]
1346 );
1347 assert!(test_get_num_recorded_errors(
1349 &glean,
1350 &CommonMetricData {
1351 name: "restarted".into(),
1352 category: "glean".into(),
1353 send_in_pings: vec![store_name.into()],
1354 lifetime: Lifetime::Ping,
1355 ..Default::default()
1356 }
1357 .into(),
1358 ErrorType::InvalidState
1359 )
1360 .is_err());
1361 assert!(test_get_num_recorded_errors(
1363 &glean,
1364 &CommonMetricData {
1365 name: "restarted".into(),
1366 category: "glean".into(),
1367 send_in_pings: vec![store_name.into()],
1368 lifetime: Lifetime::Ping,
1369 ..Default::default()
1370 }
1371 .into(),
1372 ErrorType::InvalidValue
1373 )
1374 .is_err());
1375 }
1376
1377 #[test]
1378 fn normalize_store_clamps_timestamp() {
1379 let (glean, _dir) = new_glean(None);
1380
1381 let store_name = "store-name";
1382 let event = RecordedEvent {
1383 category: "category".into(),
1384 name: "name".into(),
1385 ..Default::default()
1386 };
1387
1388 let timestamps = [
1389 0,
1390 (i64::MAX / 2) as u64,
1391 i64::MAX as _,
1392 (i64::MAX as u64) + 1,
1393 ];
1394 let mut store = timestamps
1395 .into_iter()
1396 .map(|timestamp| StoredEvent {
1397 event: RecordedEvent {
1398 timestamp,
1399 ..event.clone()
1400 },
1401 execution_counter: None,
1402 })
1403 .collect();
1404
1405 let glean_start_time = glean.start_time();
1406 glean
1407 .event_storage()
1408 .normalize_store(&glean, store_name, &mut store, glean_start_time);
1409 assert_eq!(4, store.len());
1410
1411 assert_eq!(0, store[0].event.timestamp);
1412 assert_eq!((i64::MAX / 2) as u64, store[1].event.timestamp);
1413 assert_eq!((i64::MAX as u64), store[2].event.timestamp);
1414 assert_eq!((i64::MAX as u64), store[3].event.timestamp);
1415
1416 let error_count = glean
1417 .additional_metrics
1418 .event_timestamp_clamped
1419 .get_value(&glean, "health");
1420 assert_eq!(Some(1), error_count);
1421 }
1422}