1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; #[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48 started: Option<Instant>,
50 count: u32,
52 interval: Duration,
54 max_count: u32,
56}
57
58#[derive(PartialEq)]
60enum RateLimiterState {
61 Incrementing,
63 Throttled(u64),
68}
69
70impl RateLimiter {
71 pub fn new(interval: Duration, max_count: u32) -> Self {
72 Self {
73 started: None,
74 count: 0,
75 interval,
76 max_count,
77 }
78 }
79
80 fn reset(&mut self) {
81 self.started = Some(Instant::now());
82 self.count = 0;
83 }
84
85 fn elapsed(&self) -> Duration {
86 self.started.unwrap().elapsed()
87 }
88
89 fn should_reset(&self) -> bool {
95 if self.started.is_none() {
96 return true;
97 }
98
99 if self.elapsed() > self.interval {
101 return true;
102 }
103
104 false
105 }
106
107 pub fn get_state(&mut self) -> RateLimiterState {
113 if self.should_reset() {
114 self.reset();
115 }
116
117 if self.count == self.max_count {
118 let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121 return RateLimiterState::Throttled(
122 remaining
123 .try_into()
124 .unwrap_or(self.interval.as_secs() * 1000),
125 );
126 }
127
128 self.count += 1;
129 RateLimiterState::Incrementing
130 }
131}
132
133#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139 Upload {
141 request: PingRequest,
144 },
145
146 Wait {
149 time: u64,
152 },
153
154 Done {
167 #[doc(hidden)]
168 unused: i8,
170 },
171}
172
173impl PingUploadTask {
174 pub fn is_upload(&self) -> bool {
176 matches!(self, PingUploadTask::Upload { .. })
177 }
178
179 pub fn is_wait(&self) -> bool {
181 matches!(self, PingUploadTask::Wait { .. })
182 }
183
184 pub(crate) fn done() -> Self {
185 PingUploadTask::Done { unused: 0 }
186 }
187}
188
189#[derive(Debug)]
191pub struct PingUploadManager {
192 queue: RwLock<VecDeque<PingRequest>>,
194 directory_manager: PingDirectoryManager,
196 processed_pending_pings: Arc<AtomicBool>,
198 cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200 recoverable_failure_count: AtomicU32,
202 wait_attempt_count: AtomicU32,
204 rate_limiter: Option<RwLock<RateLimiter>>,
209 language_binding_name: String,
213 upload_metrics: UploadMetrics,
215 policy: Policy,
217
218 in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223 let shallow_size = {
224 let queue = self.queue.read().unwrap();
225 if ops.has_malloc_enclosing_size_of() {
226 if let Some(front) = queue.front() {
227 unsafe { ops.malloc_enclosing_size_of(front) }
230 } else {
231 0
233 }
234 } else {
235 queue.capacity() * mem::size_of::<PingRequest>()
239 }
240 };
241
242 let mut n = shallow_size
243 + self.directory_manager.size_of(ops)
244 + mem::size_of::<AtomicBool>() + self.cached_pings.read().unwrap().size_of(ops)
246 + self.rate_limiter.as_ref().map(|rl| {
247 let lock = rl.read().unwrap();
248 (*lock).size_of(ops)
249 }).unwrap_or(0)
250 + self.language_binding_name.size_of(ops)
251 + self.upload_metrics.size_of(ops)
252 + self.policy.size_of(ops);
253
254 let in_flight = self.in_flight.read().unwrap();
255 n += in_flight.size_of(ops);
256
257 n
258 }
259}
260
261impl PingUploadManager {
262 pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
273 Self {
274 queue: RwLock::new(VecDeque::new()),
275 directory_manager: PingDirectoryManager::new(data_path),
276 processed_pending_pings: Arc::new(AtomicBool::new(false)),
277 cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
278 recoverable_failure_count: AtomicU32::new(0),
279 wait_attempt_count: AtomicU32::new(0),
280 rate_limiter: None,
281 language_binding_name: language_binding_name.into(),
282 upload_metrics: UploadMetrics::new(),
283 policy: Policy::default(),
284 in_flight: RwLock::new(HashMap::default()),
285 }
286 }
287
288 pub fn scan_pending_pings_directories(
295 &self,
296 trigger_upload: bool,
297 ) -> std::thread::JoinHandle<()> {
298 let local_manager = self.directory_manager.clone();
299 let local_cached_pings = self.cached_pings.clone();
300 let local_flag = self.processed_pending_pings.clone();
301 crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
302 {
303 let mut local_cached_pings = local_cached_pings
305 .write()
306 .expect("Can't write to pending pings cache.");
307 local_cached_pings.extend(local_manager.process_dirs());
308 local_flag.store(true, Ordering::SeqCst);
309 }
310 if trigger_upload {
311 crate::dispatcher::launch(|| {
312 if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
313 if let Err(e) = state.callbacks.trigger_upload() {
314 log::error!(
315 "Triggering upload after pending ping scan failed. Error: {}",
316 e
317 );
318 }
319 }
320 });
321 }
322 })
323 .expect("Unable to spawn thread to process pings directories.")
324 }
325
326 #[cfg(test)]
328 pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
329 let mut upload_manager = Self::new(data_path, "Test");
330
331 upload_manager.policy.set_max_recoverable_failures(None);
333 upload_manager.policy.set_max_wait_attempts(None);
334 upload_manager.policy.set_max_ping_body_size(None);
335 upload_manager
336 .policy
337 .set_max_pending_pings_directory_size(None);
338 upload_manager.policy.set_max_pending_pings_count(None);
339
340 upload_manager
342 .scan_pending_pings_directories(false)
343 .join()
344 .unwrap();
345
346 upload_manager
347 }
348
349 fn processed_pending_pings(&self) -> bool {
350 self.processed_pending_pings.load(Ordering::SeqCst)
351 }
352
353 fn recoverable_failure_count(&self) -> u32 {
354 self.recoverable_failure_count.load(Ordering::SeqCst)
355 }
356
357 fn wait_attempt_count(&self) -> u32 {
358 self.wait_attempt_count.load(Ordering::SeqCst)
359 }
360
361 fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
366 let PingPayload {
367 document_id,
368 upload_path: path,
369 json_body: body,
370 headers,
371 body_has_info_sections,
372 ping_name,
373 uploader_capabilities,
374 } = ping;
375 let mut request = PingRequest::builder(
376 &self.language_binding_name,
377 self.policy.max_ping_body_size(),
378 )
379 .document_id(&document_id)
380 .path(path)
381 .body(body)
382 .body_has_info_sections(body_has_info_sections)
383 .ping_name(ping_name)
384 .uploader_capabilities(uploader_capabilities);
385
386 if let Some(headers) = headers {
387 request = request.headers(headers);
388 }
389
390 match request.build() {
391 Ok(request) => Some(request),
392 Err(e) => {
393 log::warn!("Error trying to build ping request: {}", e);
394 self.directory_manager.delete_file(&document_id);
395
396 if let ErrorKind::PingBodyOverflow(s) = e.kind() {
399 self.upload_metrics
400 .discarded_exceeding_pings_size
401 .accumulate_sync(glean, *s as i64 / 1024);
402 }
403
404 None
405 }
406 }
407 }
408
409 pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
411 let mut queue = self
412 .queue
413 .write()
414 .expect("Can't write to pending pings queue.");
415
416 let PingPayload {
417 ref document_id,
418 upload_path: ref path,
419 ..
420 } = ping;
421 if queue
423 .iter()
424 .any(|request| request.document_id.as_str() == document_id)
425 {
426 log::warn!(
427 "Attempted to enqueue a duplicate ping {} at {}.",
428 document_id,
429 path
430 );
431 return;
432 }
433
434 {
435 let in_flight = self.in_flight.read().unwrap();
436 if in_flight.contains_key(document_id) {
437 log::warn!(
438 "Attempted to enqueue an in-flight ping {} at {}.",
439 document_id,
440 path
441 );
442 self.upload_metrics
443 .in_flight_pings_dropped
444 .add_sync(glean, 0);
445 return;
446 }
447 }
448
449 log::trace!("Enqueuing ping {} at {}", document_id, path);
450 if let Some(request) = self.build_ping_request(glean, ping) {
451 queue.push_back(request)
452 }
453 }
454
455 fn enqueue_cached_pings(&self, glean: &Glean) {
472 let mut cached_pings = self
473 .cached_pings
474 .write()
475 .expect("Can't write to pending pings cache.");
476
477 if cached_pings.len() > 0 {
478 let mut pending_pings_directory_size: u64 = 0;
479 let mut pending_pings_count = 0;
480 let mut deleting = false;
481
482 let total = cached_pings.pending_pings.len() as u64;
483 self.upload_metrics
484 .pending_pings
485 .add_sync(glean, total.try_into().unwrap_or(0));
486
487 if total > self.policy.max_pending_pings_count() {
488 log::warn!(
489 "More than {} pending pings in the directory, will delete {} old pings.",
490 self.policy.max_pending_pings_count(),
491 total - self.policy.max_pending_pings_count()
492 );
493 }
494
495 cached_pings.pending_pings.reverse();
501 cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
502 pending_pings_count += 1;
503 pending_pings_directory_size += file_size;
504
505 if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
507 log::warn!(
508 "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
509 self.policy.max_pending_pings_directory_size()
510 );
511 deleting = true;
512 }
513
514 if pending_pings_count > self.policy.max_pending_pings_count() {
518 deleting = true;
519 }
520
521 if deleting && self.directory_manager.delete_file(document_id) {
522 self.upload_metrics
523 .deleted_pings_after_quota_hit
524 .add_sync(glean, 1);
525 return false;
526 }
527
528 true
529 });
530 cached_pings.pending_pings.reverse();
533 self.upload_metrics
534 .pending_pings_directory_size
535 .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
536
537 cached_pings
540 .deletion_request_pings
541 .drain(..)
542 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
543 cached_pings
544 .pending_pings
545 .drain(..)
546 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
547 }
548 }
549
550 pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
563 self.rate_limiter = Some(RwLock::new(RateLimiter::new(
564 Duration::from_secs(interval),
565 max_tasks,
566 )));
567 }
568
569 pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
578 if let Some(ping) = self.directory_manager.process_file(document_id) {
579 self.enqueue_ping(glean, ping);
580 }
581 }
582
583 pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
585 log::trace!("Clearing ping queue");
586 let mut queue = self
587 .queue
588 .write()
589 .expect("Can't write to pending pings queue.");
590
591 queue.retain(|ping| ping.is_deletion_request());
592 log::trace!(
593 "{} pings left in the queue (only deletion-request expected)",
594 queue.len()
595 );
596 queue
597 }
598
599 fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
600 let wait_or_done = |time: u64| {
605 self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
606 if self.wait_attempt_count() > self.policy.max_wait_attempts() {
607 PingUploadTask::done()
608 } else {
609 PingUploadTask::Wait { time }
610 }
611 };
612
613 if !self.processed_pending_pings() {
614 log::info!(
615 "Tried getting an upload task, but processing is ongoing. Will come back later."
616 );
617 return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
618 }
619
620 self.enqueue_cached_pings(glean);
622
623 if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
624 log::warn!(
625 "Reached maximum recoverable failures for the current uploading window. You are done."
626 );
627 return PingUploadTask::done();
628 }
629
630 let mut queue = self
631 .queue
632 .write()
633 .expect("Can't write to pending pings queue.");
634 match queue.front() {
635 Some(request) => {
636 if let Some(rate_limiter) = &self.rate_limiter {
637 let mut rate_limiter = rate_limiter
638 .write()
639 .expect("Can't write to the rate limiter.");
640 if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
641 log::info!(
642 "Tried getting an upload task, but we are throttled at the moment."
643 );
644 return wait_or_done(remaining);
645 }
646 }
647
648 log::info!(
649 "New upload task with id {} (path: {})",
650 request.document_id,
651 request.path
652 );
653
654 if log_ping {
655 if let Some(body) = request.pretty_body() {
656 chunked_log_info(&request.path, &body);
657 } else {
658 chunked_log_info(&request.path, "<invalid ping payload>");
659 }
660 }
661
662 {
663 let mut in_flight = self.in_flight.write().unwrap();
667 let success_id = self.upload_metrics.send_success.start_sync();
668 let failure_id = self.upload_metrics.send_failure.start_sync();
669 in_flight.insert(request.document_id.clone(), (success_id, failure_id));
670 }
671
672 let mut request = queue.pop_front().unwrap();
673
674 request
676 .headers
677 .insert("Date".to_string(), create_date_header_value(Utc::now()));
678
679 PingUploadTask::Upload { request }
680 }
681 None => {
682 log::info!("No more pings to upload! You are done.");
683 PingUploadTask::done()
684 }
685 }
686 }
687
688 pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
699 let task = self.get_upload_task_internal(glean, log_ping);
700
701 if !task.is_wait() && self.wait_attempt_count() > 0 {
702 self.wait_attempt_count.store(0, Ordering::SeqCst);
703 }
704
705 if !task.is_upload() && self.recoverable_failure_count() > 0 {
706 self.recoverable_failure_count.store(0, Ordering::SeqCst);
707 }
708
709 task
710 }
711
712 pub fn process_ping_upload_response(
751 &self,
752 glean: &Glean,
753 document_id: &str,
754 status: UploadResult,
755 ) -> UploadTaskAction {
756 use UploadResult::*;
757
758 let stop_time = zeitstempel::now_awake();
759
760 if let Some(label) = status.get_label() {
761 let metric = self.upload_metrics.ping_upload_failure.get(label);
762 metric.add_sync(glean, 1);
763 }
764
765 let send_ids = {
766 let mut lock = self.in_flight.write().unwrap();
767 lock.remove(document_id)
768 };
769
770 if send_ids.is_none() {
771 self.upload_metrics.missing_send_ids.add_sync(glean, 1);
772 }
773
774 match status {
775 HttpStatus { code } if (200..=299).contains(&code) => {
776 log::info!("Ping {} successfully sent {}.", document_id, code);
777 if let Some((success_id, failure_id)) = send_ids {
778 self.upload_metrics
779 .send_success
780 .set_stop_and_accumulate(glean, success_id, stop_time);
781 self.upload_metrics.send_failure.cancel_sync(failure_id);
782 }
783 self.directory_manager.delete_file(document_id);
784 }
785
786 UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
787 log::warn!(
788 "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
789 document_id,
790 status
791 );
792 if let Some((success_id, failure_id)) = send_ids {
793 self.upload_metrics.send_success.cancel_sync(success_id);
794 self.upload_metrics
795 .send_failure
796 .set_stop_and_accumulate(glean, failure_id, stop_time);
797 }
798 self.directory_manager.delete_file(document_id);
799 }
800
801 RecoverableFailure { .. } | HttpStatus { .. } => {
802 log::warn!(
803 "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
804 document_id,
805 status
806 );
807 if let Some((success_id, failure_id)) = send_ids {
808 self.upload_metrics.send_success.cancel_sync(success_id);
809 self.upload_metrics
810 .send_failure
811 .set_stop_and_accumulate(glean, failure_id, stop_time);
812 }
813 self.enqueue_ping_from_file(glean, document_id);
814 self.recoverable_failure_count
815 .fetch_add(1, Ordering::SeqCst);
816 }
817
818 Done { .. } => {
819 log::debug!("Uploader signaled Done. Exiting.");
820 if let Some((success_id, failure_id)) = send_ids {
821 self.upload_metrics.send_success.cancel_sync(success_id);
822 self.upload_metrics.send_failure.cancel_sync(failure_id);
823 }
824 return UploadTaskAction::End;
825 }
826 };
827
828 UploadTaskAction::Next
829 }
830}
831
832#[cfg(target_os = "android")]
834pub fn chunked_log_info(path: &str, payload: &str) {
835 const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
839
840 if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
844 log::info!("Glean ping to URL: {}\n{}", path, payload);
845 return;
846 }
847
848 let mut start = 0;
851 let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
852 let mut chunk_idx = 1;
853 let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
855
856 while end < payload.len() {
857 for _ in 0..4 {
860 if payload.is_char_boundary(end) {
861 break;
862 }
863 end -= 1;
864 }
865
866 log::info!(
867 "Glean ping to URL: {} [Part {} of {}]\n{}",
868 path,
869 chunk_idx,
870 total_chunks,
871 &payload[start..end]
872 );
873
874 start = end;
876 end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
877 chunk_idx += 1;
878 }
879
880 if start < payload.len() {
882 log::info!(
883 "Glean ping to URL: {} [Part {} of {}]\n{}",
884 path,
885 chunk_idx,
886 total_chunks,
887 &payload[start..]
888 );
889 }
890}
891
892#[cfg(not(target_os = "android"))]
894pub fn chunked_log_info(_path: &str, payload: &str) {
895 log::info!("{}", payload)
896}
897
898#[cfg(test)]
899mod test {
900 use std::thread;
901 use uuid::Uuid;
902
903 use super::*;
904 use crate::metrics::PingType;
905 use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
906
907 const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
908
909 #[test]
910 fn doesnt_error_when_there_are_no_pending_pings() {
911 let (glean, _t) = new_glean(None);
912
913 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
916 }
917
918 #[test]
919 fn returns_ping_request_when_there_is_one() {
920 let (glean, dir) = new_glean(None);
921
922 let upload_manager = PingUploadManager::no_policy(dir.path());
923
924 upload_manager.enqueue_ping(
926 &glean,
927 PingPayload {
928 document_id: Uuid::new_v4().to_string(),
929 upload_path: PATH.into(),
930 json_body: "".into(),
931 headers: None,
932 body_has_info_sections: true,
933 ping_name: "ping-name".into(),
934 uploader_capabilities: vec![],
935 },
936 );
937
938 let task = upload_manager.get_upload_task(&glean, false);
941 assert!(task.is_upload());
942 }
943
944 #[test]
945 fn returns_as_many_ping_requests_as_there_are() {
946 let (glean, dir) = new_glean(None);
947
948 let upload_manager = PingUploadManager::no_policy(dir.path());
949
950 let n = 10;
952 for _ in 0..n {
953 upload_manager.enqueue_ping(
954 &glean,
955 PingPayload {
956 document_id: Uuid::new_v4().to_string(),
957 upload_path: PATH.into(),
958 json_body: "".into(),
959 headers: None,
960 body_has_info_sections: true,
961 ping_name: "ping-name".into(),
962 uploader_capabilities: vec![],
963 },
964 );
965 }
966
967 for _ in 0..n {
969 let task = upload_manager.get_upload_task(&glean, false);
970 assert!(task.is_upload());
971 }
972
973 assert_eq!(
975 upload_manager.get_upload_task(&glean, false),
976 PingUploadTask::done()
977 );
978 }
979
980 #[test]
981 fn limits_the_number_of_pings_when_there_is_rate_limiting() {
982 let (glean, dir) = new_glean(None);
983
984 let mut upload_manager = PingUploadManager::no_policy(dir.path());
985
986 let max_pings_per_interval = 10;
988 upload_manager.set_rate_limiter(3, 10);
989
990 for _ in 0..max_pings_per_interval {
992 upload_manager.enqueue_ping(
993 &glean,
994 PingPayload {
995 document_id: Uuid::new_v4().to_string(),
996 upload_path: PATH.into(),
997 json_body: "".into(),
998 headers: None,
999 body_has_info_sections: true,
1000 ping_name: "ping-name".into(),
1001 uploader_capabilities: vec![],
1002 },
1003 );
1004 }
1005
1006 for _ in 0..max_pings_per_interval {
1008 let task = upload_manager.get_upload_task(&glean, false);
1009 assert!(task.is_upload());
1010 }
1011
1012 upload_manager.enqueue_ping(
1014 &glean,
1015 PingPayload {
1016 document_id: Uuid::new_v4().to_string(),
1017 upload_path: PATH.into(),
1018 json_body: "".into(),
1019 headers: None,
1020 body_has_info_sections: true,
1021 ping_name: "ping-name".into(),
1022 uploader_capabilities: vec![],
1023 },
1024 );
1025
1026 match upload_manager.get_upload_task(&glean, false) {
1028 PingUploadTask::Wait { time } => {
1029 thread::sleep(Duration::from_millis(time));
1031 }
1032 _ => panic!("Expected upload manager to return a wait task!"),
1033 };
1034
1035 let task = upload_manager.get_upload_task(&glean, false);
1036 assert!(task.is_upload());
1037 }
1038
1039 #[test]
1040 fn clearing_the_queue_works_correctly() {
1041 let (glean, dir) = new_glean(None);
1042
1043 let upload_manager = PingUploadManager::no_policy(dir.path());
1044
1045 for _ in 0..10 {
1047 upload_manager.enqueue_ping(
1048 &glean,
1049 PingPayload {
1050 document_id: Uuid::new_v4().to_string(),
1051 upload_path: PATH.into(),
1052 json_body: "".into(),
1053 headers: None,
1054 body_has_info_sections: true,
1055 ping_name: "ping-name".into(),
1056 uploader_capabilities: vec![],
1057 },
1058 );
1059 }
1060
1061 drop(upload_manager.clear_ping_queue());
1063
1064 assert_eq!(
1066 upload_manager.get_upload_task(&glean, false),
1067 PingUploadTask::done()
1068 );
1069 }
1070
1071 #[test]
1072 fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1073 let (mut glean, _t) = new_glean(None);
1074
1075 let ping_type = PingType::new(
1077 "test",
1078 true,
1079 true,
1080 true,
1081 true,
1082 true,
1083 vec![],
1084 vec![],
1085 true,
1086 vec![],
1087 );
1088 glean.register_ping_type(&ping_type);
1089
1090 let n = 10;
1092 for _ in 0..n {
1093 ping_type.submit_sync(&glean, None);
1094 }
1095
1096 glean
1097 .internal_pings
1098 .deletion_request
1099 .submit_sync(&glean, None);
1100
1101 drop(glean.upload_manager.clear_ping_queue());
1103
1104 let upload_task = glean.get_upload_task();
1105 match upload_task {
1106 PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1107 _ => panic!("Expected upload manager to return the next request!"),
1108 }
1109
1110 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1112 }
1113
1114 #[test]
1115 fn fills_up_queue_successfully_from_disk() {
1116 let (mut glean, dir) = new_glean(None);
1117
1118 let ping_type = PingType::new(
1120 "test",
1121 true,
1122 true,
1123 true,
1124 true,
1125 true,
1126 vec![],
1127 vec![],
1128 true,
1129 vec![],
1130 );
1131 glean.register_ping_type(&ping_type);
1132
1133 let n = 10;
1135 for _ in 0..n {
1136 ping_type.submit_sync(&glean, None);
1137 }
1138
1139 let upload_manager = PingUploadManager::no_policy(dir.path());
1141
1142 for _ in 0..n {
1144 let task = upload_manager.get_upload_task(&glean, false);
1145 assert!(task.is_upload());
1146 }
1147
1148 assert_eq!(
1150 upload_manager.get_upload_task(&glean, false),
1151 PingUploadTask::done()
1152 );
1153 }
1154
1155 #[test]
1156 fn processes_correctly_success_upload_response() {
1157 let (mut glean, dir) = new_glean(None);
1158
1159 let ping_type = PingType::new(
1161 "test",
1162 true,
1163 true,
1164 true,
1165 true,
1166 true,
1167 vec![],
1168 vec![],
1169 true,
1170 vec![],
1171 );
1172 glean.register_ping_type(&ping_type);
1173
1174 ping_type.submit_sync(&glean, None);
1176
1177 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1179
1180 match glean.get_upload_task() {
1182 PingUploadTask::Upload { request } => {
1183 let document_id = request.document_id;
1185 glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1186 assert!(!pending_pings_dir.join(document_id).exists());
1188 }
1189 _ => panic!("Expected upload manager to return the next request!"),
1190 }
1191
1192 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1194 }
1195
1196 #[test]
1197 fn processes_correctly_client_error_upload_response() {
1198 let (mut glean, dir) = new_glean(None);
1199
1200 let ping_type = PingType::new(
1202 "test",
1203 true,
1204 true,
1205 true,
1206 true,
1207 true,
1208 vec![],
1209 vec![],
1210 true,
1211 vec![],
1212 );
1213 glean.register_ping_type(&ping_type);
1214
1215 ping_type.submit_sync(&glean, None);
1217
1218 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1220
1221 match glean.get_upload_task() {
1223 PingUploadTask::Upload { request } => {
1224 let document_id = request.document_id;
1226 glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1227 assert!(!pending_pings_dir.join(document_id).exists());
1229 }
1230 _ => panic!("Expected upload manager to return the next request!"),
1231 }
1232
1233 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1235 }
1236
1237 #[test]
1238 fn processes_correctly_server_error_upload_response() {
1239 let (mut glean, _t) = new_glean(None);
1240
1241 let ping_type = PingType::new(
1243 "test",
1244 true,
1245 true,
1246 true,
1247 true,
1248 true,
1249 vec![],
1250 vec![],
1251 true,
1252 vec![],
1253 );
1254 glean.register_ping_type(&ping_type);
1255
1256 ping_type.submit_sync(&glean, None);
1258
1259 match glean.get_upload_task() {
1261 PingUploadTask::Upload { request } => {
1262 let document_id = request.document_id;
1264 glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1265 match glean.get_upload_task() {
1267 PingUploadTask::Upload { request } => {
1268 assert_eq!(document_id, request.document_id);
1269 }
1270 _ => panic!("Expected upload manager to return the next request!"),
1271 }
1272 }
1273 _ => panic!("Expected upload manager to return the next request!"),
1274 }
1275
1276 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1278 }
1279
1280 #[test]
1281 fn processes_correctly_unrecoverable_upload_response() {
1282 let (mut glean, dir) = new_glean(None);
1283
1284 let ping_type = PingType::new(
1286 "test",
1287 true,
1288 true,
1289 true,
1290 true,
1291 true,
1292 vec![],
1293 vec![],
1294 true,
1295 vec![],
1296 );
1297 glean.register_ping_type(&ping_type);
1298
1299 ping_type.submit_sync(&glean, None);
1301
1302 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1304
1305 match glean.get_upload_task() {
1307 PingUploadTask::Upload { request } => {
1308 let document_id = request.document_id;
1310 glean.process_ping_upload_response(
1311 &document_id,
1312 UploadResult::unrecoverable_failure(),
1313 );
1314 assert!(!pending_pings_dir.join(document_id).exists());
1316 }
1317 _ => panic!("Expected upload manager to return the next request!"),
1318 }
1319
1320 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1322 }
1323
1324 #[test]
1325 fn new_pings_are_added_while_upload_in_progress() {
1326 let (glean, dir) = new_glean(None);
1327
1328 let upload_manager = PingUploadManager::no_policy(dir.path());
1329
1330 let doc1 = Uuid::new_v4().to_string();
1331 let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1332
1333 let doc2 = Uuid::new_v4().to_string();
1334 let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1335
1336 upload_manager.enqueue_ping(
1338 &glean,
1339 PingPayload {
1340 document_id: doc1.clone(),
1341 upload_path: path1,
1342 json_body: "".into(),
1343 headers: None,
1344 body_has_info_sections: true,
1345 ping_name: "test-ping".into(),
1346 uploader_capabilities: vec![],
1347 },
1348 );
1349
1350 let req = match upload_manager.get_upload_task(&glean, false) {
1352 PingUploadTask::Upload { request } => request,
1353 _ => panic!("Expected upload manager to return the next request!"),
1354 };
1355 assert_eq!(doc1, req.document_id);
1356
1357 upload_manager.enqueue_ping(
1359 &glean,
1360 PingPayload {
1361 document_id: doc2.clone(),
1362 upload_path: path2,
1363 json_body: "".into(),
1364 headers: None,
1365 body_has_info_sections: true,
1366 ping_name: "test-ping".into(),
1367 uploader_capabilities: vec![],
1368 },
1369 );
1370
1371 upload_manager.process_ping_upload_response(
1373 &glean,
1374 &req.document_id,
1375 UploadResult::http_status(200),
1376 );
1377
1378 let req = match upload_manager.get_upload_task(&glean, false) {
1380 PingUploadTask::Upload { request } => request,
1381 _ => panic!("Expected upload manager to return the next request!"),
1382 };
1383 assert_eq!(doc2, req.document_id);
1384
1385 upload_manager.process_ping_upload_response(
1387 &glean,
1388 &req.document_id,
1389 UploadResult::http_status(200),
1390 );
1391
1392 assert_eq!(
1394 upload_manager.get_upload_task(&glean, false),
1395 PingUploadTask::done()
1396 );
1397 }
1398
1399 #[test]
1400 fn adds_debug_view_header_to_requests_when_tag_is_set() {
1401 let (mut glean, _t) = new_glean(None);
1402
1403 glean.set_debug_view_tag("valid-tag");
1404
1405 let ping_type = PingType::new(
1407 "test",
1408 true,
1409 true,
1410 true,
1411 true,
1412 true,
1413 vec![],
1414 vec![],
1415 true,
1416 vec![],
1417 );
1418 glean.register_ping_type(&ping_type);
1419
1420 ping_type.submit_sync(&glean, None);
1422
1423 match glean.get_upload_task() {
1425 PingUploadTask::Upload { request } => {
1426 assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1427 }
1428 _ => panic!("Expected upload manager to return the next request!"),
1429 }
1430 }
1431
1432 #[test]
1433 fn duplicates_are_not_enqueued() {
1434 let (glean, dir) = new_glean(None);
1435
1436 let upload_manager = PingUploadManager::no_policy(dir.path());
1439
1440 let doc_id = Uuid::new_v4().to_string();
1441 let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1442
1443 upload_manager.enqueue_ping(
1445 &glean,
1446 PingPayload {
1447 document_id: doc_id.clone(),
1448 upload_path: path.clone(),
1449 json_body: "".into(),
1450 headers: None,
1451 body_has_info_sections: true,
1452 ping_name: "test-ping".into(),
1453 uploader_capabilities: vec![],
1454 },
1455 );
1456 upload_manager.enqueue_ping(
1457 &glean,
1458 PingPayload {
1459 document_id: doc_id,
1460 upload_path: path,
1461 json_body: "".into(),
1462 headers: None,
1463 body_has_info_sections: true,
1464 ping_name: "test-ping".into(),
1465 uploader_capabilities: vec![],
1466 },
1467 );
1468
1469 let task = upload_manager.get_upload_task(&glean, false);
1471 assert!(task.is_upload());
1472
1473 assert_eq!(
1475 upload_manager.get_upload_task(&glean, false),
1476 PingUploadTask::done()
1477 );
1478 }
1479
1480 #[test]
1481 fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1482 let (mut glean, dir) = new_glean(None);
1483
1484 let ping_type = PingType::new(
1486 "test",
1487 true,
1488 true,
1489 true,
1490 true,
1491 true,
1492 vec![],
1493 vec![],
1494 true,
1495 vec![],
1496 );
1497 glean.register_ping_type(&ping_type);
1498
1499 let n = 5;
1501 for _ in 0..n {
1502 ping_type.submit_sync(&glean, None);
1503 }
1504
1505 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1506
1507 let max_recoverable_failures = 3;
1509 upload_manager
1510 .policy
1511 .set_max_recoverable_failures(Some(max_recoverable_failures));
1512
1513 for _ in 0..max_recoverable_failures {
1515 match upload_manager.get_upload_task(&glean, false) {
1516 PingUploadTask::Upload { request } => {
1517 upload_manager.process_ping_upload_response(
1518 &glean,
1519 &request.document_id,
1520 UploadResult::recoverable_failure(),
1521 );
1522 }
1523 _ => panic!("Expected upload manager to return the next request!"),
1524 }
1525 }
1526
1527 assert_eq!(
1530 upload_manager.get_upload_task(&glean, false),
1531 PingUploadTask::done()
1532 );
1533
1534 for _ in 0..n {
1536 let task = upload_manager.get_upload_task(&glean, false);
1537 assert!(task.is_upload());
1538 }
1539 }
1540
1541 #[test]
1542 fn quota_is_enforced_when_enqueueing_cached_pings() {
1543 let (mut glean, dir) = new_glean(None);
1544
1545 let ping_type = PingType::new(
1547 "test",
1548 true,
1549 true,
1550 true,
1551 true,
1552 true,
1553 vec![],
1554 vec![],
1555 true,
1556 vec![],
1557 );
1558 glean.register_ping_type(&ping_type);
1559
1560 let n = 10;
1562 for _ in 0..n {
1563 ping_type.submit_sync(&glean, None);
1564 }
1565
1566 let directory_manager = PingDirectoryManager::new(dir.path());
1567 let pending_pings = directory_manager.process_dirs().pending_pings;
1568 let (_, newest_ping) = &pending_pings.last().unwrap();
1571 let PingPayload {
1572 document_id: newest_ping_id,
1573 ..
1574 } = &newest_ping;
1575
1576 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1578
1579 upload_manager
1586 .policy
1587 .set_max_pending_pings_directory_size(Some(500));
1588
1589 match upload_manager.get_upload_task(&glean, false) {
1593 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1594 _ => panic!("Expected upload manager to return the next request!"),
1595 }
1596
1597 assert_eq!(
1600 upload_manager.get_upload_task(&glean, false),
1601 PingUploadTask::done()
1602 );
1603
1604 assert_eq!(
1606 n - 1,
1607 upload_manager
1608 .upload_metrics
1609 .deleted_pings_after_quota_hit
1610 .get_value(&glean, Some("metrics"))
1611 .unwrap()
1612 );
1613 assert_eq!(
1614 n,
1615 upload_manager
1616 .upload_metrics
1617 .pending_pings
1618 .get_value(&glean, Some("metrics"))
1619 .unwrap()
1620 );
1621 }
1622
1623 #[test]
1624 fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1625 let (mut glean, dir) = new_glean(None);
1626
1627 let ping_type = PingType::new(
1629 "test",
1630 true,
1631 true,
1632 true,
1633 true,
1634 true,
1635 vec![],
1636 vec![],
1637 true,
1638 vec![],
1639 );
1640 glean.register_ping_type(&ping_type);
1641
1642 let count_quota = 3;
1644 let n = 10;
1646
1647 for _ in 0..n {
1649 ping_type.submit_sync(&glean, None);
1650 }
1651
1652 let directory_manager = PingDirectoryManager::new(dir.path());
1653 let pending_pings = directory_manager.process_dirs().pending_pings;
1654 let expected_pings = pending_pings
1657 .iter()
1658 .rev()
1659 .take(count_quota)
1660 .map(|(_, ping)| ping.document_id.clone())
1661 .collect::<Vec<_>>();
1662
1663 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1665
1666 upload_manager
1667 .policy
1668 .set_max_pending_pings_count(Some(count_quota as u64));
1669
1670 for ping_id in expected_pings.iter().rev() {
1674 match upload_manager.get_upload_task(&glean, false) {
1675 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1676 _ => panic!("Expected upload manager to return the next request!"),
1677 }
1678 }
1679
1680 assert_eq!(
1683 upload_manager.get_upload_task(&glean, false),
1684 PingUploadTask::done()
1685 );
1686
1687 assert_eq!(
1689 (n - count_quota) as i32,
1690 upload_manager
1691 .upload_metrics
1692 .deleted_pings_after_quota_hit
1693 .get_value(&glean, Some("metrics"))
1694 .unwrap()
1695 );
1696 assert_eq!(
1697 n as i32,
1698 upload_manager
1699 .upload_metrics
1700 .pending_pings
1701 .get_value(&glean, Some("metrics"))
1702 .unwrap()
1703 );
1704 }
1705
1706 #[test]
1707 fn size_and_count_quota_work_together_size_first() {
1708 let (mut glean, dir) = new_glean(None);
1709
1710 let ping_type = PingType::new(
1712 "test",
1713 true,
1714 true,
1715 true,
1716 true,
1717 true,
1718 vec![],
1719 vec![],
1720 true,
1721 vec![],
1722 );
1723 glean.register_ping_type(&ping_type);
1724
1725 let expected_number_of_pings = 3;
1726 let n = 10;
1728
1729 for _ in 0..n {
1731 ping_type.submit_sync(&glean, None);
1732 }
1733
1734 let directory_manager = PingDirectoryManager::new(dir.path());
1735 let pending_pings = directory_manager.process_dirs().pending_pings;
1736 let expected_pings = pending_pings
1739 .iter()
1740 .rev()
1741 .take(expected_number_of_pings)
1742 .map(|(_, ping)| ping.document_id.clone())
1743 .collect::<Vec<_>>();
1744
1745 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1747
1748 upload_manager
1751 .policy
1752 .set_max_pending_pings_directory_size(Some(1300));
1753 upload_manager.policy.set_max_pending_pings_count(Some(5));
1754
1755 for ping_id in expected_pings.iter().rev() {
1759 match upload_manager.get_upload_task(&glean, false) {
1760 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1761 _ => panic!("Expected upload manager to return the next request!"),
1762 }
1763 }
1764
1765 assert_eq!(
1768 upload_manager.get_upload_task(&glean, false),
1769 PingUploadTask::done()
1770 );
1771
1772 assert_eq!(
1774 (n - expected_number_of_pings) as i32,
1775 upload_manager
1776 .upload_metrics
1777 .deleted_pings_after_quota_hit
1778 .get_value(&glean, Some("metrics"))
1779 .unwrap()
1780 );
1781 assert_eq!(
1782 n as i32,
1783 upload_manager
1784 .upload_metrics
1785 .pending_pings
1786 .get_value(&glean, Some("metrics"))
1787 .unwrap()
1788 );
1789 }
1790
1791 #[test]
1792 fn size_and_count_quota_work_together_count_first() {
1793 let (mut glean, dir) = new_glean(None);
1794
1795 let ping_type = PingType::new(
1797 "test",
1798 true,
1799 true,
1800 true,
1801 true,
1802 true,
1803 vec![],
1804 vec![],
1805 true,
1806 vec![],
1807 );
1808 glean.register_ping_type(&ping_type);
1809
1810 let expected_number_of_pings = 2;
1811 let n = 10;
1813
1814 for _ in 0..n {
1816 ping_type.submit_sync(&glean, None);
1817 }
1818
1819 let directory_manager = PingDirectoryManager::new(dir.path());
1820 let pending_pings = directory_manager.process_dirs().pending_pings;
1821 let expected_pings = pending_pings
1824 .iter()
1825 .rev()
1826 .take(expected_number_of_pings)
1827 .map(|(_, ping)| ping.document_id.clone())
1828 .collect::<Vec<_>>();
1829
1830 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1832
1833 upload_manager
1836 .policy
1837 .set_max_pending_pings_directory_size(Some(1000));
1838 upload_manager.policy.set_max_pending_pings_count(Some(2));
1839
1840 for ping_id in expected_pings.iter().rev() {
1844 match upload_manager.get_upload_task(&glean, false) {
1845 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1846 _ => panic!("Expected upload manager to return the next request!"),
1847 }
1848 }
1849
1850 assert_eq!(
1853 upload_manager.get_upload_task(&glean, false),
1854 PingUploadTask::done()
1855 );
1856
1857 assert_eq!(
1859 (n - expected_number_of_pings) as i32,
1860 upload_manager
1861 .upload_metrics
1862 .deleted_pings_after_quota_hit
1863 .get_value(&glean, Some("metrics"))
1864 .unwrap()
1865 );
1866 assert_eq!(
1867 n as i32,
1868 upload_manager
1869 .upload_metrics
1870 .pending_pings
1871 .get_value(&glean, Some("metrics"))
1872 .unwrap()
1873 );
1874 }
1875
1876 #[test]
1877 fn maximum_wait_attemps_is_enforced() {
1878 let (glean, dir) = new_glean(None);
1879
1880 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1881
1882 let max_wait_attempts = 3;
1884 upload_manager
1885 .policy
1886 .set_max_wait_attempts(Some(max_wait_attempts));
1887
1888 let secs_per_interval = 5;
1894 let max_pings_per_interval = 1;
1895 upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1896
1897 upload_manager.enqueue_ping(
1899 &glean,
1900 PingPayload {
1901 document_id: Uuid::new_v4().to_string(),
1902 upload_path: PATH.into(),
1903 json_body: "".into(),
1904 headers: None,
1905 body_has_info_sections: true,
1906 ping_name: "ping-name".into(),
1907 uploader_capabilities: vec![],
1908 },
1909 );
1910 upload_manager.enqueue_ping(
1911 &glean,
1912 PingPayload {
1913 document_id: Uuid::new_v4().to_string(),
1914 upload_path: PATH.into(),
1915 json_body: "".into(),
1916 headers: None,
1917 body_has_info_sections: true,
1918 ping_name: "ping-name".into(),
1919 uploader_capabilities: vec![],
1920 },
1921 );
1922
1923 match upload_manager.get_upload_task(&glean, false) {
1925 PingUploadTask::Upload { .. } => {}
1926 _ => panic!("Expected upload manager to return the next request!"),
1927 }
1928
1929 for _ in 0..max_wait_attempts {
1933 let task = upload_manager.get_upload_task(&glean, false);
1934 assert!(task.is_wait());
1935 }
1936
1937 assert_eq!(
1940 upload_manager.get_upload_task(&glean, false),
1941 PingUploadTask::done()
1942 );
1943
1944 thread::sleep(Duration::from_secs(secs_per_interval));
1946
1947 let task = upload_manager.get_upload_task(&glean, false);
1949 assert!(task.is_upload());
1950
1951 assert_eq!(
1953 upload_manager.get_upload_task(&glean, false),
1954 PingUploadTask::done()
1955 );
1956 }
1957
1958 #[test]
1959 fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1960 let (glean, dir) = new_glean(None);
1961 let upload_manager = PingUploadManager::new(dir.path(), "test");
1962 match upload_manager.get_upload_task(&glean, false) {
1963 PingUploadTask::Wait { time } => {
1964 assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1965 }
1966 _ => panic!("Expected upload manager to return a wait task!"),
1967 };
1968 }
1969
1970 #[test]
1971 fn cannot_enqueue_ping_while_its_being_processed() {
1972 let (glean, dir) = new_glean(None);
1973
1974 let upload_manager = PingUploadManager::no_policy(dir.path());
1975
1976 let identifier = &Uuid::new_v4();
1978 let ping = PingPayload {
1979 document_id: identifier.to_string(),
1980 upload_path: PATH.into(),
1981 json_body: "".into(),
1982 headers: None,
1983 body_has_info_sections: true,
1984 ping_name: "ping-name".into(),
1985 uploader_capabilities: vec![],
1986 };
1987 upload_manager.enqueue_ping(&glean, ping);
1988 assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1989
1990 let ping = PingPayload {
1992 document_id: identifier.to_string(),
1993 upload_path: PATH.into(),
1994 json_body: "".into(),
1995 headers: None,
1996 body_has_info_sections: true,
1997 ping_name: "ping-name".into(),
1998 uploader_capabilities: vec![],
1999 };
2000 upload_manager.enqueue_ping(&glean, ping);
2001
2002 assert_eq!(
2004 upload_manager.get_upload_task(&glean, false),
2005 PingUploadTask::done()
2006 );
2007
2008 upload_manager.process_ping_upload_response(
2010 &glean,
2011 &identifier.to_string(),
2012 UploadResult::http_status(200),
2013 );
2014 }
2015}