remote_settings/
service.rs
1use std::{
6 collections::{HashMap, HashSet},
7 sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use viaduct::Request;
15
16use crate::{
17 client::RemoteState, config::BaseUrl, error::Error, storage::Storage, RemoteSettingsClient,
18 RemoteSettingsConfig2, RemoteSettingsContext, RemoteSettingsServer, Result,
19};
20
21pub struct RemoteSettingsService {
23 inner: Mutex<RemoteSettingsServiceInner>,
24}
25
26struct RemoteSettingsServiceInner {
27 storage_dir: Utf8PathBuf,
28 base_url: BaseUrl,
29 bucket_name: String,
30 app_context: Option<RemoteSettingsContext>,
31 remote_state: RemoteState,
32 clients: Vec<Weak<RemoteSettingsClient>>,
38}
39
40impl RemoteSettingsService {
41 pub fn new(storage_dir: String, config: RemoteSettingsConfig2) -> Self {
45 let storage_dir = storage_dir.into();
46 let base_url = config
47 .server
48 .unwrap_or(RemoteSettingsServer::Prod)
49 .get_base_url_with_prod_fallback();
50 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
51
52 Self {
53 inner: Mutex::new(RemoteSettingsServiceInner {
54 storage_dir,
55 base_url,
56 bucket_name,
57 app_context: config.app_context,
58 remote_state: RemoteState::default(),
59 clients: vec![],
60 }),
61 }
62 }
63
64 pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
65 let mut inner = self.inner.lock();
66 let storage = if inner.storage_dir == ":memory:" {
68 Storage::new(inner.storage_dir.clone())
69 } else {
70 Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
71 };
72
73 let client = Arc::new(RemoteSettingsClient::new(
74 inner.base_url.clone(),
75 inner.bucket_name.clone(),
76 collection_name.clone(),
77 inner.app_context.clone(),
78 storage,
79 ));
80 inner.clients.push(Arc::downgrade(&client));
81 client
82 }
83
84 pub fn sync(&self) -> Result<Vec<String>> {
86 let mut synced_collections = HashSet::new();
88
89 let mut inner = self.inner.lock();
90 let changes = inner.fetch_changes()?;
91 let change_map: HashMap<_, _> = changes
92 .changes
93 .iter()
94 .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
95 .collect();
96 let bucket_name = inner.bucket_name.clone();
97
98 for client in inner.active_clients() {
99 let client = &client.internal;
100 let collection_name = client.collection_name();
101 if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
102 if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
103 {
104 if client_last_modified != *server_last_modified {
105 trace!("skipping up-to-date collection: {collection_name}");
106 continue;
107 }
108 }
109 }
110 if synced_collections.insert(collection_name.to_string()) {
111 trace!("syncing collection: {collection_name}");
112 client.sync()?;
113 }
114 }
115 Ok(synced_collections.into_iter().collect())
116 }
117
118 pub fn update_config(&self, config: RemoteSettingsConfig2) -> Result<()> {
123 let base_url = config
124 .server
125 .unwrap_or(RemoteSettingsServer::Prod)
126 .get_base_url()?;
127 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
128 let mut inner = self.inner.lock();
129 for client in inner.active_clients() {
130 client.internal.update_config(
131 base_url.clone(),
132 bucket_name.clone(),
133 config.app_context.clone(),
134 )?;
135 }
136 inner.base_url = base_url;
137 inner.bucket_name = bucket_name;
138 inner.app_context = config.app_context;
139 Ok(())
140 }
141}
142
143impl RemoteSettingsServiceInner {
144 fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
148 let mut active_clients = vec![];
149 self.clients.retain(|weak| {
150 if let Some(client) = weak.upgrade() {
151 active_clients.push(client);
152 true
153 } else {
154 false
155 }
156 });
157 active_clients
158 }
159
160 fn fetch_changes(&mut self) -> Result<Changes> {
161 let mut url = self.base_url.clone();
162 url.path_segments_mut()
163 .push("buckets")
164 .push("monitor")
165 .push("collections")
166 .push("changes")
167 .push("changeset");
168 url.query_pairs_mut().append_pair("_expected", "0");
174 let url = url.into_inner();
175 trace!("make_request: {url}");
176 self.remote_state.ensure_no_backoff()?;
177
178 let req = Request::get(url);
179 let resp = req.send()?;
180
181 self.remote_state.handle_backoff_hint(&resp)?;
182
183 if resp.is_success() {
184 Ok(resp.json()?)
185 } else {
186 Err(Error::ResponseError(format!(
187 "status code: {}",
188 resp.status
189 )))
190 }
191 }
192}
193
194#[derive(Debug, Deserialize)]
198struct Changes {
199 changes: Vec<ChangesCollection>,
200}
201
202#[derive(Debug, Deserialize)]
203struct ChangesCollection {
204 collection: String,
205 bucket: String,
206 last_modified: u64,
207}