1use std::fs::{self, create_dir_all, File};
8use std::io::{BufRead, BufReader, Write};
9use std::path::{Path, PathBuf};
10
11use log::info;
12use serde_json::{json, Value as JsonValue};
13
14use crate::common_metric_data::{CommonMetricData, Lifetime};
15use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit};
16use crate::storage::{StorageManager, INTERNAL_STORAGE};
17use crate::upload::{HeaderMap, PingMetadata};
18use crate::util::{get_iso_time_string, local_now_with_offset};
19use crate::{Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
20
21pub struct Ping<'a> {
23 pub doc_id: &'a str,
25 pub name: &'a str,
27 pub url_path: &'a str,
29 pub content: JsonValue,
31 pub headers: HeaderMap,
33 pub includes_info_sections: bool,
35 pub schedules_pings: Vec<String>,
37 pub uploader_capabilities: Vec<String>,
39}
40
41pub struct PingMaker;
43
44fn merge(a: &mut JsonValue, b: &JsonValue) {
45 match (a, b) {
46 (&mut JsonValue::Object(ref mut a), JsonValue::Object(b)) => {
47 for (k, v) in b {
48 merge(a.entry(k.clone()).or_insert(JsonValue::Null), v);
49 }
50 }
51 (a, b) => {
52 *a = b.clone();
53 }
54 }
55}
56
57impl Default for PingMaker {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63impl PingMaker {
64 pub fn new() -> Self {
66 Self
67 }
68
69 fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize {
71 if !glean.is_ping_enabled(storage_name) {
73 return 0;
74 }
75
76 let seq = CounterMetric::new(CommonMetricData {
78 name: format!("{}#sequence", storage_name),
79 category: "".into(),
81 send_in_pings: vec![INTERNAL_STORAGE.into()],
82 lifetime: Lifetime::User,
83 ..Default::default()
84 });
85
86 let current_seq = match StorageManager.snapshot_metric(
87 glean.storage(),
88 INTERNAL_STORAGE,
89 &seq.meta().identifier(glean),
90 seq.meta().inner.lifetime,
91 ) {
92 Some(Metric::Counter(i)) => i,
93 _ => 0,
94 };
95
96 seq.add_sync(glean, 1);
98
99 current_seq as usize
100 }
101
102 fn get_start_end_times(
104 &self,
105 glean: &Glean,
106 storage_name: &str,
107 time_unit: TimeUnit,
108 ) -> (String, String) {
109 let start_time = DatetimeMetric::new(
110 CommonMetricData {
111 name: format!("{}#start", storage_name),
112 category: "".into(),
113 send_in_pings: vec![INTERNAL_STORAGE.into()],
114 lifetime: Lifetime::User,
115 ..Default::default()
116 },
117 time_unit,
118 );
119
120 let start_time_data = start_time
123 .get_value(glean, INTERNAL_STORAGE)
124 .unwrap_or_else(|| glean.start_time());
125 let end_time_data = local_now_with_offset();
126
127 start_time.set_sync_chrono(glean, end_time_data);
129
130 let start_time_data = get_iso_time_string(start_time_data, time_unit);
132 let end_time_data = get_iso_time_string(end_time_data, time_unit);
133 (start_time_data, end_time_data)
134 }
135
136 fn get_ping_info(
137 &self,
138 glean: &Glean,
139 storage_name: &str,
140 reason: Option<&str>,
141 precision: TimeUnit,
142 ) -> JsonValue {
143 let (start_time, end_time) = self.get_start_end_times(glean, storage_name, precision);
144 let mut map = json!({
145 "seq": self.get_ping_seq(glean, storage_name),
146 "start_time": start_time,
147 "end_time": end_time,
148 });
149
150 if let Some(reason) = reason {
151 map.as_object_mut()
152 .unwrap() .insert("reason".to_string(), JsonValue::String(reason.to_string()));
154 };
155
156 if let Some(experiment_data) =
158 StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE)
159 {
160 map.as_object_mut()
161 .unwrap() .insert("experiments".to_string(), experiment_data);
163 };
164
165 if let Some(config_json) = glean
167 .additional_metrics
168 .server_knobs_config
169 .get_value(glean, INTERNAL_STORAGE)
170 {
171 let server_knobs_config = serde_json::from_str(&config_json).unwrap();
174 map.as_object_mut()
175 .unwrap() .insert("server_knobs_config".to_string(), server_knobs_config);
177 }
178
179 map
180 }
181
182 fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue {
183 let mut map = json!({
185 "telemetry_sdk_build": crate::GLEAN_VERSION,
186 });
187
188 if let Some(client_info) =
190 StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true)
191 {
192 let client_info_obj = client_info.as_object().unwrap(); for (_metric_type, metrics) in client_info_obj {
194 merge(&mut map, metrics);
195 }
196 let map = map.as_object_mut().unwrap(); let mut attribution = serde_json::Map::new();
198 let mut distribution = serde_json::Map::new();
199 map.retain(|name, value| {
200 let mut split = name.split('.');
202 let category = split.next();
203 let name = split.next();
204 if let (Some(category), Some(name)) = (category, name) {
205 if category == "attribution" {
206 attribution.insert(name.into(), value.take());
207 false
208 } else if category == "distribution" {
209 distribution.insert(name.into(), value.take());
210 false
211 } else {
212 true
213 }
214 } else {
215 true
216 }
217 });
218 if !attribution.is_empty() {
219 map.insert("attribution".into(), serde_json::Value::from(attribution));
220 }
221 if !distribution.is_empty() {
222 map.insert("distribution".into(), serde_json::Value::from(distribution));
223 }
224 } else {
225 log::warn!("Empty client info data.");
226 }
227
228 if !include_client_id {
229 map.as_object_mut().unwrap().remove("client_id");
231 }
232
233 json!(map)
234 }
235
236 fn get_headers(&self, glean: &Glean) -> HeaderMap {
249 let mut headers_map = HeaderMap::new();
250
251 if let Some(debug_view_tag) = glean.debug_view_tag() {
252 headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string());
253 }
254
255 if let Some(source_tags) = glean.source_tags() {
256 headers_map.insert("X-Source-Tags".to_string(), source_tags.join(","));
257 }
258
259 headers_map
260 }
261
262 pub fn collect<'a>(
277 &self,
278 glean: &Glean,
279 ping: &'a PingType,
280 reason: Option<&str>,
281 doc_id: &'a str,
282 url_path: &'a str,
283 ) -> Option<Ping<'a>> {
284 info!("Collecting {}", ping.name());
285 let database = glean.storage();
286
287 let write_samples = database.write_timings.replace(Vec::with_capacity(64));
290 if !write_samples.is_empty() {
291 glean
292 .database_metrics
293 .write_time
294 .accumulate_samples_sync(glean, &write_samples);
295 }
296
297 let mut metrics_data = StorageManager.snapshot_as_json(database, ping.name(), true);
298
299 let events_data = glean
300 .event_storage()
301 .snapshot_as_json(glean, ping.name(), true);
302
303 let uploader_capabilities = ping.uploader_capabilities();
310 if !uploader_capabilities.is_empty() {
311 if metrics_data.is_none() && (ping.send_if_empty() || events_data.is_some()) {
312 metrics_data = Some(json!({}))
313 }
314
315 if let Some(map) = metrics_data.as_mut().and_then(|o| o.as_object_mut()) {
316 let lists = map
317 .entry("string_list")
318 .or_insert_with(|| json!({}))
319 .as_object_mut()
320 .unwrap();
321
322 lists.insert(
323 "glean.ping.uploader_capabilities".to_string(),
324 json!(uploader_capabilities),
325 );
326 }
327 }
328
329 if (!ping.include_client_id() || !ping.send_if_empty() || !ping.include_info_sections())
333 && glean.test_get_experimentation_id().is_some()
334 && metrics_data.is_some()
335 {
336 let metrics = metrics_data.as_mut().unwrap().as_object_mut().unwrap();
339 let metrics_count = metrics.len();
340 let strings = metrics.get_mut("string").unwrap().as_object_mut().unwrap();
341 let string_count = strings.len();
342
343 let empty_payload = events_data.is_none() && metrics_count == 1 && string_count == 1;
345 if !ping.include_client_id() || (!ping.send_if_empty() && empty_payload) {
346 strings.remove("glean.client.annotation.experimentation_id");
347 }
348
349 if strings.is_empty() {
350 metrics.remove("string");
351 }
352
353 if metrics.is_empty() {
354 metrics_data = None;
355 }
356 }
357
358 let is_empty = metrics_data.is_none() && events_data.is_none();
359 if !ping.send_if_empty() && is_empty {
360 info!("Storage for {} empty. Bailing out.", ping.name());
361 return None;
362 } else if ping.name() == "events" && events_data.is_none() {
363 info!("No events for 'events' ping. Bailing out.");
364 return None;
365 } else if is_empty {
366 info!(
367 "Storage for {} empty. Ping will still be sent.",
368 ping.name()
369 );
370 }
371
372 let precision = if ping.precise_timestamps() {
373 TimeUnit::Millisecond
374 } else {
375 TimeUnit::Minute
376 };
377
378 let mut json = if ping.include_info_sections() {
379 let ping_info = self.get_ping_info(glean, ping.name(), reason, precision);
380 let client_info = self.get_client_info(glean, ping.include_client_id());
381
382 json!({
383 "ping_info": ping_info,
384 "client_info": client_info
385 })
386 } else {
387 json!({})
388 };
389
390 let json_obj = json.as_object_mut()?;
391 if let Some(metrics_data) = metrics_data {
392 json_obj.insert("metrics".to_string(), metrics_data);
393 }
394 if let Some(events_data) = events_data {
395 json_obj.insert("events".to_string(), events_data);
396 }
397
398 Some(Ping {
399 content: json,
400 name: ping.name(),
401 doc_id,
402 url_path,
403 headers: self.get_headers(glean),
404 includes_info_sections: ping.include_info_sections(),
405 schedules_pings: ping.schedules_pings().to_vec(),
406 uploader_capabilities: ping.uploader_capabilities().to_vec(),
407 })
408 }
409
410 fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf> {
415 let pings_dir = match ping_type {
417 Some("deletion-request") => data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
418 _ => data_path.join(PENDING_PINGS_DIRECTORY),
419 };
420
421 create_dir_all(&pings_dir)?;
422 Ok(pings_dir)
423 }
424
425 fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf> {
430 let pings_dir = data_path.join("tmp");
431 create_dir_all(&pings_dir)?;
432 Ok(pings_dir)
433 }
434
435 pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> {
437 let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?;
438 let temp_dir = self.get_tmp_dir(data_path)?;
439
440 let temp_ping_path = temp_dir.join(ping.doc_id);
443 let ping_path = pings_dir.join(ping.doc_id);
444
445 log::debug!(
446 "Storing ping '{}' at '{}'",
447 ping.doc_id,
448 ping_path.display()
449 );
450
451 {
452 let mut file = File::create(&temp_ping_path)?;
453 file.write_all(ping.url_path.as_bytes())?;
454 file.write_all(b"\n")?;
455 file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?;
456 file.write_all(b"\n")?;
457 let metadata = PingMetadata {
458 headers: Some(ping.headers.clone()),
463 body_has_info_sections: Some(ping.includes_info_sections),
464 ping_name: Some(ping.name.to_string()),
465 uploader_capabilities: Some(ping.uploader_capabilities.clone()),
466 };
467 file.write_all(::serde_json::to_string(&metadata)?.as_bytes())?;
468 }
469
470 if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) {
471 log::warn!(
472 "Unable to move '{}' to '{}",
473 temp_ping_path.display(),
474 ping_path.display()
475 );
476 return Err(e);
477 }
478
479 Ok(())
480 }
481
482 pub fn clear_pending_pings(&self, data_path: &Path, ping_names: &[&str]) -> Result<()> {
484 let pings_dir = self.get_pings_dir(data_path, None)?;
485
486 let entries = pings_dir.read_dir()?;
489 for entry in entries.filter_map(|entry| entry.ok()) {
490 if let Ok(file_type) = entry.file_type() {
491 if !file_type.is_file() {
492 continue;
493 }
494 } else {
495 continue;
496 }
497
498 let file = match File::open(entry.path()) {
499 Ok(file) => file,
500 Err(_) => {
501 continue;
502 }
503 };
504
505 let mut lines = BufReader::new(file).lines();
506 if let (Some(Ok(path)), Some(Ok(_body)), Ok(metadata)) =
507 (lines.next(), lines.next(), lines.next().transpose())
508 {
509 let PingMetadata { ping_name, .. } = metadata
510 .and_then(|m| crate::upload::process_metadata(&path, &m))
511 .unwrap_or_default();
512 let ping_name =
513 ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
514
515 if ping_names.contains(&&ping_name[..]) {
516 _ = fs::remove_file(entry.path());
517 }
518 } else {
519 continue;
520 }
521 }
522
523 log::debug!("All pending pings deleted");
524
525 Ok(())
526 }
527}
528
529#[cfg(test)]
530mod test {
531 use super::*;
532 use crate::tests::new_glean;
533
534 #[test]
535 fn sequence_numbers_should_be_reset_when_toggling_uploading() {
536 let (mut glean, _t) = new_glean(None);
537 let ping_maker = PingMaker::new();
538
539 assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
540 assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
541
542 glean.set_upload_enabled(false);
543 assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
544 assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
545
546 glean.set_upload_enabled(true);
547 assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
548 assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
549 }
550
551 #[test]
552 fn test_server_knobs_config_appears_in_ping_info() {
553 use crate::metrics::RemoteSettingsConfig;
554 use std::collections::HashMap;
555
556 let (glean, _t) = new_glean(None);
557
558 let mut metrics_enabled = HashMap::new();
560 metrics_enabled.insert("test.counter".to_string(), true);
561
562 let mut pings_enabled = HashMap::new();
563 pings_enabled.insert("custom".to_string(), false);
564
565 let config = RemoteSettingsConfig {
566 metrics_enabled,
567 pings_enabled,
568 event_threshold: Some(41),
569 };
570 glean.apply_server_knobs_config(config);
571
572 let ping_maker = PingMaker::new();
574 let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
575
576 let server_knobs = &ping_info["server_knobs_config"];
577 assert_eq!(server_knobs["metrics_enabled"]["test.counter"], true);
578 assert_eq!(server_knobs["pings_enabled"]["custom"], false);
579 assert_eq!(server_knobs["event_threshold"], 41);
580 }
581
582 #[test]
583 fn test_server_knobs_not_included_when_no_config() {
584 let (glean, _t) = new_glean(None);
585
586 let ping_maker = PingMaker::new();
587 let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
588
589 assert!(ping_info.get("server_knobs_config").is_none());
590 }
591
592 #[test]
593 fn test_server_knobs_appears_in_all_pings() {
594 use crate::metrics::RemoteSettingsConfig;
595 use std::collections::HashMap;
596
597 let (glean, _t) = new_glean(None);
598
599 let mut metrics_enabled = HashMap::new();
600 metrics_enabled.insert("test.counter".to_string(), true);
601
602 let config = RemoteSettingsConfig {
603 metrics_enabled,
604 ..Default::default()
605 };
606 glean.apply_server_knobs_config(config);
607
608 let ping_maker = PingMaker::new();
610 let ping_info1 = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
611 let ping_info2 = ping_maker.get_ping_info(&glean, "store2", None, TimeUnit::Minute);
612
613 assert_eq!(
614 ping_info1["server_knobs_config"]["metrics_enabled"]["test.counter"],
615 true
616 );
617 assert_eq!(
618 ping_info2["server_knobs_config"]["metrics_enabled"]["test.counter"],
619 true
620 );
621 }
622
623 #[test]
624 fn test_server_knobs_config_omits_empty_fields() {
625 use crate::metrics::RemoteSettingsConfig;
626 use std::collections::HashMap;
627
628 let (glean, _t) = new_glean(None);
629
630 let mut metrics_enabled = HashMap::new();
632 metrics_enabled.insert("test.counter".to_string(), true);
633
634 let config = RemoteSettingsConfig {
635 metrics_enabled,
636 ..Default::default()
637 };
638 glean.apply_server_knobs_config(config);
639
640 let ping_maker = PingMaker::new();
641 let ping_info = ping_maker.get_ping_info(&glean, "store1", None, TimeUnit::Minute);
642
643 let server_knobs = &ping_info["server_knobs_config"];
644 assert_eq!(server_knobs["metrics_enabled"]["test.counter"], true);
646 assert!(server_knobs.get("pings_enabled").is_none());
648 assert!(server_knobs.get("event_threshold").is_none());
649 }
650}