1use std::{
6 collections::{HashMap, HashSet},
7 sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18 client::RemoteState, config::BaseUrl, error::Error, storage::Storage,
19 telemetry::RemoteSettingsTelemetryWrapper, RemoteSettingsClient, RemoteSettingsConfig,
20 RemoteSettingsContext, RemoteSettingsServer, Result,
21};
22
23pub struct RemoteSettingsService {
25 inner: Mutex<RemoteSettingsServiceInner>,
26}
27
28struct RemoteSettingsServiceInner {
29 storage_dir: Utf8PathBuf,
30 base_url: BaseUrl,
31 bucket_name: String,
32 app_context: Option<RemoteSettingsContext>,
33 remote_state: RemoteState,
34 telemetry: RemoteSettingsTelemetryWrapper,
35 clients: Vec<Weak<RemoteSettingsClient>>,
41}
42
43impl RemoteSettingsService {
44 pub fn new(storage_dir: String, config: RemoteSettingsConfig) -> Self {
48 let storage_dir = storage_dir.into();
49 let base_url = config
50 .server
51 .unwrap_or(RemoteSettingsServer::Prod)
52 .get_base_url_with_prod_fallback();
53 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
54
55 Self {
56 inner: Mutex::new(RemoteSettingsServiceInner {
57 storage_dir,
58 base_url,
59 bucket_name,
60 app_context: config.app_context,
61 remote_state: RemoteState::default(),
62 telemetry: RemoteSettingsTelemetryWrapper::noop(),
63 clients: vec![],
64 }),
65 }
66 }
67
68 pub fn set_telemetry(&self, telemetry: RemoteSettingsTelemetryWrapper) {
69 self.inner.lock().telemetry = telemetry;
70 }
71
72 pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
73 let mut inner = self.inner.lock();
74 let storage = if inner.storage_dir == ":memory:" {
76 Storage::new(inner.storage_dir.clone())
77 } else {
78 Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
79 };
80
81 let client = Arc::new(RemoteSettingsClient::new(
82 inner.base_url.clone(),
83 inner.bucket_name.clone(),
84 collection_name.clone(),
85 inner.app_context.clone(),
86 storage,
87 ));
88 inner.clients.push(Arc::downgrade(&client));
89 client
90 }
91
92 pub fn sync(&self) -> Result<Vec<String>> {
94 let mut synced_collections = HashSet::new();
96
97 let mut inner = self.inner.lock();
98 let changes = inner.fetch_changes()?;
99 let change_map: HashMap<_, _> = changes
100 .changes
101 .iter()
102 .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
103 .collect();
104 let bucket_name = inner.bucket_name.clone();
105
106 for client in inner.active_clients() {
107 let client = &client.internal;
108 let collection_name = client.collection_name();
109 let cid = format!("{bucket_name}/{collection_name}");
110 if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
111 if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
112 {
113 if client_last_modified == *server_last_modified {
114 trace!("skipping up-to-date collection: {collection_name}");
115 inner.telemetry.report_uptake_up_to_date(&cid, None);
116 continue;
117 }
118 }
119 }
120 if synced_collections.insert(collection_name.to_string()) {
121 trace!("syncing collection: {collection_name}");
122 let start_time = std::time::Instant::now();
123 let sync_result = client.sync();
124 let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
125 match &sync_result {
126 Ok(()) => inner.telemetry.report_uptake_success(&cid, Some(duration)),
127 Err(e) => inner.telemetry.report_uptake_error(e, &cid),
128 }
129 sync_result?;
130 }
131 }
132 Ok(synced_collections.into_iter().collect())
133 }
134
135 pub fn update_config(&self, config: RemoteSettingsConfig) -> Result<()> {
140 let base_url = config
141 .server
142 .unwrap_or(RemoteSettingsServer::Prod)
143 .get_base_url()?;
144 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
145 let mut inner = self.inner.lock();
146 for client in inner.active_clients() {
147 client.internal.update_config(
148 base_url.clone(),
149 bucket_name.clone(),
150 config.app_context.clone(),
151 );
152 }
153 inner.base_url = base_url;
154 inner.bucket_name = bucket_name;
155 inner.app_context = config.app_context;
156 Ok(())
157 }
158
159 pub fn client_url(&self) -> Url {
160 let inner = self.inner.lock();
161 let base_url = inner.base_url.clone();
162 base_url.url().clone()
163 }
164}
165
166impl RemoteSettingsServiceInner {
167 fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
171 let mut active_clients = vec![];
172 self.clients.retain(|weak| {
173 if let Some(client) = weak.upgrade() {
174 active_clients.push(client);
175 true
176 } else {
177 false
178 }
179 });
180 active_clients
181 }
182
183 fn fetch_changes(&mut self) -> Result<Changes> {
184 let mut url = self.base_url.clone();
185 url.path_segments_mut()
186 .push("buckets")
187 .push("monitor")
188 .push("collections")
189 .push("changes")
190 .push("changeset");
191 url.query_pairs_mut().append_pair("_expected", "0");
197 let url = url.into_inner();
198 trace!("make_request: {url}");
199 self.remote_state.ensure_no_backoff()?;
200
201 let start_time = std::time::Instant::now();
202 let req = Request::get(url);
203 let resp = req.send()?;
204
205 self.remote_state.handle_backoff_hint(&resp)?;
206
207 const TELEMETRY_SOURCE_POLL: &str = "settings-changes-monitoring";
208 if resp.is_success() {
209 let body = resp.json()?;
210 let duration: u64 = start_time.elapsed().as_millis().try_into().unwrap_or(0);
211 self.telemetry
212 .report_uptake_success(TELEMETRY_SOURCE_POLL, Some(duration));
213 Ok(body)
214 } else {
215 let e = Error::response_error(&resp.url, format!("status code: {}", resp.status));
216 self.telemetry
217 .report_uptake_error(&e, TELEMETRY_SOURCE_POLL);
218 Err(e)
219 }
220 }
221}
222
223#[derive(Debug, Deserialize)]
227struct Changes {
228 changes: Vec<ChangesCollection>,
229}
230
231#[derive(Debug, Deserialize)]
232struct ChangesCollection {
233 collection: String,
234 bucket: String,
235 last_modified: u64,
236}
237
238#[cfg(test)]
239mod test {
240 use super::*;
241 use crate::telemetry::UptakeEventExtras;
242 use crate::{RemoteSettingsConfig, RemoteSettingsServer};
243 use mockito::{mock, Matcher};
244 use std::sync::Arc;
245
246 struct FakeTelemetry {
248 events: std::sync::Mutex<Vec<UptakeEventExtras>>,
249 }
250
251 impl FakeTelemetry {
252 fn new() -> Self {
253 Self {
254 events: std::sync::Mutex::new(Vec::new()),
255 }
256 }
257 }
258
259 impl crate::telemetry::RemoteSettingsTelemetry for FakeTelemetry {
260 fn report_uptake(&self, extras: UptakeEventExtras) {
261 self.events.lock().unwrap().push(extras);
262 }
263 }
264
265 fn make_service(server_url: &str) -> (RemoteSettingsService, Arc<FakeTelemetry>) {
266 let service = RemoteSettingsService::new(
267 ":memory:".into(),
268 RemoteSettingsConfig {
269 server: Some(RemoteSettingsServer::Custom {
270 url: server_url.into(),
271 }),
272 ..Default::default()
273 },
274 );
275 let telemetry: Arc<FakeTelemetry> = Arc::new(FakeTelemetry::new());
276 service.set_telemetry(RemoteSettingsTelemetryWrapper::new(telemetry.clone()));
277 (service, telemetry)
278 }
279
280 fn mock_monitor_changes(collection: &str, timestamp: u64) -> mockito::Mock {
281 mock("GET", "/v1/buckets/monitor/collections/changes/changeset")
282 .match_query(Matcher::Any)
283 .with_status(200)
284 .with_header("content-type", "application/json")
285 .with_body(format!(
286 r#"{{"timestamp": {timestamp}, "changes": [{{"collection": "{collection}", "bucket": "main", "last_modified": {timestamp}}}]}}"#
287 ))
288 .create()
289 }
290
291 fn mock_changeset(collection: &str, timestamp: u64) -> mockito::Mock {
292 mock(
293 "GET",
294 format!("/v1/buckets/main/collections/{collection}/changeset").as_str(),
295 )
296 .match_query(Matcher::Any)
297 .with_status(200)
298 .with_header("content-type", "application/json")
299 .with_body(format!(
300 r#"{{"changes": [], "timestamp": {timestamp}, "metadata": {{"bucket": "main", "signatures": []}}}}"#
301 ))
302 .create()
303 }
304
305 fn mock_changeset_error(bucket: &str, collection: &str) -> mockito::Mock {
306 mock(
307 "GET",
308 format!("/v1/buckets/{bucket}/collections/{collection}/changeset").as_str(),
309 )
310 .match_query(Matcher::Any)
311 .with_status(500)
312 .with_body("server error")
313 .create()
314 }
315
316 #[test]
317 fn test_telemetry_network_error_on_changes_failure() {
318 viaduct_dev::init_backend_dev();
319 mock_changeset_error("monitor", "changes");
320
321 let (service, telemetry) = make_service(&mockito::server_url());
322 let _ = service.sync();
323
324 let events = telemetry.events.lock().unwrap();
325 assert_eq!(events.len(), 1);
326 assert_eq!(
327 events[0].source,
328 Some("settings-changes-monitoring".to_string())
329 );
330 assert_eq!(events[0].value, Some("network_error".to_string()));
331 assert_eq!(events[0].error_name, Some("ResponseError".to_string()));
332 assert!(events[0].error_name.is_some());
333 }
334
335 #[test]
336 fn test_telemetry_on_changes_success() {
337 viaduct_dev::init_backend_dev();
338 let _changes = mock_monitor_changes("cid", 42);
339
340 let (service, telemetry) = make_service(&mockito::server_url());
341 let _ = service.sync();
342
343 let events = telemetry.events.lock().unwrap();
344 assert_eq!(events.len(), 1);
345 assert_eq!(
346 events[0].source,
347 Some("settings-changes-monitoring".to_string())
348 );
349 assert_eq!(events[0].value, Some("success".to_string()));
350 assert!(events[0].duration.is_some());
351 }
352
353 #[cfg(not(feature = "signatures"))]
354 #[test]
355 fn test_telemetry_on_collection_success() {
356 viaduct_dev::init_backend_dev();
357 let collection = "cid";
358 let timestamp = 1774420582054u64;
359 let _changes = mock_monitor_changes(collection, timestamp);
360 let _changeset = mock_changeset(collection, timestamp);
361
362 let (service, telemetry) = make_service(&mockito::server_url());
363 let _client = service.make_client(collection.into());
364 let _ = service.sync();
365
366 let events = telemetry.events.lock().unwrap();
367 assert_eq!(events.len(), 2);
368 assert_eq!(
369 events[0].source,
370 Some("settings-changes-monitoring".to_string())
371 );
372 assert_eq!(events[1].source, Some(format!("main/{collection}")));
373 assert_eq!(events[1].value, Some("success".to_string()));
374 assert!(events[1].duration.is_some());
375 }
376
377 #[cfg(not(feature = "signatures"))]
378 #[test]
379 fn test_telemetry_on_collection_up_to_date() {
380 viaduct_dev::init_backend_dev();
381 let collection = "cid";
382 let timestamp = 1774420582054u64;
383 let _changes = mock_monitor_changes(collection, timestamp);
384 let _changeset = mock_changeset(collection, timestamp);
385
386 let (service, telemetry) = make_service(&mockito::server_url());
387 let _client = service.make_client(collection.into());
388
389 let _ = service.sync();
391 let events_before = telemetry.events.lock().unwrap().len();
392 let _ = service.sync();
394
395 let events = telemetry.events.lock().unwrap();
396 assert_eq!(events.len() - events_before, 2);
397 assert_eq!(
398 events[events_before].source,
399 Some("settings-changes-monitoring".to_string())
400 );
401 assert_eq!(
402 events[events_before + 1].source,
403 Some(format!("main/{collection}"))
404 );
405 assert_eq!(
406 events[events_before + 1].value,
407 Some("up_to_date".to_string())
408 );
409 }
410
411 #[test]
412 fn test_telemetry_on_collection_error() {
413 viaduct_dev::init_backend_dev();
414 let collection = "cid";
415 let timestamp = 1774420582054u64;
416 let _changes = mock_monitor_changes(collection, timestamp);
417 let _changeset = mock_changeset_error("main", collection);
418
419 let (service, telemetry) = make_service(&mockito::server_url());
420 let _client = service.make_client(collection.into());
421 let _ = service.sync();
422
423 let events = telemetry.events.lock().unwrap();
424 assert_eq!(events.len(), 2);
425 assert_eq!(
426 events[0].source,
427 Some("settings-changes-monitoring".to_string())
428 );
429 assert_eq!(events[0].value, Some("success".to_string()));
430 assert_eq!(events[1].source, Some(format!("main/{collection}")));
431 assert_eq!(events[1].value, Some("network_error".to_string()));
432 assert_eq!(events[1].error_name, Some("ResponseError".to_string()));
433 }
434
435 #[cfg(feature = "signatures")]
436 #[test]
437 fn test_telemetry_on_collection_signature_error() {
438 viaduct_dev::init_backend_dev();
439 let collection = "cid";
440 let timestamp = 1774420582054u64;
441 let _changes = mock_monitor_changes(collection, timestamp);
442 let _changeset = mock_changeset(collection, timestamp);
443
444 let (service, telemetry) = make_service(&mockito::server_url());
445 let _client = service.make_client(collection.into());
446 let _ = service.sync();
447
448 let events = telemetry.events.lock().unwrap();
449 assert_eq!(events.len(), 2);
450 assert_eq!(
451 events[0].source,
452 Some("settings-changes-monitoring".to_string())
453 );
454 assert_eq!(events[1].source, Some(format!("main/{collection}")));
455 assert_eq!(events[1].value, Some("signature_error".to_string()));
456 assert_eq!(
457 events[1].error_name,
458 Some("IncompleteSignatureDataError".to_string())
459 );
460 }
461}