remote_settings/
service.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use url::Url;
15use viaduct::Request;
16
17use crate::{
18    client::RemoteState, config::BaseUrl, error::Error, storage::Storage, RemoteSettingsClient,
19    RemoteSettingsConfig2, RemoteSettingsContext, RemoteSettingsServer, Result,
20};
21
22/// Internal Remote settings service API
23pub struct RemoteSettingsService {
24    inner: Mutex<RemoteSettingsServiceInner>,
25}
26
27struct RemoteSettingsServiceInner {
28    storage_dir: Utf8PathBuf,
29    base_url: BaseUrl,
30    bucket_name: String,
31    app_context: Option<RemoteSettingsContext>,
32    remote_state: RemoteState,
33    /// Weakrefs for all clients that we've created.  Note: this stores the
34    /// top-level/public `RemoteSettingsClient` structs rather than `client::RemoteSettingsClient`.
35    /// The reason for this is that we return Arcs to the public struct to the foreign code, so we
36    /// need to use the same type for our weakrefs.  The alternative would be to create 2 Arcs for
37    /// each client, which is wasteful.
38    clients: Vec<Weak<RemoteSettingsClient>>,
39}
40
41impl RemoteSettingsService {
42    /// Construct a [RemoteSettingsService]
43    ///
44    /// This is typically done early in the application-startup process
45    pub fn new(storage_dir: String, config: RemoteSettingsConfig2) -> Self {
46        let storage_dir = storage_dir.into();
47        let base_url = config
48            .server
49            .unwrap_or(RemoteSettingsServer::Prod)
50            .get_base_url_with_prod_fallback();
51        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
52
53        Self {
54            inner: Mutex::new(RemoteSettingsServiceInner {
55                storage_dir,
56                base_url,
57                bucket_name,
58                app_context: config.app_context,
59                remote_state: RemoteState::default(),
60                clients: vec![],
61            }),
62        }
63    }
64
65    pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
66        let mut inner = self.inner.lock();
67        // Allow using in-memory databases for testing of external crates.
68        let storage = if inner.storage_dir == ":memory:" {
69            Storage::new(inner.storage_dir.clone())
70        } else {
71            Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
72        };
73
74        let client = Arc::new(RemoteSettingsClient::new(
75            inner.base_url.clone(),
76            inner.bucket_name.clone(),
77            collection_name.clone(),
78            inner.app_context.clone(),
79            storage,
80        ));
81        inner.clients.push(Arc::downgrade(&client));
82        client
83    }
84
85    /// Sync collections for all active clients
86    pub fn sync(&self) -> Result<Vec<String>> {
87        // Make sure we only sync each collection once, even if there are multiple clients
88        let mut synced_collections = HashSet::new();
89
90        let mut inner = self.inner.lock();
91        let changes = inner.fetch_changes()?;
92        let change_map: HashMap<_, _> = changes
93            .changes
94            .iter()
95            .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
96            .collect();
97        let bucket_name = inner.bucket_name.clone();
98
99        for client in inner.active_clients() {
100            let client = &client.internal;
101            let collection_name = client.collection_name();
102            if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
103                if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
104                {
105                    if client_last_modified == *server_last_modified {
106                        trace!("skipping up-to-date collection: {collection_name}");
107                        continue;
108                    }
109                }
110            }
111            if synced_collections.insert(collection_name.to_string()) {
112                trace!("syncing collection: {collection_name}");
113                client.sync()?;
114            }
115        }
116        Ok(synced_collections.into_iter().collect())
117    }
118
119    /// Update the remote settings config
120    ///
121    /// This will cause all current and future clients to use new config and will delete any stored
122    /// records causing the clients to return new results from the new config.
123    pub fn update_config(&self, config: RemoteSettingsConfig2) -> Result<()> {
124        let base_url = config
125            .server
126            .unwrap_or(RemoteSettingsServer::Prod)
127            .get_base_url()?;
128        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
129        let mut inner = self.inner.lock();
130        for client in inner.active_clients() {
131            client.internal.update_config(
132                base_url.clone(),
133                bucket_name.clone(),
134                config.app_context.clone(),
135            )?;
136        }
137        inner.base_url = base_url;
138        inner.bucket_name = bucket_name;
139        inner.app_context = config.app_context;
140        Ok(())
141    }
142
143    pub fn client_url(&self) -> Url {
144        let inner = self.inner.lock();
145        let base_url = inner.base_url.clone();
146        base_url.url().clone()
147    }
148}
149
150impl RemoteSettingsServiceInner {
151    // Find live clients in self.clients
152    //
153    // Also, drop dead weakrefs from the vec
154    fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
155        let mut active_clients = vec![];
156        self.clients.retain(|weak| {
157            if let Some(client) = weak.upgrade() {
158                active_clients.push(client);
159                true
160            } else {
161                false
162            }
163        });
164        active_clients
165    }
166
167    fn fetch_changes(&mut self) -> Result<Changes> {
168        let mut url = self.base_url.clone();
169        url.path_segments_mut()
170            .push("buckets")
171            .push("monitor")
172            .push("collections")
173            .push("changes")
174            .push("changeset");
175        // For now, always use `0` for the expected value.  This means we'll get updates based on
176        // the default TTL of 1 hour.
177        //
178        // Eventually, we should add support for push notifications and use the timestamp from the
179        // notification.
180        url.query_pairs_mut().append_pair("_expected", "0");
181        let url = url.into_inner();
182        trace!("make_request: {url}");
183        self.remote_state.ensure_no_backoff()?;
184
185        let req = Request::get(url);
186        let resp = req.send()?;
187
188        self.remote_state.handle_backoff_hint(&resp)?;
189
190        if resp.is_success() {
191            Ok(resp.json()?)
192        } else {
193            Err(Error::ResponseError(format!(
194                "status code: {}",
195                resp.status
196            )))
197        }
198    }
199}
200
201/// Data from the changes endpoint
202///
203/// https://remote-settings.readthedocs.io/en/latest/client-specifications.html#endpoints
204#[derive(Debug, Deserialize)]
205struct Changes {
206    changes: Vec<ChangesCollection>,
207}
208
209#[derive(Debug, Deserialize)]
210struct ChangesCollection {
211    collection: String,
212    bucket: String,
213    last_modified: u64,
214}