remote_settings/
service.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18    client::RemoteState, config::BaseUrl, error::Error, storage::Storage,
19    telemetry::RemoteSettingsTelemetryWrapper, RemoteSettingsClient, RemoteSettingsConfig,
20    RemoteSettingsContext, RemoteSettingsServer, Result,
21};
22
23/// Internal Remote settings service API
24pub struct RemoteSettingsService {
25    inner: Mutex<RemoteSettingsServiceInner>,
26}
27
28struct RemoteSettingsServiceInner {
29    storage_dir: Utf8PathBuf,
30    base_url: BaseUrl,
31    bucket_name: String,
32    app_context: Option<RemoteSettingsContext>,
33    remote_state: RemoteState,
34    telemetry: RemoteSettingsTelemetryWrapper,
35    /// Weakrefs for all clients that we've created.  Note: this stores the
36    /// top-level/public `RemoteSettingsClient` structs rather than `client::RemoteSettingsClient`.
37    /// The reason for this is that we return Arcs to the public struct to the foreign code, so we
38    /// need to use the same type for our weakrefs.  The alternative would be to create 2 Arcs for
39    /// each client, which is wasteful.
40    clients: Vec<Weak<RemoteSettingsClient>>,
41}
42
43impl RemoteSettingsService {
44    /// Construct a [RemoteSettingsService]
45    ///
46    /// This is typically done early in the application-startup process
47    pub fn new(storage_dir: String, config: RemoteSettingsConfig) -> Self {
48        let storage_dir = storage_dir.into();
49        let base_url = config
50            .server
51            .unwrap_or(RemoteSettingsServer::Prod)
52            .get_base_url_with_prod_fallback();
53        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
54
55        Self {
56            inner: Mutex::new(RemoteSettingsServiceInner {
57                storage_dir,
58                base_url,
59                bucket_name,
60                app_context: config.app_context,
61                remote_state: RemoteState::default(),
62                telemetry: RemoteSettingsTelemetryWrapper::noop(),
63                clients: vec![],
64            }),
65        }
66    }
67
68    pub fn set_telemetry(&self, telemetry: RemoteSettingsTelemetryWrapper) {
69        self.inner.lock().telemetry = telemetry;
70    }
71
72    pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
73        let mut inner = self.inner.lock();
74        // Allow using in-memory databases for testing of external crates.
75        let storage = if inner.storage_dir == ":memory:" {
76            Storage::new(inner.storage_dir.clone())
77        } else {
78            Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
79        };
80
81        let client = Arc::new(RemoteSettingsClient::new(
82            inner.base_url.clone(),
83            inner.bucket_name.clone(),
84            collection_name.clone(),
85            inner.app_context.clone(),
86            storage,
87        ));
88        inner.clients.push(Arc::downgrade(&client));
89        client
90    }
91
92    /// Sync collections for all active clients
93    pub fn sync(&self) -> Result<Vec<String>> {
94        // Make sure we only sync each collection once, even if there are multiple clients
95        let mut synced_collections = HashSet::new();
96
97        let mut inner = self.inner.lock();
98        let changes = inner.fetch_changes()?;
99        let change_map: HashMap<_, _> = changes
100            .changes
101            .iter()
102            .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
103            .collect();
104        let bucket_name = inner.bucket_name.clone();
105
106        for client in inner.active_clients() {
107            let client = &client.internal;
108            let collection_name = client.collection_name();
109            let cid = format!("{bucket_name}/{collection_name}");
110            if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
111                if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
112                {
113                    if client_last_modified == *server_last_modified {
114                        trace!("skipping up-to-date collection: {collection_name}");
115                        inner.telemetry.report_uptake_up_to_date(&cid, None);
116                        continue;
117                    }
118                }
119            }
120            if synced_collections.insert(collection_name.to_string()) {
121                trace!("syncing collection: {collection_name}");
122                let start_time = std::time::Instant::now();
123                let sync_result = client.sync();
124                let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
125                match &sync_result {
126                    Ok(()) => inner.telemetry.report_uptake_success(&cid, Some(duration)),
127                    Err(e) => inner.telemetry.report_uptake_error(e, &cid),
128                }
129                sync_result?;
130            }
131        }
132        Ok(synced_collections.into_iter().collect())
133    }
134
135    /// Update the remote settings config
136    ///
137    /// This will cause all current and future clients to use new config and will delete any stored
138    /// records causing the clients to return new results from the new config.
139    pub fn update_config(&self, config: RemoteSettingsConfig) -> Result<()> {
140        let base_url = config
141            .server
142            .unwrap_or(RemoteSettingsServer::Prod)
143            .get_base_url()?;
144        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
145        let mut inner = self.inner.lock();
146        for client in inner.active_clients() {
147            client.internal.update_config(
148                base_url.clone(),
149                bucket_name.clone(),
150                config.app_context.clone(),
151            );
152        }
153        inner.base_url = base_url;
154        inner.bucket_name = bucket_name;
155        inner.app_context = config.app_context;
156        Ok(())
157    }
158
159    pub fn client_url(&self) -> Url {
160        let inner = self.inner.lock();
161        let base_url = inner.base_url.clone();
162        base_url.url().clone()
163    }
164}
165
166impl RemoteSettingsServiceInner {
167    // Find live clients in self.clients
168    //
169    // Also, drop dead weakrefs from the vec
170    fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
171        let mut active_clients = vec![];
172        self.clients.retain(|weak| {
173            if let Some(client) = weak.upgrade() {
174                active_clients.push(client);
175                true
176            } else {
177                false
178            }
179        });
180        active_clients
181    }
182
183    fn fetch_changes(&mut self) -> Result<Changes> {
184        let mut url = self.base_url.clone();
185        url.path_segments_mut()
186            .push("buckets")
187            .push("monitor")
188            .push("collections")
189            .push("changes")
190            .push("changeset");
191        // For now, always use `0` for the expected value.  This means we'll get updates based on
192        // the default TTL of 1 hour.
193        //
194        // Eventually, we should add support for push notifications and use the timestamp from the
195        // notification.
196        url.query_pairs_mut().append_pair("_expected", "0");
197        let url = url.into_inner();
198        trace!("make_request: {url}");
199        self.remote_state.ensure_no_backoff()?;
200
201        let start_time = std::time::Instant::now();
202        let req = Request::get(url);
203        let resp = req.send()?;
204
205        self.remote_state.handle_backoff_hint(&resp)?;
206
207        const TELEMETRY_SOURCE_POLL: &str = "settings-changes-monitoring";
208        if resp.is_success() {
209            let body = resp.json()?;
210            let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
211            self.telemetry
212                .report_uptake_success(TELEMETRY_SOURCE_POLL, Some(duration));
213            Ok(body)
214        } else {
215            let e = Error::response_error(&resp.url, format!("status code: {}", resp.status));
216            self.telemetry
217                .report_uptake_error(&e, TELEMETRY_SOURCE_POLL);
218            Err(e)
219        }
220    }
221}
222
223/// Data from the changes endpoint
224///
225/// https://remote-settings.readthedocs.io/en/latest/client-specifications.html#endpoints
226#[derive(Debug, Deserialize)]
227struct Changes {
228    changes: Vec<ChangesCollection>,
229}
230
231#[derive(Debug, Deserialize)]
232struct ChangesCollection {
233    collection: String,
234    bucket: String,
235    last_modified: u64,
236}
237
238#[cfg(test)]
239mod test {
240    use super::*;
241    use crate::telemetry::UptakeEventExtras;
242    use crate::{RemoteSettingsConfig, RemoteSettingsServer};
243    use mockito::{mock, Matcher};
244    use std::sync::Arc;
245
246    /// Telemetry implementation that records all events for later assertion.
247    struct FakeTelemetry {
248        events: std::sync::Mutex<Vec<UptakeEventExtras>>,
249    }
250
251    impl FakeTelemetry {
252        fn new() -> Self {
253            Self {
254                events: std::sync::Mutex::new(Vec::new()),
255            }
256        }
257    }
258
259    impl crate::telemetry::RemoteSettingsTelemetry for FakeTelemetry {
260        fn report_uptake(&self, extras: UptakeEventExtras) {
261            self.events.lock().unwrap().push(extras);
262        }
263    }
264
265    fn make_service(server_url: &str) -> (RemoteSettingsService, Arc<FakeTelemetry>) {
266        let service = RemoteSettingsService::new(
267            ":memory:".into(),
268            RemoteSettingsConfig {
269                server: Some(RemoteSettingsServer::Custom {
270                    url: server_url.into(),
271                }),
272                ..Default::default()
273            },
274        );
275        let telemetry: Arc<FakeTelemetry> = Arc::new(FakeTelemetry::new());
276        service.set_telemetry(RemoteSettingsTelemetryWrapper::new(telemetry.clone()));
277        (service, telemetry)
278    }
279
280    fn mock_monitor_changes(collection: &str, timestamp: u64) -> mockito::Mock {
281        mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
282            .match_query(Matcher::Any)
283            .with_status(200)
284            .with_header("content-type", "application/json")
285            .with_body(format!(
286                r#"{{"timestamp": {timestamp}, "changes": [{{"collection": "{collection}", "bucket": "main", "last_modified": {timestamp}}}]}}"#
287            ))
288            .create()
289    }
290
291    fn mock_changeset(collection: &str, timestamp: u64) -> mockito::Mock {
292        mock(
293            "GET",
294            format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
295        )
296        .match_query(Matcher::Any)
297        .with_status(200)
298        .with_header("content-type", "application/json")
299        .with_body(format!(
300            r#"{{"changes": [], "timestamp": {timestamp}, "metadata": {{"bucket": "main", "signatures": []}}}}"#
301        ))
302        .create()
303    }
304
305    fn mock_changeset_error(bucket: &str, collection: &str) -> mockito::Mock {
306        mock(
307            "GET",
308            format!("/v1/buckets/{bucket}/collections/{collection}/changeset").as_str(),
309        )
310        .match_query(Matcher::Any)
311        .with_status(500)
312        .with_body("server error")
313        .create()
314    }
315
316    #[test]
317    fn test_telemetry_network_error_on_changes_failure() {
318        viaduct_dev::init_backend_dev();
319        mock_changeset_error("monitor", "changes");
320
321        let (service, telemetry) = make_service(&mockito::server_url());
322        let _ = service.sync();
323
324        let events = telemetry.events.lock().unwrap();
325        assert_eq!(events.len(), 1);
326        assert_eq!(
327            events[0].source,
328            Some("settings-changes-monitoring".to_string())
329        );
330        assert_eq!(events[0].value, Some("network_error".to_string()));
331        assert_eq!(events[0].error_name, Some("ResponseError".to_string()));
332        assert!(events[0].error_name.is_some());
333    }
334
335    #[test]
336    fn test_telemetry_on_changes_success() {
337        viaduct_dev::init_backend_dev();
338        let _changes = mock_monitor_changes("cid", 42);
339
340        let (service, telemetry) = make_service(&mockito::server_url());
341        let _ = service.sync();
342
343        let events = telemetry.events.lock().unwrap();
344        assert_eq!(events.len(), 1);
345        assert_eq!(
346            events[0].source,
347            Some("settings-changes-monitoring".to_string())
348        );
349        assert_eq!(events[0].value, Some("success".to_string()));
350        assert!(events[0].duration.is_some());
351    }
352
353    #[cfg(not(feature = "signatures"))]
354    #[test]
355    fn test_telemetry_on_collection_success() {
356        viaduct_dev::init_backend_dev();
357        let collection = "cid";
358        let timestamp = 1774420582054u64;
359        let _changes = mock_monitor_changes(collection, timestamp);
360        let _changeset = mock_changeset(collection, timestamp);
361
362        let (service, telemetry) = make_service(&mockito::server_url());
363        let _client = service.make_client(collection.into());
364        let _ = service.sync();
365
366        let events = telemetry.events.lock().unwrap();
367        assert_eq!(events.len(), 2);
368        assert_eq!(
369            events[0].source,
370            Some("settings-changes-monitoring".to_string())
371        );
372        assert_eq!(events[1].source, Some(format!("main/{collection}")));
373        assert_eq!(events[1].value, Some("success".to_string()));
374        assert!(events[1].duration.is_some());
375    }
376
377    #[cfg(not(feature = "signatures"))]
378    #[test]
379    fn test_telemetry_on_collection_up_to_date() {
380        viaduct_dev::init_backend_dev();
381        let collection = "cid";
382        let timestamp = 1774420582054u64;
383        let _changes = mock_monitor_changes(collection, timestamp);
384        let _changeset = mock_changeset(collection, timestamp);
385
386        let (service, telemetry) = make_service(&mockito::server_url());
387        let _client = service.make_client(collection.into());
388
389        // First sync: populates local storage with timestamp.
390        let _ = service.sync();
391        let events_before = telemetry.events.lock().unwrap().len();
392        // Second sync.
393        let _ = service.sync();
394
395        let events = telemetry.events.lock().unwrap();
396        assert_eq!(events.len() - events_before, 2);
397        assert_eq!(
398            events[events_before].source,
399            Some("settings-changes-monitoring".to_string())
400        );
401        assert_eq!(
402            events[events_before + 1].source,
403            Some(format!("main/{collection}"))
404        );
405        assert_eq!(
406            events[events_before + 1].value,
407            Some("up_to_date".to_string())
408        );
409    }
410
411    #[test]
412    fn test_telemetry_on_collection_error() {
413        viaduct_dev::init_backend_dev();
414        let collection = "cid";
415        let timestamp = 1774420582054u64;
416        let _changes = mock_monitor_changes(collection, timestamp);
417        let _changeset = mock_changeset_error("main", collection);
418
419        let (service, telemetry) = make_service(&mockito::server_url());
420        let _client = service.make_client(collection.into());
421        let _ = service.sync();
422
423        let events = telemetry.events.lock().unwrap();
424        assert_eq!(events.len(), 2);
425        assert_eq!(
426            events[0].source,
427            Some("settings-changes-monitoring".to_string())
428        );
429        assert_eq!(events[0].value, Some("success".to_string()));
430        assert_eq!(events[1].source, Some(format!("main/{collection}")));
431        assert_eq!(events[1].value, Some("network_error".to_string()));
432        assert_eq!(events[1].error_name, Some("ResponseError".to_string()));
433    }
434
435    #[cfg(feature = "signatures")]
436    #[test]
437    fn test_telemetry_on_collection_signature_error() {
438        viaduct_dev::init_backend_dev();
439        let collection = "cid";
440        let timestamp = 1774420582054u64;
441        let _changes = mock_monitor_changes(collection, timestamp);
442        let _changeset = mock_changeset(collection, timestamp);
443
444        let (service, telemetry) = make_service(&mockito::server_url());
445        let _client = service.make_client(collection.into());
446        let _ = service.sync();
447
448        let events = telemetry.events.lock().unwrap();
449        assert_eq!(events.len(), 2);
450        assert_eq!(
451            events[0].source,
452            Some("settings-changes-monitoring".to_string())
453        );
454        assert_eq!(events[1].source, Some(format!("main/{collection}")));
455        assert_eq!(events[1].value, Some("signature_error".to_string()));
456        assert_eq!(
457            events[1].error_name,
458            Some("IncompleteSignatureDataError".to_string())
459        );
460    }
461}