sync15/client/
storage_client.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::request::{
6    BatchPoster, InfoCollections, InfoConfiguration, PostQueue, PostResponse, PostResponseHandler,
7};
8use super::token;
9use crate::bso::{IncomingBso, IncomingEncryptedBso, OutgoingBso, OutgoingEncryptedBso};
10use crate::engine::{CollectionPost, CollectionRequest};
11use crate::error::{self, debug, info, trace, warn, Error, ErrorResponse};
12use crate::record_types::MetaGlobalRecord;
13use crate::{CollectionName, Guid, ServerTimestamp};
14use serde_json::Value;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU32, Ordering};
17use url::Url;
18use viaduct::{
19    header_names::{self, AUTHORIZATION},
20    Method, Request, Response,
21};
22
23/// A response from a GET request on a Sync15StorageClient, encapsulating all
24/// the variants users of this client needs to care about.
25#[derive(Debug, Clone)]
26pub enum Sync15ClientResponse<T> {
27    Success {
28        status: u16,
29        record: T,
30        last_modified: ServerTimestamp,
31        route: String,
32    },
33    Error(ErrorResponse),
34}
35
36fn parse_seconds(seconds_str: &str) -> Option<u32> {
37    let secs = seconds_str.parse::<f64>().ok()?.ceil();
38    // Note: u32 doesn't impl TryFrom<f64> :(
39    if !secs.is_finite() || secs < 0.0 || secs > f64::from(u32::MAX) {
40        warn!("invalid backoff value: {}", secs);
41        None
42    } else {
43        Some(secs as u32)
44    }
45}
46
47impl<T> Sync15ClientResponse<T> {
48    pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result<Self>
49    where
50        for<'a> T: serde::de::Deserialize<'a>,
51    {
52        let route: String = resp.url.path().into();
53        // Android seems to respect retry_after even on success requests, so we
54        // will too if it's present. This also lets us handle both backoff-like
55        // properties in the same place.
56        let retry_after = resp
57            .headers
58            .get(header_names::RETRY_AFTER)
59            .and_then(parse_seconds);
60
61        let backoff = resp
62            .headers
63            .get(header_names::X_WEAVE_BACKOFF)
64            .and_then(parse_seconds);
65
66        if let Some(b) = backoff {
67            backoff_listener.note_backoff(b);
68        }
69        if let Some(ra) = retry_after {
70            backoff_listener.note_retry_after(ra);
71        }
72
73        Ok(if resp.is_success() {
74            let record: T = resp.json()?;
75            let last_modified = resp
76                .headers
77                .get(header_names::X_LAST_MODIFIED)
78                .and_then(|s| ServerTimestamp::from_str(s).ok())
79                .ok_or(Error::MissingServerTimestamp)?;
80            info!(
81                "Successful request to \"{}\", incoming x-last-modified={:?}",
82                route, last_modified
83            );
84
85            Sync15ClientResponse::Success {
86                status: resp.status,
87                record,
88                last_modified,
89                route,
90            }
91        } else {
92            let status = resp.status;
93            info!("Request \"{}\" was an error (status={})", route, status);
94            match status {
95                404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }),
96                401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }),
97                412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }),
98                500..=600 => {
99                    Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status })
100                }
101                _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }),
102            }
103        })
104    }
105
106    pub fn create_storage_error(self) -> Error {
107        let inner = match self {
108            Sync15ClientResponse::Success { status, route, .. } => {
109                // This should never happen as callers are expected to have
110                // already special-cased this response, so warn if it does.
111                // (or maybe we could panic?)
112                warn!("Converting success response into an error");
113                ErrorResponse::RequestFailed { status, route }
114            }
115            Sync15ClientResponse::Error(e) => e,
116        };
117        Error::StorageHttpError(inner)
118    }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub struct Sync15StorageClientInit {
123    pub key_id: String,
124    pub access_token: String,
125    pub tokenserver_url: Url,
126}
127
128/// A trait containing the methods required to run through the setup state
129/// machine. This is factored out into a separate trait to make mocking
130/// easier.
131pub trait SetupStorageClient {
132    fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>>;
133    fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>>;
134    fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>>;
135    fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<IncomingEncryptedBso>>;
136
137    fn put_meta_global(
138        &self,
139        xius: ServerTimestamp,
140        global: &MetaGlobalRecord,
141    ) -> error::Result<ServerTimestamp>;
142    fn put_crypto_keys(
143        &self,
144        xius: ServerTimestamp,
145        keys: &OutgoingEncryptedBso,
146    ) -> error::Result<()>;
147    fn wipe_all_remote(&self) -> error::Result<()>;
148}
149
150#[derive(Debug, Default)]
151pub struct BackoffState {
152    pub backoff_secs: AtomicU32,
153    pub retry_after_secs: AtomicU32,
154}
155
156pub(crate) type BackoffListener = std::sync::Arc<BackoffState>;
157
158pub(crate) fn new_backoff_listener() -> BackoffListener {
159    std::sync::Arc::new(BackoffState::default())
160}
161
162impl BackoffState {
163    pub fn note_backoff(&self, noted: u32) {
164        super::util::atomic_update_max(&self.backoff_secs, noted)
165    }
166
167    pub fn note_retry_after(&self, noted: u32) {
168        super::util::atomic_update_max(&self.retry_after_secs, noted)
169    }
170
171    pub fn get_backoff_secs(&self) -> u32 {
172        self.backoff_secs.load(Ordering::SeqCst)
173    }
174
175    pub fn get_retry_after_secs(&self) -> u32 {
176        self.retry_after_secs.load(Ordering::SeqCst)
177    }
178
179    pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option<std::time::Duration> {
180        let bo = self.get_backoff_secs();
181        let ra = self.get_retry_after_secs();
182        let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) });
183        if secs > 0 {
184            Some(std::time::Duration::from_secs(secs))
185        } else {
186            None
187        }
188    }
189
190    pub fn reset(&self) {
191        self.backoff_secs.store(0, Ordering::SeqCst);
192        self.retry_after_secs.store(0, Ordering::SeqCst);
193    }
194}
195
196// meta/global is a clear-text Bso (ie, there's a String `payload` which has a MetaGlobalRecord)
197// We don't use the 'content' helpers here because we want json errors to be fatal here
198// (ie, we don't need tombstones and can't just skip a malformed record)
199type IncMetaGlobalBso = IncomingBso;
200type OutMetaGlobalBso = OutgoingBso;
201
202#[derive(Debug)]
203pub struct Sync15StorageClient {
204    tsc: token::TokenProvider,
205    pub(crate) backoff: BackoffListener,
206}
207
208impl SetupStorageClient for Sync15StorageClient {
209    fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>> {
210        self.relative_storage_request(Method::Get, "info/configuration")
211    }
212
213    fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> {
214        self.relative_storage_request(Method::Get, "info/collections")
215    }
216
217    fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> {
218        let got: Sync15ClientResponse<IncMetaGlobalBso> =
219            self.relative_storage_request(Method::Get, "storage/meta/global")?;
220        Ok(match got {
221            Sync15ClientResponse::Success {
222                record,
223                last_modified,
224                route,
225                status,
226            } => {
227                debug!(
228                    "Got meta global with modified = {}; last-modified = {}",
229                    record.envelope.modified, last_modified
230                );
231                Sync15ClientResponse::Success {
232                    record: serde_json::from_str(&record.payload)?,
233                    last_modified,
234                    route,
235                    status,
236                }
237            }
238            Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e),
239        })
240    }
241
242    fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<IncomingEncryptedBso>> {
243        self.relative_storage_request(Method::Get, "storage/crypto/keys")
244    }
245
246    fn put_meta_global(
247        &self,
248        xius: ServerTimestamp,
249        global: &MetaGlobalRecord,
250    ) -> error::Result<ServerTimestamp> {
251        let bso = OutMetaGlobalBso::new(Guid::new("global").into(), global)?;
252        self.put("storage/meta/global", xius, &bso)
253    }
254
255    fn put_crypto_keys(
256        &self,
257        xius: ServerTimestamp,
258        keys: &OutgoingEncryptedBso,
259    ) -> error::Result<()> {
260        self.put("storage/crypto/keys", xius, keys)?;
261        Ok(())
262    }
263
264    fn wipe_all_remote(&self) -> error::Result<()> {
265        let s = self.tsc.api_endpoint()?;
266        let url = Url::parse(&s)?;
267
268        let req = self.build_request(Method::Delete, url)?;
269        match self.exec_request::<Value>(req, false) {
270            Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
271            | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
272            Ok(resp) => Err(resp.create_storage_error()),
273            Err(e) => Err(e),
274        }
275    }
276}
277
278impl Sync15StorageClient {
279    pub fn new(init_params: Sync15StorageClientInit) -> error::Result<Sync15StorageClient> {
280        rc_crypto::ensure_initialized();
281        let tsc = token::TokenProvider::new(
282            init_params.tokenserver_url,
283            init_params.access_token,
284            init_params.key_id,
285        );
286        Ok(Sync15StorageClient {
287            tsc,
288            backoff: new_backoff_listener(),
289        })
290    }
291
292    pub fn get_encrypted_records(
293        &self,
294        collection_request: CollectionRequest,
295    ) -> error::Result<Sync15ClientResponse<Vec<IncomingEncryptedBso>>> {
296        self.collection_request(Method::Get, collection_request)
297    }
298
299    #[inline]
300    fn authorized(&self, req: Request) -> error::Result<Request> {
301        let hawk_header_value = self.tsc.authorization(&req)?;
302        Ok(req.header(AUTHORIZATION, hawk_header_value)?)
303    }
304
305    // TODO: probably want a builder-like API to do collection requests (e.g. something
306    // that occupies roughly the same conceptual role as the Collection class in desktop)
307    fn build_request(&self, method: Method, url: Url) -> error::Result<Request> {
308        self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?)
309    }
310
311    fn relative_storage_request<P, T>(
312        &self,
313        method: Method,
314        relative_path: P,
315    ) -> error::Result<Sync15ClientResponse<T>>
316    where
317        P: AsRef<str>,
318        for<'a> T: serde::de::Deserialize<'a>,
319    {
320        let s = self.tsc.api_endpoint()? + "/";
321        let url = Url::parse(&s)?.join(relative_path.as_ref())?;
322        self.exec_request(self.build_request(method, url)?, false)
323    }
324
325    fn exec_request<T>(
326        &self,
327        req: Request,
328        require_success: bool,
329    ) -> error::Result<Sync15ClientResponse<T>>
330    where
331        for<'a> T: serde::de::Deserialize<'a>,
332    {
333        trace!(
334            "request: {} {} ({:?})",
335            req.method,
336            req.url.path(),
337            req.url.query()
338        );
339        let resp = req.send()?;
340
341        let result = Sync15ClientResponse::from_response(resp, &self.backoff)?;
342        match result {
343            Sync15ClientResponse::Success { .. } => Ok(result),
344            _ => {
345                if require_success {
346                    Err(result.create_storage_error())
347                } else {
348                    Ok(result)
349                }
350            }
351        }
352    }
353
354    fn collection_request<T>(
355        &self,
356        method: Method,
357        r: CollectionRequest,
358    ) -> error::Result<Sync15ClientResponse<T>>
359    where
360        for<'a> T: serde::de::Deserialize<'a>,
361    {
362        let url = build_collection_request_url(Url::parse(&self.tsc.api_endpoint()?)?, r)?;
363        self.exec_request(self.build_request(method, url)?, false)
364    }
365
366    pub fn new_post_queue<'a, F: PostResponseHandler>(
367        &'a self,
368        coll: &'a CollectionName,
369        config: &InfoConfiguration,
370        ts: ServerTimestamp,
371        on_response: F,
372    ) -> error::Result<PostQueue<PostWrapper<'a>, F>> {
373        let pw = PostWrapper { client: self, coll };
374        Ok(PostQueue::new(config, ts, pw, on_response))
375    }
376
377    fn put<P, B>(
378        &self,
379        relative_path: P,
380        xius: ServerTimestamp,
381        body: &B,
382    ) -> error::Result<ServerTimestamp>
383    where
384        P: AsRef<str>,
385        B: serde::ser::Serialize,
386    {
387        let s = self.tsc.api_endpoint()? + "/";
388        let url = Url::parse(&s)?.join(relative_path.as_ref())?;
389
390        let req = self
391            .build_request(Method::Put, url)?
392            .json(body)
393            .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?;
394
395        let resp = self.exec_request::<Value>(req, true)?;
396        // Note: we pass `true` for require_success, so this panic never happens.
397        if let Sync15ClientResponse::Success { last_modified, .. } = resp {
398            Ok(last_modified)
399        } else {
400            unreachable!("Error returned exec_request when `require_success` was true");
401        }
402    }
403
404    pub fn hashed_uid(&self) -> error::Result<String> {
405        self.tsc.hashed_uid()
406    }
407
408    pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> {
409        let s = self.tsc.api_endpoint()? + "/";
410        let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?;
411        debug!("Wiping: {:?}", url);
412        let req = self.build_request(Method::Delete, url)?;
413        match self.exec_request::<Value>(req, false) {
414            Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
415            | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
416            Ok(resp) => Err(resp.create_storage_error()),
417            Err(e) => Err(e),
418        }
419    }
420}
421
422pub struct PostWrapper<'a> {
423    client: &'a Sync15StorageClient,
424    coll: &'a CollectionName,
425}
426
427impl BatchPoster for PostWrapper<'_> {
428    fn post<T, O>(
429        &self,
430        bytes: Vec<u8>,
431        xius: ServerTimestamp,
432        batch: Option<String>,
433        commit: bool,
434        _: &PostQueue<T, O>,
435    ) -> error::Result<PostResponse> {
436        let r = CollectionPost::new(self.coll.clone())
437            .batch(batch)
438            .commit(commit);
439        let url = build_collection_post_url(Url::parse(&self.client.tsc.api_endpoint()?)?, r)?;
440
441        let req = self
442            .client
443            .build_request(Method::Post, url)?
444            .header(header_names::CONTENT_TYPE, "application/json")?
445            .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?
446            .body(bytes);
447        self.client.exec_request(req, false)
448    }
449}
450
451fn build_collection_url(mut base_url: Url, collection: CollectionName) -> error::Result<Url> {
452    base_url
453        .path_segments_mut()
454        .map_err(|_| Error::UnacceptableUrl("Storage server URL is not a base".to_string()))?
455        .extend(&["storage", &collection]);
456
457    // This is strange but just accessing query_pairs_mut makes you have
458    // a trailing question mark on your url. I don't think anything bad
459    // would happen here, but I don't know, and also, it looks dumb so
460    // I'd rather not have it.
461    if base_url.query() == Some("") {
462        base_url.set_query(None);
463    }
464    Ok(base_url)
465}
466
467fn build_collection_request_url(mut base_url: Url, r: CollectionRequest) -> error::Result<Url> {
468    let mut pairs = base_url.query_pairs_mut();
469    if r.full {
470        pairs.append_pair("full", "1");
471    }
472    if let Some(ids) = &r.ids {
473        // Most ids are 12 characters, and we comma separate them, so 13.
474        let mut buf = String::with_capacity(ids.len() * 13);
475        for (i, id) in ids.iter().enumerate() {
476            if i > 0 {
477                buf.push(',');
478            }
479            buf.push_str(id.as_str());
480        }
481        pairs.append_pair("ids", &buf);
482    }
483    if let Some(ts) = r.older {
484        pairs.append_pair("older", &ts.to_string());
485    }
486    if let Some(ts) = r.newer {
487        pairs.append_pair("newer", &ts.to_string());
488    }
489    if let Some(l) = r.limit {
490        pairs.append_pair("sort", l.order.as_str());
491        pairs.append_pair("limit", &l.num.to_string());
492    }
493    pairs.finish();
494    drop(pairs);
495    build_collection_url(base_url, r.collection)
496}
497
498#[cfg(feature = "sync-client")]
499fn build_collection_post_url(mut base_url: Url, r: CollectionPost) -> error::Result<Url> {
500    let mut pairs = base_url.query_pairs_mut();
501    if let Some(batch) = &r.batch {
502        pairs.append_pair("batch", batch);
503    }
504    if r.commit {
505        pairs.append_pair("commit", "true");
506    }
507    pairs.finish();
508    drop(pairs);
509    build_collection_url(base_url, r.collection)
510}
511
512#[cfg(test)]
513mod test {
514    use super::*;
515    #[test]
516    fn test_send() {
517        fn ensure_send<T: Send>() {}
518        // Compile will fail if not send.
519        ensure_send::<Sync15StorageClient>();
520    }
521
522    #[test]
523    fn test_parse_seconds() {
524        assert_eq!(parse_seconds("1"), Some(1));
525        assert_eq!(parse_seconds("1.4"), Some(2));
526        assert_eq!(parse_seconds("1.5"), Some(2));
527        assert_eq!(parse_seconds("3600.0"), Some(3600));
528        assert_eq!(parse_seconds("3600"), Some(3600));
529        assert_eq!(parse_seconds("-1"), None);
530        assert_eq!(parse_seconds("inf"), None);
531        assert_eq!(parse_seconds("-inf"), None);
532        assert_eq!(parse_seconds("one-thousand"), None);
533        assert_eq!(parse_seconds("4294967295"), Some(4294967295));
534        assert_eq!(parse_seconds("4294967296"), None);
535    }
536
537    #[test]
538    fn test_query_building() {
539        use crate::engine::RequestOrder;
540        let base = Url::parse("https://example.com/sync").unwrap();
541
542        let empty =
543            build_collection_request_url(base.clone(), CollectionRequest::new("foo".into()))
544                .unwrap();
545        assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo");
546
547        let idreq = build_collection_request_url(
548            base.clone(),
549            CollectionRequest::new("wutang".into())
550                .full()
551                .ids(&["rza", "gza"]),
552        )
553        .unwrap();
554        assert_eq!(
555            idreq.as_str(),
556            "https://example.com/sync/storage/wutang?full=1&ids=rza%2Cgza"
557        );
558
559        let complex = build_collection_request_url(
560            base,
561            CollectionRequest::new("specific".into())
562                .full()
563                .limit(10, RequestOrder::Oldest)
564                .older_than(ServerTimestamp(9_876_540))
565                .newer_than(ServerTimestamp(1_234_560)),
566        )
567        .unwrap();
568        assert_eq!(complex.as_str(),
569            "https://example.com/sync/storage/specific?full=1&older=9876.54&newer=1234.56&sort=oldest&limit=10");
570    }
571
572    #[cfg(feature = "sync-client")]
573    #[test]
574    fn test_post_query_building() {
575        let base = Url::parse("https://example.com/sync").unwrap();
576
577        let empty =
578            build_collection_post_url(base.clone(), CollectionPost::new("foo".into())).unwrap();
579        assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo");
580        let batch_start = build_collection_post_url(
581            base.clone(),
582            CollectionPost::new("bar".into())
583                .batch(Some("true".into()))
584                .commit(false),
585        )
586        .unwrap();
587        assert_eq!(
588            batch_start.as_str(),
589            "https://example.com/sync/storage/bar?batch=true"
590        );
591        let batch_commit = build_collection_post_url(
592            base,
593            CollectionPost::new("asdf".into())
594                .batch(Some("1234abc".into()))
595                .commit(true),
596        )
597        .unwrap();
598        assert_eq!(
599            batch_commit.as_str(),
600            "https://example.com/sync/storage/asdf?batch=1234abc&commit=true"
601        );
602    }
603}