1use super::request::{
6 BatchPoster, InfoCollections, InfoConfiguration, PostQueue, PostResponse, PostResponseHandler,
7};
8use super::token;
9use crate::bso::{IncomingBso, IncomingEncryptedBso, OutgoingBso, OutgoingEncryptedBso};
10use crate::engine::{CollectionPost, CollectionRequest};
11use crate::error::{self, debug, info, trace, warn, Error, ErrorResponse};
12use crate::record_types::MetaGlobalRecord;
13use crate::{CollectionName, Guid, ServerTimestamp};
14use serde_json::Value;
15use std::str::FromStr;
16use std::sync::atomic::{AtomicU32, Ordering};
17use url::Url;
18use viaduct::{
19 header_names::{self, AUTHORIZATION},
20 Method, Request, Response,
21};
22
23#[derive(Debug, Clone)]
26pub enum Sync15ClientResponse<T> {
27 Success {
28 status: u16,
29 record: T,
30 last_modified: ServerTimestamp,
31 route: String,
32 },
33 Error(ErrorResponse),
34}
35
36fn parse_seconds(seconds_str: &str) -> Option<u32> {
37 let secs = seconds_str.parse::<f64>().ok()?.ceil();
38 if !secs.is_finite() || secs < 0.0 || secs > f64::from(u32::MAX) {
40 warn!("invalid backoff value: {}", secs);
41 None
42 } else {
43 Some(secs as u32)
44 }
45}
46
47impl<T> Sync15ClientResponse<T> {
48 pub fn from_response(resp: Response, backoff_listener: &BackoffListener) -> error::Result<Self>
49 where
50 for<'a> T: serde::de::Deserialize<'a>,
51 {
52 let route: String = resp.url.path().into();
53 let retry_after = resp
57 .headers
58 .get(header_names::RETRY_AFTER)
59 .and_then(parse_seconds);
60
61 let backoff = resp
62 .headers
63 .get(header_names::X_WEAVE_BACKOFF)
64 .and_then(parse_seconds);
65
66 if let Some(b) = backoff {
67 backoff_listener.note_backoff(b);
68 }
69 if let Some(ra) = retry_after {
70 backoff_listener.note_retry_after(ra);
71 }
72
73 Ok(if resp.is_success() {
74 let record: T = resp.json()?;
75 let last_modified = resp
76 .headers
77 .get(header_names::X_LAST_MODIFIED)
78 .and_then(|s| ServerTimestamp::from_str(s).ok())
79 .ok_or(Error::MissingServerTimestamp)?;
80 info!(
81 "Successful request to \"{}\", incoming x-last-modified={:?}",
82 route, last_modified
83 );
84
85 Sync15ClientResponse::Success {
86 status: resp.status,
87 record,
88 last_modified,
89 route,
90 }
91 } else {
92 let status = resp.status;
93 info!("Request \"{}\" was an error (status={})", route, status);
94 match status {
95 404 => Sync15ClientResponse::Error(ErrorResponse::NotFound { route }),
96 401 => Sync15ClientResponse::Error(ErrorResponse::Unauthorized { route }),
97 412 => Sync15ClientResponse::Error(ErrorResponse::PreconditionFailed { route }),
98 500..=600 => {
99 Sync15ClientResponse::Error(ErrorResponse::ServerError { route, status })
100 }
101 _ => Sync15ClientResponse::Error(ErrorResponse::RequestFailed { route, status }),
102 }
103 })
104 }
105
106 pub fn create_storage_error(self) -> Error {
107 let inner = match self {
108 Sync15ClientResponse::Success { status, route, .. } => {
109 warn!("Converting success response into an error");
113 ErrorResponse::RequestFailed { status, route }
114 }
115 Sync15ClientResponse::Error(e) => e,
116 };
117 Error::StorageHttpError(inner)
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
122pub struct Sync15StorageClientInit {
123 pub key_id: String,
124 pub access_token: String,
125 pub tokenserver_url: Url,
126}
127
128pub trait SetupStorageClient {
132 fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>>;
133 fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>>;
134 fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>>;
135 fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<IncomingEncryptedBso>>;
136
137 fn put_meta_global(
138 &self,
139 xius: ServerTimestamp,
140 global: &MetaGlobalRecord,
141 ) -> error::Result<ServerTimestamp>;
142 fn put_crypto_keys(
143 &self,
144 xius: ServerTimestamp,
145 keys: &OutgoingEncryptedBso,
146 ) -> error::Result<()>;
147 fn wipe_all_remote(&self) -> error::Result<()>;
148}
149
150#[derive(Debug, Default)]
151pub struct BackoffState {
152 pub backoff_secs: AtomicU32,
153 pub retry_after_secs: AtomicU32,
154}
155
156pub(crate) type BackoffListener = std::sync::Arc<BackoffState>;
157
158pub(crate) fn new_backoff_listener() -> BackoffListener {
159 std::sync::Arc::new(BackoffState::default())
160}
161
162impl BackoffState {
163 pub fn note_backoff(&self, noted: u32) {
164 super::util::atomic_update_max(&self.backoff_secs, noted)
165 }
166
167 pub fn note_retry_after(&self, noted: u32) {
168 super::util::atomic_update_max(&self.retry_after_secs, noted)
169 }
170
171 pub fn get_backoff_secs(&self) -> u32 {
172 self.backoff_secs.load(Ordering::SeqCst)
173 }
174
175 pub fn get_retry_after_secs(&self) -> u32 {
176 self.retry_after_secs.load(Ordering::SeqCst)
177 }
178
179 pub fn get_required_wait(&self, ignore_soft_backoff: bool) -> Option<std::time::Duration> {
180 let bo = self.get_backoff_secs();
181 let ra = self.get_retry_after_secs();
182 let secs = u64::from(if ignore_soft_backoff { ra } else { bo.max(ra) });
183 if secs > 0 {
184 Some(std::time::Duration::from_secs(secs))
185 } else {
186 None
187 }
188 }
189
190 pub fn reset(&self) {
191 self.backoff_secs.store(0, Ordering::SeqCst);
192 self.retry_after_secs.store(0, Ordering::SeqCst);
193 }
194}
195
196type IncMetaGlobalBso = IncomingBso;
200type OutMetaGlobalBso = OutgoingBso;
201
202#[derive(Debug)]
203pub struct Sync15StorageClient {
204 tsc: token::TokenProvider,
205 pub(crate) backoff: BackoffListener,
206}
207
208impl SetupStorageClient for Sync15StorageClient {
209 fn fetch_info_configuration(&self) -> error::Result<Sync15ClientResponse<InfoConfiguration>> {
210 self.relative_storage_request(Method::Get, "info/configuration")
211 }
212
213 fn fetch_info_collections(&self) -> error::Result<Sync15ClientResponse<InfoCollections>> {
214 self.relative_storage_request(Method::Get, "info/collections")
215 }
216
217 fn fetch_meta_global(&self) -> error::Result<Sync15ClientResponse<MetaGlobalRecord>> {
218 let got: Sync15ClientResponse<IncMetaGlobalBso> =
219 self.relative_storage_request(Method::Get, "storage/meta/global")?;
220 Ok(match got {
221 Sync15ClientResponse::Success {
222 record,
223 last_modified,
224 route,
225 status,
226 } => {
227 debug!(
228 "Got meta global with modified = {}; last-modified = {}",
229 record.envelope.modified, last_modified
230 );
231 Sync15ClientResponse::Success {
232 record: serde_json::from_str(&record.payload)?,
233 last_modified,
234 route,
235 status,
236 }
237 }
238 Sync15ClientResponse::Error(e) => Sync15ClientResponse::Error(e),
239 })
240 }
241
242 fn fetch_crypto_keys(&self) -> error::Result<Sync15ClientResponse<IncomingEncryptedBso>> {
243 self.relative_storage_request(Method::Get, "storage/crypto/keys")
244 }
245
246 fn put_meta_global(
247 &self,
248 xius: ServerTimestamp,
249 global: &MetaGlobalRecord,
250 ) -> error::Result<ServerTimestamp> {
251 let bso = OutMetaGlobalBso::new(Guid::new("global").into(), global)?;
252 self.put("storage/meta/global", xius, &bso)
253 }
254
255 fn put_crypto_keys(
256 &self,
257 xius: ServerTimestamp,
258 keys: &OutgoingEncryptedBso,
259 ) -> error::Result<()> {
260 self.put("storage/crypto/keys", xius, keys)?;
261 Ok(())
262 }
263
264 fn wipe_all_remote(&self) -> error::Result<()> {
265 let s = self.tsc.api_endpoint()?;
266 let url = Url::parse(&s)?;
267
268 let req = self.build_request(Method::Delete, url)?;
269 match self.exec_request::<Value>(req, false) {
270 Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
271 | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
272 Ok(resp) => Err(resp.create_storage_error()),
273 Err(e) => Err(e),
274 }
275 }
276}
277
278impl Sync15StorageClient {
279 pub fn new(init_params: Sync15StorageClientInit) -> error::Result<Sync15StorageClient> {
280 rc_crypto::ensure_initialized();
281 let tsc = token::TokenProvider::new(
282 init_params.tokenserver_url,
283 init_params.access_token,
284 init_params.key_id,
285 );
286 Ok(Sync15StorageClient {
287 tsc,
288 backoff: new_backoff_listener(),
289 })
290 }
291
292 pub fn get_encrypted_records(
293 &self,
294 collection_request: CollectionRequest,
295 ) -> error::Result<Sync15ClientResponse<Vec<IncomingEncryptedBso>>> {
296 self.collection_request(Method::Get, collection_request)
297 }
298
299 #[inline]
300 fn authorized(&self, req: Request) -> error::Result<Request> {
301 let hawk_header_value = self.tsc.authorization(&req)?;
302 Ok(req.header(AUTHORIZATION, hawk_header_value)?)
303 }
304
305 fn build_request(&self, method: Method, url: Url) -> error::Result<Request> {
308 self.authorized(Request::new(method, url).header(header_names::ACCEPT, "application/json")?)
309 }
310
311 fn relative_storage_request<P, T>(
312 &self,
313 method: Method,
314 relative_path: P,
315 ) -> error::Result<Sync15ClientResponse<T>>
316 where
317 P: AsRef<str>,
318 for<'a> T: serde::de::Deserialize<'a>,
319 {
320 let s = self.tsc.api_endpoint()? + "/";
321 let url = Url::parse(&s)?.join(relative_path.as_ref())?;
322 self.exec_request(self.build_request(method, url)?, false)
323 }
324
325 fn exec_request<T>(
326 &self,
327 req: Request,
328 require_success: bool,
329 ) -> error::Result<Sync15ClientResponse<T>>
330 where
331 for<'a> T: serde::de::Deserialize<'a>,
332 {
333 trace!(
334 "request: {} {} ({:?})",
335 req.method,
336 req.url.path(),
337 req.url.query()
338 );
339 let resp = req.send()?;
340
341 let result = Sync15ClientResponse::from_response(resp, &self.backoff)?;
342 match result {
343 Sync15ClientResponse::Success { .. } => Ok(result),
344 _ => {
345 if require_success {
346 Err(result.create_storage_error())
347 } else {
348 Ok(result)
349 }
350 }
351 }
352 }
353
354 fn collection_request<T>(
355 &self,
356 method: Method,
357 r: CollectionRequest,
358 ) -> error::Result<Sync15ClientResponse<T>>
359 where
360 for<'a> T: serde::de::Deserialize<'a>,
361 {
362 let url = build_collection_request_url(Url::parse(&self.tsc.api_endpoint()?)?, r)?;
363 self.exec_request(self.build_request(method, url)?, false)
364 }
365
366 pub fn new_post_queue<'a, F: PostResponseHandler>(
367 &'a self,
368 coll: &'a CollectionName,
369 config: &InfoConfiguration,
370 ts: ServerTimestamp,
371 on_response: F,
372 ) -> error::Result<PostQueue<PostWrapper<'a>, F>> {
373 let pw = PostWrapper { client: self, coll };
374 Ok(PostQueue::new(config, ts, pw, on_response))
375 }
376
377 fn put<P, B>(
378 &self,
379 relative_path: P,
380 xius: ServerTimestamp,
381 body: &B,
382 ) -> error::Result<ServerTimestamp>
383 where
384 P: AsRef<str>,
385 B: serde::ser::Serialize,
386 {
387 let s = self.tsc.api_endpoint()? + "/";
388 let url = Url::parse(&s)?.join(relative_path.as_ref())?;
389
390 let req = self
391 .build_request(Method::Put, url)?
392 .json(body)
393 .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?;
394
395 let resp = self.exec_request::<Value>(req, true)?;
396 if let Sync15ClientResponse::Success { last_modified, .. } = resp {
398 Ok(last_modified)
399 } else {
400 unreachable!("Error returned exec_request when `require_success` was true");
401 }
402 }
403
404 pub fn hashed_uid(&self) -> error::Result<String> {
405 self.tsc.hashed_uid()
406 }
407
408 pub(crate) fn wipe_remote_engine(&self, engine: &str) -> error::Result<()> {
409 let s = self.tsc.api_endpoint()? + "/";
410 let url = Url::parse(&s)?.join(&format!("storage/{}", engine))?;
411 debug!("Wiping: {:?}", url);
412 let req = self.build_request(Method::Delete, url)?;
413 match self.exec_request::<Value>(req, false) {
414 Ok(Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }))
415 | Ok(Sync15ClientResponse::Success { .. }) => Ok(()),
416 Ok(resp) => Err(resp.create_storage_error()),
417 Err(e) => Err(e),
418 }
419 }
420}
421
422pub struct PostWrapper<'a> {
423 client: &'a Sync15StorageClient,
424 coll: &'a CollectionName,
425}
426
427impl BatchPoster for PostWrapper<'_> {
428 fn post<T, O>(
429 &self,
430 bytes: Vec<u8>,
431 xius: ServerTimestamp,
432 batch: Option<String>,
433 commit: bool,
434 _: &PostQueue<T, O>,
435 ) -> error::Result<PostResponse> {
436 let r = CollectionPost::new(self.coll.clone())
437 .batch(batch)
438 .commit(commit);
439 let url = build_collection_post_url(Url::parse(&self.client.tsc.api_endpoint()?)?, r)?;
440
441 let req = self
442 .client
443 .build_request(Method::Post, url)?
444 .header(header_names::CONTENT_TYPE, "application/json")?
445 .header(header_names::X_IF_UNMODIFIED_SINCE, format!("{}", xius))?
446 .body(bytes);
447 self.client.exec_request(req, false)
448 }
449}
450
451fn build_collection_url(mut base_url: Url, collection: CollectionName) -> error::Result<Url> {
452 base_url
453 .path_segments_mut()
454 .map_err(|_| Error::UnacceptableUrl("Storage server URL is not a base".to_string()))?
455 .extend(&["storage", &collection]);
456
457 if base_url.query() == Some("") {
462 base_url.set_query(None);
463 }
464 Ok(base_url)
465}
466
467fn build_collection_request_url(mut base_url: Url, r: CollectionRequest) -> error::Result<Url> {
468 let mut pairs = base_url.query_pairs_mut();
469 if r.full {
470 pairs.append_pair("full", "1");
471 }
472 if let Some(ids) = &r.ids {
473 let mut buf = String::with_capacity(ids.len() * 13);
475 for (i, id) in ids.iter().enumerate() {
476 if i > 0 {
477 buf.push(',');
478 }
479 buf.push_str(id.as_str());
480 }
481 pairs.append_pair("ids", &buf);
482 }
483 if let Some(ts) = r.older {
484 pairs.append_pair("older", &ts.to_string());
485 }
486 if let Some(ts) = r.newer {
487 pairs.append_pair("newer", &ts.to_string());
488 }
489 if let Some(l) = r.limit {
490 pairs.append_pair("sort", l.order.as_str());
491 pairs.append_pair("limit", &l.num.to_string());
492 }
493 pairs.finish();
494 drop(pairs);
495 build_collection_url(base_url, r.collection)
496}
497
498#[cfg(feature = "sync-client")]
499fn build_collection_post_url(mut base_url: Url, r: CollectionPost) -> error::Result<Url> {
500 let mut pairs = base_url.query_pairs_mut();
501 if let Some(batch) = &r.batch {
502 pairs.append_pair("batch", batch);
503 }
504 if r.commit {
505 pairs.append_pair("commit", "true");
506 }
507 pairs.finish();
508 drop(pairs);
509 build_collection_url(base_url, r.collection)
510}
511
512#[cfg(test)]
513mod test {
514 use super::*;
515 #[test]
516 fn test_send() {
517 fn ensure_send<T: Send>() {}
518 ensure_send::<Sync15StorageClient>();
520 }
521
522 #[test]
523 fn test_parse_seconds() {
524 assert_eq!(parse_seconds("1"), Some(1));
525 assert_eq!(parse_seconds("1.4"), Some(2));
526 assert_eq!(parse_seconds("1.5"), Some(2));
527 assert_eq!(parse_seconds("3600.0"), Some(3600));
528 assert_eq!(parse_seconds("3600"), Some(3600));
529 assert_eq!(parse_seconds("-1"), None);
530 assert_eq!(parse_seconds("inf"), None);
531 assert_eq!(parse_seconds("-inf"), None);
532 assert_eq!(parse_seconds("one-thousand"), None);
533 assert_eq!(parse_seconds("4294967295"), Some(4294967295));
534 assert_eq!(parse_seconds("4294967296"), None);
535 }
536
537 #[test]
538 fn test_query_building() {
539 use crate::engine::RequestOrder;
540 let base = Url::parse("https://example.com/sync").unwrap();
541
542 let empty =
543 build_collection_request_url(base.clone(), CollectionRequest::new("foo".into()))
544 .unwrap();
545 assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo");
546
547 let idreq = build_collection_request_url(
548 base.clone(),
549 CollectionRequest::new("wutang".into())
550 .full()
551 .ids(&["rza", "gza"]),
552 )
553 .unwrap();
554 assert_eq!(
555 idreq.as_str(),
556 "https://example.com/sync/storage/wutang?full=1&ids=rza%2Cgza"
557 );
558
559 let complex = build_collection_request_url(
560 base,
561 CollectionRequest::new("specific".into())
562 .full()
563 .limit(10, RequestOrder::Oldest)
564 .older_than(ServerTimestamp(9_876_540))
565 .newer_than(ServerTimestamp(1_234_560)),
566 )
567 .unwrap();
568 assert_eq!(complex.as_str(),
569 "https://example.com/sync/storage/specific?full=1&older=9876.54&newer=1234.56&sort=oldest&limit=10");
570 }
571
572 #[cfg(feature = "sync-client")]
573 #[test]
574 fn test_post_query_building() {
575 let base = Url::parse("https://example.com/sync").unwrap();
576
577 let empty =
578 build_collection_post_url(base.clone(), CollectionPost::new("foo".into())).unwrap();
579 assert_eq!(empty.as_str(), "https://example.com/sync/storage/foo");
580 let batch_start = build_collection_post_url(
581 base.clone(),
582 CollectionPost::new("bar".into())
583 .batch(Some("true".into()))
584 .commit(false),
585 )
586 .unwrap();
587 assert_eq!(
588 batch_start.as_str(),
589 "https://example.com/sync/storage/bar?batch=true"
590 );
591 let batch_commit = build_collection_post_url(
592 base,
593 CollectionPost::new("asdf".into())
594 .batch(Some("1234abc".into()))
595 .commit(true),
596 )
597 .unwrap();
598 assert_eq!(
599 batch_commit.as_str(),
600 "https://example.com/sync/storage/asdf?batch=1234abc&commit=true"
601 );
602 }
603}