remote_settings/
service.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18    client::RemoteState, config::BaseUrl, error::Error, storage::Storage,
19    telemetry::RemoteSettingsTelemetryWrapper, RemoteSettingsClient, RemoteSettingsConfig,
20    RemoteSettingsContext, RemoteSettingsServer, Result,
21};
22
23/// Internal Remote settings service API
24pub struct RemoteSettingsService {
25    inner: Mutex<RemoteSettingsServiceInner>,
26}
27
28struct RemoteSettingsServiceInner {
29    storage_dir: Utf8PathBuf,
30    base_url: BaseUrl,
31    bucket_name: String,
32    app_context: Option<RemoteSettingsContext>,
33    remote_state: RemoteState,
34    telemetry: RemoteSettingsTelemetryWrapper,
35    /// Weakrefs for all clients that we've created.  Note: this stores the
36    /// top-level/public `RemoteSettingsClient` structs rather than `client::RemoteSettingsClient`.
37    /// The reason for this is that we return Arcs to the public struct to the foreign code, so we
38    /// need to use the same type for our weakrefs.  The alternative would be to create 2 Arcs for
39    /// each client, which is wasteful.
40    clients: Vec<Weak<RemoteSettingsClient>>,
41}
42
43impl RemoteSettingsService {
44    /// Construct a [RemoteSettingsService]
45    ///
46    /// This is typically done early in the application-startup process
47    pub fn new(storage_dir: String, config: RemoteSettingsConfig) -> Self {
48        let storage_dir = storage_dir.into();
49        let base_url = config
50            .server
51            .unwrap_or(RemoteSettingsServer::Prod)
52            .get_base_url_with_prod_fallback();
53        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
54
55        Self {
56            inner: Mutex::new(RemoteSettingsServiceInner {
57                storage_dir,
58                base_url,
59                bucket_name,
60                app_context: config.app_context,
61                remote_state: RemoteState::default(),
62                telemetry: RemoteSettingsTelemetryWrapper::noop(),
63                clients: vec![],
64            }),
65        }
66    }
67
68    pub fn set_telemetry(&self, telemetry: RemoteSettingsTelemetryWrapper) {
69        self.inner.lock().telemetry = telemetry;
70    }
71
72    pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
73        let mut inner = self.inner.lock();
74        // Allow using in-memory databases for testing of external crates.
75        let storage = if inner.storage_dir == ":memory:" {
76            Storage::new(inner.storage_dir.clone())
77        } else {
78            Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
79        };
80
81        let client = Arc::new(RemoteSettingsClient::new(
82            inner.base_url.clone(),
83            inner.bucket_name.clone(),
84            collection_name.clone(),
85            inner.app_context.clone(),
86            storage,
87        ));
88        inner.clients.push(Arc::downgrade(&client));
89        client
90    }
91
92    /// Sync collections for all active clients
93    pub fn sync(&self) -> Result<Vec<String>> {
94        // Make sure we only sync each collection once, even if there are multiple clients
95        let mut synced_collections = HashSet::new();
96
97        let mut inner = self.inner.lock();
98        let changes = inner.fetch_changes()?;
99        let change_map: HashMap<_, _> = changes
100            .changes
101            .iter()
102            .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
103            .collect();
104        let bucket_name = inner.bucket_name.clone();
105
106        let active_clients = inner.active_clients();
107        for client in &active_clients {
108            let client = &client.internal;
109            let collection_name = client.collection_name();
110            let cid = format!("{bucket_name}/{collection_name}");
111            if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
112                if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
113                {
114                    if client_last_modified == *server_last_modified {
115                        trace!("skipping up-to-date collection: {collection_name}");
116                        inner.telemetry.report_uptake_up_to_date(&cid, None);
117                        continue;
118                    }
119                }
120            }
121            if synced_collections.insert(collection_name.to_string()) {
122                trace!("syncing collection: {collection_name}");
123                let start_time = std::time::Instant::now();
124                let sync_result = client.sync();
125                let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
126                match &sync_result {
127                    Ok(()) => inner.telemetry.report_uptake_success(&cid, Some(duration)),
128                    Err(e) => inner.telemetry.report_uptake_error(e, &cid),
129                }
130                sync_result?;
131            }
132        }
133
134        // Run SQLite maintenance after sync so SQLite can reclaim pages freed by
135        // attachment cleanup and enable/use incremental auto-vacuum.
136        for client in &active_clients {
137            let client = &client.internal;
138            let collection_name = client.collection_name();
139
140            if synced_collections.contains(collection_name) {
141                trace!("running maintenance for collection: {collection_name}");
142                client.run_maintenance()?;
143            }
144        }
145
146        Ok(synced_collections.into_iter().collect())
147    }
148
149    /// Update the remote settings config
150    ///
151    /// This will cause all current and future clients to use new config and will delete any stored
152    /// records causing the clients to return new results from the new config.
153    pub fn update_config(&self, config: RemoteSettingsConfig) -> Result<()> {
154        let base_url = config
155            .server
156            .unwrap_or(RemoteSettingsServer::Prod)
157            .get_base_url()?;
158        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
159        let mut inner = self.inner.lock();
160        for client in inner.active_clients() {
161            client.internal.update_config(
162                base_url.clone(),
163                bucket_name.clone(),
164                config.app_context.clone(),
165            );
166        }
167        inner.base_url = base_url;
168        inner.bucket_name = bucket_name;
169        inner.app_context = config.app_context;
170        Ok(())
171    }
172
173    pub fn client_url(&self) -> Url {
174        let inner = self.inner.lock();
175        let base_url = inner.base_url.clone();
176        base_url.url().clone()
177    }
178}
179
180impl RemoteSettingsServiceInner {
181    // Find live clients in self.clients
182    //
183    // Also, drop dead weakrefs from the vec
184    fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
185        let mut active_clients = vec![];
186        self.clients.retain(|weak| {
187            if let Some(client) = weak.upgrade() {
188                active_clients.push(client);
189                true
190            } else {
191                false
192            }
193        });
194        active_clients
195    }
196
197    fn fetch_changes(&mut self) -> Result<Changes> {
198        let mut url = self.base_url.clone();
199        url.path_segments_mut()
200            .push("buckets")
201            .push("monitor")
202            .push("collections")
203            .push("changes")
204            .push("changeset");
205        // For now, always use `0` for the expected value.  This means we'll get updates based on
206        // the default TTL of 1 hour.
207        //
208        // Eventually, we should add support for push notifications and use the timestamp from the
209        // notification.
210        url.query_pairs_mut().append_pair("_expected", "0");
211        let url = url.into_inner();
212        trace!("make_request: {url}");
213        self.remote_state.ensure_no_backoff()?;
214
215        let start_time = std::time::Instant::now();
216        let req = Request::get(url);
217        let resp = req.send()?;
218
219        self.remote_state.handle_backoff_hint(&resp)?;
220
221        const TELEMETRY_SOURCE_POLL: &str = "settings-changes-monitoring";
222        if resp.is_success() {
223            let body = resp.json()?;
224            let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
225            self.telemetry
226                .report_uptake_success(TELEMETRY_SOURCE_POLL, Some(duration));
227            Ok(body)
228        } else {
229            let e = Error::response_error(&resp.url, format!("status code: {}", resp.status));
230            self.telemetry
231                .report_uptake_error(&e, TELEMETRY_SOURCE_POLL);
232            Err(e)
233        }
234    }
235}
236
237/// Data from the changes endpoint
238///
239/// https://remote-settings.readthedocs.io/en/latest/client-specifications.html#endpoints
240#[derive(Debug, Deserialize)]
241struct Changes {
242    changes: Vec<ChangesCollection>,
243}
244
245#[derive(Debug, Deserialize)]
246struct ChangesCollection {
247    collection: String,
248    bucket: String,
249    last_modified: u64,
250}
251
252#[cfg(test)]
253mod test {
254    use super::*;
255    use crate::telemetry::UptakeEventExtras;
256    use crate::{RemoteSettingsConfig, RemoteSettingsServer};
257    use mockito::{mock, Matcher};
258    use std::sync::Arc;
259
260    /// Telemetry implementation that records all events for later assertion.
261    struct FakeTelemetry {
262        events: std::sync::Mutex<Vec<UptakeEventExtras>>,
263    }
264
265    impl FakeTelemetry {
266        fn new() -> Self {
267            Self {
268                events: std::sync::Mutex::new(Vec::new()),
269            }
270        }
271    }
272
273    impl crate::telemetry::RemoteSettingsTelemetry for FakeTelemetry {
274        fn report_uptake(&self, extras: UptakeEventExtras) {
275            self.events.lock().unwrap().push(extras);
276        }
277    }
278
279    fn make_service(server_url: &str) -> (RemoteSettingsService, Arc<FakeTelemetry>) {
280        let service = RemoteSettingsService::new(
281            ":memory:".into(),
282            RemoteSettingsConfig {
283                server: Some(RemoteSettingsServer::Custom {
284                    url: server_url.into(),
285                }),
286                ..Default::default()
287            },
288        );
289        let telemetry: Arc<FakeTelemetry> = Arc::new(FakeTelemetry::new());
290        service.set_telemetry(RemoteSettingsTelemetryWrapper::new(telemetry.clone()));
291        (service, telemetry)
292    }
293
294    fn mock_monitor_changes(collection: &str, timestamp: u64) -> mockito::Mock {
295        mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
296            .match_query(Matcher::Any)
297            .with_status(200)
298            .with_header("content-type", "application/json")
299            .with_body(format!(
300                r#"{{"timestamp": {timestamp}, "changes": [{{"collection": "{collection}", "bucket": "main", "last_modified": {timestamp}}}]}}"#
301            ))
302            .create()
303    }
304
305    fn mock_changeset(collection: &str, timestamp: u64) -> mockito::Mock {
306        mock(
307            "GET",
308            format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
309        )
310        .match_query(Matcher::Any)
311        .with_status(200)
312        .with_header("content-type", "application/json")
313        .with_body(format!(
314            r#"{{"changes": [], "timestamp": {timestamp}, "metadata": {{"bucket": "main", "signatures": []}}}}"#
315        ))
316        .create()
317    }
318
319    fn mock_changeset_error(bucket: &str, collection: &str) -> mockito::Mock {
320        mock(
321            "GET",
322            format!("/v1/buckets/{bucket}/collections/{collection}/changeset").as_str(),
323        )
324        .match_query(Matcher::Any)
325        .with_status(500)
326        .with_body("server error")
327        .create()
328    }
329
330    #[test]
331    fn test_telemetry_network_error_on_changes_failure() {
332        viaduct_dev::init_backend_dev();
333        mock_changeset_error("monitor", "changes");
334
335        let (service, telemetry) = make_service(&mockito::server_url());
336        let _ = service.sync();
337
338        let events = telemetry.events.lock().unwrap();
339        assert_eq!(events.len(), 1);
340        assert_eq!(
341            events[0].source,
342            Some("settings-changes-monitoring".to_string())
343        );
344        assert_eq!(events[0].value, Some("network_error".to_string()));
345        assert_eq!(events[0].error_name, Some("ResponseError".to_string()));
346        assert!(events[0].error_name.is_some());
347    }
348
349    #[test]
350    fn test_telemetry_on_changes_success() {
351        viaduct_dev::init_backend_dev();
352        let _changes = mock_monitor_changes("cid", 42);
353
354        let (service, telemetry) = make_service(&mockito::server_url());
355        let _ = service.sync();
356
357        let events = telemetry.events.lock().unwrap();
358        assert_eq!(events.len(), 1);
359        assert_eq!(
360            events[0].source,
361            Some("settings-changes-monitoring".to_string())
362        );
363        assert_eq!(events[0].value, Some("success".to_string()));
364        assert!(events[0].duration.is_some());
365    }
366
367    #[cfg(not(feature = "signatures"))]
368    #[test]
369    fn test_telemetry_on_collection_success() {
370        viaduct_dev::init_backend_dev();
371        let collection = "cid";
372        let timestamp = 1774420582054u64;
373        let _changes = mock_monitor_changes(collection, timestamp);
374        let _changeset = mock_changeset(collection, timestamp);
375
376        let (service, telemetry) = make_service(&mockito::server_url());
377        let _client = service.make_client(collection.into());
378        let _ = service.sync();
379
380        let events = telemetry.events.lock().unwrap();
381        assert_eq!(events.len(), 2);
382        assert_eq!(
383            events[0].source,
384            Some("settings-changes-monitoring".to_string())
385        );
386        assert_eq!(events[1].source, Some(format!("main/{collection}")));
387        assert_eq!(events[1].value, Some("success".to_string()));
388        assert!(events[1].duration.is_some());
389    }
390
391    #[cfg(not(feature = "signatures"))]
392    #[test]
393    fn test_telemetry_on_collection_up_to_date() {
394        viaduct_dev::init_backend_dev();
395        let collection = "cid";
396        let timestamp = 1774420582054u64;
397        let _changes = mock_monitor_changes(collection, timestamp);
398        let _changeset = mock_changeset(collection, timestamp);
399
400        let (service, telemetry) = make_service(&mockito::server_url());
401        let _client = service.make_client(collection.into());
402
403        // First sync: populates local storage with timestamp.
404        let _ = service.sync();
405        let events_before = telemetry.events.lock().unwrap().len();
406        // Second sync.
407        let _ = service.sync();
408
409        let events = telemetry.events.lock().unwrap();
410        assert_eq!(events.len() - events_before, 2);
411        assert_eq!(
412            events[events_before].source,
413            Some("settings-changes-monitoring".to_string())
414        );
415        assert_eq!(
416            events[events_before + 1].source,
417            Some(format!("main/{collection}"))
418        );
419        assert_eq!(
420            events[events_before + 1].value,
421            Some("up_to_date".to_string())
422        );
423    }
424
425    #[test]
426    fn test_telemetry_on_collection_error() {
427        viaduct_dev::init_backend_dev();
428        let collection = "cid";
429        let timestamp = 1774420582054u64;
430        let _changes = mock_monitor_changes(collection, timestamp);
431        let _changeset = mock_changeset_error("main", collection);
432
433        let (service, telemetry) = make_service(&mockito::server_url());
434        let _client = service.make_client(collection.into());
435        let _ = service.sync();
436
437        let events = telemetry.events.lock().unwrap();
438        assert_eq!(events.len(), 2);
439        assert_eq!(
440            events[0].source,
441            Some("settings-changes-monitoring".to_string())
442        );
443        assert_eq!(events[0].value, Some("success".to_string()));
444        assert_eq!(events[1].source, Some(format!("main/{collection}")));
445        assert_eq!(events[1].value, Some("network_error".to_string()));
446        assert_eq!(events[1].error_name, Some("ResponseError".to_string()));
447    }
448
449    #[cfg(feature = "signatures")]
450    #[test]
451    fn test_telemetry_on_collection_signature_error() {
452        viaduct_dev::init_backend_dev();
453        let collection = "cid";
454        let timestamp = 1774420582054u64;
455        let _changes = mock_monitor_changes(collection, timestamp);
456        let _changeset = mock_changeset(collection, timestamp);
457
458        let (service, telemetry) = make_service(&mockito::server_url());
459        let _client = service.make_client(collection.into());
460        let _ = service.sync();
461
462        let events = telemetry.events.lock().unwrap();
463        assert_eq!(events.len(), 2);
464        assert_eq!(
465            events[0].source,
466            Some("settings-changes-monitoring".to_string())
467        );
468        assert_eq!(events[1].source, Some(format!("main/{collection}")));
469        assert_eq!(events[1].value, Some("signature_error".to_string()));
470        assert_eq!(
471            events[1].error_name,
472            Some("IncompleteSignatureDataError".to_string())
473        );
474    }
475
476    #[cfg(not(feature = "signatures"))]
477    #[test]
478    fn test_sync_maintenance_shrinks_db_after_attachment_cleanup() -> Result<()> {
479        use crate::RemoteSettingsRecord;
480        use sha2::Digest;
481        viaduct_dev::init_backend_dev();
482
483        let collection = "cid";
484        let temp_dir = tempfile::tempdir().expect("create temp dir");
485        let db_path = temp_dir.path().join(format!("{collection}.sql"));
486
487        let attachment_data = vec![0x41; 5 * 1024 * 1024];
488        let attachment_hash = format!("{:x}", sha2::Sha256::digest(&attachment_data));
489
490        let attachment_record = format!(
491            r#"{{
492                "id": "record-with-attachment",
493                "last_modified": 100,
494                "attachment": {{
495                    "filename": "big.bin",
496                    "mimetype": "application/octet-stream",
497                    "location": "attachments/big.bin",
498                    "hash": "{attachment_hash}",
499                    "size": {}
500                }}
501            }}"#,
502            attachment_data.len()
503        );
504
505        // First sync creates a record that references the big attachment.
506        let _changes_1 = mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
507            .match_query(Matcher::Any)
508            .with_status(200)
509            .with_header("content-type", "application/json")
510            .with_body(format!(
511                r#"{{
512                    "timestamp": 100,
513                    "changes": [
514                        {{"collection": "{collection}", "bucket": "main", "last_modified": 100}}
515                    ]
516                }}"#
517            ))
518            .create();
519
520        let _changeset_1 = mock(
521            "GET",
522            format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
523        )
524        .match_query(Matcher::Any)
525        .with_status(200)
526        .with_header("content-type", "application/json")
527        .with_body(format!(
528            r#"{{
529                "changes": [{attachment_record}],
530                "timestamp": 100,
531                "metadata": {{"bucket": "main", "signatures": []}}
532            }}"#
533        ))
534        .create();
535
536        let service = RemoteSettingsService::new(
537            temp_dir.path().to_string_lossy().to_string(),
538            RemoteSettingsConfig {
539                server: Some(RemoteSettingsServer::Custom {
540                    url: mockito::server_url(),
541                }),
542                ..Default::default()
543            },
544        );
545
546        let client = service.make_client(collection.into());
547
548        service.sync()?;
549
550        // Mock attachment discovery and download.
551        let _root = mock("GET", "/v1/")
552            .with_status(200)
553            .with_header("content-type", "application/json")
554            .with_body(format!(
555                r#"{{
556                    "capabilities": {{
557                        "attachments": {{
558                            "base_url": "{}/"
559                        }}
560                    }}
561                }}"#,
562                mockito::server_url()
563            ))
564            .create();
565
566        // Path matches `location: "attachments/big"` joined against the base URL above.
567        let _attachment = mock("GET", "/attachments/big")
568            .with_status(200)
569            .with_body(attachment_data.clone())
570            .create();
571
572        // Store the large attachment so the DB becomes bloated.
573        client.internal.get_attachment(&RemoteSettingsRecord {
574            id: "record-with-attachment".to_string(),
575            last_modified: 100,
576            deleted: false,
577            attachment: Some(crate::Attachment {
578                filename: "big".to_string(),
579                mimetype: "application/octet-stream".to_string(),
580                location: "attachments/big".to_string(),
581                hash: attachment_hash.clone(),
582                size: attachment_data.len() as u64,
583            }),
584            fields: serde_json::Map::new(),
585        })?;
586
587        let size_with_attachment = std::fs::metadata(&db_path)
588            .expect("db exists after first sync")
589            .len();
590
591        assert!(
592            size_with_attachment > 4 * 1024 * 1024,
593            "DB should contain the large attachment; size={size_with_attachment}"
594        );
595
596        // Drop first-sync mocks explicitly so mockito doesn't re-match the second sync's
597        // changeset request against them. Mockito matches by registration order, so leftover
598        // mocks for the same URL would shadow the second-sync mocks.
599        drop(_changes_1);
600        drop(_changeset_1);
601
602        // Second sync tombstones the record. This deletes the attachment row, and
603        // post-sync maintenance should compact the database.
604        let _changes_2 = mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
605            .match_query(Matcher::Any)
606            .with_status(200)
607            .with_header("content-type", "application/json")
608            .with_body(format!(
609                r#"{{
610                    "timestamp": 200,
611                    "changes": [
612                        {{"collection": "{collection}", "bucket": "main", "last_modified": 200}}
613                    ]
614                }}"#
615            ))
616            .create();
617
618        let _changeset_2 = mock(
619            "GET",
620            format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
621        )
622        .match_query(Matcher::Any)
623        .with_status(200)
624        .with_header("content-type", "application/json")
625        .with_body(
626            r#"{
627                "changes": [
628                    {
629                        "id": "record-with-attachment",
630                        "last_modified": 200,
631                        "deleted": true
632                    }
633                ],
634                "timestamp": 200,
635                "metadata": {"bucket": "main", "signatures": []}
636            }"#,
637        )
638        .create();
639
640        service.sync()?;
641
642        let size_after_cleanup_and_maintenance = std::fs::metadata(&db_path)
643            .expect("db exists after second sync")
644            .len();
645
646        assert!(
647            size_after_cleanup_and_maintenance < size_with_attachment,
648            "maintenance should reclaim at least some space after deleting attachment; before={size_with_attachment}, after={size_after_cleanup_and_maintenance}"
649        );
650
651        // Sanity-check that maintenance enabled incremental auto-vacuum.
652        let conn = rusqlite::Connection::open(&db_path).expect("open collection db");
653        let auto_vacuum: u32 = conn
654            .query_row("PRAGMA auto_vacuum", [], |row| row.get(0))
655            .expect("query auto_vacuum");
656
657        assert_eq!(auto_vacuum, 2);
658
659        Ok(())
660    }
661}