Skip to main content

glean_core/upload/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Manages the pending pings queue and directory.
6//!
7//! * Keeps track of pending pings, loading any unsent ping from disk on startup;
8//! * Exposes [`get_upload_task`](PingUploadManager::get_upload_task) API for
9//!   the platform layer to request next upload task;
10//! * Exposes
11//!   [`process_ping_upload_response`](PingUploadManager::process_ping_upload_response)
12//!   API to check the HTTP response from the ping upload and either delete the
13//!   corresponding ping from disk or re-enqueue it for sending.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; // in milliseconds
45
46#[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48    /// The instant the current interval has started.
49    started: Option<Instant>,
50    /// The count for the current interval.
51    count: u32,
52    /// The duration of each interval.
53    interval: Duration,
54    /// The maximum count per interval.
55    max_count: u32,
56}
57
58/// An enum to represent the current state of the RateLimiter.
59#[derive(PartialEq)]
60enum RateLimiterState {
61    /// The RateLimiter has not reached the maximum count and is still incrementing.
62    Incrementing,
63    /// The RateLimiter has reached the maximum count for the  current interval.
64    ///
65    /// This variant contains the remaining time (in milliseconds)
66    /// until the rate limiter is not throttled anymore.
67    Throttled(u64),
68}
69
70impl RateLimiter {
71    pub fn new(interval: Duration, max_count: u32) -> Self {
72        Self {
73            started: None,
74            count: 0,
75            interval,
76            max_count,
77        }
78    }
79
80    fn reset(&mut self) {
81        self.started = Some(Instant::now());
82        self.count = 0;
83    }
84
85    fn elapsed(&self) -> Duration {
86        self.started.unwrap().elapsed()
87    }
88
89    // The counter should reset if
90    //
91    // 1. It has never started;
92    // 2. It has been started more than the interval time ago;
93    // 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
94    fn should_reset(&self) -> bool {
95        if self.started.is_none() {
96            return true;
97        }
98
99        // Safe unwrap, we already stated that `self.started` is not `None` above.
100        if self.elapsed() > self.interval {
101            return true;
102        }
103
104        false
105    }
106
107    /// Tries to increment the internal counter.
108    ///
109    /// # Returns
110    ///
111    /// The current state of the RateLimiter.
112    pub fn get_state(&mut self) -> RateLimiterState {
113        if self.should_reset() {
114            self.reset();
115        }
116
117        if self.count == self.max_count {
118            // Note that `remining` can't be a negative number because we just called `reset`,
119            // which will check if it is and reset if so.
120            let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121            return RateLimiterState::Throttled(
122                remaining
123                    .try_into()
124                    .unwrap_or(self.interval.as_secs() * 1000),
125            );
126        }
127
128        self.count += 1;
129        RateLimiterState::Incrementing
130    }
131}
132
133/// An enum representing the possible upload tasks to be performed by an uploader.
134///
135/// When asking for the next ping request to upload,
136/// the requester may receive one out of three possible tasks.
137#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139    /// An upload task
140    Upload {
141        /// The ping request for upload
142        /// See [`PingRequest`](struct.PingRequest.html) for more information.
143        request: PingRequest,
144    },
145
146    /// A flag signaling that the pending pings directories are not done being processed,
147    /// thus the requester should wait and come back later.
148    Wait {
149        /// The time in milliseconds
150        /// the requester should wait before requesting a new task.
151        time: u64,
152    },
153
154    /// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
155    ///
156    /// There are three possibilities for this scenario:
157    /// * Pending pings queue is empty, no more pings to request;
158    /// * Requester has gotten more than MAX_WAIT_ATTEMPTS (3, by default) `PingUploadTask::Wait` responses in a row;
159    /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
160    ///   recoverable upload failures on the same uploading window (see below)
161    ///   and should stop requesting at this moment.
162    ///
163    /// An "uploading window" starts when a requester gets a new
164    /// `PingUploadTask::Upload(PingRequest)` response and finishes when they
165    /// finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
166    Done {
167        #[doc(hidden)]
168        /// Unused field. Required because UniFFI can't handle variants without fields.
169        unused: i8,
170    },
171}
172
173impl PingUploadTask {
174    /// Whether the current task is an upload task.
175    pub fn is_upload(&self) -> bool {
176        matches!(self, PingUploadTask::Upload { .. })
177    }
178
179    /// Whether the current task is wait task.
180    pub fn is_wait(&self) -> bool {
181        matches!(self, PingUploadTask::Wait { .. })
182    }
183
184    pub(crate) fn done() -> Self {
185        PingUploadTask::Done { unused: 0 }
186    }
187}
188
189/// Manages the pending pings queue and directory.
190#[derive(Debug)]
191pub struct PingUploadManager {
192    /// A FIFO queue storing a `PingRequest` for each pending ping.
193    queue: RwLock<VecDeque<PingRequest>>,
194    /// A manager for the pending pings directories.
195    directory_manager: PingDirectoryManager,
196    /// A flag signaling if we are done processing the pending pings directories.
197    processed_pending_pings: Arc<AtomicBool>,
198    /// A vector to store the pending pings processed off-thread.
199    cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200    /// The number of upload failures for the current uploading window.
201    recoverable_failure_count: AtomicU32,
202    /// The number or times in a row a user has received a `PingUploadTask::Wait` response.
203    wait_attempt_count: AtomicU32,
204    /// A ping counter to help rate limit the ping uploads.
205    ///
206    /// To keep resource usage in check,
207    /// we may want to limit the amount of pings sent in a given interval.
208    rate_limiter: Option<RwLock<RateLimiter>>,
209    /// The name of the programming language used by the binding creating this instance of PingUploadManager.
210    ///
211    /// This will be used to build the value User-Agent header for each ping request.
212    language_binding_name: String,
213    /// Metrics related to ping uploading.
214    upload_metrics: UploadMetrics,
215    /// Policies for ping storage, uploading and requests.
216    policy: Policy,
217
218    in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223        let shallow_size = {
224            let queue = self.queue.read().unwrap();
225            if ops.has_malloc_enclosing_size_of() {
226                if let Some(front) = queue.front() {
227                    // SAFETY: The front element is a valid interior pointer and thus valid to pass
228                    // to an external function.
229                    unsafe { ops.malloc_enclosing_size_of(front) }
230                } else {
231                    // This assumes that no memory is allocated when the VecDeque is empty.
232                    0
233                }
234            } else {
235                // If `ops` can't estimate the size of a pointer,
236                // we can estimate the allocation size by the size of each element and the
237                // allocated capacity.
238                queue.capacity() * mem::size_of::<PingRequest>()
239            }
240        };
241
242        let mut n = shallow_size
243            + self.directory_manager.size_of(ops)
244            + mem::size_of::<AtomicBool>() // Allocated inside the `self.processed_pending_pings` `Arc`.
245            + self.cached_pings.read().unwrap().size_of(ops)
246            + self.rate_limiter.as_ref().map(|rl| {
247                let lock = rl.read().unwrap();
248                (*lock).size_of(ops)
249            }).unwrap_or(0)
250            + self.language_binding_name.size_of(ops)
251            + self.upload_metrics.size_of(ops)
252            + self.policy.size_of(ops);
253
254        let in_flight = self.in_flight.read().unwrap();
255        n += in_flight.size_of(ops);
256
257        n
258    }
259}
260
261impl PingUploadManager {
262    /// Creates a new PingUploadManager.
263    ///
264    /// # Arguments
265    ///
266    /// * `data_path` - Path to the pending pings directory.
267    /// * `language_binding_name` - The name of the language binding calling this managers instance.
268    ///
269    /// # Panics
270    ///
271    /// Will panic if unable to spawn a new thread.
272    pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
273        Self {
274            queue: RwLock::new(VecDeque::new()),
275            directory_manager: PingDirectoryManager::new(data_path),
276            processed_pending_pings: Arc::new(AtomicBool::new(false)),
277            cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
278            recoverable_failure_count: AtomicU32::new(0),
279            wait_attempt_count: AtomicU32::new(0),
280            rate_limiter: None,
281            language_binding_name: language_binding_name.into(),
282            upload_metrics: UploadMetrics::new(),
283            policy: Policy::default(),
284            in_flight: RwLock::new(HashMap::default()),
285        }
286    }
287
288    /// Spawns a new thread and processes the pending pings directories,
289    /// filling up the queue with whatever pings are in there.
290    ///
291    /// # Returns
292    ///
293    /// The `JoinHandle` to the spawned thread
294    pub fn scan_pending_pings_directories(
295        &self,
296        trigger_upload: bool,
297    ) -> std::thread::JoinHandle<()> {
298        let local_manager = self.directory_manager.clone();
299        let local_cached_pings = self.cached_pings.clone();
300        let local_flag = self.processed_pending_pings.clone();
301        crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
302            {
303                // Be sure to drop local_cached_pings lock before triggering upload.
304                let mut local_cached_pings = local_cached_pings
305                    .write()
306                    .expect("Can't write to pending pings cache.");
307                local_cached_pings.extend(local_manager.process_dirs());
308                local_flag.store(true, Ordering::SeqCst);
309            }
310            if trigger_upload {
311                crate::dispatcher::launch(|| {
312                    if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
313                        if let Err(e) = state.callbacks.trigger_upload() {
314                            log::error!(
315                                "Triggering upload after pending ping scan failed. Error: {}",
316                                e
317                            );
318                        }
319                    }
320                });
321            }
322        })
323        .expect("Unable to spawn thread to process pings directories.")
324    }
325
326    /// Creates a new upload manager with no limitations, for tests.
327    #[cfg(test)]
328    pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
329        let mut upload_manager = Self::new(data_path, "Test");
330
331        // Disable all policies for tests, if necessary individuals tests can re-enable them.
332        upload_manager.policy.set_max_recoverable_failures(None);
333        upload_manager.policy.set_max_wait_attempts(None);
334        upload_manager.policy.set_max_ping_body_size(None);
335        upload_manager
336            .policy
337            .set_max_pending_pings_directory_size(None);
338        upload_manager.policy.set_max_pending_pings_count(None);
339
340        // When building for tests, always scan the pending pings directories and do it sync.
341        upload_manager
342            .scan_pending_pings_directories(false)
343            .join()
344            .unwrap();
345
346        upload_manager
347    }
348
349    fn processed_pending_pings(&self) -> bool {
350        self.processed_pending_pings.load(Ordering::SeqCst)
351    }
352
353    fn recoverable_failure_count(&self) -> u32 {
354        self.recoverable_failure_count.load(Ordering::SeqCst)
355    }
356
357    fn wait_attempt_count(&self) -> u32 {
358        self.wait_attempt_count.load(Ordering::SeqCst)
359    }
360
361    /// Attempts to build a ping request from a ping file payload.
362    ///
363    /// Returns the `PingRequest` or `None` if unable to build,
364    /// in which case it will delete the ping file and record an error.
365    fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
366        let PingPayload {
367            document_id,
368            upload_path: path,
369            json_body: body,
370            headers,
371            body_has_info_sections,
372            ping_name,
373            uploader_capabilities,
374        } = ping;
375        let mut request = PingRequest::builder(
376            &self.language_binding_name,
377            self.policy.max_ping_body_size(),
378        )
379        .document_id(&document_id)
380        .path(path)
381        .body(body)
382        .body_has_info_sections(body_has_info_sections)
383        .ping_name(ping_name)
384        .uploader_capabilities(uploader_capabilities);
385
386        if let Some(headers) = headers {
387            request = request.headers(headers);
388        }
389
390        match request.build() {
391            Ok(request) => Some(request),
392            Err(e) => {
393                log::warn!("Error trying to build ping request: {}", e);
394                self.directory_manager.delete_file(&document_id);
395
396                // Record the error.
397                // Currently the only possible error is PingBodyOverflow.
398                if let ErrorKind::PingBodyOverflow(s) = e.kind() {
399                    self.upload_metrics
400                        .discarded_exceeding_pings_size
401                        .accumulate_sync(glean, *s as i64 / 1024);
402                }
403
404                None
405            }
406        }
407    }
408
409    /// Enqueue a ping for upload.
410    pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
411        let mut queue = self
412            .queue
413            .write()
414            .expect("Can't write to pending pings queue.");
415
416        let PingPayload {
417            ref document_id,
418            upload_path: ref path,
419            ..
420        } = ping;
421        // Checks if a ping with this `document_id` is already enqueued.
422        if queue
423            .iter()
424            .any(|request| request.document_id.as_str() == document_id)
425        {
426            log::warn!(
427                "Attempted to enqueue a duplicate ping {} at {}.",
428                document_id,
429                path
430            );
431            return;
432        }
433
434        {
435            let in_flight = self.in_flight.read().unwrap();
436            if in_flight.contains_key(document_id) {
437                log::warn!(
438                    "Attempted to enqueue an in-flight ping {} at {}.",
439                    document_id,
440                    path
441                );
442                self.upload_metrics
443                    .in_flight_pings_dropped
444                    .add_sync(glean, 0);
445                return;
446            }
447        }
448
449        log::trace!("Enqueuing ping {} at {}", document_id, path);
450        if let Some(request) = self.build_ping_request(glean, ping) {
451            queue.push_back(request)
452        }
453    }
454
455    /// Enqueues pings that might have been cached.
456    ///
457    /// The size of the PENDING_PINGS_DIRECTORY directory will be calculated
458    /// (by accumulating each ping's size in that directory)
459    /// and in case we exceed the quota, defined by the `quota` arg,
460    /// outstanding pings get deleted and are not enqueued.
461    ///
462    /// The size of the DELETION_REQUEST_PINGS_DIRECTORY will not be calculated
463    /// and no deletion-request pings will be deleted. Deletion request pings
464    /// are not very common and usually don't contain any data,
465    /// we don't expect that directory to ever reach quota.
466    /// Most importantly, we don't want to ever delete deletion-request pings.
467    ///
468    /// # Arguments
469    ///
470    /// * `glean` - The Glean object holding the database.
471    fn enqueue_cached_pings(&self, glean: &Glean) {
472        let mut cached_pings = self
473            .cached_pings
474            .write()
475            .expect("Can't write to pending pings cache.");
476
477        if cached_pings.len() > 0 {
478            let mut pending_pings_directory_size: u64 = 0;
479            let mut pending_pings_count = 0;
480            let mut deleting = false;
481            let mut delete_reason: Option<&'static str> = None;
482
483            let total = cached_pings.pending_pings.len() as u64;
484            self.upload_metrics
485                .pending_pings
486                .add_sync(glean, total.try_into().unwrap_or(0));
487
488            if total > self.policy.max_pending_pings_count() {
489                log::warn!(
490                    "More than {} pending pings in the directory, will delete {} old pings.",
491                    self.policy.max_pending_pings_count(),
492                    total - self.policy.max_pending_pings_count()
493                );
494            }
495
496            // The pending pings vector is sorted by date in ascending order (oldest -> newest).
497            // We need to calculate the size of the pending pings directory
498            // and delete the **oldest** pings in case quota is reached.
499            // Thus, we reverse the order of the pending pings vector,
500            // so that we iterate in descending order (newest -> oldest).
501            cached_pings.pending_pings.reverse();
502            cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
503                pending_pings_count += 1;
504                pending_pings_directory_size += file_size;
505
506                // We don't want to spam the log for every ping over the quota.
507                // Size is checked first; if both limits are exceeded simultaneously,
508                // size_quota takes precedence as the recorded reason.
509                if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
510                    log::warn!(
511                        "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
512                        self.policy.max_pending_pings_directory_size()
513                    );
514                    deleting = true;
515                    delete_reason = Some("size_quota");
516                }
517
518                // Once we reach the number of allowed pings we start deleting,
519                // no matter what size.
520                // We already log this before the loop.
521                if !deleting && pending_pings_count > self.policy.max_pending_pings_count() {
522                    deleting = true;
523                    delete_reason = Some("count_quota");
524                }
525
526                if deleting && self.directory_manager.delete_file(document_id) {
527                    self.upload_metrics
528                        .deleted_pings_after_quota_hit
529                        .add_sync(glean, 1);
530                    if let Some(reason) = delete_reason {
531                        self.upload_metrics
532                            .pending_pings_deleted
533                            .get(reason)
534                            .add_sync(glean, 1);
535                    }
536                    return false;
537                }
538
539                true
540            });
541            // After calculating the size of the pending pings directory,
542            // we record the calculated number and reverse the pings array back for enqueueing.
543            cached_pings.pending_pings.reverse();
544            self.upload_metrics
545                .pending_pings_directory_size
546                .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
547
548            // Enqueue the remaining pending pings and
549            // enqueue all deletion-request pings.
550            cached_pings
551                .deletion_request_pings
552                .drain(..)
553                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
554            cached_pings
555                .pending_pings
556                .drain(..)
557                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
558        }
559    }
560
561    /// Adds rate limiting capability to this upload manager.
562    ///
563    /// The rate limiter will limit the amount of calls to `get_upload_task` per interval.
564    ///
565    /// Setting this will restart count and timer in case there was a previous rate limiter set
566    /// (e.g. if we have reached the current limit and call this function, we start counting again
567    /// and the caller is allowed to asks for tasks).
568    ///
569    /// # Arguments
570    ///
571    /// * `interval` - the amount of seconds in each rate limiting window.
572    /// * `max_tasks` - the maximum amount of task requests allowed per interval.
573    pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
574        self.rate_limiter = Some(RwLock::new(RateLimiter::new(
575            Duration::from_secs(interval),
576            max_tasks,
577        )));
578    }
579
580    pub(crate) fn set_max_pending_pings_count(&mut self, n: u64) {
581        self.policy.set_max_pending_pings_count(Some(n));
582    }
583
584    pub(crate) fn set_max_pending_pings_directory_size(&mut self, n: u64) {
585        self.policy.set_max_pending_pings_directory_size(Some(n));
586    }
587
588    /// Reads a ping file, creates a `PingRequest` and adds it to the queue.
589    ///
590    /// Duplicate requests won't be added.
591    ///
592    /// # Arguments
593    ///
594    /// * `glean` - The Glean object holding the database.
595    /// * `document_id` - The UUID of the ping in question.
596    pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
597        if let Some(ping) = self.directory_manager.process_file(document_id) {
598            self.enqueue_ping(glean, ping);
599        }
600    }
601
602    /// Clears the pending pings queue, leaves the deletion-request pings.
603    pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
604        log::trace!("Clearing ping queue");
605        let mut queue = self
606            .queue
607            .write()
608            .expect("Can't write to pending pings queue.");
609
610        queue.retain(|ping| ping.is_deletion_request());
611        log::trace!(
612            "{} pings left in the queue (only deletion-request expected)",
613            queue.len()
614        );
615        queue
616    }
617
618    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
619        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
620        //
621        // We want to limit the amount of PingUploadTask::Wait returned in a row,
622        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
623        let wait_or_done = |time: u64| {
624            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
625            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
626                PingUploadTask::done()
627            } else {
628                PingUploadTask::Wait { time }
629            }
630        };
631
632        if !self.processed_pending_pings() {
633            log::info!(
634                "Tried getting an upload task, but processing is ongoing. Will come back later."
635            );
636            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
637        }
638
639        // This is a no-op in case there are no cached pings.
640        self.enqueue_cached_pings(glean);
641
642        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
643            log::warn!(
644                "Reached maximum recoverable failures for the current uploading window. You are done."
645            );
646            return PingUploadTask::done();
647        }
648
649        let mut queue = self
650            .queue
651            .write()
652            .expect("Can't write to pending pings queue.");
653        match queue.front() {
654            Some(request) => {
655                if let Some(rate_limiter) = &self.rate_limiter {
656                    let mut rate_limiter = rate_limiter
657                        .write()
658                        .expect("Can't write to the rate limiter.");
659                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
660                        log::info!(
661                            "Tried getting an upload task, but we are throttled at the moment."
662                        );
663                        return wait_or_done(remaining);
664                    }
665                }
666
667                log::info!(
668                    "New upload task with id {} (path: {})",
669                    request.document_id,
670                    request.path
671                );
672
673                if log_ping {
674                    if let Some(body) = request.pretty_body() {
675                        chunked_log_info(&request.path, &body);
676                    } else {
677                        chunked_log_info(&request.path, "<invalid ping payload>");
678                    }
679                }
680
681                {
682                    // Synchronous timer starts.
683                    // We're in the uploader thread anyway.
684                    // But also: No data is stored on disk.
685                    let mut in_flight = self.in_flight.write().unwrap();
686                    let success_id = self.upload_metrics.send_success.start_sync();
687                    let failure_id = self.upload_metrics.send_failure.start_sync();
688                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
689                }
690
691                let mut request = queue.pop_front().unwrap();
692
693                // Adding the `Date` header just before actual upload happens.
694                request
695                    .headers
696                    .insert("Date".to_string(), create_date_header_value(Utc::now()));
697
698                PingUploadTask::Upload { request }
699            }
700            None => {
701                log::info!("No more pings to upload! You are done.");
702                PingUploadTask::done()
703            }
704        }
705    }
706
707    /// Gets the next `PingUploadTask`.
708    ///
709    /// # Arguments
710    ///
711    /// * `glean` - The Glean object holding the database.
712    /// * `log_ping` - Whether to log the ping before returning.
713    ///
714    /// # Returns
715    ///
716    /// The next [`PingUploadTask`](enum.PingUploadTask.html).
717    pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
718        let task = self.get_upload_task_internal(glean, log_ping);
719
720        if !task.is_wait() && self.wait_attempt_count() > 0 {
721            self.wait_attempt_count.store(0, Ordering::SeqCst);
722        }
723
724        if !task.is_upload() && self.recoverable_failure_count() > 0 {
725            self.recoverable_failure_count.store(0, Ordering::SeqCst);
726        }
727
728        task
729    }
730
731    /// Processes the response from an attempt to upload a ping.
732    ///
733    /// Based on the HTTP status of said response,
734    /// the possible outcomes are:
735    ///
736    /// * **200 - 299 Success**
737    ///   Any status on the 2XX range is considered a succesful upload,
738    ///   which means the corresponding ping file can be deleted.
739    ///   _Known 2XX status:_
740    ///   * 200 - OK. Request accepted into the pipeline.
741    ///
742    /// * **400 - 499 Unrecoverable error**
743    ///   Any status on the 4XX range means something our client did is not correct.
744    ///   It is unlikely that the client is going to recover from this by retrying,
745    ///   so in this case the corresponding ping file can also be deleted.
746    ///   _Known 4XX status:_
747    ///   * 404 - not found - POST/PUT to an unknown namespace
748    ///   * 405 - wrong request type (anything other than POST/PUT)
749    ///   * 411 - missing content-length header
750    ///   * 413 - request body too large Note that if we have badly-behaved clients that
751    ///           retry on 4XX, we should send back 202 on body/path too long).
752    ///   * 414 - request path too long (See above)
753    ///
754    /// * **Any other error**
755    ///   For any other error, a warning is logged and the ping is re-enqueued.
756    ///   _Known other errors:_
757    ///   * 500 - internal error
758    ///
759    /// # Note
760    ///
761    /// The disk I/O performed by this function is not done off-thread,
762    /// as it is expected to be called off-thread by the platform.
763    ///
764    /// # Arguments
765    ///
766    /// * `glean` - The Glean object holding the database.
767    /// * `document_id` - The UUID of the ping in question.
768    /// * `status` - The HTTP status of the response.
769    pub fn process_ping_upload_response(
770        &self,
771        glean: &Glean,
772        document_id: &str,
773        status: UploadResult,
774    ) -> UploadTaskAction {
775        use UploadResult::*;
776
777        let stop_time = zeitstempel::now_awake();
778
779        if let Some(label) = status.get_label() {
780            let metric = self.upload_metrics.ping_upload_failure.get(label);
781            metric.add_sync(glean, 1);
782        }
783
784        let send_ids = {
785            let mut lock = self.in_flight.write().unwrap();
786            lock.remove(document_id)
787        };
788
789        if send_ids.is_none() {
790            self.upload_metrics.missing_send_ids.add_sync(glean, 1);
791        }
792
793        match status {
794            HttpStatus { code } if (200..=299).contains(&code) => {
795                log::info!("Ping {} successfully sent {}.", document_id, code);
796                if let Some((success_id, failure_id)) = send_ids {
797                    self.upload_metrics
798                        .send_success
799                        .set_stop_and_accumulate(glean, success_id, stop_time);
800                    self.upload_metrics.send_failure.cancel_sync(failure_id);
801                }
802                self.directory_manager.delete_file(document_id);
803            }
804
805            UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
806                log::warn!(
807                    "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
808                    document_id,
809                    status
810                );
811                if let Some((success_id, failure_id)) = send_ids {
812                    self.upload_metrics.send_success.cancel_sync(success_id);
813                    self.upload_metrics
814                        .send_failure
815                        .set_stop_and_accumulate(glean, failure_id, stop_time);
816                }
817                self.directory_manager.delete_file(document_id);
818            }
819
820            RecoverableFailure { .. } | HttpStatus { .. } => {
821                log::warn!(
822                    "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
823                    document_id,
824                    status
825                );
826                if let Some((success_id, failure_id)) = send_ids {
827                    self.upload_metrics.send_success.cancel_sync(success_id);
828                    self.upload_metrics
829                        .send_failure
830                        .set_stop_and_accumulate(glean, failure_id, stop_time);
831                }
832                self.enqueue_ping_from_file(glean, document_id);
833                self.recoverable_failure_count
834                    .fetch_add(1, Ordering::SeqCst);
835            }
836
837            Done { .. } => {
838                log::debug!("Uploader signaled Done. Exiting.");
839                if let Some((success_id, failure_id)) = send_ids {
840                    self.upload_metrics.send_success.cancel_sync(success_id);
841                    self.upload_metrics.send_failure.cancel_sync(failure_id);
842                }
843                return UploadTaskAction::End;
844            }
845        };
846
847        UploadTaskAction::Next
848    }
849}
850
851/// Splits log message into chunks on Android.
852#[cfg(target_os = "android")]
853pub fn chunked_log_info(path: &str, payload: &str) {
854    // Since the logcat ring buffer size is configurable, but it's 'max payload' size is not,
855    // we must break apart long pings into chunks no larger than the max payload size of 4076b.
856    // We leave some head space for our prefix.
857    const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
858
859    // If the length of the ping will fit within one logcat payload, then we can
860    // short-circuit here and avoid some overhead, otherwise we must split up the
861    // message so that we don't truncate it.
862    if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
863        log::info!("Glean ping to URL: {}\n{}", path, payload);
864        return;
865    }
866
867    // Otherwise we break it apart into chunks of smaller size,
868    // prefixing it with the path and a counter.
869    let mut start = 0;
870    let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
871    let mut chunk_idx = 1;
872    // Might be off by 1 on edge cases, but do we really care?
873    let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
874
875    while end < payload.len() {
876        // Find char boundary from the end.
877        // It's UTF-8, so it is within 4 bytes from here.
878        for _ in 0..4 {
879            if payload.is_char_boundary(end) {
880                break;
881            }
882            end -= 1;
883        }
884
885        log::info!(
886            "Glean ping to URL: {} [Part {} of {}]\n{}",
887            path,
888            chunk_idx,
889            total_chunks,
890            &payload[start..end]
891        );
892
893        // Move on with the string
894        start = end;
895        end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
896        chunk_idx += 1;
897    }
898
899    // Print any suffix left
900    if start < payload.len() {
901        log::info!(
902            "Glean ping to URL: {} [Part {} of {}]\n{}",
903            path,
904            chunk_idx,
905            total_chunks,
906            &payload[start..]
907        );
908    }
909}
910
911/// Logs payload in one go (all other OS).
912#[cfg(not(target_os = "android"))]
913pub fn chunked_log_info(_path: &str, payload: &str) {
914    log::info!("{}", payload)
915}
916
917#[cfg(test)]
918mod test {
919    use std::thread;
920    use uuid::Uuid;
921
922    use super::*;
923    use crate::metrics::PingType;
924    use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
925
926    const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
927
928    #[test]
929    fn doesnt_error_when_there_are_no_pending_pings() {
930        let (glean, _t) = new_glean(None);
931
932        // Try and get the next request.
933        // Verify request was not returned
934        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
935    }
936
937    #[test]
938    fn returns_ping_request_when_there_is_one() {
939        let (glean, dir) = new_glean(None);
940
941        let upload_manager = PingUploadManager::no_policy(dir.path());
942
943        // Enqueue a ping
944        upload_manager.enqueue_ping(
945            &glean,
946            PingPayload {
947                document_id: Uuid::new_v4().to_string(),
948                upload_path: PATH.into(),
949                json_body: "".into(),
950                headers: None,
951                body_has_info_sections: true,
952                ping_name: "ping-name".into(),
953                uploader_capabilities: vec![],
954            },
955        );
956
957        // Try and get the next request.
958        // Verify request was returned
959        let task = upload_manager.get_upload_task(&glean, false);
960        assert!(task.is_upload());
961    }
962
963    #[test]
964    fn returns_as_many_ping_requests_as_there_are() {
965        let (glean, dir) = new_glean(None);
966
967        let upload_manager = PingUploadManager::no_policy(dir.path());
968
969        // Enqueue a ping multiple times
970        let n = 10;
971        for _ in 0..n {
972            upload_manager.enqueue_ping(
973                &glean,
974                PingPayload {
975                    document_id: Uuid::new_v4().to_string(),
976                    upload_path: PATH.into(),
977                    json_body: "".into(),
978                    headers: None,
979                    body_has_info_sections: true,
980                    ping_name: "ping-name".into(),
981                    uploader_capabilities: vec![],
982                },
983            );
984        }
985
986        // Verify a request is returned for each submitted ping
987        for _ in 0..n {
988            let task = upload_manager.get_upload_task(&glean, false);
989            assert!(task.is_upload());
990        }
991
992        // Verify that after all requests are returned, none are left
993        assert_eq!(
994            upload_manager.get_upload_task(&glean, false),
995            PingUploadTask::done()
996        );
997    }
998
999    #[test]
1000    fn limits_the_number_of_pings_when_there_is_rate_limiting() {
1001        let (glean, dir) = new_glean(None);
1002
1003        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1004
1005        // Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
1006        let max_pings_per_interval = 10;
1007        upload_manager.set_rate_limiter(3, 10);
1008
1009        // Enqueue the max number of pings allowed per uploading window
1010        for _ in 0..max_pings_per_interval {
1011            upload_manager.enqueue_ping(
1012                &glean,
1013                PingPayload {
1014                    document_id: Uuid::new_v4().to_string(),
1015                    upload_path: PATH.into(),
1016                    json_body: "".into(),
1017                    headers: None,
1018                    body_has_info_sections: true,
1019                    ping_name: "ping-name".into(),
1020                    uploader_capabilities: vec![],
1021                },
1022            );
1023        }
1024
1025        // Verify a request is returned for each submitted ping
1026        for _ in 0..max_pings_per_interval {
1027            let task = upload_manager.get_upload_task(&glean, false);
1028            assert!(task.is_upload());
1029        }
1030
1031        // Enqueue just one more ping
1032        upload_manager.enqueue_ping(
1033            &glean,
1034            PingPayload {
1035                document_id: Uuid::new_v4().to_string(),
1036                upload_path: PATH.into(),
1037                json_body: "".into(),
1038                headers: None,
1039                body_has_info_sections: true,
1040                ping_name: "ping-name".into(),
1041                uploader_capabilities: vec![],
1042            },
1043        );
1044
1045        // Verify that we are indeed told to wait because we are at capacity
1046        match upload_manager.get_upload_task(&glean, false) {
1047            PingUploadTask::Wait { time } => {
1048                // Wait for the uploading window to reset
1049                thread::sleep(Duration::from_millis(time));
1050            }
1051            _ => panic!("Expected upload manager to return a wait task!"),
1052        };
1053
1054        let task = upload_manager.get_upload_task(&glean, false);
1055        assert!(task.is_upload());
1056    }
1057
1058    #[test]
1059    fn clearing_the_queue_works_correctly() {
1060        let (glean, dir) = new_glean(None);
1061
1062        let upload_manager = PingUploadManager::no_policy(dir.path());
1063
1064        // Enqueue a ping multiple times
1065        for _ in 0..10 {
1066            upload_manager.enqueue_ping(
1067                &glean,
1068                PingPayload {
1069                    document_id: Uuid::new_v4().to_string(),
1070                    upload_path: PATH.into(),
1071                    json_body: "".into(),
1072                    headers: None,
1073                    body_has_info_sections: true,
1074                    ping_name: "ping-name".into(),
1075                    uploader_capabilities: vec![],
1076                },
1077            );
1078        }
1079
1080        // Clear the queue
1081        drop(upload_manager.clear_ping_queue());
1082
1083        // Verify there really isn't any ping in the queue
1084        assert_eq!(
1085            upload_manager.get_upload_task(&glean, false),
1086            PingUploadTask::done()
1087        );
1088    }
1089
1090    #[test]
1091    fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1092        let (mut glean, _t) = new_glean(None);
1093
1094        // Register a ping for testing
1095        let ping_type = PingType::new(
1096            "test",
1097            true,
1098            /* send_if_empty */ true,
1099            true,
1100            true,
1101            true,
1102            vec![],
1103            vec![],
1104            true,
1105            vec![],
1106        );
1107        glean.register_ping_type(&ping_type);
1108
1109        // Submit the ping multiple times
1110        let n = 10;
1111        for _ in 0..n {
1112            ping_type.submit_sync(&glean, None);
1113        }
1114
1115        glean
1116            .internal_pings
1117            .deletion_request
1118            .submit_sync(&glean, None);
1119
1120        // Clear the queue
1121        drop(glean.upload_manager.clear_ping_queue());
1122
1123        let upload_task = glean.get_upload_task();
1124        match upload_task {
1125            PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1126            _ => panic!("Expected upload manager to return the next request!"),
1127        }
1128
1129        // Verify there really isn't any other pings in the queue
1130        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1131    }
1132
1133    #[test]
1134    fn fills_up_queue_successfully_from_disk() {
1135        let (mut glean, dir) = new_glean(None);
1136
1137        // Register a ping for testing
1138        let ping_type = PingType::new(
1139            "test",
1140            true,
1141            /* send_if_empty */ true,
1142            true,
1143            true,
1144            true,
1145            vec![],
1146            vec![],
1147            true,
1148            vec![],
1149        );
1150        glean.register_ping_type(&ping_type);
1151
1152        // Submit the ping multiple times
1153        let n = 10;
1154        for _ in 0..n {
1155            ping_type.submit_sync(&glean, None);
1156        }
1157
1158        // Create a new upload manager pointing to the same data_path as the glean instance.
1159        let upload_manager = PingUploadManager::no_policy(dir.path());
1160
1161        // Verify the requests were properly enqueued
1162        for _ in 0..n {
1163            let task = upload_manager.get_upload_task(&glean, false);
1164            assert!(task.is_upload());
1165        }
1166
1167        // Verify that after all requests are returned, none are left
1168        assert_eq!(
1169            upload_manager.get_upload_task(&glean, false),
1170            PingUploadTask::done()
1171        );
1172    }
1173
1174    #[test]
1175    fn processes_correctly_success_upload_response() {
1176        let (mut glean, dir) = new_glean(None);
1177
1178        // Register a ping for testing
1179        let ping_type = PingType::new(
1180            "test",
1181            true,
1182            /* send_if_empty */ true,
1183            true,
1184            true,
1185            true,
1186            vec![],
1187            vec![],
1188            true,
1189            vec![],
1190        );
1191        glean.register_ping_type(&ping_type);
1192
1193        // Submit a ping
1194        ping_type.submit_sync(&glean, None);
1195
1196        // Get the pending ping directory path
1197        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1198
1199        // Get the submitted PingRequest
1200        match glean.get_upload_task() {
1201            PingUploadTask::Upload { request } => {
1202                // Simulate the processing of a sucessfull request
1203                let document_id = request.document_id;
1204                glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1205                // Verify file was deleted
1206                assert!(!pending_pings_dir.join(document_id).exists());
1207            }
1208            _ => panic!("Expected upload manager to return the next request!"),
1209        }
1210
1211        // Verify that after request is returned, none are left
1212        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1213    }
1214
1215    #[test]
1216    fn processes_correctly_client_error_upload_response() {
1217        let (mut glean, dir) = new_glean(None);
1218
1219        // Register a ping for testing
1220        let ping_type = PingType::new(
1221            "test",
1222            true,
1223            /* send_if_empty */ true,
1224            true,
1225            true,
1226            true,
1227            vec![],
1228            vec![],
1229            true,
1230            vec![],
1231        );
1232        glean.register_ping_type(&ping_type);
1233
1234        // Submit a ping
1235        ping_type.submit_sync(&glean, None);
1236
1237        // Get the pending ping directory path
1238        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1239
1240        // Get the submitted PingRequest
1241        match glean.get_upload_task() {
1242            PingUploadTask::Upload { request } => {
1243                // Simulate the processing of a client error
1244                let document_id = request.document_id;
1245                glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1246                // Verify file was deleted
1247                assert!(!pending_pings_dir.join(document_id).exists());
1248            }
1249            _ => panic!("Expected upload manager to return the next request!"),
1250        }
1251
1252        // Verify that after request is returned, none are left
1253        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1254    }
1255
1256    #[test]
1257    fn processes_correctly_server_error_upload_response() {
1258        let (mut glean, _t) = new_glean(None);
1259
1260        // Register a ping for testing
1261        let ping_type = PingType::new(
1262            "test",
1263            true,
1264            /* send_if_empty */ true,
1265            true,
1266            true,
1267            true,
1268            vec![],
1269            vec![],
1270            true,
1271            vec![],
1272        );
1273        glean.register_ping_type(&ping_type);
1274
1275        // Submit a ping
1276        ping_type.submit_sync(&glean, None);
1277
1278        // Get the submitted PingRequest
1279        match glean.get_upload_task() {
1280            PingUploadTask::Upload { request } => {
1281                // Simulate the processing of a client error
1282                let document_id = request.document_id;
1283                glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1284                // Verify this ping was indeed re-enqueued
1285                match glean.get_upload_task() {
1286                    PingUploadTask::Upload { request } => {
1287                        assert_eq!(document_id, request.document_id);
1288                    }
1289                    _ => panic!("Expected upload manager to return the next request!"),
1290                }
1291            }
1292            _ => panic!("Expected upload manager to return the next request!"),
1293        }
1294
1295        // Verify that after request is returned, none are left
1296        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1297    }
1298
1299    #[test]
1300    fn processes_correctly_unrecoverable_upload_response() {
1301        let (mut glean, dir) = new_glean(None);
1302
1303        // Register a ping for testing
1304        let ping_type = PingType::new(
1305            "test",
1306            true,
1307            /* send_if_empty */ true,
1308            true,
1309            true,
1310            true,
1311            vec![],
1312            vec![],
1313            true,
1314            vec![],
1315        );
1316        glean.register_ping_type(&ping_type);
1317
1318        // Submit a ping
1319        ping_type.submit_sync(&glean, None);
1320
1321        // Get the pending ping directory path
1322        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1323
1324        // Get the submitted PingRequest
1325        match glean.get_upload_task() {
1326            PingUploadTask::Upload { request } => {
1327                // Simulate the processing of a client error
1328                let document_id = request.document_id;
1329                glean.process_ping_upload_response(
1330                    &document_id,
1331                    UploadResult::unrecoverable_failure(),
1332                );
1333                // Verify file was deleted
1334                assert!(!pending_pings_dir.join(document_id).exists());
1335            }
1336            _ => panic!("Expected upload manager to return the next request!"),
1337        }
1338
1339        // Verify that after request is returned, none are left
1340        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1341    }
1342
1343    #[test]
1344    fn new_pings_are_added_while_upload_in_progress() {
1345        let (glean, dir) = new_glean(None);
1346
1347        let upload_manager = PingUploadManager::no_policy(dir.path());
1348
1349        let doc1 = Uuid::new_v4().to_string();
1350        let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1351
1352        let doc2 = Uuid::new_v4().to_string();
1353        let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1354
1355        // Enqueue a ping
1356        upload_manager.enqueue_ping(
1357            &glean,
1358            PingPayload {
1359                document_id: doc1.clone(),
1360                upload_path: path1,
1361                json_body: "".into(),
1362                headers: None,
1363                body_has_info_sections: true,
1364                ping_name: "test-ping".into(),
1365                uploader_capabilities: vec![],
1366            },
1367        );
1368
1369        // Try and get the first request.
1370        let req = match upload_manager.get_upload_task(&glean, false) {
1371            PingUploadTask::Upload { request } => request,
1372            _ => panic!("Expected upload manager to return the next request!"),
1373        };
1374        assert_eq!(doc1, req.document_id);
1375
1376        // Schedule the next one while the first one is "in progress"
1377        upload_manager.enqueue_ping(
1378            &glean,
1379            PingPayload {
1380                document_id: doc2.clone(),
1381                upload_path: path2,
1382                json_body: "".into(),
1383                headers: None,
1384                body_has_info_sections: true,
1385                ping_name: "test-ping".into(),
1386                uploader_capabilities: vec![],
1387            },
1388        );
1389
1390        // Mark as processed
1391        upload_manager.process_ping_upload_response(
1392            &glean,
1393            &req.document_id,
1394            UploadResult::http_status(200),
1395        );
1396
1397        // Get the second request.
1398        let req = match upload_manager.get_upload_task(&glean, false) {
1399            PingUploadTask::Upload { request } => request,
1400            _ => panic!("Expected upload manager to return the next request!"),
1401        };
1402        assert_eq!(doc2, req.document_id);
1403
1404        // Mark as processed
1405        upload_manager.process_ping_upload_response(
1406            &glean,
1407            &req.document_id,
1408            UploadResult::http_status(200),
1409        );
1410
1411        // ... and then we're done.
1412        assert_eq!(
1413            upload_manager.get_upload_task(&glean, false),
1414            PingUploadTask::done()
1415        );
1416    }
1417
1418    #[test]
1419    fn adds_debug_view_header_to_requests_when_tag_is_set() {
1420        let (mut glean, _t) = new_glean(None);
1421
1422        glean.set_debug_view_tag("valid-tag");
1423
1424        // Register a ping for testing
1425        let ping_type = PingType::new(
1426            "test",
1427            true,
1428            /* send_if_empty */ true,
1429            true,
1430            true,
1431            true,
1432            vec![],
1433            vec![],
1434            true,
1435            vec![],
1436        );
1437        glean.register_ping_type(&ping_type);
1438
1439        // Submit a ping
1440        ping_type.submit_sync(&glean, None);
1441
1442        // Get the submitted PingRequest
1443        match glean.get_upload_task() {
1444            PingUploadTask::Upload { request } => {
1445                assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1446            }
1447            _ => panic!("Expected upload manager to return the next request!"),
1448        }
1449    }
1450
1451    #[test]
1452    fn duplicates_are_not_enqueued() {
1453        let (glean, dir) = new_glean(None);
1454
1455        // Create a new upload manager so that we have access to its functions directly,
1456        // make it synchronous so we don't have to manually wait for the scanning to finish.
1457        let upload_manager = PingUploadManager::no_policy(dir.path());
1458
1459        let doc_id = Uuid::new_v4().to_string();
1460        let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1461
1462        // Try to enqueue a ping with the same doc_id twice
1463        upload_manager.enqueue_ping(
1464            &glean,
1465            PingPayload {
1466                document_id: doc_id.clone(),
1467                upload_path: path.clone(),
1468                json_body: "".into(),
1469                headers: None,
1470                body_has_info_sections: true,
1471                ping_name: "test-ping".into(),
1472                uploader_capabilities: vec![],
1473            },
1474        );
1475        upload_manager.enqueue_ping(
1476            &glean,
1477            PingPayload {
1478                document_id: doc_id,
1479                upload_path: path,
1480                json_body: "".into(),
1481                headers: None,
1482                body_has_info_sections: true,
1483                ping_name: "test-ping".into(),
1484                uploader_capabilities: vec![],
1485            },
1486        );
1487
1488        // Get a task once
1489        let task = upload_manager.get_upload_task(&glean, false);
1490        assert!(task.is_upload());
1491
1492        // There should be no more queued tasks
1493        assert_eq!(
1494            upload_manager.get_upload_task(&glean, false),
1495            PingUploadTask::done()
1496        );
1497    }
1498
1499    #[test]
1500    fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1501        let (mut glean, dir) = new_glean(None);
1502
1503        // Register a ping for testing
1504        let ping_type = PingType::new(
1505            "test",
1506            true,
1507            /* send_if_empty */ true,
1508            true,
1509            true,
1510            true,
1511            vec![],
1512            vec![],
1513            true,
1514            vec![],
1515        );
1516        glean.register_ping_type(&ping_type);
1517
1518        // Submit the ping multiple times
1519        let n = 5;
1520        for _ in 0..n {
1521            ping_type.submit_sync(&glean, None);
1522        }
1523
1524        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1525
1526        // Set a policy for max recoverable failures, this is usually disabled for tests.
1527        let max_recoverable_failures = 3;
1528        upload_manager
1529            .policy
1530            .set_max_recoverable_failures(Some(max_recoverable_failures));
1531
1532        // Return the max recoverable error failures in a row
1533        for _ in 0..max_recoverable_failures {
1534            match upload_manager.get_upload_task(&glean, false) {
1535                PingUploadTask::Upload { request } => {
1536                    upload_manager.process_ping_upload_response(
1537                        &glean,
1538                        &request.document_id,
1539                        UploadResult::recoverable_failure(),
1540                    );
1541                }
1542                _ => panic!("Expected upload manager to return the next request!"),
1543            }
1544        }
1545
1546        // Verify that after returning the max amount of recoverable failures,
1547        // we are done even though we haven't gotten all the enqueued requests.
1548        assert_eq!(
1549            upload_manager.get_upload_task(&glean, false),
1550            PingUploadTask::done()
1551        );
1552
1553        // Verify all requests are returned when we try again.
1554        for _ in 0..n {
1555            let task = upload_manager.get_upload_task(&glean, false);
1556            assert!(task.is_upload());
1557        }
1558    }
1559
1560    #[test]
1561    fn quota_is_enforced_when_enqueueing_cached_pings() {
1562        let (mut glean, dir) = new_glean(None);
1563
1564        // Register a ping for testing
1565        let ping_type = PingType::new(
1566            "test",
1567            true,
1568            /* send_if_empty */ true,
1569            true,
1570            true,
1571            true,
1572            vec![],
1573            vec![],
1574            true,
1575            vec![],
1576        );
1577        glean.register_ping_type(&ping_type);
1578
1579        // Submit the ping multiple times
1580        let n = 10;
1581        for _ in 0..n {
1582            ping_type.submit_sync(&glean, None);
1583        }
1584
1585        let directory_manager = PingDirectoryManager::new(dir.path());
1586        let pending_pings = directory_manager.process_dirs().pending_pings;
1587        // The pending pings array is sorted by date in ascending order,
1588        // the newest element is the last one.
1589        let (_, newest_ping) = &pending_pings.last().unwrap();
1590        let PingPayload {
1591            document_id: newest_ping_id,
1592            ..
1593        } = &newest_ping;
1594
1595        // Create a new upload manager pointing to the same data_path as the glean instance.
1596        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1597
1598        // Set the quota to just a little over the size on an empty ping file.
1599        // This way we can check that one ping is kept and all others are deleted.
1600        //
1601        // From manual testing I figured out an empty ping file is 324bytes,
1602        // I am setting this a little over just so that minor changes to the ping structure
1603        // don't immediatelly break this.
1604        upload_manager
1605            .policy
1606            .set_max_pending_pings_directory_size(Some(500));
1607
1608        // Get a task once
1609        // One ping should have been enqueued.
1610        // Make sure it is the newest ping.
1611        match upload_manager.get_upload_task(&glean, false) {
1612            PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1613            _ => panic!("Expected upload manager to return the next request!"),
1614        }
1615
1616        // Verify that no other requests were returned,
1617        // they should all have been deleted because pending pings quota was hit.
1618        assert_eq!(
1619            upload_manager.get_upload_task(&glean, false),
1620            PingUploadTask::done()
1621        );
1622
1623        // Verify that the correct number of deleted pings was recorded
1624        assert_eq!(
1625            n - 1,
1626            upload_manager
1627                .upload_metrics
1628                .deleted_pings_after_quota_hit
1629                .get_value(&glean, Some("metrics"))
1630                .unwrap()
1631        );
1632        assert_eq!(
1633            n,
1634            upload_manager
1635                .upload_metrics
1636                .pending_pings
1637                .get_value(&glean, Some("metrics"))
1638                .unwrap()
1639        );
1640    }
1641
1642    #[test]
1643    fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1644        let (mut glean, dir) = new_glean(None);
1645
1646        // Register a ping for testing
1647        let ping_type = PingType::new(
1648            "test",
1649            true,
1650            /* send_if_empty */ true,
1651            true,
1652            true,
1653            true,
1654            vec![],
1655            vec![],
1656            true,
1657            vec![],
1658        );
1659        glean.register_ping_type(&ping_type);
1660
1661        // How many pings we allow at maximum
1662        let count_quota = 3;
1663        // The number of pings we fill the pending pings directory with.
1664        let n = 10;
1665
1666        // Submit the ping multiple times
1667        for _ in 0..n {
1668            ping_type.submit_sync(&glean, None);
1669        }
1670
1671        let directory_manager = PingDirectoryManager::new(dir.path());
1672        let pending_pings = directory_manager.process_dirs().pending_pings;
1673        // The pending pings array is sorted by date in ascending order,
1674        // the newest element is the last one.
1675        let expected_pings = pending_pings
1676            .iter()
1677            .rev()
1678            .take(count_quota)
1679            .map(|(_, ping)| ping.document_id.clone())
1680            .collect::<Vec<_>>();
1681
1682        // Create a new upload manager pointing to the same data_path as the glean instance.
1683        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1684
1685        upload_manager
1686            .policy
1687            .set_max_pending_pings_count(Some(count_quota as u64));
1688
1689        // Get a task once
1690        // One ping should have been enqueued.
1691        // Make sure it is the newest ping.
1692        for ping_id in expected_pings.iter().rev() {
1693            match upload_manager.get_upload_task(&glean, false) {
1694                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1695                _ => panic!("Expected upload manager to return the next request!"),
1696            }
1697        }
1698
1699        // Verify that no other requests were returned,
1700        // they should all have been deleted because pending pings quota was hit.
1701        assert_eq!(
1702            upload_manager.get_upload_task(&glean, false),
1703            PingUploadTask::done()
1704        );
1705
1706        // Verify that the correct number of deleted pings was recorded
1707        assert_eq!(
1708            (n - count_quota) as i32,
1709            upload_manager
1710                .upload_metrics
1711                .deleted_pings_after_quota_hit
1712                .get_value(&glean, Some("metrics"))
1713                .unwrap()
1714        );
1715        assert_eq!(
1716            n as i32,
1717            upload_manager
1718                .upload_metrics
1719                .pending_pings
1720                .get_value(&glean, Some("metrics"))
1721                .unwrap()
1722        );
1723    }
1724
1725    #[test]
1726    fn size_and_count_quota_work_together_size_first() {
1727        let (mut glean, dir) = new_glean(None);
1728
1729        // Register a ping for testing
1730        let ping_type = PingType::new(
1731            "test",
1732            true,
1733            /* send_if_empty */ true,
1734            true,
1735            true,
1736            true,
1737            vec![],
1738            vec![],
1739            true,
1740            vec![],
1741        );
1742        glean.register_ping_type(&ping_type);
1743
1744        let expected_number_of_pings = 3;
1745        // The number of pings we fill the pending pings directory with.
1746        let n = 10;
1747
1748        // Submit the ping multiple times
1749        for _ in 0..n {
1750            ping_type.submit_sync(&glean, None);
1751        }
1752
1753        let directory_manager = PingDirectoryManager::new(dir.path());
1754        let pending_pings = directory_manager.process_dirs().pending_pings;
1755        // The pending pings array is sorted by date in ascending order,
1756        // the newest element is the last one.
1757        let expected_pings = pending_pings
1758            .iter()
1759            .rev()
1760            .take(expected_number_of_pings)
1761            .map(|(_, ping)| ping.document_id.clone())
1762            .collect::<Vec<_>>();
1763
1764        // Create a new upload manager pointing to the same data_path as the glean instance.
1765        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1766
1767        // From manual testing we figured out a basically empty ping file is 399 bytes,
1768        // so this allows 3 pings with some headroom in case of future changes.
1769        upload_manager
1770            .policy
1771            .set_max_pending_pings_directory_size(Some(1300));
1772        upload_manager.policy.set_max_pending_pings_count(Some(5));
1773
1774        // Get a task once
1775        // One ping should have been enqueued.
1776        // Make sure it is the newest ping.
1777        for ping_id in expected_pings.iter().rev() {
1778            match upload_manager.get_upload_task(&glean, false) {
1779                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1780                _ => panic!("Expected upload manager to return the next request!"),
1781            }
1782        }
1783
1784        // Verify that no other requests were returned,
1785        // they should all have been deleted because pending pings quota was hit.
1786        assert_eq!(
1787            upload_manager.get_upload_task(&glean, false),
1788            PingUploadTask::done()
1789        );
1790
1791        // Verify that the correct number of deleted pings was recorded
1792        assert_eq!(
1793            (n - expected_number_of_pings) as i32,
1794            upload_manager
1795                .upload_metrics
1796                .deleted_pings_after_quota_hit
1797                .get_value(&glean, Some("metrics"))
1798                .unwrap()
1799        );
1800        assert_eq!(
1801            n as i32,
1802            upload_manager
1803                .upload_metrics
1804                .pending_pings
1805                .get_value(&glean, Some("metrics"))
1806                .unwrap()
1807        );
1808        // Verify the labeled deletion counter attributes deletions to size_quota
1809        assert_eq!(
1810            (n - expected_number_of_pings) as i32,
1811            upload_manager
1812                .upload_metrics
1813                .pending_pings_deleted
1814                .get("size_quota")
1815                .get_value(&glean, Some("health"))
1816                .unwrap()
1817        );
1818        assert!(upload_manager
1819            .upload_metrics
1820            .pending_pings_deleted
1821            .get("count_quota")
1822            .get_value(&glean, Some("health"))
1823            .is_none());
1824    }
1825
1826    #[test]
1827    fn size_and_count_quota_work_together_count_first() {
1828        let (mut glean, dir) = new_glean(None);
1829
1830        // Register a ping for testing
1831        let ping_type = PingType::new(
1832            "test",
1833            true,
1834            /* send_if_empty */ true,
1835            true,
1836            true,
1837            true,
1838            vec![],
1839            vec![],
1840            true,
1841            vec![],
1842        );
1843        glean.register_ping_type(&ping_type);
1844
1845        let expected_number_of_pings = 2;
1846        // The number of pings we fill the pending pings directory with.
1847        let n = 10;
1848
1849        // Submit the ping multiple times
1850        for _ in 0..n {
1851            ping_type.submit_sync(&glean, None);
1852        }
1853
1854        let directory_manager = PingDirectoryManager::new(dir.path());
1855        let pending_pings = directory_manager.process_dirs().pending_pings;
1856        // The pending pings array is sorted by date in ascending order,
1857        // the newest element is the last one.
1858        let expected_pings = pending_pings
1859            .iter()
1860            .rev()
1861            .take(expected_number_of_pings)
1862            .map(|(_, ping)| ping.document_id.clone())
1863            .collect::<Vec<_>>();
1864
1865        // Create a new upload manager pointing to the same data_path as the glean instance.
1866        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1867
1868        // Set a large enough size quota so it never triggers before the count quota does.
1869        upload_manager
1870            .policy
1871            .set_max_pending_pings_directory_size(Some(100_000));
1872        upload_manager.policy.set_max_pending_pings_count(Some(2));
1873
1874        // Get a task once
1875        // One ping should have been enqueued.
1876        // Make sure it is the newest ping.
1877        for ping_id in expected_pings.iter().rev() {
1878            match upload_manager.get_upload_task(&glean, false) {
1879                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1880                _ => panic!("Expected upload manager to return the next request!"),
1881            }
1882        }
1883
1884        // Verify that no other requests were returned,
1885        // they should all have been deleted because pending pings quota was hit.
1886        assert_eq!(
1887            upload_manager.get_upload_task(&glean, false),
1888            PingUploadTask::done()
1889        );
1890
1891        // Verify that the correct number of deleted pings was recorded
1892        assert_eq!(
1893            (n - expected_number_of_pings) as i32,
1894            upload_manager
1895                .upload_metrics
1896                .deleted_pings_after_quota_hit
1897                .get_value(&glean, Some("metrics"))
1898                .unwrap()
1899        );
1900        assert_eq!(
1901            n as i32,
1902            upload_manager
1903                .upload_metrics
1904                .pending_pings
1905                .get_value(&glean, Some("metrics"))
1906                .unwrap()
1907        );
1908        // Verify the labeled deletion counter attributes deletions to count_quota
1909        assert_eq!(
1910            (n - expected_number_of_pings) as i32,
1911            upload_manager
1912                .upload_metrics
1913                .pending_pings_deleted
1914                .get("count_quota")
1915                .get_value(&glean, Some("health"))
1916                .unwrap()
1917        );
1918        assert!(upload_manager
1919            .upload_metrics
1920            .pending_pings_deleted
1921            .get("size_quota")
1922            .get_value(&glean, Some("health"))
1923            .is_none());
1924    }
1925
1926    #[test]
1927    fn pending_pings_deleted_is_not_recorded_when_quota_not_hit() {
1928        let (mut glean, dir) = new_glean(None);
1929
1930        let ping_type = PingType::new(
1931            "test",
1932            true,
1933            /* send_if_empty */ true,
1934            true,
1935            true,
1936            true,
1937            vec![],
1938            vec![],
1939            true,
1940            vec![],
1941        );
1942        glean.register_ping_type(&ping_type);
1943
1944        // Submit fewer pings than any quota.
1945        for _ in 0..3 {
1946            ping_type.submit_sync(&glean, None);
1947        }
1948
1949        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1950        upload_manager.policy.set_max_pending_pings_count(Some(10));
1951        upload_manager
1952            .policy
1953            .set_max_pending_pings_directory_size(Some(1024 * 1024));
1954
1955        upload_manager.get_upload_task(&glean, false);
1956
1957        assert!(upload_manager
1958            .upload_metrics
1959            .pending_pings_deleted
1960            .get("count_quota")
1961            .get_value(&glean, Some("health"))
1962            .is_none());
1963        assert!(upload_manager
1964            .upload_metrics
1965            .pending_pings_deleted
1966            .get("size_quota")
1967            .get_value(&glean, Some("health"))
1968            .is_none());
1969    }
1970
1971    #[test]
1972    fn pending_pings_config_overrides_are_applied() {
1973        let (_, dir) = new_glean(None);
1974
1975        let mut upload_manager = PingUploadManager::new(dir.path(), "test");
1976
1977        let custom_count: u64 = 42;
1978        let custom_size: u64 = 999_999;
1979        upload_manager.set_max_pending_pings_count(custom_count);
1980        upload_manager.set_max_pending_pings_directory_size(custom_size);
1981
1982        assert_eq!(
1983            custom_count,
1984            upload_manager.policy.max_pending_pings_count()
1985        );
1986        assert_eq!(
1987            custom_size,
1988            upload_manager.policy.max_pending_pings_directory_size()
1989        );
1990    }
1991
1992    #[test]
1993    fn maximum_wait_attemps_is_enforced() {
1994        let (glean, dir) = new_glean(None);
1995
1996        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1997
1998        // Define a max_wait_attemps policy, this is disabled for tests by default.
1999        let max_wait_attempts = 3;
2000        upload_manager
2001            .policy
2002            .set_max_wait_attempts(Some(max_wait_attempts));
2003
2004        // Add a rate limiter to the upload mangager with max of 1 ping 5secs.
2005        //
2006        // We arbitrarily set the maximum pings per interval to a very low number,
2007        // when the rate limiter reaches it's limit get_upload_task returns a PingUploadTask::Wait,
2008        // which will allow us to test the limitations around returning too many of those in a row.
2009        let secs_per_interval = 5;
2010        let max_pings_per_interval = 1;
2011        upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
2012
2013        // Enqueue two pings
2014        upload_manager.enqueue_ping(
2015            &glean,
2016            PingPayload {
2017                document_id: Uuid::new_v4().to_string(),
2018                upload_path: PATH.into(),
2019                json_body: "".into(),
2020                headers: None,
2021                body_has_info_sections: true,
2022                ping_name: "ping-name".into(),
2023                uploader_capabilities: vec![],
2024            },
2025        );
2026        upload_manager.enqueue_ping(
2027            &glean,
2028            PingPayload {
2029                document_id: Uuid::new_v4().to_string(),
2030                upload_path: PATH.into(),
2031                json_body: "".into(),
2032                headers: None,
2033                body_has_info_sections: true,
2034                ping_name: "ping-name".into(),
2035                uploader_capabilities: vec![],
2036            },
2037        );
2038
2039        // Get the first ping, it should be returned normally.
2040        match upload_manager.get_upload_task(&glean, false) {
2041            PingUploadTask::Upload { .. } => {}
2042            _ => panic!("Expected upload manager to return the next request!"),
2043        }
2044
2045        // Try to get the next ping,
2046        // we should be throttled and thus get a PingUploadTask::Wait.
2047        // Check that we are indeed allowed to get this response as many times as expected.
2048        for _ in 0..max_wait_attempts {
2049            let task = upload_manager.get_upload_task(&glean, false);
2050            assert!(task.is_wait());
2051        }
2052
2053        // Check that after we get PingUploadTask::Wait the allowed number of times,
2054        // we then get PingUploadTask::Done.
2055        assert_eq!(
2056            upload_manager.get_upload_task(&glean, false),
2057            PingUploadTask::done()
2058        );
2059
2060        // Wait for the rate limiter to allow upload tasks again.
2061        thread::sleep(Duration::from_secs(secs_per_interval));
2062
2063        // Check that we are allowed again to get pings.
2064        let task = upload_manager.get_upload_task(&glean, false);
2065        assert!(task.is_upload());
2066
2067        // And once we are done we don't need to wait anymore.
2068        assert_eq!(
2069            upload_manager.get_upload_task(&glean, false),
2070            PingUploadTask::done()
2071        );
2072    }
2073
2074    #[test]
2075    fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
2076        let (glean, dir) = new_glean(None);
2077        let upload_manager = PingUploadManager::new(dir.path(), "test");
2078        match upload_manager.get_upload_task(&glean, false) {
2079            PingUploadTask::Wait { time } => {
2080                assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
2081            }
2082            _ => panic!("Expected upload manager to return a wait task!"),
2083        };
2084    }
2085
2086    #[test]
2087    fn cannot_enqueue_ping_while_its_being_processed() {
2088        let (glean, dir) = new_glean(None);
2089
2090        let upload_manager = PingUploadManager::no_policy(dir.path());
2091
2092        // Enqueue a ping and start processing it
2093        let identifier = &Uuid::new_v4();
2094        let ping = PingPayload {
2095            document_id: identifier.to_string(),
2096            upload_path: PATH.into(),
2097            json_body: "".into(),
2098            headers: None,
2099            body_has_info_sections: true,
2100            ping_name: "ping-name".into(),
2101            uploader_capabilities: vec![],
2102        };
2103        upload_manager.enqueue_ping(&glean, ping);
2104        assert!(upload_manager.get_upload_task(&glean, false).is_upload());
2105
2106        // Attempt to re-enqueue the same ping
2107        let ping = PingPayload {
2108            document_id: identifier.to_string(),
2109            upload_path: PATH.into(),
2110            json_body: "".into(),
2111            headers: None,
2112            body_has_info_sections: true,
2113            ping_name: "ping-name".into(),
2114            uploader_capabilities: vec![],
2115        };
2116        upload_manager.enqueue_ping(&glean, ping);
2117
2118        // No new pings should have been enqueued so the upload task is Done.
2119        assert_eq!(
2120            upload_manager.get_upload_task(&glean, false),
2121            PingUploadTask::done()
2122        );
2123
2124        // Process the upload response
2125        upload_manager.process_ping_upload_response(
2126            &glean,
2127            &identifier.to_string(),
2128            UploadResult::http_status(200),
2129        );
2130    }
2131}