sync15/client/
coll_update.rs
1use super::{
6 request::{NormalResponseHandler, UploadInfo},
7 CollState, Sync15ClientResponse, Sync15StorageClient,
8};
9use crate::bso::{IncomingBso, OutgoingBso, OutgoingEncryptedBso};
10use crate::engine::CollectionRequest;
11use crate::error::{self, Error, Result};
12use crate::{CollectionName, KeyBundle, ServerTimestamp};
13
14fn encrypt_outgoing(o: Vec<OutgoingBso>, key: &KeyBundle) -> Result<Vec<OutgoingEncryptedBso>> {
15 o.into_iter()
16 .map(|change| change.into_encrypted(key))
17 .collect()
18}
19
20pub fn fetch_incoming(
21 client: &Sync15StorageClient,
22 state: &CollState,
23 collection_request: CollectionRequest,
24) -> Result<Vec<IncomingBso>> {
25 let (records, _timestamp) = match client.get_encrypted_records(collection_request)? {
26 Sync15ClientResponse::Success {
27 record,
28 last_modified,
29 ..
30 } => (record, last_modified),
31 other => return Err(other.create_storage_error()),
32 };
33 let mut result = Vec::with_capacity(records.len());
34 for record in records {
35 result.push(record.into_decrypted(&state.key)?);
41 }
42 Ok(result)
43}
44
45pub struct CollectionUpdate<'a> {
46 client: &'a Sync15StorageClient,
47 state: &'a CollState,
48 collection: CollectionName,
49 xius: ServerTimestamp,
50 to_update: Vec<OutgoingEncryptedBso>,
51 fully_atomic: bool,
52}
53
54impl<'a> CollectionUpdate<'a> {
55 pub fn new(
56 client: &'a Sync15StorageClient,
57 state: &'a CollState,
58 collection: CollectionName,
59 xius: ServerTimestamp,
60 records: Vec<OutgoingEncryptedBso>,
61 fully_atomic: bool,
62 ) -> CollectionUpdate<'a> {
63 CollectionUpdate {
64 client,
65 state,
66 collection,
67 xius,
68 to_update: records,
69 fully_atomic,
70 }
71 }
72
73 pub fn new_from_changeset(
74 client: &'a Sync15StorageClient,
75 state: &'a CollState,
76 collection: CollectionName,
77 changeset: Vec<OutgoingBso>,
78 fully_atomic: bool,
79 ) -> Result<CollectionUpdate<'a>> {
80 let to_update = encrypt_outgoing(changeset, &state.key)?;
81 Ok(CollectionUpdate::new(
82 client,
83 state,
84 collection,
85 state.last_modified,
86 to_update,
87 fully_atomic,
88 ))
89 }
90
91 pub fn upload(self) -> error::Result<UploadInfo> {
94 let mut failed = vec![];
95 let mut q = self.client.new_post_queue(
96 &self.collection,
97 &self.state.config,
98 self.xius,
99 NormalResponseHandler::new(!self.fully_atomic),
100 )?;
101
102 for record in self.to_update.into_iter() {
103 let enqueued = q.enqueue(&record)?;
104 if !enqueued && self.fully_atomic {
105 return Err(Error::RecordTooLargeError);
106 }
107 }
108
109 q.flush(true)?;
110 let mut info = q.completed_upload_info();
111 info.failed_ids.append(&mut failed);
112 if self.fully_atomic {
113 assert_eq!(
114 info.failed_ids.len(),
115 0,
116 "Bug: Should have failed by now if we aren't allowing dropped records"
117 );
118 }
119 Ok(info)
120 }
121}