Skip to main content

glean_core/upload/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Manages the pending pings queue and directory.
6//!
7//! * Keeps track of pending pings, loading any unsent ping from disk on startup;
8//! * Exposes [`get_upload_task`](PingUploadManager::get_upload_task) API for
9//!   the platform layer to request next upload task;
10//! * Exposes
11//!   [`process_ping_upload_response`](PingUploadManager::process_ping_upload_response)
12//!   API to check the HTTP response from the ping upload and either delete the
13//!   corresponding ping from disk or re-enqueue it for sending.
14
15use std::collections::HashMap;
16use std::collections::VecDeque;
17use std::mem;
18use std::path::PathBuf;
19use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
20use std::sync::{Arc, RwLock, RwLockWriteGuard};
21use std::time::{Duration, Instant};
22
23use chrono::Utc;
24use malloc_size_of::MallocSizeOf;
25use malloc_size_of_derive::MallocSizeOf;
26
27use crate::error::ErrorKind;
28use crate::TimerId;
29use crate::{internal_metrics::UploadMetrics, Glean};
30pub use directory::process_metadata;
31use directory::{PingDirectoryManager, PingPayloadsByDirectory};
32use policy::Policy;
33use request::create_date_header_value;
34
35pub use directory::{PingMetadata, PingPayload};
36pub use request::{HeaderMap, PingRequest};
37pub use result::{UploadResult, UploadTaskAction};
38
39mod directory;
40mod policy;
41mod request;
42mod result;
43
44const WAIT_TIME_FOR_PING_PROCESSING: u64 = 1000; // in milliseconds
45
46#[derive(Debug, MallocSizeOf)]
47struct RateLimiter {
48    /// The instant the current interval has started.
49    started: Option<Instant>,
50    /// The count for the current interval.
51    count: u32,
52    /// The duration of each interval.
53    interval: Duration,
54    /// The maximum count per interval.
55    max_count: u32,
56}
57
58/// An enum to represent the current state of the RateLimiter.
59#[derive(PartialEq)]
60enum RateLimiterState {
61    /// The RateLimiter has not reached the maximum count and is still incrementing.
62    Incrementing,
63    /// The RateLimiter has reached the maximum count for the  current interval.
64    ///
65    /// This variant contains the remaining time (in milliseconds)
66    /// until the rate limiter is not throttled anymore.
67    Throttled(u64),
68}
69
70impl RateLimiter {
71    pub fn new(interval: Duration, max_count: u32) -> Self {
72        Self {
73            started: None,
74            count: 0,
75            interval,
76            max_count,
77        }
78    }
79
80    fn reset(&mut self) {
81        self.started = Some(Instant::now());
82        self.count = 0;
83    }
84
85    fn elapsed(&self) -> Duration {
86        self.started.unwrap().elapsed()
87    }
88
89    // The counter should reset if
90    //
91    // 1. It has never started;
92    // 2. It has been started more than the interval time ago;
93    // 3. Something goes wrong while trying to calculate the elapsed time since the last reset.
94    fn should_reset(&self) -> bool {
95        if self.started.is_none() {
96            return true;
97        }
98
99        // Safe unwrap, we already stated that `self.started` is not `None` above.
100        if self.elapsed() > self.interval {
101            return true;
102        }
103
104        false
105    }
106
107    /// Tries to increment the internal counter.
108    ///
109    /// # Returns
110    ///
111    /// The current state of the RateLimiter.
112    pub fn get_state(&mut self) -> RateLimiterState {
113        if self.should_reset() {
114            self.reset();
115        }
116
117        if self.count == self.max_count {
118            // Note that `remining` can't be a negative number because we just called `reset`,
119            // which will check if it is and reset if so.
120            let remaining = self.interval.as_millis() - self.elapsed().as_millis();
121            return RateLimiterState::Throttled(
122                remaining
123                    .try_into()
124                    .unwrap_or(self.interval.as_secs() * 1000),
125            );
126        }
127
128        self.count += 1;
129        RateLimiterState::Incrementing
130    }
131}
132
133/// An enum representing the possible upload tasks to be performed by an uploader.
134///
135/// When asking for the next ping request to upload,
136/// the requester may receive one out of three possible tasks.
137#[derive(PartialEq, Eq, Debug)]
138pub enum PingUploadTask {
139    /// An upload task
140    Upload {
141        /// The ping request for upload
142        /// See [`PingRequest`](struct.PingRequest.html) for more information.
143        request: PingRequest,
144    },
145
146    /// A flag signaling that the pending pings directories are not done being processed,
147    /// thus the requester should wait and come back later.
148    Wait {
149        /// The time in milliseconds
150        /// the requester should wait before requesting a new task.
151        time: u64,
152    },
153
154    /// A flag signaling that requester doesn't need to request any more upload tasks at this moment.
155    ///
156    /// There are three possibilities for this scenario:
157    /// * Pending pings queue is empty, no more pings to request;
158    /// * Requester has gotten more than MAX_WAIT_ATTEMPTS (3, by default) `PingUploadTask::Wait` responses in a row;
159    /// * Requester has reported more than MAX_RECOVERABLE_FAILURES_PER_UPLOADING_WINDOW
160    ///   recoverable upload failures on the same uploading window (see below)
161    ///   and should stop requesting at this moment.
162    ///
163    /// An "uploading window" starts when a requester gets a new
164    /// `PingUploadTask::Upload(PingRequest)` response and finishes when they
165    /// finally get a `PingUploadTask::Done` or `PingUploadTask::Wait` response.
166    Done {
167        #[doc(hidden)]
168        /// Unused field. Required because UniFFI can't handle variants without fields.
169        unused: i8,
170    },
171}
172
173impl PingUploadTask {
174    /// Whether the current task is an upload task.
175    pub fn is_upload(&self) -> bool {
176        matches!(self, PingUploadTask::Upload { .. })
177    }
178
179    /// Whether the current task is wait task.
180    pub fn is_wait(&self) -> bool {
181        matches!(self, PingUploadTask::Wait { .. })
182    }
183
184    pub(crate) fn done() -> Self {
185        PingUploadTask::Done { unused: 0 }
186    }
187}
188
189/// Manages the pending pings queue and directory.
190#[derive(Debug)]
191pub struct PingUploadManager {
192    /// A FIFO queue storing a `PingRequest` for each pending ping.
193    queue: RwLock<VecDeque<PingRequest>>,
194    /// A manager for the pending pings directories.
195    directory_manager: PingDirectoryManager,
196    /// A flag signaling if we are done processing the pending pings directories.
197    processed_pending_pings: Arc<AtomicBool>,
198    /// A vector to store the pending pings processed off-thread.
199    cached_pings: Arc<RwLock<PingPayloadsByDirectory>>,
200    /// The number of upload failures for the current uploading window.
201    recoverable_failure_count: AtomicU32,
202    /// The number or times in a row a user has received a `PingUploadTask::Wait` response.
203    wait_attempt_count: AtomicU32,
204    /// A ping counter to help rate limit the ping uploads.
205    ///
206    /// To keep resource usage in check,
207    /// we may want to limit the amount of pings sent in a given interval.
208    rate_limiter: Option<RwLock<RateLimiter>>,
209    /// The name of the programming language used by the binding creating this instance of PingUploadManager.
210    ///
211    /// This will be used to build the value User-Agent header for each ping request.
212    language_binding_name: String,
213    /// Metrics related to ping uploading.
214    upload_metrics: UploadMetrics,
215    /// Policies for ping storage, uploading and requests.
216    policy: Policy,
217
218    in_flight: RwLock<HashMap<String, (TimerId, TimerId)>>,
219}
220
221impl MallocSizeOf for PingUploadManager {
222    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
223        let shallow_size = {
224            let queue = self.queue.read().unwrap();
225            if ops.has_malloc_enclosing_size_of() {
226                if let Some(front) = queue.front() {
227                    // SAFETY: The front element is a valid interior pointer and thus valid to pass
228                    // to an external function.
229                    unsafe { ops.malloc_enclosing_size_of(front) }
230                } else {
231                    // This assumes that no memory is allocated when the VecDeque is empty.
232                    0
233                }
234            } else {
235                // If `ops` can't estimate the size of a pointer,
236                // we can estimate the allocation size by the size of each element and the
237                // allocated capacity.
238                queue.capacity() * mem::size_of::<PingRequest>()
239            }
240        };
241
242        let mut n = shallow_size
243            + self.directory_manager.size_of(ops)
244            + mem::size_of::<AtomicBool>() // Allocated inside the `self.processed_pending_pings` `Arc`.
245            + self.cached_pings.read().unwrap().size_of(ops)
246            + self.rate_limiter.as_ref().map(|rl| {
247                let lock = rl.read().unwrap();
248                (*lock).size_of(ops)
249            }).unwrap_or(0)
250            + self.language_binding_name.size_of(ops)
251            + self.upload_metrics.size_of(ops)
252            + self.policy.size_of(ops);
253
254        let in_flight = self.in_flight.read().unwrap();
255        n += in_flight.size_of(ops);
256
257        n
258    }
259}
260
261impl PingUploadManager {
262    /// Creates a new PingUploadManager.
263    ///
264    /// # Arguments
265    ///
266    /// * `data_path` - Path to the pending pings directory.
267    /// * `language_binding_name` - The name of the language binding calling this managers instance.
268    ///
269    /// # Panics
270    ///
271    /// Will panic if unable to spawn a new thread.
272    pub fn new<P: Into<PathBuf>>(data_path: P, language_binding_name: &str) -> Self {
273        Self {
274            queue: RwLock::new(VecDeque::new()),
275            directory_manager: PingDirectoryManager::new(data_path),
276            processed_pending_pings: Arc::new(AtomicBool::new(false)),
277            cached_pings: Arc::new(RwLock::new(PingPayloadsByDirectory::default())),
278            recoverable_failure_count: AtomicU32::new(0),
279            wait_attempt_count: AtomicU32::new(0),
280            rate_limiter: None,
281            language_binding_name: language_binding_name.into(),
282            upload_metrics: UploadMetrics::new(),
283            policy: Policy::default(),
284            in_flight: RwLock::new(HashMap::default()),
285        }
286    }
287
288    /// Spawns a new thread and processes the pending pings directories,
289    /// filling up the queue with whatever pings are in there.
290    ///
291    /// # Returns
292    ///
293    /// The `JoinHandle` to the spawned thread
294    pub fn scan_pending_pings_directories(
295        &self,
296        trigger_upload: bool,
297    ) -> std::thread::JoinHandle<()> {
298        let local_manager = self.directory_manager.clone();
299        let local_cached_pings = self.cached_pings.clone();
300        let local_flag = self.processed_pending_pings.clone();
301        crate::thread::spawn("glean.ping_directory_manager.process_dir", move || {
302            {
303                // Be sure to drop local_cached_pings lock before triggering upload.
304                let mut local_cached_pings = local_cached_pings
305                    .write()
306                    .expect("Can't write to pending pings cache.");
307                local_cached_pings.extend(local_manager.process_dirs());
308                local_flag.store(true, Ordering::SeqCst);
309            }
310            if trigger_upload {
311                crate::dispatcher::launch(|| {
312                    if let Some(state) = crate::maybe_global_state().and_then(|s| s.lock().ok()) {
313                        if let Err(e) = state.callbacks.trigger_upload() {
314                            log::error!(
315                                "Triggering upload after pending ping scan failed. Error: {}",
316                                e
317                            );
318                        }
319                    }
320                });
321            }
322        })
323        .expect("Unable to spawn thread to process pings directories.")
324    }
325
326    /// Creates a new upload manager with no limitations, for tests.
327    #[cfg(test)]
328    pub fn no_policy<P: Into<PathBuf>>(data_path: P) -> Self {
329        let mut upload_manager = Self::new(data_path, "Test");
330
331        // Disable all policies for tests, if necessary individuals tests can re-enable them.
332        upload_manager.policy.set_max_recoverable_failures(None);
333        upload_manager.policy.set_max_wait_attempts(None);
334        upload_manager.policy.set_max_ping_body_size(None);
335        upload_manager
336            .policy
337            .set_max_pending_pings_directory_size(None);
338        upload_manager.policy.set_max_pending_pings_count(None);
339
340        // When building for tests, always scan the pending pings directories and do it sync.
341        upload_manager
342            .scan_pending_pings_directories(false)
343            .join()
344            .unwrap();
345
346        upload_manager
347    }
348
349    fn processed_pending_pings(&self) -> bool {
350        self.processed_pending_pings.load(Ordering::SeqCst)
351    }
352
353    fn recoverable_failure_count(&self) -> u32 {
354        self.recoverable_failure_count.load(Ordering::SeqCst)
355    }
356
357    fn wait_attempt_count(&self) -> u32 {
358        self.wait_attempt_count.load(Ordering::SeqCst)
359    }
360
361    /// Attempts to build a ping request from a ping file payload.
362    ///
363    /// Returns the `PingRequest` or `None` if unable to build,
364    /// in which case it will delete the ping file and record an error.
365    fn build_ping_request(&self, glean: &Glean, ping: PingPayload) -> Option<PingRequest> {
366        let PingPayload {
367            document_id,
368            upload_path: path,
369            json_body: body,
370            headers,
371            body_has_info_sections,
372            ping_name,
373            uploader_capabilities,
374        } = ping;
375        let mut request = PingRequest::builder(
376            &self.language_binding_name,
377            self.policy.max_ping_body_size(),
378        )
379        .document_id(&document_id)
380        .path(path)
381        .body(body)
382        .body_has_info_sections(body_has_info_sections)
383        .ping_name(ping_name)
384        .uploader_capabilities(uploader_capabilities);
385
386        if let Some(headers) = headers {
387            request = request.headers(headers);
388        }
389
390        match request.build() {
391            Ok(request) => Some(request),
392            Err(e) => {
393                log::warn!("Error trying to build ping request: {}", e);
394                self.directory_manager.delete_file(&document_id);
395
396                // Record the error.
397                // Currently the only possible error is PingBodyOverflow.
398                if let ErrorKind::PingBodyOverflow(s) = e.kind() {
399                    self.upload_metrics
400                        .discarded_exceeding_pings_size
401                        .accumulate_sync(glean, *s as i64 / 1024);
402                }
403
404                None
405            }
406        }
407    }
408
409    /// Enqueue a ping for upload.
410    pub fn enqueue_ping(&self, glean: &Glean, ping: PingPayload) {
411        let mut queue = self
412            .queue
413            .write()
414            .expect("Can't write to pending pings queue.");
415
416        let PingPayload {
417            ref document_id,
418            upload_path: ref path,
419            ..
420        } = ping;
421        // Checks if a ping with this `document_id` is already enqueued.
422        if queue
423            .iter()
424            .any(|request| request.document_id.as_str() == document_id)
425        {
426            log::warn!(
427                "Attempted to enqueue a duplicate ping {} at {}.",
428                document_id,
429                path
430            );
431            return;
432        }
433
434        {
435            let in_flight = self.in_flight.read().unwrap();
436            if in_flight.contains_key(document_id) {
437                log::warn!(
438                    "Attempted to enqueue an in-flight ping {} at {}.",
439                    document_id,
440                    path
441                );
442                self.upload_metrics
443                    .in_flight_pings_dropped
444                    .add_sync(glean, 0);
445                return;
446            }
447        }
448
449        log::trace!("Enqueuing ping {} at {}", document_id, path);
450        if let Some(request) = self.build_ping_request(glean, ping) {
451            queue.push_back(request)
452        }
453    }
454
455    /// Enqueues pings that might have been cached.
456    ///
457    /// The size of the PENDING_PINGS_DIRECTORY directory will be calculated
458    /// (by accumulating each ping's size in that directory)
459    /// and in case we exceed the quota, defined by the `quota` arg,
460    /// outstanding pings get deleted and are not enqueued.
461    ///
462    /// The size of the DELETION_REQUEST_PINGS_DIRECTORY will not be calculated
463    /// and no deletion-request pings will be deleted. Deletion request pings
464    /// are not very common and usually don't contain any data,
465    /// we don't expect that directory to ever reach quota.
466    /// Most importantly, we don't want to ever delete deletion-request pings.
467    ///
468    /// # Arguments
469    ///
470    /// * `glean` - The Glean object holding the database.
471    fn enqueue_cached_pings(&self, glean: &Glean) {
472        let mut cached_pings = self
473            .cached_pings
474            .write()
475            .expect("Can't write to pending pings cache.");
476
477        if cached_pings.len() > 0 {
478            let mut pending_pings_directory_size: u64 = 0;
479            let mut pending_pings_count = 0;
480            let mut deleting = false;
481
482            let total = cached_pings.pending_pings.len() as u64;
483            self.upload_metrics
484                .pending_pings
485                .add_sync(glean, total.try_into().unwrap_or(0));
486
487            if total > self.policy.max_pending_pings_count() {
488                log::warn!(
489                    "More than {} pending pings in the directory, will delete {} old pings.",
490                    self.policy.max_pending_pings_count(),
491                    total - self.policy.max_pending_pings_count()
492                );
493            }
494
495            // The pending pings vector is sorted by date in ascending order (oldest -> newest).
496            // We need to calculate the size of the pending pings directory
497            // and delete the **oldest** pings in case quota is reached.
498            // Thus, we reverse the order of the pending pings vector,
499            // so that we iterate in descending order (newest -> oldest).
500            cached_pings.pending_pings.reverse();
501            cached_pings.pending_pings.retain(|(file_size, PingPayload {document_id, ..})| {
502                pending_pings_count += 1;
503                pending_pings_directory_size += file_size;
504
505                // We don't want to spam the log for every ping over the quota.
506                if !deleting && pending_pings_directory_size > self.policy.max_pending_pings_directory_size() {
507                    log::warn!(
508                        "Pending pings directory has reached the size quota of {} bytes, outstanding pings will be deleted.",
509                        self.policy.max_pending_pings_directory_size()
510                    );
511                    deleting = true;
512                }
513
514                // Once we reach the number of allowed pings we start deleting,
515                // no matter what size.
516                // We already log this before the loop.
517                if pending_pings_count > self.policy.max_pending_pings_count() {
518                    deleting = true;
519                }
520
521                if deleting && self.directory_manager.delete_file(document_id) {
522                    self.upload_metrics
523                        .deleted_pings_after_quota_hit
524                        .add_sync(glean, 1);
525                    return false;
526                }
527
528                true
529            });
530            // After calculating the size of the pending pings directory,
531            // we record the calculated number and reverse the pings array back for enqueueing.
532            cached_pings.pending_pings.reverse();
533            self.upload_metrics
534                .pending_pings_directory_size
535                .accumulate_sync(glean, pending_pings_directory_size as i64 / 1024);
536
537            // Enqueue the remaining pending pings and
538            // enqueue all deletion-request pings.
539            cached_pings
540                .deletion_request_pings
541                .drain(..)
542                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
543            cached_pings
544                .pending_pings
545                .drain(..)
546                .for_each(|(_, ping)| self.enqueue_ping(glean, ping));
547        }
548    }
549
550    /// Adds rate limiting capability to this upload manager.
551    ///
552    /// The rate limiter will limit the amount of calls to `get_upload_task` per interval.
553    ///
554    /// Setting this will restart count and timer in case there was a previous rate limiter set
555    /// (e.g. if we have reached the current limit and call this function, we start counting again
556    /// and the caller is allowed to asks for tasks).
557    ///
558    /// # Arguments
559    ///
560    /// * `interval` - the amount of seconds in each rate limiting window.
561    /// * `max_tasks` - the maximum amount of task requests allowed per interval.
562    pub fn set_rate_limiter(&mut self, interval: u64, max_tasks: u32) {
563        self.rate_limiter = Some(RwLock::new(RateLimiter::new(
564            Duration::from_secs(interval),
565            max_tasks,
566        )));
567    }
568
569    /// Reads a ping file, creates a `PingRequest` and adds it to the queue.
570    ///
571    /// Duplicate requests won't be added.
572    ///
573    /// # Arguments
574    ///
575    /// * `glean` - The Glean object holding the database.
576    /// * `document_id` - The UUID of the ping in question.
577    pub fn enqueue_ping_from_file(&self, glean: &Glean, document_id: &str) {
578        if let Some(ping) = self.directory_manager.process_file(document_id) {
579            self.enqueue_ping(glean, ping);
580        }
581    }
582
583    /// Clears the pending pings queue, leaves the deletion-request pings.
584    pub fn clear_ping_queue(&self) -> RwLockWriteGuard<'_, VecDeque<PingRequest>> {
585        log::trace!("Clearing ping queue");
586        let mut queue = self
587            .queue
588            .write()
589            .expect("Can't write to pending pings queue.");
590
591        queue.retain(|ping| ping.is_deletion_request());
592        log::trace!(
593            "{} pings left in the queue (only deletion-request expected)",
594            queue.len()
595        );
596        queue
597    }
598
599    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
600        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
601        //
602        // We want to limit the amount of PingUploadTask::Wait returned in a row,
603        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
604        let wait_or_done = |time: u64| {
605            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
606            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
607                PingUploadTask::done()
608            } else {
609                PingUploadTask::Wait { time }
610            }
611        };
612
613        if !self.processed_pending_pings() {
614            log::info!(
615                "Tried getting an upload task, but processing is ongoing. Will come back later."
616            );
617            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
618        }
619
620        // This is a no-op in case there are no cached pings.
621        self.enqueue_cached_pings(glean);
622
623        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
624            log::warn!(
625                "Reached maximum recoverable failures for the current uploading window. You are done."
626            );
627            return PingUploadTask::done();
628        }
629
630        let mut queue = self
631            .queue
632            .write()
633            .expect("Can't write to pending pings queue.");
634        match queue.front() {
635            Some(request) => {
636                if let Some(rate_limiter) = &self.rate_limiter {
637                    let mut rate_limiter = rate_limiter
638                        .write()
639                        .expect("Can't write to the rate limiter.");
640                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
641                        log::info!(
642                            "Tried getting an upload task, but we are throttled at the moment."
643                        );
644                        return wait_or_done(remaining);
645                    }
646                }
647
648                log::info!(
649                    "New upload task with id {} (path: {})",
650                    request.document_id,
651                    request.path
652                );
653
654                if log_ping {
655                    if let Some(body) = request.pretty_body() {
656                        chunked_log_info(&request.path, &body);
657                    } else {
658                        chunked_log_info(&request.path, "<invalid ping payload>");
659                    }
660                }
661
662                {
663                    // Synchronous timer starts.
664                    // We're in the uploader thread anyway.
665                    // But also: No data is stored on disk.
666                    let mut in_flight = self.in_flight.write().unwrap();
667                    let success_id = self.upload_metrics.send_success.start_sync();
668                    let failure_id = self.upload_metrics.send_failure.start_sync();
669                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
670                }
671
672                let mut request = queue.pop_front().unwrap();
673
674                // Adding the `Date` header just before actual upload happens.
675                request
676                    .headers
677                    .insert("Date".to_string(), create_date_header_value(Utc::now()));
678
679                PingUploadTask::Upload { request }
680            }
681            None => {
682                log::info!("No more pings to upload! You are done.");
683                PingUploadTask::done()
684            }
685        }
686    }
687
688    /// Gets the next `PingUploadTask`.
689    ///
690    /// # Arguments
691    ///
692    /// * `glean` - The Glean object holding the database.
693    /// * `log_ping` - Whether to log the ping before returning.
694    ///
695    /// # Returns
696    ///
697    /// The next [`PingUploadTask`](enum.PingUploadTask.html).
698    pub fn get_upload_task(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
699        let task = self.get_upload_task_internal(glean, log_ping);
700
701        if !task.is_wait() && self.wait_attempt_count() > 0 {
702            self.wait_attempt_count.store(0, Ordering::SeqCst);
703        }
704
705        if !task.is_upload() && self.recoverable_failure_count() > 0 {
706            self.recoverable_failure_count.store(0, Ordering::SeqCst);
707        }
708
709        task
710    }
711
712    /// Processes the response from an attempt to upload a ping.
713    ///
714    /// Based on the HTTP status of said response,
715    /// the possible outcomes are:
716    ///
717    /// * **200 - 299 Success**
718    ///   Any status on the 2XX range is considered a succesful upload,
719    ///   which means the corresponding ping file can be deleted.
720    ///   _Known 2XX status:_
721    ///   * 200 - OK. Request accepted into the pipeline.
722    ///
723    /// * **400 - 499 Unrecoverable error**
724    ///   Any status on the 4XX range means something our client did is not correct.
725    ///   It is unlikely that the client is going to recover from this by retrying,
726    ///   so in this case the corresponding ping file can also be deleted.
727    ///   _Known 4XX status:_
728    ///   * 404 - not found - POST/PUT to an unknown namespace
729    ///   * 405 - wrong request type (anything other than POST/PUT)
730    ///   * 411 - missing content-length header
731    ///   * 413 - request body too large Note that if we have badly-behaved clients that
732    ///           retry on 4XX, we should send back 202 on body/path too long).
733    ///   * 414 - request path too long (See above)
734    ///
735    /// * **Any other error**
736    ///   For any other error, a warning is logged and the ping is re-enqueued.
737    ///   _Known other errors:_
738    ///   * 500 - internal error
739    ///
740    /// # Note
741    ///
742    /// The disk I/O performed by this function is not done off-thread,
743    /// as it is expected to be called off-thread by the platform.
744    ///
745    /// # Arguments
746    ///
747    /// * `glean` - The Glean object holding the database.
748    /// * `document_id` - The UUID of the ping in question.
749    /// * `status` - The HTTP status of the response.
750    pub fn process_ping_upload_response(
751        &self,
752        glean: &Glean,
753        document_id: &str,
754        status: UploadResult,
755    ) -> UploadTaskAction {
756        use UploadResult::*;
757
758        let stop_time = zeitstempel::now_awake();
759
760        if let Some(label) = status.get_label() {
761            let metric = self.upload_metrics.ping_upload_failure.get(label);
762            metric.add_sync(glean, 1);
763        }
764
765        let send_ids = {
766            let mut lock = self.in_flight.write().unwrap();
767            lock.remove(document_id)
768        };
769
770        if send_ids.is_none() {
771            self.upload_metrics.missing_send_ids.add_sync(glean, 1);
772        }
773
774        match status {
775            HttpStatus { code } if (200..=299).contains(&code) => {
776                log::info!("Ping {} successfully sent {}.", document_id, code);
777                if let Some((success_id, failure_id)) = send_ids {
778                    self.upload_metrics
779                        .send_success
780                        .set_stop_and_accumulate(glean, success_id, stop_time);
781                    self.upload_metrics.send_failure.cancel_sync(failure_id);
782                }
783                self.directory_manager.delete_file(document_id);
784            }
785
786            UnrecoverableFailure { .. } | HttpStatus { code: 400..=499 } | Incapable { .. } => {
787                log::warn!(
788                    "Unrecoverable upload failure while attempting to send ping {}. Error was {:?}",
789                    document_id,
790                    status
791                );
792                if let Some((success_id, failure_id)) = send_ids {
793                    self.upload_metrics.send_success.cancel_sync(success_id);
794                    self.upload_metrics
795                        .send_failure
796                        .set_stop_and_accumulate(glean, failure_id, stop_time);
797                }
798                self.directory_manager.delete_file(document_id);
799            }
800
801            RecoverableFailure { .. } | HttpStatus { .. } => {
802                log::warn!(
803                    "Recoverable upload failure while attempting to send ping {}, will retry. Error was {:?}",
804                    document_id,
805                    status
806                );
807                if let Some((success_id, failure_id)) = send_ids {
808                    self.upload_metrics.send_success.cancel_sync(success_id);
809                    self.upload_metrics
810                        .send_failure
811                        .set_stop_and_accumulate(glean, failure_id, stop_time);
812                }
813                self.enqueue_ping_from_file(glean, document_id);
814                self.recoverable_failure_count
815                    .fetch_add(1, Ordering::SeqCst);
816            }
817
818            Done { .. } => {
819                log::debug!("Uploader signaled Done. Exiting.");
820                if let Some((success_id, failure_id)) = send_ids {
821                    self.upload_metrics.send_success.cancel_sync(success_id);
822                    self.upload_metrics.send_failure.cancel_sync(failure_id);
823                }
824                return UploadTaskAction::End;
825            }
826        };
827
828        UploadTaskAction::Next
829    }
830}
831
832/// Splits log message into chunks on Android.
833#[cfg(target_os = "android")]
834pub fn chunked_log_info(path: &str, payload: &str) {
835    // Since the logcat ring buffer size is configurable, but it's 'max payload' size is not,
836    // we must break apart long pings into chunks no larger than the max payload size of 4076b.
837    // We leave some head space for our prefix.
838    const MAX_LOG_PAYLOAD_SIZE_BYTES: usize = 4000;
839
840    // If the length of the ping will fit within one logcat payload, then we can
841    // short-circuit here and avoid some overhead, otherwise we must split up the
842    // message so that we don't truncate it.
843    if path.len() + payload.len() <= MAX_LOG_PAYLOAD_SIZE_BYTES {
844        log::info!("Glean ping to URL: {}\n{}", path, payload);
845        return;
846    }
847
848    // Otherwise we break it apart into chunks of smaller size,
849    // prefixing it with the path and a counter.
850    let mut start = 0;
851    let mut end = MAX_LOG_PAYLOAD_SIZE_BYTES;
852    let mut chunk_idx = 1;
853    // Might be off by 1 on edge cases, but do we really care?
854    let total_chunks = payload.len() / MAX_LOG_PAYLOAD_SIZE_BYTES + 1;
855
856    while end < payload.len() {
857        // Find char boundary from the end.
858        // It's UTF-8, so it is within 4 bytes from here.
859        for _ in 0..4 {
860            if payload.is_char_boundary(end) {
861                break;
862            }
863            end -= 1;
864        }
865
866        log::info!(
867            "Glean ping to URL: {} [Part {} of {}]\n{}",
868            path,
869            chunk_idx,
870            total_chunks,
871            &payload[start..end]
872        );
873
874        // Move on with the string
875        start = end;
876        end = end + MAX_LOG_PAYLOAD_SIZE_BYTES;
877        chunk_idx += 1;
878    }
879
880    // Print any suffix left
881    if start < payload.len() {
882        log::info!(
883            "Glean ping to URL: {} [Part {} of {}]\n{}",
884            path,
885            chunk_idx,
886            total_chunks,
887            &payload[start..]
888        );
889    }
890}
891
892/// Logs payload in one go (all other OS).
893#[cfg(not(target_os = "android"))]
894pub fn chunked_log_info(_path: &str, payload: &str) {
895    log::info!("{}", payload)
896}
897
898#[cfg(test)]
899mod test {
900    use std::thread;
901    use uuid::Uuid;
902
903    use super::*;
904    use crate::metrics::PingType;
905    use crate::{tests::new_glean, PENDING_PINGS_DIRECTORY};
906
907    const PATH: &str = "/submit/app_id/ping_name/schema_version/doc_id";
908
909    #[test]
910    fn doesnt_error_when_there_are_no_pending_pings() {
911        let (glean, _t) = new_glean(None);
912
913        // Try and get the next request.
914        // Verify request was not returned
915        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
916    }
917
918    #[test]
919    fn returns_ping_request_when_there_is_one() {
920        let (glean, dir) = new_glean(None);
921
922        let upload_manager = PingUploadManager::no_policy(dir.path());
923
924        // Enqueue a ping
925        upload_manager.enqueue_ping(
926            &glean,
927            PingPayload {
928                document_id: Uuid::new_v4().to_string(),
929                upload_path: PATH.into(),
930                json_body: "".into(),
931                headers: None,
932                body_has_info_sections: true,
933                ping_name: "ping-name".into(),
934                uploader_capabilities: vec![],
935            },
936        );
937
938        // Try and get the next request.
939        // Verify request was returned
940        let task = upload_manager.get_upload_task(&glean, false);
941        assert!(task.is_upload());
942    }
943
944    #[test]
945    fn returns_as_many_ping_requests_as_there_are() {
946        let (glean, dir) = new_glean(None);
947
948        let upload_manager = PingUploadManager::no_policy(dir.path());
949
950        // Enqueue a ping multiple times
951        let n = 10;
952        for _ in 0..n {
953            upload_manager.enqueue_ping(
954                &glean,
955                PingPayload {
956                    document_id: Uuid::new_v4().to_string(),
957                    upload_path: PATH.into(),
958                    json_body: "".into(),
959                    headers: None,
960                    body_has_info_sections: true,
961                    ping_name: "ping-name".into(),
962                    uploader_capabilities: vec![],
963                },
964            );
965        }
966
967        // Verify a request is returned for each submitted ping
968        for _ in 0..n {
969            let task = upload_manager.get_upload_task(&glean, false);
970            assert!(task.is_upload());
971        }
972
973        // Verify that after all requests are returned, none are left
974        assert_eq!(
975            upload_manager.get_upload_task(&glean, false),
976            PingUploadTask::done()
977        );
978    }
979
980    #[test]
981    fn limits_the_number_of_pings_when_there_is_rate_limiting() {
982        let (glean, dir) = new_glean(None);
983
984        let mut upload_manager = PingUploadManager::no_policy(dir.path());
985
986        // Add a rate limiter to the upload mangager with max of 10 pings every 3 seconds.
987        let max_pings_per_interval = 10;
988        upload_manager.set_rate_limiter(3, 10);
989
990        // Enqueue the max number of pings allowed per uploading window
991        for _ in 0..max_pings_per_interval {
992            upload_manager.enqueue_ping(
993                &glean,
994                PingPayload {
995                    document_id: Uuid::new_v4().to_string(),
996                    upload_path: PATH.into(),
997                    json_body: "".into(),
998                    headers: None,
999                    body_has_info_sections: true,
1000                    ping_name: "ping-name".into(),
1001                    uploader_capabilities: vec![],
1002                },
1003            );
1004        }
1005
1006        // Verify a request is returned for each submitted ping
1007        for _ in 0..max_pings_per_interval {
1008            let task = upload_manager.get_upload_task(&glean, false);
1009            assert!(task.is_upload());
1010        }
1011
1012        // Enqueue just one more ping
1013        upload_manager.enqueue_ping(
1014            &glean,
1015            PingPayload {
1016                document_id: Uuid::new_v4().to_string(),
1017                upload_path: PATH.into(),
1018                json_body: "".into(),
1019                headers: None,
1020                body_has_info_sections: true,
1021                ping_name: "ping-name".into(),
1022                uploader_capabilities: vec![],
1023            },
1024        );
1025
1026        // Verify that we are indeed told to wait because we are at capacity
1027        match upload_manager.get_upload_task(&glean, false) {
1028            PingUploadTask::Wait { time } => {
1029                // Wait for the uploading window to reset
1030                thread::sleep(Duration::from_millis(time));
1031            }
1032            _ => panic!("Expected upload manager to return a wait task!"),
1033        };
1034
1035        let task = upload_manager.get_upload_task(&glean, false);
1036        assert!(task.is_upload());
1037    }
1038
1039    #[test]
1040    fn clearing_the_queue_works_correctly() {
1041        let (glean, dir) = new_glean(None);
1042
1043        let upload_manager = PingUploadManager::no_policy(dir.path());
1044
1045        // Enqueue a ping multiple times
1046        for _ in 0..10 {
1047            upload_manager.enqueue_ping(
1048                &glean,
1049                PingPayload {
1050                    document_id: Uuid::new_v4().to_string(),
1051                    upload_path: PATH.into(),
1052                    json_body: "".into(),
1053                    headers: None,
1054                    body_has_info_sections: true,
1055                    ping_name: "ping-name".into(),
1056                    uploader_capabilities: vec![],
1057                },
1058            );
1059        }
1060
1061        // Clear the queue
1062        drop(upload_manager.clear_ping_queue());
1063
1064        // Verify there really isn't any ping in the queue
1065        assert_eq!(
1066            upload_manager.get_upload_task(&glean, false),
1067            PingUploadTask::done()
1068        );
1069    }
1070
1071    #[test]
1072    fn clearing_the_queue_doesnt_clear_deletion_request_pings() {
1073        let (mut glean, _t) = new_glean(None);
1074
1075        // Register a ping for testing
1076        let ping_type = PingType::new(
1077            "test",
1078            true,
1079            /* send_if_empty */ true,
1080            true,
1081            true,
1082            true,
1083            vec![],
1084            vec![],
1085            true,
1086            vec![],
1087        );
1088        glean.register_ping_type(&ping_type);
1089
1090        // Submit the ping multiple times
1091        let n = 10;
1092        for _ in 0..n {
1093            ping_type.submit_sync(&glean, None);
1094        }
1095
1096        glean
1097            .internal_pings
1098            .deletion_request
1099            .submit_sync(&glean, None);
1100
1101        // Clear the queue
1102        drop(glean.upload_manager.clear_ping_queue());
1103
1104        let upload_task = glean.get_upload_task();
1105        match upload_task {
1106            PingUploadTask::Upload { request } => assert!(request.is_deletion_request()),
1107            _ => panic!("Expected upload manager to return the next request!"),
1108        }
1109
1110        // Verify there really isn't any other pings in the queue
1111        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1112    }
1113
1114    #[test]
1115    fn fills_up_queue_successfully_from_disk() {
1116        let (mut glean, dir) = new_glean(None);
1117
1118        // Register a ping for testing
1119        let ping_type = PingType::new(
1120            "test",
1121            true,
1122            /* send_if_empty */ true,
1123            true,
1124            true,
1125            true,
1126            vec![],
1127            vec![],
1128            true,
1129            vec![],
1130        );
1131        glean.register_ping_type(&ping_type);
1132
1133        // Submit the ping multiple times
1134        let n = 10;
1135        for _ in 0..n {
1136            ping_type.submit_sync(&glean, None);
1137        }
1138
1139        // Create a new upload manager pointing to the same data_path as the glean instance.
1140        let upload_manager = PingUploadManager::no_policy(dir.path());
1141
1142        // Verify the requests were properly enqueued
1143        for _ in 0..n {
1144            let task = upload_manager.get_upload_task(&glean, false);
1145            assert!(task.is_upload());
1146        }
1147
1148        // Verify that after all requests are returned, none are left
1149        assert_eq!(
1150            upload_manager.get_upload_task(&glean, false),
1151            PingUploadTask::done()
1152        );
1153    }
1154
1155    #[test]
1156    fn processes_correctly_success_upload_response() {
1157        let (mut glean, dir) = new_glean(None);
1158
1159        // Register a ping for testing
1160        let ping_type = PingType::new(
1161            "test",
1162            true,
1163            /* send_if_empty */ true,
1164            true,
1165            true,
1166            true,
1167            vec![],
1168            vec![],
1169            true,
1170            vec![],
1171        );
1172        glean.register_ping_type(&ping_type);
1173
1174        // Submit a ping
1175        ping_type.submit_sync(&glean, None);
1176
1177        // Get the pending ping directory path
1178        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1179
1180        // Get the submitted PingRequest
1181        match glean.get_upload_task() {
1182            PingUploadTask::Upload { request } => {
1183                // Simulate the processing of a sucessfull request
1184                let document_id = request.document_id;
1185                glean.process_ping_upload_response(&document_id, UploadResult::http_status(200));
1186                // Verify file was deleted
1187                assert!(!pending_pings_dir.join(document_id).exists());
1188            }
1189            _ => panic!("Expected upload manager to return the next request!"),
1190        }
1191
1192        // Verify that after request is returned, none are left
1193        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1194    }
1195
1196    #[test]
1197    fn processes_correctly_client_error_upload_response() {
1198        let (mut glean, dir) = new_glean(None);
1199
1200        // Register a ping for testing
1201        let ping_type = PingType::new(
1202            "test",
1203            true,
1204            /* send_if_empty */ true,
1205            true,
1206            true,
1207            true,
1208            vec![],
1209            vec![],
1210            true,
1211            vec![],
1212        );
1213        glean.register_ping_type(&ping_type);
1214
1215        // Submit a ping
1216        ping_type.submit_sync(&glean, None);
1217
1218        // Get the pending ping directory path
1219        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1220
1221        // Get the submitted PingRequest
1222        match glean.get_upload_task() {
1223            PingUploadTask::Upload { request } => {
1224                // Simulate the processing of a client error
1225                let document_id = request.document_id;
1226                glean.process_ping_upload_response(&document_id, UploadResult::http_status(404));
1227                // Verify file was deleted
1228                assert!(!pending_pings_dir.join(document_id).exists());
1229            }
1230            _ => panic!("Expected upload manager to return the next request!"),
1231        }
1232
1233        // Verify that after request is returned, none are left
1234        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1235    }
1236
1237    #[test]
1238    fn processes_correctly_server_error_upload_response() {
1239        let (mut glean, _t) = new_glean(None);
1240
1241        // Register a ping for testing
1242        let ping_type = PingType::new(
1243            "test",
1244            true,
1245            /* send_if_empty */ true,
1246            true,
1247            true,
1248            true,
1249            vec![],
1250            vec![],
1251            true,
1252            vec![],
1253        );
1254        glean.register_ping_type(&ping_type);
1255
1256        // Submit a ping
1257        ping_type.submit_sync(&glean, None);
1258
1259        // Get the submitted PingRequest
1260        match glean.get_upload_task() {
1261            PingUploadTask::Upload { request } => {
1262                // Simulate the processing of a client error
1263                let document_id = request.document_id;
1264                glean.process_ping_upload_response(&document_id, UploadResult::http_status(500));
1265                // Verify this ping was indeed re-enqueued
1266                match glean.get_upload_task() {
1267                    PingUploadTask::Upload { request } => {
1268                        assert_eq!(document_id, request.document_id);
1269                    }
1270                    _ => panic!("Expected upload manager to return the next request!"),
1271                }
1272            }
1273            _ => panic!("Expected upload manager to return the next request!"),
1274        }
1275
1276        // Verify that after request is returned, none are left
1277        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1278    }
1279
1280    #[test]
1281    fn processes_correctly_unrecoverable_upload_response() {
1282        let (mut glean, dir) = new_glean(None);
1283
1284        // Register a ping for testing
1285        let ping_type = PingType::new(
1286            "test",
1287            true,
1288            /* send_if_empty */ true,
1289            true,
1290            true,
1291            true,
1292            vec![],
1293            vec![],
1294            true,
1295            vec![],
1296        );
1297        glean.register_ping_type(&ping_type);
1298
1299        // Submit a ping
1300        ping_type.submit_sync(&glean, None);
1301
1302        // Get the pending ping directory path
1303        let pending_pings_dir = dir.path().join(PENDING_PINGS_DIRECTORY);
1304
1305        // Get the submitted PingRequest
1306        match glean.get_upload_task() {
1307            PingUploadTask::Upload { request } => {
1308                // Simulate the processing of a client error
1309                let document_id = request.document_id;
1310                glean.process_ping_upload_response(
1311                    &document_id,
1312                    UploadResult::unrecoverable_failure(),
1313                );
1314                // Verify file was deleted
1315                assert!(!pending_pings_dir.join(document_id).exists());
1316            }
1317            _ => panic!("Expected upload manager to return the next request!"),
1318        }
1319
1320        // Verify that after request is returned, none are left
1321        assert_eq!(glean.get_upload_task(), PingUploadTask::done());
1322    }
1323
1324    #[test]
1325    fn new_pings_are_added_while_upload_in_progress() {
1326        let (glean, dir) = new_glean(None);
1327
1328        let upload_manager = PingUploadManager::no_policy(dir.path());
1329
1330        let doc1 = Uuid::new_v4().to_string();
1331        let path1 = format!("/submit/app_id/test-ping/1/{}", doc1);
1332
1333        let doc2 = Uuid::new_v4().to_string();
1334        let path2 = format!("/submit/app_id/test-ping/1/{}", doc2);
1335
1336        // Enqueue a ping
1337        upload_manager.enqueue_ping(
1338            &glean,
1339            PingPayload {
1340                document_id: doc1.clone(),
1341                upload_path: path1,
1342                json_body: "".into(),
1343                headers: None,
1344                body_has_info_sections: true,
1345                ping_name: "test-ping".into(),
1346                uploader_capabilities: vec![],
1347            },
1348        );
1349
1350        // Try and get the first request.
1351        let req = match upload_manager.get_upload_task(&glean, false) {
1352            PingUploadTask::Upload { request } => request,
1353            _ => panic!("Expected upload manager to return the next request!"),
1354        };
1355        assert_eq!(doc1, req.document_id);
1356
1357        // Schedule the next one while the first one is "in progress"
1358        upload_manager.enqueue_ping(
1359            &glean,
1360            PingPayload {
1361                document_id: doc2.clone(),
1362                upload_path: path2,
1363                json_body: "".into(),
1364                headers: None,
1365                body_has_info_sections: true,
1366                ping_name: "test-ping".into(),
1367                uploader_capabilities: vec![],
1368            },
1369        );
1370
1371        // Mark as processed
1372        upload_manager.process_ping_upload_response(
1373            &glean,
1374            &req.document_id,
1375            UploadResult::http_status(200),
1376        );
1377
1378        // Get the second request.
1379        let req = match upload_manager.get_upload_task(&glean, false) {
1380            PingUploadTask::Upload { request } => request,
1381            _ => panic!("Expected upload manager to return the next request!"),
1382        };
1383        assert_eq!(doc2, req.document_id);
1384
1385        // Mark as processed
1386        upload_manager.process_ping_upload_response(
1387            &glean,
1388            &req.document_id,
1389            UploadResult::http_status(200),
1390        );
1391
1392        // ... and then we're done.
1393        assert_eq!(
1394            upload_manager.get_upload_task(&glean, false),
1395            PingUploadTask::done()
1396        );
1397    }
1398
1399    #[test]
1400    fn adds_debug_view_header_to_requests_when_tag_is_set() {
1401        let (mut glean, _t) = new_glean(None);
1402
1403        glean.set_debug_view_tag("valid-tag");
1404
1405        // Register a ping for testing
1406        let ping_type = PingType::new(
1407            "test",
1408            true,
1409            /* send_if_empty */ true,
1410            true,
1411            true,
1412            true,
1413            vec![],
1414            vec![],
1415            true,
1416            vec![],
1417        );
1418        glean.register_ping_type(&ping_type);
1419
1420        // Submit a ping
1421        ping_type.submit_sync(&glean, None);
1422
1423        // Get the submitted PingRequest
1424        match glean.get_upload_task() {
1425            PingUploadTask::Upload { request } => {
1426                assert_eq!(request.headers.get("X-Debug-ID").unwrap(), "valid-tag")
1427            }
1428            _ => panic!("Expected upload manager to return the next request!"),
1429        }
1430    }
1431
1432    #[test]
1433    fn duplicates_are_not_enqueued() {
1434        let (glean, dir) = new_glean(None);
1435
1436        // Create a new upload manager so that we have access to its functions directly,
1437        // make it synchronous so we don't have to manually wait for the scanning to finish.
1438        let upload_manager = PingUploadManager::no_policy(dir.path());
1439
1440        let doc_id = Uuid::new_v4().to_string();
1441        let path = format!("/submit/app_id/test-ping/1/{}", doc_id);
1442
1443        // Try to enqueue a ping with the same doc_id twice
1444        upload_manager.enqueue_ping(
1445            &glean,
1446            PingPayload {
1447                document_id: doc_id.clone(),
1448                upload_path: path.clone(),
1449                json_body: "".into(),
1450                headers: None,
1451                body_has_info_sections: true,
1452                ping_name: "test-ping".into(),
1453                uploader_capabilities: vec![],
1454            },
1455        );
1456        upload_manager.enqueue_ping(
1457            &glean,
1458            PingPayload {
1459                document_id: doc_id,
1460                upload_path: path,
1461                json_body: "".into(),
1462                headers: None,
1463                body_has_info_sections: true,
1464                ping_name: "test-ping".into(),
1465                uploader_capabilities: vec![],
1466            },
1467        );
1468
1469        // Get a task once
1470        let task = upload_manager.get_upload_task(&glean, false);
1471        assert!(task.is_upload());
1472
1473        // There should be no more queued tasks
1474        assert_eq!(
1475            upload_manager.get_upload_task(&glean, false),
1476            PingUploadTask::done()
1477        );
1478    }
1479
1480    #[test]
1481    fn maximum_of_recoverable_errors_is_enforced_for_uploading_window() {
1482        let (mut glean, dir) = new_glean(None);
1483
1484        // Register a ping for testing
1485        let ping_type = PingType::new(
1486            "test",
1487            true,
1488            /* send_if_empty */ true,
1489            true,
1490            true,
1491            true,
1492            vec![],
1493            vec![],
1494            true,
1495            vec![],
1496        );
1497        glean.register_ping_type(&ping_type);
1498
1499        // Submit the ping multiple times
1500        let n = 5;
1501        for _ in 0..n {
1502            ping_type.submit_sync(&glean, None);
1503        }
1504
1505        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1506
1507        // Set a policy for max recoverable failures, this is usually disabled for tests.
1508        let max_recoverable_failures = 3;
1509        upload_manager
1510            .policy
1511            .set_max_recoverable_failures(Some(max_recoverable_failures));
1512
1513        // Return the max recoverable error failures in a row
1514        for _ in 0..max_recoverable_failures {
1515            match upload_manager.get_upload_task(&glean, false) {
1516                PingUploadTask::Upload { request } => {
1517                    upload_manager.process_ping_upload_response(
1518                        &glean,
1519                        &request.document_id,
1520                        UploadResult::recoverable_failure(),
1521                    );
1522                }
1523                _ => panic!("Expected upload manager to return the next request!"),
1524            }
1525        }
1526
1527        // Verify that after returning the max amount of recoverable failures,
1528        // we are done even though we haven't gotten all the enqueued requests.
1529        assert_eq!(
1530            upload_manager.get_upload_task(&glean, false),
1531            PingUploadTask::done()
1532        );
1533
1534        // Verify all requests are returned when we try again.
1535        for _ in 0..n {
1536            let task = upload_manager.get_upload_task(&glean, false);
1537            assert!(task.is_upload());
1538        }
1539    }
1540
1541    #[test]
1542    fn quota_is_enforced_when_enqueueing_cached_pings() {
1543        let (mut glean, dir) = new_glean(None);
1544
1545        // Register a ping for testing
1546        let ping_type = PingType::new(
1547            "test",
1548            true,
1549            /* send_if_empty */ true,
1550            true,
1551            true,
1552            true,
1553            vec![],
1554            vec![],
1555            true,
1556            vec![],
1557        );
1558        glean.register_ping_type(&ping_type);
1559
1560        // Submit the ping multiple times
1561        let n = 10;
1562        for _ in 0..n {
1563            ping_type.submit_sync(&glean, None);
1564        }
1565
1566        let directory_manager = PingDirectoryManager::new(dir.path());
1567        let pending_pings = directory_manager.process_dirs().pending_pings;
1568        // The pending pings array is sorted by date in ascending order,
1569        // the newest element is the last one.
1570        let (_, newest_ping) = &pending_pings.last().unwrap();
1571        let PingPayload {
1572            document_id: newest_ping_id,
1573            ..
1574        } = &newest_ping;
1575
1576        // Create a new upload manager pointing to the same data_path as the glean instance.
1577        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1578
1579        // Set the quota to just a little over the size on an empty ping file.
1580        // This way we can check that one ping is kept and all others are deleted.
1581        //
1582        // From manual testing I figured out an empty ping file is 324bytes,
1583        // I am setting this a little over just so that minor changes to the ping structure
1584        // don't immediatelly break this.
1585        upload_manager
1586            .policy
1587            .set_max_pending_pings_directory_size(Some(500));
1588
1589        // Get a task once
1590        // One ping should have been enqueued.
1591        // Make sure it is the newest ping.
1592        match upload_manager.get_upload_task(&glean, false) {
1593            PingUploadTask::Upload { request } => assert_eq!(&request.document_id, newest_ping_id),
1594            _ => panic!("Expected upload manager to return the next request!"),
1595        }
1596
1597        // Verify that no other requests were returned,
1598        // they should all have been deleted because pending pings quota was hit.
1599        assert_eq!(
1600            upload_manager.get_upload_task(&glean, false),
1601            PingUploadTask::done()
1602        );
1603
1604        // Verify that the correct number of deleted pings was recorded
1605        assert_eq!(
1606            n - 1,
1607            upload_manager
1608                .upload_metrics
1609                .deleted_pings_after_quota_hit
1610                .get_value(&glean, Some("metrics"))
1611                .unwrap()
1612        );
1613        assert_eq!(
1614            n,
1615            upload_manager
1616                .upload_metrics
1617                .pending_pings
1618                .get_value(&glean, Some("metrics"))
1619                .unwrap()
1620        );
1621    }
1622
1623    #[test]
1624    fn number_quota_is_enforced_when_enqueueing_cached_pings() {
1625        let (mut glean, dir) = new_glean(None);
1626
1627        // Register a ping for testing
1628        let ping_type = PingType::new(
1629            "test",
1630            true,
1631            /* send_if_empty */ true,
1632            true,
1633            true,
1634            true,
1635            vec![],
1636            vec![],
1637            true,
1638            vec![],
1639        );
1640        glean.register_ping_type(&ping_type);
1641
1642        // How many pings we allow at maximum
1643        let count_quota = 3;
1644        // The number of pings we fill the pending pings directory with.
1645        let n = 10;
1646
1647        // Submit the ping multiple times
1648        for _ in 0..n {
1649            ping_type.submit_sync(&glean, None);
1650        }
1651
1652        let directory_manager = PingDirectoryManager::new(dir.path());
1653        let pending_pings = directory_manager.process_dirs().pending_pings;
1654        // The pending pings array is sorted by date in ascending order,
1655        // the newest element is the last one.
1656        let expected_pings = pending_pings
1657            .iter()
1658            .rev()
1659            .take(count_quota)
1660            .map(|(_, ping)| ping.document_id.clone())
1661            .collect::<Vec<_>>();
1662
1663        // Create a new upload manager pointing to the same data_path as the glean instance.
1664        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1665
1666        upload_manager
1667            .policy
1668            .set_max_pending_pings_count(Some(count_quota as u64));
1669
1670        // Get a task once
1671        // One ping should have been enqueued.
1672        // Make sure it is the newest ping.
1673        for ping_id in expected_pings.iter().rev() {
1674            match upload_manager.get_upload_task(&glean, false) {
1675                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1676                _ => panic!("Expected upload manager to return the next request!"),
1677            }
1678        }
1679
1680        // Verify that no other requests were returned,
1681        // they should all have been deleted because pending pings quota was hit.
1682        assert_eq!(
1683            upload_manager.get_upload_task(&glean, false),
1684            PingUploadTask::done()
1685        );
1686
1687        // Verify that the correct number of deleted pings was recorded
1688        assert_eq!(
1689            (n - count_quota) as i32,
1690            upload_manager
1691                .upload_metrics
1692                .deleted_pings_after_quota_hit
1693                .get_value(&glean, Some("metrics"))
1694                .unwrap()
1695        );
1696        assert_eq!(
1697            n as i32,
1698            upload_manager
1699                .upload_metrics
1700                .pending_pings
1701                .get_value(&glean, Some("metrics"))
1702                .unwrap()
1703        );
1704    }
1705
1706    #[test]
1707    fn size_and_count_quota_work_together_size_first() {
1708        let (mut glean, dir) = new_glean(None);
1709
1710        // Register a ping for testing
1711        let ping_type = PingType::new(
1712            "test",
1713            true,
1714            /* send_if_empty */ true,
1715            true,
1716            true,
1717            true,
1718            vec![],
1719            vec![],
1720            true,
1721            vec![],
1722        );
1723        glean.register_ping_type(&ping_type);
1724
1725        let expected_number_of_pings = 3;
1726        // The number of pings we fill the pending pings directory with.
1727        let n = 10;
1728
1729        // Submit the ping multiple times
1730        for _ in 0..n {
1731            ping_type.submit_sync(&glean, None);
1732        }
1733
1734        let directory_manager = PingDirectoryManager::new(dir.path());
1735        let pending_pings = directory_manager.process_dirs().pending_pings;
1736        // The pending pings array is sorted by date in ascending order,
1737        // the newest element is the last one.
1738        let expected_pings = pending_pings
1739            .iter()
1740            .rev()
1741            .take(expected_number_of_pings)
1742            .map(|(_, ping)| ping.document_id.clone())
1743            .collect::<Vec<_>>();
1744
1745        // Create a new upload manager pointing to the same data_path as the glean instance.
1746        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1747
1748        // From manual testing we figured out a basically empty ping file is 399 bytes,
1749        // so this allows 3 pings with some headroom in case of future changes.
1750        upload_manager
1751            .policy
1752            .set_max_pending_pings_directory_size(Some(1300));
1753        upload_manager.policy.set_max_pending_pings_count(Some(5));
1754
1755        // Get a task once
1756        // One ping should have been enqueued.
1757        // Make sure it is the newest ping.
1758        for ping_id in expected_pings.iter().rev() {
1759            match upload_manager.get_upload_task(&glean, false) {
1760                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1761                _ => panic!("Expected upload manager to return the next request!"),
1762            }
1763        }
1764
1765        // Verify that no other requests were returned,
1766        // they should all have been deleted because pending pings quota was hit.
1767        assert_eq!(
1768            upload_manager.get_upload_task(&glean, false),
1769            PingUploadTask::done()
1770        );
1771
1772        // Verify that the correct number of deleted pings was recorded
1773        assert_eq!(
1774            (n - expected_number_of_pings) as i32,
1775            upload_manager
1776                .upload_metrics
1777                .deleted_pings_after_quota_hit
1778                .get_value(&glean, Some("metrics"))
1779                .unwrap()
1780        );
1781        assert_eq!(
1782            n as i32,
1783            upload_manager
1784                .upload_metrics
1785                .pending_pings
1786                .get_value(&glean, Some("metrics"))
1787                .unwrap()
1788        );
1789    }
1790
1791    #[test]
1792    fn size_and_count_quota_work_together_count_first() {
1793        let (mut glean, dir) = new_glean(None);
1794
1795        // Register a ping for testing
1796        let ping_type = PingType::new(
1797            "test",
1798            true,
1799            /* send_if_empty */ true,
1800            true,
1801            true,
1802            true,
1803            vec![],
1804            vec![],
1805            true,
1806            vec![],
1807        );
1808        glean.register_ping_type(&ping_type);
1809
1810        let expected_number_of_pings = 2;
1811        // The number of pings we fill the pending pings directory with.
1812        let n = 10;
1813
1814        // Submit the ping multiple times
1815        for _ in 0..n {
1816            ping_type.submit_sync(&glean, None);
1817        }
1818
1819        let directory_manager = PingDirectoryManager::new(dir.path());
1820        let pending_pings = directory_manager.process_dirs().pending_pings;
1821        // The pending pings array is sorted by date in ascending order,
1822        // the newest element is the last one.
1823        let expected_pings = pending_pings
1824            .iter()
1825            .rev()
1826            .take(expected_number_of_pings)
1827            .map(|(_, ping)| ping.document_id.clone())
1828            .collect::<Vec<_>>();
1829
1830        // Create a new upload manager pointing to the same data_path as the glean instance.
1831        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1832
1833        // From manual testing we figured out an empty ping file is 324bytes,
1834        // so this allows 3 pings.
1835        upload_manager
1836            .policy
1837            .set_max_pending_pings_directory_size(Some(1000));
1838        upload_manager.policy.set_max_pending_pings_count(Some(2));
1839
1840        // Get a task once
1841        // One ping should have been enqueued.
1842        // Make sure it is the newest ping.
1843        for ping_id in expected_pings.iter().rev() {
1844            match upload_manager.get_upload_task(&glean, false) {
1845                PingUploadTask::Upload { request } => assert_eq!(&request.document_id, ping_id),
1846                _ => panic!("Expected upload manager to return the next request!"),
1847            }
1848        }
1849
1850        // Verify that no other requests were returned,
1851        // they should all have been deleted because pending pings quota was hit.
1852        assert_eq!(
1853            upload_manager.get_upload_task(&glean, false),
1854            PingUploadTask::done()
1855        );
1856
1857        // Verify that the correct number of deleted pings was recorded
1858        assert_eq!(
1859            (n - expected_number_of_pings) as i32,
1860            upload_manager
1861                .upload_metrics
1862                .deleted_pings_after_quota_hit
1863                .get_value(&glean, Some("metrics"))
1864                .unwrap()
1865        );
1866        assert_eq!(
1867            n as i32,
1868            upload_manager
1869                .upload_metrics
1870                .pending_pings
1871                .get_value(&glean, Some("metrics"))
1872                .unwrap()
1873        );
1874    }
1875
1876    #[test]
1877    fn maximum_wait_attemps_is_enforced() {
1878        let (glean, dir) = new_glean(None);
1879
1880        let mut upload_manager = PingUploadManager::no_policy(dir.path());
1881
1882        // Define a max_wait_attemps policy, this is disabled for tests by default.
1883        let max_wait_attempts = 3;
1884        upload_manager
1885            .policy
1886            .set_max_wait_attempts(Some(max_wait_attempts));
1887
1888        // Add a rate limiter to the upload mangager with max of 1 ping 5secs.
1889        //
1890        // We arbitrarily set the maximum pings per interval to a very low number,
1891        // when the rate limiter reaches it's limit get_upload_task returns a PingUploadTask::Wait,
1892        // which will allow us to test the limitations around returning too many of those in a row.
1893        let secs_per_interval = 5;
1894        let max_pings_per_interval = 1;
1895        upload_manager.set_rate_limiter(secs_per_interval, max_pings_per_interval);
1896
1897        // Enqueue two pings
1898        upload_manager.enqueue_ping(
1899            &glean,
1900            PingPayload {
1901                document_id: Uuid::new_v4().to_string(),
1902                upload_path: PATH.into(),
1903                json_body: "".into(),
1904                headers: None,
1905                body_has_info_sections: true,
1906                ping_name: "ping-name".into(),
1907                uploader_capabilities: vec![],
1908            },
1909        );
1910        upload_manager.enqueue_ping(
1911            &glean,
1912            PingPayload {
1913                document_id: Uuid::new_v4().to_string(),
1914                upload_path: PATH.into(),
1915                json_body: "".into(),
1916                headers: None,
1917                body_has_info_sections: true,
1918                ping_name: "ping-name".into(),
1919                uploader_capabilities: vec![],
1920            },
1921        );
1922
1923        // Get the first ping, it should be returned normally.
1924        match upload_manager.get_upload_task(&glean, false) {
1925            PingUploadTask::Upload { .. } => {}
1926            _ => panic!("Expected upload manager to return the next request!"),
1927        }
1928
1929        // Try to get the next ping,
1930        // we should be throttled and thus get a PingUploadTask::Wait.
1931        // Check that we are indeed allowed to get this response as many times as expected.
1932        for _ in 0..max_wait_attempts {
1933            let task = upload_manager.get_upload_task(&glean, false);
1934            assert!(task.is_wait());
1935        }
1936
1937        // Check that after we get PingUploadTask::Wait the allowed number of times,
1938        // we then get PingUploadTask::Done.
1939        assert_eq!(
1940            upload_manager.get_upload_task(&glean, false),
1941            PingUploadTask::done()
1942        );
1943
1944        // Wait for the rate limiter to allow upload tasks again.
1945        thread::sleep(Duration::from_secs(secs_per_interval));
1946
1947        // Check that we are allowed again to get pings.
1948        let task = upload_manager.get_upload_task(&glean, false);
1949        assert!(task.is_upload());
1950
1951        // And once we are done we don't need to wait anymore.
1952        assert_eq!(
1953            upload_manager.get_upload_task(&glean, false),
1954            PingUploadTask::done()
1955        );
1956    }
1957
1958    #[test]
1959    fn wait_task_contains_expected_wait_time_when_pending_pings_dir_not_processed_yet() {
1960        let (glean, dir) = new_glean(None);
1961        let upload_manager = PingUploadManager::new(dir.path(), "test");
1962        match upload_manager.get_upload_task(&glean, false) {
1963            PingUploadTask::Wait { time } => {
1964                assert_eq!(time, WAIT_TIME_FOR_PING_PROCESSING);
1965            }
1966            _ => panic!("Expected upload manager to return a wait task!"),
1967        };
1968    }
1969
1970    #[test]
1971    fn cannot_enqueue_ping_while_its_being_processed() {
1972        let (glean, dir) = new_glean(None);
1973
1974        let upload_manager = PingUploadManager::no_policy(dir.path());
1975
1976        // Enqueue a ping and start processing it
1977        let identifier = &Uuid::new_v4();
1978        let ping = PingPayload {
1979            document_id: identifier.to_string(),
1980            upload_path: PATH.into(),
1981            json_body: "".into(),
1982            headers: None,
1983            body_has_info_sections: true,
1984            ping_name: "ping-name".into(),
1985            uploader_capabilities: vec![],
1986        };
1987        upload_manager.enqueue_ping(&glean, ping);
1988        assert!(upload_manager.get_upload_task(&glean, false).is_upload());
1989
1990        // Attempt to re-enqueue the same ping
1991        let ping = PingPayload {
1992            document_id: identifier.to_string(),
1993            upload_path: PATH.into(),
1994            json_body: "".into(),
1995            headers: None,
1996            body_has_info_sections: true,
1997            ping_name: "ping-name".into(),
1998            uploader_capabilities: vec![],
1999        };
2000        upload_manager.enqueue_ping(&glean, ping);
2001
2002        // No new pings should have been enqueued so the upload task is Done.
2003        assert_eq!(
2004            upload_manager.get_upload_task(&glean, false),
2005            PingUploadTask::done()
2006        );
2007
2008        // Process the upload response
2009        upload_manager.process_ping_upload_response(
2010            &glean,
2011            &identifier.to_string(),
2012            UploadResult::http_status(200),
2013        );
2014    }
2015}