use crate::metrics::{DatetimeMetric, StringMetric, TimeUnit};
use crate::storage::INTERNAL_STORAGE;
use crate::util::local_now_with_offset;
use crate::{CommonMetricData, Glean, Lifetime};
use chrono::prelude::*;
use chrono::Duration;
use once_cell::sync::Lazy;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::JoinHandle;
const SCHEDULED_HOUR: u32 = 4;
#[allow(clippy::mutex_atomic)]
static TASK_CONDVAR: Lazy<Arc<(Mutex<bool>, Condvar)>> =
Lazy::new(|| Arc::new((Mutex::new(false), Condvar::new())));
trait MetricsPingSubmitter {
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>);
}
trait MetricsPingScheduler {
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
);
}
struct GleanMetricsPingSubmitter {}
impl MetricsPingSubmitter for GleanMetricsPingSubmitter {
fn submit_metrics_ping(&self, glean: &Glean, reason: Option<&str>, now: DateTime<FixedOffset>) {
glean.submit_ping_by_name("metrics", reason);
get_last_sent_time_metric().set_sync_chrono(glean, now);
}
}
struct GleanMetricsPingScheduler {}
impl MetricsPingScheduler for GleanMetricsPingScheduler {
fn start_scheduler(
&self,
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
start_scheduler(submitter, now, when);
}
}
pub fn schedule(glean: &Glean) {
let now = local_now_with_offset();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR;
if *cancelled_lock.lock().unwrap() {
log::debug!("Told to schedule, but already cancelled. Are we in a test?");
}
*cancelled_lock.lock().unwrap() = false; let submitter = GleanMetricsPingSubmitter {};
let scheduler = GleanMetricsPingScheduler {};
schedule_internal(glean, submitter, scheduler, now)
}
pub fn cancel() {
let (cancelled_lock, condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = true; condvar.notify_all(); }
fn schedule_internal(
glean: &Glean,
submitter: impl MetricsPingSubmitter + Send + 'static,
scheduler: impl MetricsPingScheduler,
now: DateTime<FixedOffset>,
) {
let last_sent_build_metric = get_last_sent_build_metric();
if let Some(last_sent_build) = last_sent_build_metric.get_value(glean, Some(INTERNAL_STORAGE)) {
if last_sent_build != glean.app_build {
last_sent_build_metric.set_sync(glean, &glean.app_build);
log::info!("App build changed. Sending 'metrics' ping");
submitter.submit_metrics_ping(glean, Some("upgrade"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
return;
}
} else {
last_sent_build_metric.set_sync(glean, &glean.app_build);
}
let last_sent_time = get_last_sent_time_metric().get_value(glean, INTERNAL_STORAGE);
if let Some(last_sent) = last_sent_time {
log::info!("The 'metrics' ping was last sent on {}", last_sent);
}
let already_sent_today = last_sent_time.is_some_and(|d| d.date() == now.date());
if already_sent_today {
log::info!("The 'metrics' ping was already sent today, {}", now);
scheduler.start_scheduler(submitter, now, When::Tomorrow);
} else if now > now.date().and_hms(SCHEDULED_HOUR, 0, 0) {
log::info!("Sending the 'metrics' ping immediately, {}", now);
submitter.submit_metrics_ping(glean, Some("overdue"), now);
scheduler.start_scheduler(submitter, now, When::Reschedule);
} else {
log::info!("The 'metrics' collection is scheduled for today, {}", now);
scheduler.start_scheduler(submitter, now, When::Today);
}
}
#[derive(Debug, PartialEq)]
enum When {
Today,
Tomorrow,
Reschedule,
}
impl When {
fn until(&self, now: DateTime<FixedOffset>) -> std::time::Duration {
let fire_date = match self {
Self::Today => now.date().and_hms(SCHEDULED_HOUR, 0, 0),
Self::Tomorrow | Self::Reschedule => {
(now.date() + Duration::days(1)).and_hms(SCHEDULED_HOUR, 0, 0)
}
};
(fire_date - now)
.to_std()
.unwrap_or_else(|_| std::time::Duration::from_millis(0))
}
fn reason(&self) -> &'static str {
match self {
Self::Today => "today",
Self::Tomorrow => "tomorrow",
Self::Reschedule => "reschedule",
}
}
}
fn start_scheduler(
submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) -> JoinHandle<()> {
let pair = Arc::clone(&TASK_CONDVAR);
std::thread::Builder::new()
.name("glean.mps".into())
.spawn(move || {
let (cancelled_lock, condvar) = &*pair;
let mut when = when;
let mut now = now;
loop {
let dur = when.until(now);
log::info!("Scheduling for {} after {:?}, reason {:?}", now, dur, when);
let mut timed_out = false;
{
match condvar.wait_timeout_while(cancelled_lock.lock().unwrap(), dur, |cancelled| !*cancelled) {
Err(err) => {
log::warn!("Condvar wait failure. MPS exiting. {}", err);
break;
}
Ok((cancelled, wait_result)) => {
if *cancelled {
log::info!("Metrics Ping Scheduler cancelled. Exiting.");
break;
} else if wait_result.timed_out() {
timed_out = true;
} else {
log::warn!("Spurious wakeup of the MPS condvar should be impossible.");
}
}
}
}
if timed_out {
log::info!("Time to submit our metrics ping, {:?}", when);
let glean = crate::core::global_glean().expect("Global Glean not present when trying to send scheduled 'metrics' ping?!").lock().unwrap();
submitter.submit_metrics_ping(&glean, Some(when.reason()), now);
when = When::Reschedule;
}
now = local_now_with_offset();
}
}).expect("Unable to spawn Metrics Ping Scheduler thread.")
}
fn get_last_sent_time_metric() -> DatetimeMetric {
DatetimeMetric::new(
CommonMetricData {
name: "last_sent_time".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
},
TimeUnit::Minute,
)
}
fn get_last_sent_build_metric() -> StringMetric {
StringMetric::new(CommonMetricData {
name: "last_sent_build".into(),
category: "mps".into(),
send_in_pings: vec![INTERNAL_STORAGE.into()],
lifetime: Lifetime::User,
..Default::default()
})
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::new_glean;
use std::sync::atomic::{AtomicU32, Ordering};
struct ValidatingSubmitter<F: Fn(DateTime<FixedOffset>, Option<&str>)> {
submit_validator: F,
validator_run_count: Arc<AtomicU32>,
}
struct ValidatingScheduler<F: Fn(DateTime<FixedOffset>, When)> {
schedule_validator: F,
validator_run_count: Arc<AtomicU32>,
}
impl<F: Fn(DateTime<FixedOffset>, Option<&str>)> MetricsPingSubmitter for ValidatingSubmitter<F> {
fn submit_metrics_ping(
&self,
_glean: &Glean,
reason: Option<&str>,
now: DateTime<FixedOffset>,
) {
(self.submit_validator)(now, reason);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
impl<F: Fn(DateTime<FixedOffset>, When)> MetricsPingScheduler for ValidatingScheduler<F> {
fn start_scheduler(
&self,
_submitter: impl MetricsPingSubmitter + Send + 'static,
now: DateTime<FixedOffset>,
when: When,
) {
(self.schedule_validator)(now, when);
self.validator_run_count.fetch_add(1, Ordering::Relaxed);
}
}
fn new_proxies<
F1: Fn(DateTime<FixedOffset>, Option<&str>),
F2: Fn(DateTime<FixedOffset>, When),
>(
submit_validator: F1,
schedule_validator: F2,
) -> (
ValidatingSubmitter<F1>,
Arc<AtomicU32>,
ValidatingScheduler<F2>,
Arc<AtomicU32>,
) {
let submitter_count = Arc::new(AtomicU32::new(0));
let submitter = ValidatingSubmitter {
submit_validator,
validator_run_count: Arc::clone(&submitter_count),
};
let scheduler_count = Arc::new(AtomicU32::new(0));
let scheduler = ValidatingScheduler {
schedule_validator,
validator_run_count: Arc::clone(&scheduler_count),
};
(submitter, submitter_count, scheduler, scheduler_count)
}
#[test]
fn first_run_last_sent_build() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
let lsb_metric = get_last_sent_build_metric();
assert_eq!(None, lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE)));
let fake_now = FixedOffset::east(0)
.ymd(2022, 11, 15)
.and_hms(SCHEDULED_HOUR, 0, 1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
assert_eq!(
Some(glean.app_build.to_string()),
lsb_metric.get_value(&glean, Some(INTERNAL_STORAGE))
);
}
#[test]
fn different_app_builds_submit_and_reschedule() {
let (mut glean, _t) = new_glean(None);
glean.app_build = "a build".into();
get_last_sent_build_metric().set_sync(&glean, "a different build");
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("upgrade")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, local_now_with_offset());
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
#[test]
fn case_1_no_submit_but_schedule_tomorrow() {
let (glean, _t) = new_glean(None);
let fake_now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(14, 36, 14);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_now);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #1 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Tomorrow),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
#[test]
fn case_2_submit_ping_and_reschedule() {
let (glean, _t) = new_glean(None);
let fake_yesterday = FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| assert_eq!(reason, Some("overdue")),
|_, when| assert_eq!(when, When::Reschedule),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
#[test]
fn case_3_no_submit_but_schedule_today() {
let (glean, _t) = new_glean(None);
let fake_yesterday =
FixedOffset::east(0)
.ymd(2021, 4, 29)
.and_hms(SCHEDULED_HOUR - 1, 0, 1);
get_last_sent_time_metric().set_sync_chrono(&glean, fake_yesterday);
let fake_now = fake_yesterday + Duration::days(1);
let (submitter, submitter_count, scheduler, scheduler_count) = new_proxies(
|_, reason| panic!("Case #3 shouldn't submit a ping! reason: {:?}", reason),
|_, when| assert_eq!(when, When::Today),
);
schedule_internal(&glean, submitter, scheduler, fake_now);
assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
assert_eq!(1, scheduler_count.swap(0, Ordering::Relaxed));
}
#[test]
fn when_gets_at_least_some_date_math_correct() {
let now = FixedOffset::east(0).ymd(2021, 4, 30).and_hms(15, 2, 10);
assert_eq!(std::time::Duration::from_secs(0), When::Today.until(now));
let earlier = now.date().and_hms(SCHEDULED_HOUR - 1, 0, 0);
assert_eq!(
std::time::Duration::from_secs(3600),
When::Today.until(earlier)
);
assert_eq!(
std::time::Duration::from_secs(46670),
When::Tomorrow.until(now)
);
assert_eq!(
std::time::Duration::from_secs(46670),
When::Reschedule.until(now)
);
assert_eq!(When::Tomorrow.until(now), When::Reschedule.until(now));
assert_ne!(When::Tomorrow.reason(), When::Reschedule.reason());
}
static SCHEDULER_TEST_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
#[test]
fn cancellable_tasks_can_be_cancelled() {
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = false;
let now = FixedOffset::east(0)
.ymd(2021, 4, 30)
.and_hms(SCHEDULED_HOUR - 2, 0, 0);
let proxy_factory = || {
new_proxies(
|_, reason| {
panic!(
"Shouldn't submit when testing scheduler. reason: {:?}",
reason
)
},
|_, _| panic!("Not even using the scheduler this time."),
)
};
let (submitter, submitter_count, _, _) = proxy_factory();
let handle = start_scheduler(submitter, now, When::Today);
super::cancel();
handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; let handle = start_scheduler(submitter, now, When::Tomorrow);
super::cancel();
handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
let (submitter, submitter_count, _, _) = proxy_factory();
*cancelled_lock.lock().unwrap() = false; let handle = start_scheduler(submitter, now, When::Reschedule);
super::cancel();
handle.join().unwrap(); assert_eq!(0, submitter_count.swap(0, Ordering::Relaxed));
}
#[test]
fn immediate_task_runs_immediately() {
let _test_lock = SCHEDULER_TEST_MUTEX.lock().unwrap();
let (cancelled_lock, _condvar) = &**TASK_CONDVAR; *cancelled_lock.lock().unwrap() = false;
let (glean, _t) = new_glean(None);
assert!(
!glean.schedule_metrics_pings,
"Real schedulers not allowed in tests!"
);
assert!(crate::core::setup_glean(glean).is_ok());
let now = FixedOffset::east(0).ymd(2021, 4, 20).and_hms(15, 42, 0);
let (submitter, submitter_count, _, _) = new_proxies(
move |_, reason| {
assert_eq!(reason, Some("today"));
std::thread::spawn(super::cancel);
},
|_, _| panic!("Not using the scheduler this time."),
);
let handle = start_scheduler(submitter, now, When::Today);
handle.join().unwrap();
assert_eq!(1, submitter_count.swap(0, Ordering::Relaxed));
}
}