remote_settings/
service.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::{Arc, Weak},
8};
9
10use camino::Utf8PathBuf;
11use error_support::trace;
12use parking_lot::Mutex;
13use serde::Deserialize;
14use viaduct::Request;
15
16use crate::{
17    client::RemoteState, config::BaseUrl, error::Error, storage::Storage, RemoteSettingsClient,
18    RemoteSettingsConfig2, RemoteSettingsContext, RemoteSettingsServer, Result,
19};
20
21/// Internal Remote settings service API
22pub struct RemoteSettingsService {
23    inner: Mutex<RemoteSettingsServiceInner>,
24}
25
26struct RemoteSettingsServiceInner {
27    storage_dir: Utf8PathBuf,
28    base_url: BaseUrl,
29    bucket_name: String,
30    app_context: Option<RemoteSettingsContext>,
31    remote_state: RemoteState,
32    /// Weakrefs for all clients that we've created.  Note: this stores the
33    /// top-level/public `RemoteSettingsClient` structs rather than `client::RemoteSettingsClient`.
34    /// The reason for this is that we return Arcs to the public struct to the foreign code, so we
35    /// need to use the same type for our weakrefs.  The alternative would be to create 2 Arcs for
36    /// each client, which is wasteful.
37    clients: Vec<Weak<RemoteSettingsClient>>,
38}
39
40impl RemoteSettingsService {
41    /// Construct a [RemoteSettingsService]
42    ///
43    /// This is typically done early in the application-startup process
44    pub fn new(storage_dir: String, config: RemoteSettingsConfig2) -> Self {
45        let storage_dir = storage_dir.into();
46        let base_url = config
47            .server
48            .unwrap_or(RemoteSettingsServer::Prod)
49            .get_base_url_with_prod_fallback();
50        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
51
52        Self {
53            inner: Mutex::new(RemoteSettingsServiceInner {
54                storage_dir,
55                base_url,
56                bucket_name,
57                app_context: config.app_context,
58                remote_state: RemoteState::default(),
59                clients: vec![],
60            }),
61        }
62    }
63
64    pub fn make_client(&self, collection_name: String) -> Arc<RemoteSettingsClient> {
65        let mut inner = self.inner.lock();
66        // Allow using in-memory databases for testing of external crates.
67        let storage = if inner.storage_dir == ":memory:" {
68            Storage::new(inner.storage_dir.clone())
69        } else {
70            Storage::new(inner.storage_dir.join(format!("{collection_name}.sql")))
71        };
72
73        let client = Arc::new(RemoteSettingsClient::new(
74            inner.base_url.clone(),
75            inner.bucket_name.clone(),
76            collection_name.clone(),
77            inner.app_context.clone(),
78            storage,
79        ));
80        inner.clients.push(Arc::downgrade(&client));
81        client
82    }
83
84    /// Sync collections for all active clients
85    pub fn sync(&self) -> Result<Vec<String>> {
86        // Make sure we only sync each collection once, even if there are multiple clients
87        let mut synced_collections = HashSet::new();
88
89        let mut inner = self.inner.lock();
90        let changes = inner.fetch_changes()?;
91        let change_map: HashMap<_, _> = changes
92            .changes
93            .iter()
94            .map(|c| ((c.collection.as_str(), &c.bucket), c.last_modified))
95            .collect();
96        let bucket_name = inner.bucket_name.clone();
97
98        for client in inner.active_clients() {
99            let client = &client.internal;
100            let collection_name = client.collection_name();
101            if let Some(client_last_modified) = client.get_last_modified_timestamp()? {
102                if let Some(server_last_modified) = change_map.get(&(collection_name, &bucket_name))
103                {
104                    if client_last_modified != *server_last_modified {
105                        trace!("skipping up-to-date collection: {collection_name}");
106                        continue;
107                    }
108                }
109            }
110            if synced_collections.insert(collection_name.to_string()) {
111                trace!("syncing collection: {collection_name}");
112                client.sync()?;
113            }
114        }
115        Ok(synced_collections.into_iter().collect())
116    }
117
118    /// Update the remote settings config
119    ///
120    /// This will cause all current and future clients to use new config and will delete any stored
121    /// records causing the clients to return new results from the new config.
122    pub fn update_config(&self, config: RemoteSettingsConfig2) -> Result<()> {
123        let base_url = config
124            .server
125            .unwrap_or(RemoteSettingsServer::Prod)
126            .get_base_url()?;
127        let bucket_name = config.bucket_name.unwrap_or_else(|| String::from("main"));
128        let mut inner = self.inner.lock();
129        for client in inner.active_clients() {
130            client.internal.update_config(
131                base_url.clone(),
132                bucket_name.clone(),
133                config.app_context.clone(),
134            )?;
135        }
136        inner.base_url = base_url;
137        inner.bucket_name = bucket_name;
138        inner.app_context = config.app_context;
139        Ok(())
140    }
141}
142
143impl RemoteSettingsServiceInner {
144    // Find live clients in self.clients
145    //
146    // Also, drop dead weakrefs from the vec
147    fn active_clients(&mut self) -> Vec<Arc<RemoteSettingsClient>> {
148        let mut active_clients = vec![];
149        self.clients.retain(|weak| {
150            if let Some(client) = weak.upgrade() {
151                active_clients.push(client);
152                true
153            } else {
154                false
155            }
156        });
157        active_clients
158    }
159
160    fn fetch_changes(&mut self) -> Result<Changes> {
161        let mut url = self.base_url.clone();
162        url.path_segments_mut()
163            .push("buckets")
164            .push("monitor")
165            .push("collections")
166            .push("changes")
167            .push("changeset");
168        // For now, always use `0` for the expected value.  This means we'll get updates based on
169        // the default TTL of 1 hour.
170        //
171        // Eventually, we should add support for push notifications and use the timestamp from the
172        // notification.
173        url.query_pairs_mut().append_pair("_expected", "0");
174        let url = url.into_inner();
175        trace!("make_request: {url}");
176        self.remote_state.ensure_no_backoff()?;
177
178        let req = Request::get(url);
179        let resp = req.send()?;
180
181        self.remote_state.handle_backoff_hint(&resp)?;
182
183        if resp.is_success() {
184            Ok(resp.json()?)
185        } else {
186            Err(Error::ResponseError(format!(
187                "status code: {}",
188                resp.status
189            )))
190        }
191    }
192}
193
194/// Data from the changes endpoint
195///
196/// https://remote-settings.readthedocs.io/en/latest/client-specifications.html#endpoints
197#[derive(Debug, Deserialize)]
198struct Changes {
199    changes: Vec<ChangesCollection>,
200}
201
202#[derive(Debug, Deserialize)]
203struct ChangesCollection {
204    collection: String,
205    bucket: String,
206    last_modified: u64,
207}