sync15/client/
sync.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::{CollectionUpdate, GlobalState, LocalCollStateMachine, Sync15StorageClient};
6use crate::clients_engine;
7use crate::engine::SyncEngine;
8use crate::error::{info, warn, Error};
9use crate::telemetry;
10use crate::KeyBundle;
11use interrupt_support::Interruptee;
12
13#[allow(clippy::too_many_arguments)]
14pub fn synchronize_with_clients_engine(
15    client: &Sync15StorageClient,
16    global_state: &GlobalState,
17    root_sync_key: &KeyBundle,
18    clients: Option<&clients_engine::Engine<'_>>,
19    engine: &dyn SyncEngine,
20    fully_atomic: bool,
21    telem_engine: &mut telemetry::Engine,
22    interruptee: &dyn Interruptee,
23) -> Result<(), Error> {
24    let collection = engine.collection_name();
25    info!("Syncing collection {}", collection);
26
27    // our global state machine is ready - get the collection machine going.
28    let coll_state = match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? {
29        Some(coll_state) => coll_state,
30        None => {
31            // XXX - this is either "error" or "declined".
32            warn!(
33                "can't setup for the {} collection - hopefully it works later",
34                collection
35            );
36            return Ok(());
37        }
38    };
39
40    if let Some(clients) = clients {
41        engine.prepare_for_sync(&|| clients.get_client_data())?;
42    }
43    interruptee.err_if_interrupted()?;
44    // We assume an "engine" manages exactly one "collection" with the engine's name.
45    match engine.get_collection_request(coll_state.last_modified)? {
46        None => {
47            info!("skipping incoming for {} - not needed.", collection);
48        }
49        Some(collection_request) => {
50            // Ideally we would "batch" incoming records (eg, fetch just 1000 at a time)
51            // and ask the engine to "stage" them as they come in - but currently we just read
52            // them all in one request.
53
54            // Doing this batching will involve specifying a "limit=" param and
55            // "x-if-unmodified-since" for each request, looking for an
56            // "X-Weave-Next-Offset header in the response and using that in subsequent
57            // requests.
58            // See https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html#syncstorage-paging
59            //
60            // But even if we had that, we need to deal with a 412 response on a subsequent batch,
61            // so we can't know if we've staged *every* record for that timestamp; the next
62            // sync must use an earlier one.
63            //
64            // For this reason, an engine can't really trust a server timestamp until the
65            // very end when we know we've staged them all.
66            let incoming = super::fetch_incoming(client, &coll_state, collection_request)?;
67            info!("Downloaded {} remote changes", incoming.len());
68            engine.stage_incoming(incoming, telem_engine)?;
69            interruptee.err_if_interrupted()?;
70        }
71    };
72
73    // Should consider adding a new `fetch_outgoing()` and having `apply()` only apply.
74    // It *might* even make sense to only call `apply()` when something was staged,
75    // but that's not clear - see the discussion at
76    // https://github.com/mozilla/application-services/pull/5441/files/f36274f455a6299f10e7ce56b167882c369aa806#r1189267540
77    info!("Applying changes");
78    let outgoing = engine.apply(coll_state.last_modified, telem_engine)?;
79    interruptee.err_if_interrupted()?;
80
81    // XXX - this upload strategy is buggy due to batching. With enough records, we will commit
82    // 2 batches on the server. If the second fails, we get an Err<> here, so can't tell the
83    // engine about the successful server batch commit.
84    // Most stuff below should be called per-batch rather than at the successful end of all
85    // batches, but that's not trivial.
86    info!("Uploading {} outgoing changes", outgoing.len());
87    let upload_info = CollectionUpdate::new_from_changeset(
88        client,
89        &coll_state,
90        collection,
91        outgoing,
92        fully_atomic,
93    )?
94    .upload()?;
95    info!(
96        "Upload success ({} records success, {} records failed)",
97        upload_info.successful_ids.len(),
98        upload_info.failed_ids.len()
99    );
100
101    let mut telem_outgoing = telemetry::EngineOutgoing::new();
102    telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
103    telem_outgoing.failed(upload_info.failed_ids.len());
104    telem_engine.outgoing(telem_outgoing);
105
106    engine.set_uploaded(upload_info.modified_timestamp, upload_info.successful_ids)?;
107
108    // The above should all be per-batch :(
109
110    engine.sync_finished()?;
111
112    info!("Sync finished!");
113    Ok(())
114}