sync15/client/
coll_update.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::{
6    request::{NormalResponseHandler, UploadInfo},
7    CollState, Sync15ClientResponse, Sync15StorageClient,
8};
9use crate::bso::{IncomingBso, OutgoingBso, OutgoingEncryptedBso};
10use crate::engine::CollectionRequest;
11use crate::error::{self, Error, Result};
12use crate::{CollectionName, KeyBundle, ServerTimestamp};
13
14fn encrypt_outgoing(o: Vec<OutgoingBso>, key: &KeyBundle) -> Result<Vec<OutgoingEncryptedBso>> {
15    o.into_iter()
16        .map(|change| change.into_encrypted(key))
17        .collect()
18}
19
20pub fn fetch_incoming(
21    client: &Sync15StorageClient,
22    state: &CollState,
23    collection_request: CollectionRequest,
24) -> Result<Vec<IncomingBso>> {
25    let (records, _timestamp) = match client.get_encrypted_records(collection_request)? {
26        Sync15ClientResponse::Success {
27            record,
28            last_modified,
29            ..
30        } => (record, last_modified),
31        other => return Err(other.create_storage_error()),
32    };
33    let mut result = Vec::with_capacity(records.len());
34    for record in records {
35        // if we see a HMAC error, we've made an explicit decision to
36        // NOT handle it here, but restart the global state machine.
37        // That should cause us to re-read crypto/keys and things should
38        // work (although if for some reason crypto/keys was updated but
39        // not all storage was wiped we are probably screwed.)
40        result.push(record.into_decrypted(&state.key)?);
41    }
42    Ok(result)
43}
44
45pub struct CollectionUpdate<'a> {
46    client: &'a Sync15StorageClient,
47    state: &'a CollState,
48    collection: CollectionName,
49    xius: ServerTimestamp,
50    to_update: Vec<OutgoingEncryptedBso>,
51    fully_atomic: bool,
52}
53
54impl<'a> CollectionUpdate<'a> {
55    pub fn new(
56        client: &'a Sync15StorageClient,
57        state: &'a CollState,
58        collection: CollectionName,
59        xius: ServerTimestamp,
60        records: Vec<OutgoingEncryptedBso>,
61        fully_atomic: bool,
62    ) -> CollectionUpdate<'a> {
63        CollectionUpdate {
64            client,
65            state,
66            collection,
67            xius,
68            to_update: records,
69            fully_atomic,
70        }
71    }
72
73    pub fn new_from_changeset(
74        client: &'a Sync15StorageClient,
75        state: &'a CollState,
76        collection: CollectionName,
77        changeset: Vec<OutgoingBso>,
78        fully_atomic: bool,
79    ) -> Result<CollectionUpdate<'a>> {
80        let to_update = encrypt_outgoing(changeset, &state.key)?;
81        Ok(CollectionUpdate::new(
82            client,
83            state,
84            collection,
85            state.last_modified,
86            to_update,
87            fully_atomic,
88        ))
89    }
90
91    /// Returns a list of the IDs that failed if allowed_dropped_records is true, otherwise
92    /// returns an empty vec.
93    pub fn upload(self) -> error::Result<UploadInfo> {
94        let mut failed = vec![];
95        let mut q = self.client.new_post_queue(
96            &self.collection,
97            &self.state.config,
98            self.xius,
99            NormalResponseHandler::new(!self.fully_atomic),
100        )?;
101
102        for record in self.to_update.into_iter() {
103            let enqueued = q.enqueue(&record)?;
104            if !enqueued && self.fully_atomic {
105                return Err(Error::RecordTooLargeError);
106            }
107        }
108
109        q.flush(true)?;
110        let mut info = q.completed_upload_info();
111        info.failed_ids.append(&mut failed);
112        if self.fully_atomic {
113            assert_eq!(
114                info.failed_ids.len(),
115                0,
116                "Bug: Should have failed by now if we aren't allowing dropped records"
117            );
118        }
119        Ok(info)
120    }
121}