1use std::{
6 collections::{HashMap, HashSet},
7 sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18 client::RemoteState, config::BaseUrl, error::Error, storage::Storage,
19 telemetry::RemoteSettingsTelemetryWrapper, RemoteSettingsClient, RemoteSettingsConfig,
20 RemoteSettingsContext, RemoteSettingsServer, Result,
21};
22
23pub struct RemoteSettingsService {
25 inner: Mutex<RemoteSettingsServiceInner>,
26}
27
28struct RemoteSettingsServiceInner {
29 storage_dir: Utf8PathBuf,
30 base_url: BaseUrl,
31 bucket_name: String,
32 app_context: Option<RemoteSettingsContext>,
33 remote_state: RemoteState,
34 telemetry: RemoteSettingsTelemetryWrapper,
35 clients: Vec<Weak<RemoteSettingsClient>>,
41}
42
43impl RemoteSettingsService {
44 pub fn new(storage_dir: String, config: RemoteSettingsConfig) -> Self {
48 let storage_dir = storage_dir.into();
49 let base_url = config
50 .server
51 .unwrap_or(RemoteSettingsServer::Prod)
52 .get_base_url_with_prod_fallback();
53 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
54
55 Self {
56 inner: Mutex::new(RemoteSettingsServiceInner {
57 storage_dir,
58 base_url,
59 bucket_name,
60 app_context: config.app_context,
61 remote_state: RemoteState::default(),
62 telemetry: RemoteSettingsTelemetryWrapper::noop(),
63 clients: vec![],
64 }),
65 }
66 }
67
68 pub fn set_telemetry(&self, telemetry: RemoteSettingsTelemetryWrapper) {
69 self.inner.lock().telemetry = telemetry;
70 }
71
72 pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
73 let mut inner = self.inner.lock();
74 let storage = if inner.storage_dir == ":memory:" {
76 Storage::new(inner.storage_dir.clone())
77 } else {
78 Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
79 };
80
81 let client = Arc::new(RemoteSettingsClient::new(
82 inner.base_url.clone(),
83 inner.bucket_name.clone(),
84 collection_name.clone(),
85 inner.app_context.clone(),
86 storage,
87 ));
88 inner.clients.push(Arc::downgrade(&client));
89 client
90 }
91
92 pub fn sync(&self) -> Result<Vec<String>> {
94 let mut synced_collections = HashSet::new();
96
97 let mut inner = self.inner.lock();
98 let changes = inner.fetch_changes()?;
99 let change_map: HashMap<_, _> = changes
100 .changes
101 .iter()
102 .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
103 .collect();
104 let bucket_name = inner.bucket_name.clone();
105
106 let active_clients = inner.active_clients();
107 for client in &active_clients {
108 let client = &client.internal;
109 let collection_name = client.collection_name();
110 let cid = format!("{bucket_name}/{collection_name}");
111 if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
112 if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
113 {
114 if client_last_modified == *server_last_modified {
115 trace!("skipping up-to-date collection: {collection_name}");
116 inner.telemetry.report_uptake_up_to_date(&cid, None);
117 continue;
118 }
119 }
120 }
121 if synced_collections.insert(collection_name.to_string()) {
122 trace!("syncing collection: {collection_name}");
123 let start_time = std::time::Instant::now();
124 let sync_result = client.sync();
125 let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
126 match &sync_result {
127 Ok(()) => inner.telemetry.report_uptake_success(&cid, Some(duration)),
128 Err(e) => inner.telemetry.report_uptake_error(e, &cid),
129 }
130 sync_result?;
131 }
132 }
133
134 for client in &active_clients {
137 let client = &client.internal;
138 let collection_name = client.collection_name();
139
140 if synced_collections.contains(collection_name) {
141 trace!("running maintenance for collection: {collection_name}");
142 client.run_maintenance()?;
143 }
144 }
145
146 Ok(synced_collections.into_iter().collect())
147 }
148
149 pub fn update_config(&self, config: RemoteSettingsConfig) -> Result<()> {
154 let base_url = config
155 .server
156 .unwrap_or(RemoteSettingsServer::Prod)
157 .get_base_url()?;
158 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
159 let mut inner = self.inner.lock();
160 for client in inner.active_clients() {
161 client.internal.update_config(
162 base_url.clone(),
163 bucket_name.clone(),
164 config.app_context.clone(),
165 );
166 }
167 inner.base_url = base_url;
168 inner.bucket_name = bucket_name;
169 inner.app_context = config.app_context;
170 Ok(())
171 }
172
173 pub fn client_url(&self) -> Url {
174 let inner = self.inner.lock();
175 let base_url = inner.base_url.clone();
176 base_url.url().clone()
177 }
178}
179
180impl RemoteSettingsServiceInner {
181 fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
185 let mut active_clients = vec![];
186 self.clients.retain(|weak| {
187 if let Some(client) = weak.upgrade() {
188 active_clients.push(client);
189 true
190 } else {
191 false
192 }
193 });
194 active_clients
195 }
196
197 fn fetch_changes(&mut self) -> Result<Changes> {
198 let mut url = self.base_url.clone();
199 url.path_segments_mut()
200 .push("buckets")
201 .push("monitor")
202 .push("collections")
203 .push("changes")
204 .push("changeset");
205 url.query_pairs_mut().append_pair("_expected", "0");
211 let url = url.into_inner();
212 trace!("make_request: {url}");
213 self.remote_state.ensure_no_backoff()?;
214
215 let start_time = std::time::Instant::now();
216 let req = Request::get(url);
217 let resp = req.send()?;
218
219 self.remote_state.handle_backoff_hint(&resp)?;
220
221 const TELEMETRY_SOURCE_POLL: &str = "settings-changes-monitoring";
222 if resp.is_success() {
223 let body = resp.json()?;
224 let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
225 self.telemetry
226 .report_uptake_success(TELEMETRY_SOURCE_POLL, Some(duration));
227 Ok(body)
228 } else {
229 let e = Error::response_error(&resp.url, format!("status code: {}", resp.status));
230 self.telemetry
231 .report_uptake_error(&e, TELEMETRY_SOURCE_POLL);
232 Err(e)
233 }
234 }
235}
236
237#[derive(Debug, Deserialize)]
241struct Changes {
242 changes: Vec<ChangesCollection>,
243}
244
245#[derive(Debug, Deserialize)]
246struct ChangesCollection {
247 collection: String,
248 bucket: String,
249 last_modified: u64,
250}
251
252#[cfg(test)]
253mod test {
254 use super::*;
255 use crate::telemetry::UptakeEventExtras;
256 use crate::{RemoteSettingsConfig, RemoteSettingsServer};
257 use mockito::{mock, Matcher};
258 use std::sync::Arc;
259
260 struct FakeTelemetry {
262 events: std::sync::Mutex<Vec<UptakeEventExtras>>,
263 }
264
265 impl FakeTelemetry {
266 fn new() -> Self {
267 Self {
268 events: std::sync::Mutex::new(Vec::new()),
269 }
270 }
271 }
272
273 impl crate::telemetry::RemoteSettingsTelemetry for FakeTelemetry {
274 fn report_uptake(&self, extras: UptakeEventExtras) {
275 self.events.lock().unwrap().push(extras);
276 }
277 }
278
279 fn make_service(server_url: &str) -> (RemoteSettingsService, Arc<FakeTelemetry>) {
280 let service = RemoteSettingsService::new(
281 ":memory:".into(),
282 RemoteSettingsConfig {
283 server: Some(RemoteSettingsServer::Custom {
284 url: server_url.into(),
285 }),
286 ..Default::default()
287 },
288 );
289 let telemetry: Arc<FakeTelemetry> = Arc::new(FakeTelemetry::new());
290 service.set_telemetry(RemoteSettingsTelemetryWrapper::new(telemetry.clone()));
291 (service, telemetry)
292 }
293
294 fn mock_monitor_changes(collection: &str, timestamp: u64) -> mockito::Mock {
295 mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
296 .match_query(Matcher::Any)
297 .with_status(200)
298 .with_header("content-type", "application/json")
299 .with_body(format!(
300 r#"{{"timestamp": {timestamp}, "changes": [{{"collection": "{collection}", "bucket": "main", "last_modified": {timestamp}}}]}}"#
301 ))
302 .create()
303 }
304
305 fn mock_changeset(collection: &str, timestamp: u64) -> mockito::Mock {
306 mock(
307 "GET",
308 format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
309 )
310 .match_query(Matcher::Any)
311 .with_status(200)
312 .with_header("content-type", "application/json")
313 .with_body(format!(
314 r#"{{"changes": [], "timestamp": {timestamp}, "metadata": {{"bucket": "main", "signatures": []}}}}"#
315 ))
316 .create()
317 }
318
319 fn mock_changeset_error(bucket: &str, collection: &str) -> mockito::Mock {
320 mock(
321 "GET",
322 format!("/v1/buckets/{bucket}/collections/{collection}/changeset").as_str(),
323 )
324 .match_query(Matcher::Any)
325 .with_status(500)
326 .with_body("server error")
327 .create()
328 }
329
330 #[test]
331 fn test_telemetry_network_error_on_changes_failure() {
332 viaduct_dev::init_backend_dev();
333 mock_changeset_error("monitor", "changes");
334
335 let (service, telemetry) = make_service(&mockito::server_url());
336 let _ = service.sync();
337
338 let events = telemetry.events.lock().unwrap();
339 assert_eq!(events.len(), 1);
340 assert_eq!(
341 events[0].source,
342 Some("settings-changes-monitoring".to_string())
343 );
344 assert_eq!(events[0].value, Some("network_error".to_string()));
345 assert_eq!(events[0].error_name, Some("ResponseError".to_string()));
346 assert!(events[0].error_name.is_some());
347 }
348
349 #[test]
350 fn test_telemetry_on_changes_success() {
351 viaduct_dev::init_backend_dev();
352 let _changes = mock_monitor_changes("cid", 42);
353
354 let (service, telemetry) = make_service(&mockito::server_url());
355 let _ = service.sync();
356
357 let events = telemetry.events.lock().unwrap();
358 assert_eq!(events.len(), 1);
359 assert_eq!(
360 events[0].source,
361 Some("settings-changes-monitoring".to_string())
362 );
363 assert_eq!(events[0].value, Some("success".to_string()));
364 assert!(events[0].duration.is_some());
365 }
366
367 #[cfg(not(feature = "signatures"))]
368 #[test]
369 fn test_telemetry_on_collection_success() {
370 viaduct_dev::init_backend_dev();
371 let collection = "cid";
372 let timestamp = 1774420582054u64;
373 let _changes = mock_monitor_changes(collection, timestamp);
374 let _changeset = mock_changeset(collection, timestamp);
375
376 let (service, telemetry) = make_service(&mockito::server_url());
377 let _client = service.make_client(collection.into());
378 let _ = service.sync();
379
380 let events = telemetry.events.lock().unwrap();
381 assert_eq!(events.len(), 2);
382 assert_eq!(
383 events[0].source,
384 Some("settings-changes-monitoring".to_string())
385 );
386 assert_eq!(events[1].source, Some(format!("main/{collection}")));
387 assert_eq!(events[1].value, Some("success".to_string()));
388 assert!(events[1].duration.is_some());
389 }
390
391 #[cfg(not(feature = "signatures"))]
392 #[test]
393 fn test_telemetry_on_collection_up_to_date() {
394 viaduct_dev::init_backend_dev();
395 let collection = "cid";
396 let timestamp = 1774420582054u64;
397 let _changes = mock_monitor_changes(collection, timestamp);
398 let _changeset = mock_changeset(collection, timestamp);
399
400 let (service, telemetry) = make_service(&mockito::server_url());
401 let _client = service.make_client(collection.into());
402
403 let _ = service.sync();
405 let events_before = telemetry.events.lock().unwrap().len();
406 let _ = service.sync();
408
409 let events = telemetry.events.lock().unwrap();
410 assert_eq!(events.len() - events_before, 2);
411 assert_eq!(
412 events[events_before].source,
413 Some("settings-changes-monitoring".to_string())
414 );
415 assert_eq!(
416 events[events_before + 1].source,
417 Some(format!("main/{collection}"))
418 );
419 assert_eq!(
420 events[events_before + 1].value,
421 Some("up_to_date".to_string())
422 );
423 }
424
425 #[test]
426 fn test_telemetry_on_collection_error() {
427 viaduct_dev::init_backend_dev();
428 let collection = "cid";
429 let timestamp = 1774420582054u64;
430 let _changes = mock_monitor_changes(collection, timestamp);
431 let _changeset = mock_changeset_error("main", collection);
432
433 let (service, telemetry) = make_service(&mockito::server_url());
434 let _client = service.make_client(collection.into());
435 let _ = service.sync();
436
437 let events = telemetry.events.lock().unwrap();
438 assert_eq!(events.len(), 2);
439 assert_eq!(
440 events[0].source,
441 Some("settings-changes-monitoring".to_string())
442 );
443 assert_eq!(events[0].value, Some("success".to_string()));
444 assert_eq!(events[1].source, Some(format!("main/{collection}")));
445 assert_eq!(events[1].value, Some("network_error".to_string()));
446 assert_eq!(events[1].error_name, Some("ResponseError".to_string()));
447 }
448
449 #[cfg(feature = "signatures")]
450 #[test]
451 fn test_telemetry_on_collection_signature_error() {
452 viaduct_dev::init_backend_dev();
453 let collection = "cid";
454 let timestamp = 1774420582054u64;
455 let _changes = mock_monitor_changes(collection, timestamp);
456 let _changeset = mock_changeset(collection, timestamp);
457
458 let (service, telemetry) = make_service(&mockito::server_url());
459 let _client = service.make_client(collection.into());
460 let _ = service.sync();
461
462 let events = telemetry.events.lock().unwrap();
463 assert_eq!(events.len(), 2);
464 assert_eq!(
465 events[0].source,
466 Some("settings-changes-monitoring".to_string())
467 );
468 assert_eq!(events[1].source, Some(format!("main/{collection}")));
469 assert_eq!(events[1].value, Some("signature_error".to_string()));
470 assert_eq!(
471 events[1].error_name,
472 Some("IncompleteSignatureDataError".to_string())
473 );
474 }
475
476 #[cfg(not(feature = "signatures"))]
477 #[test]
478 fn test_sync_maintenance_shrinks_db_after_attachment_cleanup() -> Result<()> {
479 use crate::RemoteSettingsRecord;
480 use sha2::Digest;
481 viaduct_dev::init_backend_dev();
482
483 let collection = "cid";
484 let temp_dir = tempfile::tempdir().expect("create temp dir");
485 let db_path = temp_dir.path().join(format!("{collection}.sql"));
486
487 let attachment_data = vec![0x41; 5 * 1024 * 1024];
488 let attachment_hash = format!("{:x}", sha2::Sha256::digest(&attachment_data));
489
490 let attachment_record = format!(
491 r#"{{
492 "id": "record-with-attachment",
493 "last_modified": 100,
494 "attachment": {{
495 "filename": "big.bin",
496 "mimetype": "application/octet-stream",
497 "location": "attachments/big.bin",
498 "hash": "{attachment_hash}",
499 "size": {}
500 }}
501 }}"#,
502 attachment_data.len()
503 );
504
505 let _changes_1 = mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
507 .match_query(Matcher::Any)
508 .with_status(200)
509 .with_header("content-type", "application/json")
510 .with_body(format!(
511 r#"{{
512 "timestamp": 100,
513 "changes": [
514 {{"collection": "{collection}", "bucket": "main", "last_modified": 100}}
515 ]
516 }}"#
517 ))
518 .create();
519
520 let _changeset_1 = mock(
521 "GET",
522 format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
523 )
524 .match_query(Matcher::Any)
525 .with_status(200)
526 .with_header("content-type", "application/json")
527 .with_body(format!(
528 r#"{{
529 "changes": [{attachment_record}],
530 "timestamp": 100,
531 "metadata": {{"bucket": "main", "signatures": []}}
532 }}"#
533 ))
534 .create();
535
536 let service = RemoteSettingsService::new(
537 temp_dir.path().to_string_lossy().to_string(),
538 RemoteSettingsConfig {
539 server: Some(RemoteSettingsServer::Custom {
540 url: mockito::server_url(),
541 }),
542 ..Default::default()
543 },
544 );
545
546 let client = service.make_client(collection.into());
547
548 service.sync()?;
549
550 let _root = mock("GET", "/v1/")
552 .with_status(200)
553 .with_header("content-type", "application/json")
554 .with_body(format!(
555 r#"{{
556 "capabilities": {{
557 "attachments": {{
558 "base_url": "{}/"
559 }}
560 }}
561 }}"#,
562 mockito::server_url()
563 ))
564 .create();
565
566 let _attachment = mock("GET", "/attachments/big")
568 .with_status(200)
569 .with_body(attachment_data.clone())
570 .create();
571
572 client.internal.get_attachment(&RemoteSettingsRecord {
574 id: "record-with-attachment".to_string(),
575 last_modified: 100,
576 deleted: false,
577 attachment: Some(crate::Attachment {
578 filename: "big".to_string(),
579 mimetype: "application/octet-stream".to_string(),
580 location: "attachments/big".to_string(),
581 hash: attachment_hash.clone(),
582 size: attachment_data.len() as u64,
583 }),
584 fields: serde_json::Map::new(),
585 })?;
586
587 let size_with_attachment = std::fs::metadata(&db_path)
588 .expect("db exists after first sync")
589 .len();
590
591 assert!(
592 size_with_attachment > 4 * 1024 * 1024,
593 "DB should contain the large attachment; size={size_with_attachment}"
594 );
595
596 drop(_changes_1);
600 drop(_changeset_1);
601
602 let _changes_2 = mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
605 .match_query(Matcher::Any)
606 .with_status(200)
607 .with_header("content-type", "application/json")
608 .with_body(format!(
609 r#"{{
610 "timestamp": 200,
611 "changes": [
612 {{"collection": "{collection}", "bucket": "main", "last_modified": 200}}
613 ]
614 }}"#
615 ))
616 .create();
617
618 let _changeset_2 = mock(
619 "GET",
620 format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
621 )
622 .match_query(Matcher::Any)
623 .with_status(200)
624 .with_header("content-type", "application/json")
625 .with_body(
626 r#"{
627 "changes": [
628 {
629 "id": "record-with-attachment",
630 "last_modified": 200,
631 "deleted": true
632 }
633 ],
634 "timestamp": 200,
635 "metadata": {"bucket": "main", "signatures": []}
636 }"#,
637 )
638 .create();
639
640 service.sync()?;
641
642 let size_after_cleanup_and_maintenance = std::fs::metadata(&db_path)
643 .expect("db exists after second sync")
644 .len();
645
646 assert!(
647 size_after_cleanup_and_maintenance < size_with_attachment,
648 "maintenance should reclaim at least some space after deleting attachment; before={size_with_attachment}, after={size_after_cleanup_and_maintenance}"
649 );
650
651 let conn = rusqlite::Connection::open(&db_path).expect("open collection db");
653 let auto_vacuum: u32 = conn
654 .query_row("PRAGMA auto_vacuum", [], |row| row.get(0))
655 .expect("query auto_vacuum");
656
657 assert_eq!(auto_vacuum, 2);
658
659 Ok(())
660 }
661}