1use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; #[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48 started: Option<Instant>,
50 count: u32,
52 interval: Duration,
54 max_count: u32,
56}
57
58#[derive(PartialEq)]
60enum RateLimiterState {
61 Incrementing,
63 Throttled(u64),
68}
69
70impl RateLimiter {
71 pub fn new(interval: Duration, max_count: u32) -> Self {
72 Self {
73 started: None,
74 count: 0,
75 interval,
76 max_count,
77 }
78 }
79
80 fn reset(&mut self) {
81 self.started = Some(Instant::now());
82 self.count = 0;
83 }
84
85 fn elapsed(&self) -> Duration {
86 self.started.unwrap().elapsed()
87 }
88
89 fn should_reset(&self) -> bool {
95 if self.started.is_none() {
96 return true;
97 }
98
99 if self.elapsed() > self.interval {
101 return true;
102 }
103
104 false
105 }
106
107 pub fn get_state(&mut self) -> RateLimiterState {
113 if self.should_reset() {
114 self.reset();
115 }
116
117 if self.count == self.max_count {
118 let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121 return RateLimiterState::Throttled(
122 remaining
123 .try_into()
124 .unwrap_or(self.interval.as_secs() * 1000),
125 );
126 }
127
128 self.count += 1;
129 RateLimiterState::Incrementing
130 }
131}
132
133#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139 Upload {
141 request: PingRequest,
144 },
145
146 Wait {
149 time: u64,
152 },
153
154 Done {
167 #[doc(hidden)]
168 unused: i8,
170 },
171}
172
173impl PingUploadTask {
174 pub fn is_upload(&self) -> bool {
176 matches!(self, PingUploadTask::Upload { .. })
177 }
178
179 pub fn is_wait(&self) -> bool {
181 matches!(self, PingUploadTask::Wait { .. })
182 }
183
184 pub(crate) fn done() -> Self {
185 PingUploadTask::Done { unused: 0 }
186 }
187}
188
189#[derive(Debug)]
191pub struct PingUploadManager {
192 queue: RwLock<VecDeque<PingRequest>>,
194 directory_manager: PingDirectoryManager,
196 processed_pending_pings: Arc<AtomicBool>,
198 cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200 recoverable_failure_count: AtomicU32,
202 wait_attempt_count: AtomicU32,
204 rate_limiter: Option<RwLock<RateLimiter>>,
209 language_binding_name: String,
213 upload_metrics: UploadMetrics,
215 policy: Policy,
217
218 in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223 let shallow_size = {
224 let queue = self.queue.read().unwrap();
225 if ops.has_malloc_enclosing_size_of() {
226 if let Some(front) = queue.front() {
227 unsafe { ops.malloc_enclosing_size_of(front) }
230 } else {
231 0
233 }
234 } else {
235 queue.capacity() * mem::size_of::<PingRequest>()
239 }
240 };
241
242 let mut n = shallow_size
243 + self.directory_manager.size_of(ops)
244 + mem::size_of::<AtomicBool>() + self.cached_pings.read().unwrap().size_of(ops)
246 + self.rate_limiter.as_ref().map(|rl| {
247 let lock = rl.read().unwrap();
248 (*lock).size_of(ops)
249 }).unwrap_or(0)
250 + self.language_binding_name.size_of(ops)
251 + self.upload_metrics.size_of(ops)
252 + self.policy.size_of(ops);
253
254 let in_flight = self.in_flight.read().unwrap();
255 n += in_flight.size_of(ops);
256
257 n
258 }
259}
260
261impl PingUploadManager {
262 pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
273 Self {
274 queue: RwLock::new(VecDeque::new()),
275 directory_manager: PingDirectoryManager::new(data_path),
276 processed_pending_pings: Arc::new(AtomicBool::new(false)),
277 cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
278 recoverable_failure_count: AtomicU32::new(0),
279 wait_attempt_count: AtomicU32::new(0),
280 rate_limiter: None,
281 language_binding_name: language_binding_name.into(),
282 upload_metrics: UploadMetrics::new(),
283 policy: Policy::default(),
284 in_flight: RwLock::new(HashMap::default()),
285 }
286 }
287
288 pub fn scan_pending_pings_directories(
295 &self,
296 trigger_upload: bool,
297 ) -> std::thread::JoinHandle<()> {
298 let local_manager = self.directory_manager.clone();
299 let local_cached_pings = self.cached_pings.clone();
300 let local_flag = self.processed_pending_pings.clone();
301 crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
302 {
303 let mut local_cached_pings = local_cached_pings
305 .write()
306 .expect("Can't write to pending pings cache.");
307 local_cached_pings.extend(local_manager.process_dirs());
308 local_flag.store(true, Ordering::SeqCst);
309 }
310 if trigger_upload {
311 crate::dispatcher::launch(|| {
312 if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
313 if let Err(e) = state.callbacks.trigger_upload() {
314 log::error!(
315 "Triggering upload after pending ping scan failed. Error: {}",
316 e
317 );
318 }
319 }
320 });
321 }
322 })
323 .expect("Unable to spawn thread to process pings directories.")
324 }
325
326 #[cfg(test)]
328 pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
329 let mut upload_manager = Self::new(data_path, "Test");
330
331 upload_manager.policy.set_max_recoverable_failures(None);
333 upload_manager.policy.set_max_wait_attempts(None);
334 upload_manager.policy.set_max_ping_body_size(None);
335 upload_manager
336 .policy
337 .set_max_pending_pings_directory_size(None);
338 upload_manager.policy.set_max_pending_pings_count(None);
339
340 upload_manager
342 .scan_pending_pings_directories(false)
343 .join()
344 .unwrap();
345
346 upload_manager
347 }
348
349 fn processed_pending_pings(&self) -> bool {
350 self.processed_pending_pings.load(Ordering::SeqCst)
351 }
352
353 fn recoverable_failure_count(&self) -> u32 {
354 self.recoverable_failure_count.load(Ordering::SeqCst)
355 }
356
357 fn wait_attempt_count(&self) -> u32 {
358 self.wait_attempt_count.load(Ordering::SeqCst)
359 }
360
361 fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
366 let PingPayload {
367 document_id,
368 upload_path: path,
369 json_body: body,
370 headers,
371 body_has_info_sections,
372 ping_name,
373 uploader_capabilities,
374 } = ping;
375 let mut request = PingRequest::builder(
376 &self.language_binding_name,
377 self.policy.max_ping_body_size(),
378 )
379 .document_id(&document_id)
380 .path(path)
381 .body(body)
382 .body_has_info_sections(body_has_info_sections)
383 .ping_name(ping_name)
384 .uploader_capabilities(uploader_capabilities);
385
386 if let Some(headers) = headers {
387 request = request.headers(headers);
388 }
389
390 match request.build() {
391 Ok(request) => Some(request),
392 Err(e) => {
393 log::warn!("Error trying to build ping request: {}", e);
394 self.directory_manager.delete_file(&document_id);
395
396 if let ErrorKind::PingBodyOverflow(s) = e.kind() {
399 self.upload_metrics
400 .discarded_exceeding_pings_size
401 .accumulate_sync(glean, *s as i64 / 1024);
402 }
403
404 None
405 }
406 }
407 }
408
409 pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
411 let mut queue = self
412 .queue
413 .write()
414 .expect("Can't write to pending pings queue.");
415
416 let PingPayload {
417 ref document_id,
418 upload_path: ref path,
419 ..
420 } = ping;
421 if queue
423 .iter()
424 .any(|request| request.document_id.as_str() == document_id)
425 {
426 log::warn!(
427 "Attempted to enqueue a duplicate ping {} at {}.",
428 document_id,
429 path
430 );
431 return;
432 }
433
434 {
435 let in_flight = self.in_flight.read().unwrap();
436 if in_flight.contains_key(document_id) {
437 log::warn!(
438 "Attempted to enqueue an in-flight ping {} at {}.",
439 document_id,
440 path
441 );
442 self.upload_metrics
443 .in_flight_pings_dropped
444 .add_sync(glean, 0);
445 return;
446 }
447 }
448
449 log::trace!("Enqueuing ping {} at {}", document_id, path);
450 if let Some(request) = self.build_ping_request(glean, ping) {
451 queue.push_back(request)
452 }
453 }
454
455 fn enqueue_cached_pings(&self, glean: &Glean) {
472 let mut cached_pings = self
473 .cached_pings
474 .write()
475 .expect("Can't write to pending pings cache.");
476
477 if cached_pings.len() > 0 {
478 let mut pending_pings_directory_size: u64 = 0;
479 let mut pending_pings_count = 0;
480 let mut deleting = false;
481 let mut delete_reason: Option<&'static str> = None;
482
483 let total = cached_pings.pending_pings.len() as u64;
484 self.upload_metrics
485 .pending_pings
486 .add_sync(glean, total.try_into().unwrap_or(0));
487
488 if total > self.policy.max_pending_pings_count() {
489 log::warn!(
490 "More than {} pending pings in the directory, will delete {} old pings.",
491 self.policy.max_pending_pings_count(),
492 total - self.policy.max_pending_pings_count()
493 );
494 }
495
496 cached_pings.pending_pings.reverse();
502 cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
503 pending_pings_count += 1;
504 pending_pings_directory_size += file_size;
505
506 if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
510 log::warn!(
511 "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
512 self.policy.max_pending_pings_directory_size()
513 );
514 deleting = true;
515 delete_reason = Some("size_quota");
516 }
517
518 if !deleting && pending_pings_count > self.policy.max_pending_pings_count() {
522 deleting = true;
523 delete_reason = Some("count_quota");
524 }
525
526 if deleting && self.directory_manager.delete_file(document_id) {
527 self.upload_metrics
528 .deleted_pings_after_quota_hit
529 .add_sync(glean, 1);
530 if let Some(reason) = delete_reason {
531 self.upload_metrics
532 .pending_pings_deleted
533 .get(reason)
534 .add_sync(glean, 1);
535 }
536 return false;
537 }
538
539 true
540 });
541 cached_pings.pending_pings.reverse();
544 self.upload_metrics
545 .pending_pings_directory_size
546 .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
547
548 cached_pings
551 .deletion_request_pings
552 .drain(..)
553 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
554 cached_pings
555 .pending_pings
556 .drain(..)
557 .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
558 }
559 }
560
561 pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
574 self.rate_limiter = Some(RwLock::new(RateLimiter::new(
575 Duration::from_secs(interval),
576 max_tasks,
577 )));
578 }
579
580 pub(crate) fn set_max_pending_pings_count(&mut self, n: u64) {
581 self.policy.set_max_pending_pings_count(Some(n));
582 }
583
584 pub(crate) fn set_max_pending_pings_directory_size(&mut self, n: u64) {
585 self.policy.set_max_pending_pings_directory_size(Some(n));
586 }
587
588 pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
597 if let Some(ping) = self.directory_manager.process_file(document_id) {
598 self.enqueue_ping(glean, ping);
599 }
600 }
601
602 pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
604 log::trace!("Clearing ping queue");
605 let mut queue = self
606 .queue
607 .write()
608 .expect("Can't write to pending pings queue.");
609
610 queue.retain(|ping| ping.is_deletion_request());
611 log::trace!(
612 "{} pings left in the queue (only deletion-request expected)",
613 queue.len()
614 );
615 queue
616 }
617
618 fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
619 let wait_or_done = |time: u64| {
624 self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
625 if self.wait_attempt_count() > self.policy.max_wait_attempts() {
626 PingUploadTask::done()
627 } else {
628 PingUploadTask::Wait { time }
629 }
630 };
631
632 if !self.processed_pending_pings() {
633 log::info!(
634 "Tried getting an upload task, but processing is ongoing. Will come back later."
635 );
636 return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
637 }
638
639 self.enqueue_cached_pings(glean);
641
642 if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
643 log::warn!(
644 "Reached maximum recoverable failures for the current uploading window. You are done."
645 );
646 return PingUploadTask::done();
647 }
648
649 let mut queue = self
650 .queue
651 .write()
652 .expect("Can't write to pending pings queue.");
653 match queue.front() {
654 Some(request) => {
655 if let Some(rate_limiter) = &self.rate_limiter {
656 let mut rate_limiter = rate_limiter
657 .write()
658 .expect("Can't write to the rate limiter.");
659 if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
660 log::info!(
661 "Tried getting an upload task, but we are throttled at the moment."
662 );
663 return wait_or_done(remaining);
664 }
665 }
666
667 log::info!(
668 "New upload task with id {} (path: {})",
669 request.document_id,
670 request.path
671 );
672
673 if log_ping {
674 if let Some(body) = request.pretty_body() {
675 chunked_log_info(&request.path, &body);
676 } else {
677 chunked_log_info(&request.path, "<invalid ping payload>");
678 }
679 }
680
681 {
682 let mut in_flight = self.in_flight.write().unwrap();
686 let success_id = self.upload_metrics.send_success.start_sync();
687 let failure_id = self.upload_metrics.send_failure.start_sync();
688 in_flight.insert(request.document_id.clone(), (success_id, failure_id));
689 }
690
691 let mut request = queue.pop_front().unwrap();
692
693 request
695 .headers
696 .insert("Date".to_string(), create_date_header_value(Utc::now()));
697
698 PingUploadTask::Upload { request }
699 }
700 None => {
701 log::info!("No more pings to upload! You are done.");
702 PingUploadTask::done()
703 }
704 }
705 }
706
707 pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
718 let task = self.get_upload_task_internal(glean, log_ping);
719
720 if !task.is_wait() && self.wait_attempt_count() > 0 {
721 self.wait_attempt_count.store(0, Ordering::SeqCst);
722 }
723
724 if !task.is_upload() && self.recoverable_failure_count() > 0 {
725 self.recoverable_failure_count.store(0, Ordering::SeqCst);
726 }
727
728 task
729 }
730
731 pub fn process_ping_upload_response(
770 &self,
771 glean: &Glean,
772 document_id: &str,
773 status: UploadResult,
774 ) -> UploadTaskAction {
775 use UploadResult::*;
776
777 let stop_time = zeitstempel::now_awake();
778
779 if let Some(label) = status.get_label() {
780 let metric = self.upload_metrics.ping_upload_failure.get(label);
781 metric.add_sync(glean, 1);
782 }
783
784 let send_ids = {
785 let mut lock = self.in_flight.write().unwrap();
786 lock.remove(document_id)
787 };
788
789 if send_ids.is_none() {
790 self.upload_metrics.missing_send_ids.add_sync(glean, 1);
791 }
792
793 match status {
794 HttpStatus { code } if (200..=299).contains(&code) => {
795 log::info!("Ping {} successfully sent {}.", document_id, code);
796 if let Some((success_id, failure_id)) = send_ids {
797 self.upload_metrics
798 .send_success
799 .set_stop_and_accumulate(glean, success_id, stop_time);
800 self.upload_metrics.send_failure.cancel_sync(failure_id);
801 }
802 self.directory_manager.delete_file(document_id);
803 }
804
805 UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
806 log::warn!(
807 "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
808 document_id,
809 status
810 );
811 if let Some((success_id, failure_id)) = send_ids {
812 self.upload_metrics.send_success.cancel_sync(success_id);
813 self.upload_metrics
814 .send_failure
815 .set_stop_and_accumulate(glean, failure_id, stop_time);
816 }
817 self.directory_manager.delete_file(document_id);
818 }
819
820 RecoverableFailure { .. } | HttpStatus { .. } => {
821 log::warn!(
822 "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
823 document_id,
824 status
825 );
826 if let Some((success_id, failure_id)) = send_ids {
827 self.upload_metrics.send_success.cancel_sync(success_id);
828 self.upload_metrics
829 .send_failure
830 .set_stop_and_accumulate(glean, failure_id, stop_time);
831 }
832 self.enqueue_ping_from_file(glean, document_id);
833 self.recoverable_failure_count
834 .fetch_add(1, Ordering::SeqCst);
835 }
836
837 Done { .. } => {
838 log::debug!("Uploader signaled Done. Exiting.");
839 if let Some((success_id, failure_id)) = send_ids {
840 self.upload_metrics.send_success.cancel_sync(success_id);
841 self.upload_metrics.send_failure.cancel_sync(failure_id);
842 }
843 return UploadTaskAction::End;
844 }
845 };
846
847 UploadTaskAction::Next
848 }
849}
850
851#[cfg(target_os = "android")]
853pub fn chunked_log_info(path: &str, payload: &str) {
854 const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
858
859 if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
863 log::info!("Glean ping to URL: {}\n{}", path, payload);
864 return;
865 }
866
867 let mut start = 0;
870 let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
871 let mut chunk_idx = 1;
872 let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
874
875 while end < payload.len() {
876 for _ in 0..4 {
879 if payload.is_char_boundary(end) {
880 break;
881 }
882 end -= 1;
883 }
884
885 log::info!(
886 "Glean ping to URL: {} [Part {} of {}]\n{}",
887 path,
888 chunk_idx,
889 total_chunks,
890 &payload[start..end]
891 );
892
893 start = end;
895 end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
896 chunk_idx += 1;
897 }
898
899 if start < payload.len() {
901 log::info!(
902 "Glean ping to URL: {} [Part {} of {}]\n{}",
903 path,
904 chunk_idx,
905 total_chunks,
906 &payload[start..]
907 );
908 }
909}
910
911#[cfg(not(target_os = "android"))]
913pub fn chunked_log_info(_path: &str, payload: &str) {
914 log::info!("{}", payload)
915}
916
917#[cfg(test)]
918mod test {
919 use std::thread;
920 use uuid::Uuid;
921
922 use super::*;
923 use crate::metrics::PingType;
924 use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
925
926 const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
927
928 #[test]
929 fn doesnt_error_when_there_are_no_pending_pings() {
930 let (glean, _t) = new_glean(None);
931
932 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
935 }
936
937 #[test]
938 fn returns_ping_request_when_there_is_one() {
939 let (glean, dir) = new_glean(None);
940
941 let upload_manager = PingUploadManager::no_policy(dir.path());
942
943 upload_manager.enqueue_ping(
945 &glean,
946 PingPayload {
947 document_id: Uuid::new_v4().to_string(),
948 upload_path: PATH.into(),
949 json_body: "".into(),
950 headers: None,
951 body_has_info_sections: true,
952 ping_name: "ping-name".into(),
953 uploader_capabilities: vec![],
954 },
955 );
956
957 let task = upload_manager.get_upload_task(&glean, false);
960 assert!(task.is_upload());
961 }
962
963 #[test]
964 fn returns_as_many_ping_requests_as_there_are() {
965 let (glean, dir) = new_glean(None);
966
967 let upload_manager = PingUploadManager::no_policy(dir.path());
968
969 let n = 10;
971 for _ in 0..n {
972 upload_manager.enqueue_ping(
973 &glean,
974 PingPayload {
975 document_id: Uuid::new_v4().to_string(),
976 upload_path: PATH.into(),
977 json_body: "".into(),
978 headers: None,
979 body_has_info_sections: true,
980 ping_name: "ping-name".into(),
981 uploader_capabilities: vec![],
982 },
983 );
984 }
985
986 for _ in 0..n {
988 let task = upload_manager.get_upload_task(&glean, false);
989 assert!(task.is_upload());
990 }
991
992 assert_eq!(
994 upload_manager.get_upload_task(&glean, false),
995 PingUploadTask::done()
996 );
997 }
998
999 #[test]
1000 fn limits_the_number_of_pings_when_there_is_rate_limiting() {
1001 let (glean, dir) = new_glean(None);
1002
1003 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1004
1005 let max_pings_per_interval = 10;
1007 upload_manager.set_rate_limiter(3, 10);
1008
1009 for _ in 0..max_pings_per_interval {
1011 upload_manager.enqueue_ping(
1012 &glean,
1013 PingPayload {
1014 document_id: Uuid::new_v4().to_string(),
1015 upload_path: PATH.into(),
1016 json_body: "".into(),
1017 headers: None,
1018 body_has_info_sections: true,
1019 ping_name: "ping-name".into(),
1020 uploader_capabilities: vec![],
1021 },
1022 );
1023 }
1024
1025 for _ in 0..max_pings_per_interval {
1027 let task = upload_manager.get_upload_task(&glean, false);
1028 assert!(task.is_upload());
1029 }
1030
1031 upload_manager.enqueue_ping(
1033 &glean,
1034 PingPayload {
1035 document_id: Uuid::new_v4().to_string(),
1036 upload_path: PATH.into(),
1037 json_body: "".into(),
1038 headers: None,
1039 body_has_info_sections: true,
1040 ping_name: "ping-name".into(),
1041 uploader_capabilities: vec![],
1042 },
1043 );
1044
1045 match upload_manager.get_upload_task(&glean, false) {
1047 PingUploadTask::Wait { time } => {
1048 thread::sleep(Duration::from_millis(time));
1050 }
1051 _ => panic!("Expected upload manager to return a wait task!"),
1052 };
1053
1054 let task = upload_manager.get_upload_task(&glean, false);
1055 assert!(task.is_upload());
1056 }
1057
1058 #[test]
1059 fn clearing_the_queue_works_correctly() {
1060 let (glean, dir) = new_glean(None);
1061
1062 let upload_manager = PingUploadManager::no_policy(dir.path());
1063
1064 for _ in 0..10 {
1066 upload_manager.enqueue_ping(
1067 &glean,
1068 PingPayload {
1069 document_id: Uuid::new_v4().to_string(),
1070 upload_path: PATH.into(),
1071 json_body: "".into(),
1072 headers: None,
1073 body_has_info_sections: true,
1074 ping_name: "ping-name".into(),
1075 uploader_capabilities: vec![],
1076 },
1077 );
1078 }
1079
1080 drop(upload_manager.clear_ping_queue());
1082
1083 assert_eq!(
1085 upload_manager.get_upload_task(&glean, false),
1086 PingUploadTask::done()
1087 );
1088 }
1089
1090 #[test]
1091 fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1092 let (mut glean, _t) = new_glean(None);
1093
1094 let ping_type = PingType::new(
1096 "test",
1097 true,
1098 true,
1099 true,
1100 true,
1101 true,
1102 vec![],
1103 vec![],
1104 true,
1105 vec![],
1106 );
1107 glean.register_ping_type(&ping_type);
1108
1109 let n = 10;
1111 for _ in 0..n {
1112 ping_type.submit_sync(&glean, None);
1113 }
1114
1115 glean
1116 .internal_pings
1117 .deletion_request
1118 .submit_sync(&glean, None);
1119
1120 drop(glean.upload_manager.clear_ping_queue());
1122
1123 let upload_task = glean.get_upload_task();
1124 match upload_task {
1125 PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1126 _ => panic!("Expected upload manager to return the next request!"),
1127 }
1128
1129 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1131 }
1132
1133 #[test]
1134 fn fills_up_queue_successfully_from_disk() {
1135 let (mut glean, dir) = new_glean(None);
1136
1137 let ping_type = PingType::new(
1139 "test",
1140 true,
1141 true,
1142 true,
1143 true,
1144 true,
1145 vec![],
1146 vec![],
1147 true,
1148 vec![],
1149 );
1150 glean.register_ping_type(&ping_type);
1151
1152 let n = 10;
1154 for _ in 0..n {
1155 ping_type.submit_sync(&glean, None);
1156 }
1157
1158 let upload_manager = PingUploadManager::no_policy(dir.path());
1160
1161 for _ in 0..n {
1163 let task = upload_manager.get_upload_task(&glean, false);
1164 assert!(task.is_upload());
1165 }
1166
1167 assert_eq!(
1169 upload_manager.get_upload_task(&glean, false),
1170 PingUploadTask::done()
1171 );
1172 }
1173
1174 #[test]
1175 fn processes_correctly_success_upload_response() {
1176 let (mut glean, dir) = new_glean(None);
1177
1178 let ping_type = PingType::new(
1180 "test",
1181 true,
1182 true,
1183 true,
1184 true,
1185 true,
1186 vec![],
1187 vec![],
1188 true,
1189 vec![],
1190 );
1191 glean.register_ping_type(&ping_type);
1192
1193 ping_type.submit_sync(&glean, None);
1195
1196 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1198
1199 match glean.get_upload_task() {
1201 PingUploadTask::Upload { request } => {
1202 let document_id = request.document_id;
1204 glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1205 assert!(!pending_pings_dir.join(document_id).exists());
1207 }
1208 _ => panic!("Expected upload manager to return the next request!"),
1209 }
1210
1211 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1213 }
1214
1215 #[test]
1216 fn processes_correctly_client_error_upload_response() {
1217 let (mut glean, dir) = new_glean(None);
1218
1219 let ping_type = PingType::new(
1221 "test",
1222 true,
1223 true,
1224 true,
1225 true,
1226 true,
1227 vec![],
1228 vec![],
1229 true,
1230 vec![],
1231 );
1232 glean.register_ping_type(&ping_type);
1233
1234 ping_type.submit_sync(&glean, None);
1236
1237 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1239
1240 match glean.get_upload_task() {
1242 PingUploadTask::Upload { request } => {
1243 let document_id = request.document_id;
1245 glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1246 assert!(!pending_pings_dir.join(document_id).exists());
1248 }
1249 _ => panic!("Expected upload manager to return the next request!"),
1250 }
1251
1252 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1254 }
1255
1256 #[test]
1257 fn processes_correctly_server_error_upload_response() {
1258 let (mut glean, _t) = new_glean(None);
1259
1260 let ping_type = PingType::new(
1262 "test",
1263 true,
1264 true,
1265 true,
1266 true,
1267 true,
1268 vec![],
1269 vec![],
1270 true,
1271 vec![],
1272 );
1273 glean.register_ping_type(&ping_type);
1274
1275 ping_type.submit_sync(&glean, None);
1277
1278 match glean.get_upload_task() {
1280 PingUploadTask::Upload { request } => {
1281 let document_id = request.document_id;
1283 glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1284 match glean.get_upload_task() {
1286 PingUploadTask::Upload { request } => {
1287 assert_eq!(document_id, request.document_id);
1288 }
1289 _ => panic!("Expected upload manager to return the next request!"),
1290 }
1291 }
1292 _ => panic!("Expected upload manager to return the next request!"),
1293 }
1294
1295 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1297 }
1298
1299 #[test]
1300 fn processes_correctly_unrecoverable_upload_response() {
1301 let (mut glean, dir) = new_glean(None);
1302
1303 let ping_type = PingType::new(
1305 "test",
1306 true,
1307 true,
1308 true,
1309 true,
1310 true,
1311 vec![],
1312 vec![],
1313 true,
1314 vec![],
1315 );
1316 glean.register_ping_type(&ping_type);
1317
1318 ping_type.submit_sync(&glean, None);
1320
1321 let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1323
1324 match glean.get_upload_task() {
1326 PingUploadTask::Upload { request } => {
1327 let document_id = request.document_id;
1329 glean.process_ping_upload_response(
1330 &document_id,
1331 UploadResult::unrecoverable_failure(),
1332 );
1333 assert!(!pending_pings_dir.join(document_id).exists());
1335 }
1336 _ => panic!("Expected upload manager to return the next request!"),
1337 }
1338
1339 assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1341 }
1342
1343 #[test]
1344 fn new_pings_are_added_while_upload_in_progress() {
1345 let (glean, dir) = new_glean(None);
1346
1347 let upload_manager = PingUploadManager::no_policy(dir.path());
1348
1349 let doc1 = Uuid::new_v4().to_string();
1350 let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1351
1352 let doc2 = Uuid::new_v4().to_string();
1353 let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1354
1355 upload_manager.enqueue_ping(
1357 &glean,
1358 PingPayload {
1359 document_id: doc1.clone(),
1360 upload_path: path1,
1361 json_body: "".into(),
1362 headers: None,
1363 body_has_info_sections: true,
1364 ping_name: "test-ping".into(),
1365 uploader_capabilities: vec![],
1366 },
1367 );
1368
1369 let req = match upload_manager.get_upload_task(&glean, false) {
1371 PingUploadTask::Upload { request } => request,
1372 _ => panic!("Expected upload manager to return the next request!"),
1373 };
1374 assert_eq!(doc1, req.document_id);
1375
1376 upload_manager.enqueue_ping(
1378 &glean,
1379 PingPayload {
1380 document_id: doc2.clone(),
1381 upload_path: path2,
1382 json_body: "".into(),
1383 headers: None,
1384 body_has_info_sections: true,
1385 ping_name: "test-ping".into(),
1386 uploader_capabilities: vec![],
1387 },
1388 );
1389
1390 upload_manager.process_ping_upload_response(
1392 &glean,
1393 &req.document_id,
1394 UploadResult::http_status(200),
1395 );
1396
1397 let req = match upload_manager.get_upload_task(&glean, false) {
1399 PingUploadTask::Upload { request } => request,
1400 _ => panic!("Expected upload manager to return the next request!"),
1401 };
1402 assert_eq!(doc2, req.document_id);
1403
1404 upload_manager.process_ping_upload_response(
1406 &glean,
1407 &req.document_id,
1408 UploadResult::http_status(200),
1409 );
1410
1411 assert_eq!(
1413 upload_manager.get_upload_task(&glean, false),
1414 PingUploadTask::done()
1415 );
1416 }
1417
1418 #[test]
1419 fn adds_debug_view_header_to_requests_when_tag_is_set() {
1420 let (mut glean, _t) = new_glean(None);
1421
1422 glean.set_debug_view_tag("valid-tag");
1423
1424 let ping_type = PingType::new(
1426 "test",
1427 true,
1428 true,
1429 true,
1430 true,
1431 true,
1432 vec![],
1433 vec![],
1434 true,
1435 vec![],
1436 );
1437 glean.register_ping_type(&ping_type);
1438
1439 ping_type.submit_sync(&glean, None);
1441
1442 match glean.get_upload_task() {
1444 PingUploadTask::Upload { request } => {
1445 assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1446 }
1447 _ => panic!("Expected upload manager to return the next request!"),
1448 }
1449 }
1450
1451 #[test]
1452 fn duplicates_are_not_enqueued() {
1453 let (glean, dir) = new_glean(None);
1454
1455 let upload_manager = PingUploadManager::no_policy(dir.path());
1458
1459 let doc_id = Uuid::new_v4().to_string();
1460 let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1461
1462 upload_manager.enqueue_ping(
1464 &glean,
1465 PingPayload {
1466 document_id: doc_id.clone(),
1467 upload_path: path.clone(),
1468 json_body: "".into(),
1469 headers: None,
1470 body_has_info_sections: true,
1471 ping_name: "test-ping".into(),
1472 uploader_capabilities: vec![],
1473 },
1474 );
1475 upload_manager.enqueue_ping(
1476 &glean,
1477 PingPayload {
1478 document_id: doc_id,
1479 upload_path: path,
1480 json_body: "".into(),
1481 headers: None,
1482 body_has_info_sections: true,
1483 ping_name: "test-ping".into(),
1484 uploader_capabilities: vec![],
1485 },
1486 );
1487
1488 let task = upload_manager.get_upload_task(&glean, false);
1490 assert!(task.is_upload());
1491
1492 assert_eq!(
1494 upload_manager.get_upload_task(&glean, false),
1495 PingUploadTask::done()
1496 );
1497 }
1498
1499 #[test]
1500 fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1501 let (mut glean, dir) = new_glean(None);
1502
1503 let ping_type = PingType::new(
1505 "test",
1506 true,
1507 true,
1508 true,
1509 true,
1510 true,
1511 vec![],
1512 vec![],
1513 true,
1514 vec![],
1515 );
1516 glean.register_ping_type(&ping_type);
1517
1518 let n = 5;
1520 for _ in 0..n {
1521 ping_type.submit_sync(&glean, None);
1522 }
1523
1524 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1525
1526 let max_recoverable_failures = 3;
1528 upload_manager
1529 .policy
1530 .set_max_recoverable_failures(Some(max_recoverable_failures));
1531
1532 for _ in 0..max_recoverable_failures {
1534 match upload_manager.get_upload_task(&glean, false) {
1535 PingUploadTask::Upload { request } => {
1536 upload_manager.process_ping_upload_response(
1537 &glean,
1538 &request.document_id,
1539 UploadResult::recoverable_failure(),
1540 );
1541 }
1542 _ => panic!("Expected upload manager to return the next request!"),
1543 }
1544 }
1545
1546 assert_eq!(
1549 upload_manager.get_upload_task(&glean, false),
1550 PingUploadTask::done()
1551 );
1552
1553 for _ in 0..n {
1555 let task = upload_manager.get_upload_task(&glean, false);
1556 assert!(task.is_upload());
1557 }
1558 }
1559
1560 #[test]
1561 fn quota_is_enforced_when_enqueueing_cached_pings() {
1562 let (mut glean, dir) = new_glean(None);
1563
1564 let ping_type = PingType::new(
1566 "test",
1567 true,
1568 true,
1569 true,
1570 true,
1571 true,
1572 vec![],
1573 vec![],
1574 true,
1575 vec![],
1576 );
1577 glean.register_ping_type(&ping_type);
1578
1579 let n = 10;
1581 for _ in 0..n {
1582 ping_type.submit_sync(&glean, None);
1583 }
1584
1585 let directory_manager = PingDirectoryManager::new(dir.path());
1586 let pending_pings = directory_manager.process_dirs().pending_pings;
1587 let (_, newest_ping) = &pending_pings.last().unwrap();
1590 let PingPayload {
1591 document_id: newest_ping_id,
1592 ..
1593 } = &newest_ping;
1594
1595 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1597
1598 upload_manager
1605 .policy
1606 .set_max_pending_pings_directory_size(Some(500));
1607
1608 match upload_manager.get_upload_task(&glean, false) {
1612 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1613 _ => panic!("Expected upload manager to return the next request!"),
1614 }
1615
1616 assert_eq!(
1619 upload_manager.get_upload_task(&glean, false),
1620 PingUploadTask::done()
1621 );
1622
1623 assert_eq!(
1625 n - 1,
1626 upload_manager
1627 .upload_metrics
1628 .deleted_pings_after_quota_hit
1629 .get_value(&glean, Some("metrics"))
1630 .unwrap()
1631 );
1632 assert_eq!(
1633 n,
1634 upload_manager
1635 .upload_metrics
1636 .pending_pings
1637 .get_value(&glean, Some("metrics"))
1638 .unwrap()
1639 );
1640 }
1641
1642 #[test]
1643 fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1644 let (mut glean, dir) = new_glean(None);
1645
1646 let ping_type = PingType::new(
1648 "test",
1649 true,
1650 true,
1651 true,
1652 true,
1653 true,
1654 vec![],
1655 vec![],
1656 true,
1657 vec![],
1658 );
1659 glean.register_ping_type(&ping_type);
1660
1661 let count_quota = 3;
1663 let n = 10;
1665
1666 for _ in 0..n {
1668 ping_type.submit_sync(&glean, None);
1669 }
1670
1671 let directory_manager = PingDirectoryManager::new(dir.path());
1672 let pending_pings = directory_manager.process_dirs().pending_pings;
1673 let expected_pings = pending_pings
1676 .iter()
1677 .rev()
1678 .take(count_quota)
1679 .map(|(_, ping)| ping.document_id.clone())
1680 .collect::<Vec<_>>();
1681
1682 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1684
1685 upload_manager
1686 .policy
1687 .set_max_pending_pings_count(Some(count_quota as u64));
1688
1689 for ping_id in expected_pings.iter().rev() {
1693 match upload_manager.get_upload_task(&glean, false) {
1694 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1695 _ => panic!("Expected upload manager to return the next request!"),
1696 }
1697 }
1698
1699 assert_eq!(
1702 upload_manager.get_upload_task(&glean, false),
1703 PingUploadTask::done()
1704 );
1705
1706 assert_eq!(
1708 (n - count_quota) as i32,
1709 upload_manager
1710 .upload_metrics
1711 .deleted_pings_after_quota_hit
1712 .get_value(&glean, Some("metrics"))
1713 .unwrap()
1714 );
1715 assert_eq!(
1716 n as i32,
1717 upload_manager
1718 .upload_metrics
1719 .pending_pings
1720 .get_value(&glean, Some("metrics"))
1721 .unwrap()
1722 );
1723 }
1724
1725 #[test]
1726 fn size_and_count_quota_work_together_size_first() {
1727 let (mut glean, dir) = new_glean(None);
1728
1729 let ping_type = PingType::new(
1731 "test",
1732 true,
1733 true,
1734 true,
1735 true,
1736 true,
1737 vec![],
1738 vec![],
1739 true,
1740 vec![],
1741 );
1742 glean.register_ping_type(&ping_type);
1743
1744 let expected_number_of_pings = 3;
1745 let n = 10;
1747
1748 for _ in 0..n {
1750 ping_type.submit_sync(&glean, None);
1751 }
1752
1753 let directory_manager = PingDirectoryManager::new(dir.path());
1754 let pending_pings = directory_manager.process_dirs().pending_pings;
1755 let expected_pings = pending_pings
1758 .iter()
1759 .rev()
1760 .take(expected_number_of_pings)
1761 .map(|(_, ping)| ping.document_id.clone())
1762 .collect::<Vec<_>>();
1763
1764 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1766
1767 upload_manager
1770 .policy
1771 .set_max_pending_pings_directory_size(Some(1300));
1772 upload_manager.policy.set_max_pending_pings_count(Some(5));
1773
1774 for ping_id in expected_pings.iter().rev() {
1778 match upload_manager.get_upload_task(&glean, false) {
1779 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1780 _ => panic!("Expected upload manager to return the next request!"),
1781 }
1782 }
1783
1784 assert_eq!(
1787 upload_manager.get_upload_task(&glean, false),
1788 PingUploadTask::done()
1789 );
1790
1791 assert_eq!(
1793 (n - expected_number_of_pings) as i32,
1794 upload_manager
1795 .upload_metrics
1796 .deleted_pings_after_quota_hit
1797 .get_value(&glean, Some("metrics"))
1798 .unwrap()
1799 );
1800 assert_eq!(
1801 n as i32,
1802 upload_manager
1803 .upload_metrics
1804 .pending_pings
1805 .get_value(&glean, Some("metrics"))
1806 .unwrap()
1807 );
1808 assert_eq!(
1810 (n - expected_number_of_pings) as i32,
1811 upload_manager
1812 .upload_metrics
1813 .pending_pings_deleted
1814 .get("size_quota")
1815 .get_value(&glean, Some("health"))
1816 .unwrap()
1817 );
1818 assert!(upload_manager
1819 .upload_metrics
1820 .pending_pings_deleted
1821 .get("count_quota")
1822 .get_value(&glean, Some("health"))
1823 .is_none());
1824 }
1825
1826 #[test]
1827 fn size_and_count_quota_work_together_count_first() {
1828 let (mut glean, dir) = new_glean(None);
1829
1830 let ping_type = PingType::new(
1832 "test",
1833 true,
1834 true,
1835 true,
1836 true,
1837 true,
1838 vec![],
1839 vec![],
1840 true,
1841 vec![],
1842 );
1843 glean.register_ping_type(&ping_type);
1844
1845 let expected_number_of_pings = 2;
1846 let n = 10;
1848
1849 for _ in 0..n {
1851 ping_type.submit_sync(&glean, None);
1852 }
1853
1854 let directory_manager = PingDirectoryManager::new(dir.path());
1855 let pending_pings = directory_manager.process_dirs().pending_pings;
1856 let expected_pings = pending_pings
1859 .iter()
1860 .rev()
1861 .take(expected_number_of_pings)
1862 .map(|(_, ping)| ping.document_id.clone())
1863 .collect::<Vec<_>>();
1864
1865 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1867
1868 upload_manager
1870 .policy
1871 .set_max_pending_pings_directory_size(Some(100_000));
1872 upload_manager.policy.set_max_pending_pings_count(Some(2));
1873
1874 for ping_id in expected_pings.iter().rev() {
1878 match upload_manager.get_upload_task(&glean, false) {
1879 PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1880 _ => panic!("Expected upload manager to return the next request!"),
1881 }
1882 }
1883
1884 assert_eq!(
1887 upload_manager.get_upload_task(&glean, false),
1888 PingUploadTask::done()
1889 );
1890
1891 assert_eq!(
1893 (n - expected_number_of_pings) as i32,
1894 upload_manager
1895 .upload_metrics
1896 .deleted_pings_after_quota_hit
1897 .get_value(&glean, Some("metrics"))
1898 .unwrap()
1899 );
1900 assert_eq!(
1901 n as i32,
1902 upload_manager
1903 .upload_metrics
1904 .pending_pings
1905 .get_value(&glean, Some("metrics"))
1906 .unwrap()
1907 );
1908 assert_eq!(
1910 (n - expected_number_of_pings) as i32,
1911 upload_manager
1912 .upload_metrics
1913 .pending_pings_deleted
1914 .get("count_quota")
1915 .get_value(&glean, Some("health"))
1916 .unwrap()
1917 );
1918 assert!(upload_manager
1919 .upload_metrics
1920 .pending_pings_deleted
1921 .get("size_quota")
1922 .get_value(&glean, Some("health"))
1923 .is_none());
1924 }
1925
1926 #[test]
1927 fn pending_pings_deleted_is_not_recorded_when_quota_not_hit() {
1928 let (mut glean, dir) = new_glean(None);
1929
1930 let ping_type = PingType::new(
1931 "test",
1932 true,
1933 true,
1934 true,
1935 true,
1936 true,
1937 vec![],
1938 vec![],
1939 true,
1940 vec![],
1941 );
1942 glean.register_ping_type(&ping_type);
1943
1944 for _ in 0..3 {
1946 ping_type.submit_sync(&glean, None);
1947 }
1948
1949 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1950 upload_manager.policy.set_max_pending_pings_count(Some(10));
1951 upload_manager
1952 .policy
1953 .set_max_pending_pings_directory_size(Some(1024 * 1024));
1954
1955 upload_manager.get_upload_task(&glean, false);
1956
1957 assert!(upload_manager
1958 .upload_metrics
1959 .pending_pings_deleted
1960 .get("count_quota")
1961 .get_value(&glean, Some("health"))
1962 .is_none());
1963 assert!(upload_manager
1964 .upload_metrics
1965 .pending_pings_deleted
1966 .get("size_quota")
1967 .get_value(&glean, Some("health"))
1968 .is_none());
1969 }
1970
1971 #[test]
1972 fn pending_pings_config_overrides_are_applied() {
1973 let (_, dir) = new_glean(None);
1974
1975 let mut upload_manager = PingUploadManager::new(dir.path(), "test");
1976
1977 let custom_count: u64 = 42;
1978 let custom_size: u64 = 999_999;
1979 upload_manager.set_max_pending_pings_count(custom_count);
1980 upload_manager.set_max_pending_pings_directory_size(custom_size);
1981
1982 assert_eq!(
1983 custom_count,
1984 upload_manager.policy.max_pending_pings_count()
1985 );
1986 assert_eq!(
1987 custom_size,
1988 upload_manager.policy.max_pending_pings_directory_size()
1989 );
1990 }
1991
1992 #[test]
1993 fn maximum_wait_attemps_is_enforced() {
1994 let (glean, dir) = new_glean(None);
1995
1996 let mut upload_manager = PingUploadManager::no_policy(dir.path());
1997
1998 let max_wait_attempts = 3;
2000 upload_manager
2001 .policy
2002 .set_max_wait_attempts(Some(max_wait_attempts));
2003
2004 let secs_per_interval = 5;
2010 let max_pings_per_interval = 1;
2011 upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
2012
2013 upload_manager.enqueue_ping(
2015 &glean,
2016 PingPayload {
2017 document_id: Uuid::new_v4().to_string(),
2018 upload_path: PATH.into(),
2019 json_body: "".into(),
2020 headers: None,
2021 body_has_info_sections: true,
2022 ping_name: "ping-name".into(),
2023 uploader_capabilities: vec![],
2024 },
2025 );
2026 upload_manager.enqueue_ping(
2027 &glean,
2028 PingPayload {
2029 document_id: Uuid::new_v4().to_string(),
2030 upload_path: PATH.into(),
2031 json_body: "".into(),
2032 headers: None,
2033 body_has_info_sections: true,
2034 ping_name: "ping-name".into(),
2035 uploader_capabilities: vec![],
2036 },
2037 );
2038
2039 match upload_manager.get_upload_task(&glean, false) {
2041 PingUploadTask::Upload { .. } => {}
2042 _ => panic!("Expected upload manager to return the next request!"),
2043 }
2044
2045 for _ in 0..max_wait_attempts {
2049 let task = upload_manager.get_upload_task(&glean, false);
2050 assert!(task.is_wait());
2051 }
2052
2053 assert_eq!(
2056 upload_manager.get_upload_task(&glean, false),
2057 PingUploadTask::done()
2058 );
2059
2060 thread::sleep(Duration::from_secs(secs_per_interval));
2062
2063 let task = upload_manager.get_upload_task(&glean, false);
2065 assert!(task.is_upload());
2066
2067 assert_eq!(
2069 upload_manager.get_upload_task(&glean, false),
2070 PingUploadTask::done()
2071 );
2072 }
2073
2074 #[test]
2075 fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
2076 let (glean, dir) = new_glean(None);
2077 let upload_manager = PingUploadManager::new(dir.path(), "test");
2078 match upload_manager.get_upload_task(&glean, false) {
2079 PingUploadTask::Wait { time } => {
2080 assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
2081 }
2082 _ => panic!("Expected upload manager to return a wait task!"),
2083 };
2084 }
2085
2086 #[test]
2087 fn cannot_enqueue_ping_while_its_being_processed() {
2088 let (glean, dir) = new_glean(None);
2089
2090 let upload_manager = PingUploadManager::no_policy(dir.path());
2091
2092 let identifier = &Uuid::new_v4();
2094 let ping = PingPayload {
2095 document_id: identifier.to_string(),
2096 upload_path: PATH.into(),
2097 json_body: "".into(),
2098 headers: None,
2099 body_has_info_sections: true,
2100 ping_name: "ping-name".into(),
2101 uploader_capabilities: vec![],
2102 };
2103 upload_manager.enqueue_ping(&glean, ping);
2104 assert!(upload_manager.get_upload_task(&glean, false).is_upload());
2105
2106 let ping = PingPayload {
2108 document_id: identifier.to_string(),
2109 upload_path: PATH.into(),
2110 json_body: "".into(),
2111 headers: None,
2112 body_has_info_sections: true,
2113 ping_name: "ping-name".into(),
2114 uploader_capabilities: vec![],
2115 };
2116 upload_manager.enqueue_ping(&glean, ping);
2117
2118 assert_eq!(
2120 upload_manager.get_upload_task(&glean, false),
2121 PingUploadTask::done()
2122 );
2123
2124 upload_manager.process_ping_upload_response(
2126 &glean,
2127 &identifier.to_string(),
2128 UploadResult::http_status(200),
2129 );
2130 }
2131}