remote_settings/
service.rs1use std::{
6 collections::{HashMap, HashSet},
7 sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18 client::RemoteState, config::BaseUrl, error::Error, storage::Storage, RemoteSettingsClient,
19 RemoteSettingsConfig2, RemoteSettingsContext, RemoteSettingsServer, Result,
20};
21
22pub struct RemoteSettingsService {
24 inner: Mutex<RemoteSettingsServiceInner>,
25}
26
27struct RemoteSettingsServiceInner {
28 storage_dir: Utf8PathBuf,
29 base_url: BaseUrl,
30 bucket_name: String,
31 app_context: Option<RemoteSettingsContext>,
32 remote_state: RemoteState,
33 clients: Vec<Weak<RemoteSettingsClient>>,
39}
40
41impl RemoteSettingsService {
42 pub fn new(storage_dir: String, config: RemoteSettingsConfig2) -> Self {
46 let storage_dir = storage_dir.into();
47 let base_url = config
48 .server
49 .unwrap_or(RemoteSettingsServer::Prod)
50 .get_base_url_with_prod_fallback();
51 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
52
53 Self {
54 inner: Mutex::new(RemoteSettingsServiceInner {
55 storage_dir,
56 base_url,
57 bucket_name,
58 app_context: config.app_context,
59 remote_state: RemoteState::default(),
60 clients: vec![],
61 }),
62 }
63 }
64
65 pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
66 let mut inner = self.inner.lock();
67 let storage = if inner.storage_dir == ":memory:" {
69 Storage::new(inner.storage_dir.clone())
70 } else {
71 Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
72 };
73
74 let client = Arc::new(RemoteSettingsClient::new(
75 inner.base_url.clone(),
76 inner.bucket_name.clone(),
77 collection_name.clone(),
78 inner.app_context.clone(),
79 storage,
80 ));
81 inner.clients.push(Arc::downgrade(&client));
82 client
83 }
84
85 pub fn sync(&self) -> Result<Vec<String>> {
87 let mut synced_collections = HashSet::new();
89
90 let mut inner = self.inner.lock();
91 let changes = inner.fetch_changes()?;
92 let change_map: HashMap<_, _> = changes
93 .changes
94 .iter()
95 .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
96 .collect();
97 let bucket_name = inner.bucket_name.clone();
98
99 for client in inner.active_clients() {
100 let client = &client.internal;
101 let collection_name = client.collection_name();
102 if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
103 if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
104 {
105 if client_last_modified == *server_last_modified {
106 trace!("skipping up-to-date collection: {collection_name}");
107 continue;
108 }
109 }
110 }
111 if synced_collections.insert(collection_name.to_string()) {
112 trace!("syncing collection: {collection_name}");
113 client.sync()?;
114 }
115 }
116 Ok(synced_collections.into_iter().collect())
117 }
118
119 pub fn update_config(&self, config: RemoteSettingsConfig2) -> Result<()> {
124 let base_url = config
125 .server
126 .unwrap_or(RemoteSettingsServer::Prod)
127 .get_base_url()?;
128 let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
129 let mut inner = self.inner.lock();
130 for client in inner.active_clients() {
131 client.internal.update_config(
132 base_url.clone(),
133 bucket_name.clone(),
134 config.app_context.clone(),
135 )?;
136 }
137 inner.base_url = base_url;
138 inner.bucket_name = bucket_name;
139 inner.app_context = config.app_context;
140 Ok(())
141 }
142
143 pub fn client_url(&self) -> Url {
144 let inner = self.inner.lock();
145 let base_url = inner.base_url.clone();
146 base_url.url().clone()
147 }
148}
149
150impl RemoteSettingsServiceInner {
151 fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
155 let mut active_clients = vec![];
156 self.clients.retain(|weak| {
157 if let Some(client) = weak.upgrade() {
158 active_clients.push(client);
159 true
160 } else {
161 false
162 }
163 });
164 active_clients
165 }
166
167 fn fetch_changes(&mut self) -> Result<Changes> {
168 let mut url = self.base_url.clone();
169 url.path_segments_mut()
170 .push("buckets")
171 .push("monitor")
172 .push("collections")
173 .push("changes")
174 .push("changeset");
175 url.query_pairs_mut().append_pair("_expected", "0");
181 let url = url.into_inner();
182 trace!("make_request: {url}");
183 self.remote_state.ensure_no_backoff()?;
184
185 let req = Request::get(url);
186 let resp = req.send()?;
187
188 self.remote_state.handle_backoff_hint(&resp)?;
189
190 if resp.is_success() {
191 Ok(resp.json()?)
192 } else {
193 Err(Error::ResponseError(format!(
194 "status code: {}",
195 resp.status
196 )))
197 }
198 }
199}
200
201#[derive(Debug, Deserialize)]
205struct Changes {
206 changes: Vec<ChangesCollection>,
207}
208
209#[derive(Debug, Deserialize)]
210struct ChangesCollection {
211 collection: String,
212 bucket: String,
213 last_modified: u64,
214}