sync15/client/sync.rs
1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use super::{CollectionUpdate, GlobalState, LocalCollStateMachine, Sync15StorageClient};
6use crate::clients_engine;
7use crate::engine::SyncEngine;
8use crate::error::{info, warn, Error};
9use crate::telemetry;
10use crate::KeyBundle;
11use interrupt_support::Interruptee;
12
13#[allow(clippy::too_many_arguments)]
14pub fn synchronize_with_clients_engine(
15 client: &Sync15StorageClient,
16 global_state: &GlobalState,
17 root_sync_key: &KeyBundle,
18 clients: Option<&clients_engine::Engine<'_>>,
19 engine: &dyn SyncEngine,
20 fully_atomic: bool,
21 telem_engine: &mut telemetry::Engine,
22 interruptee: &dyn Interruptee,
23) -> Result<(), Error> {
24 let collection = engine.collection_name();
25 info!("Syncing collection {}", collection);
26
27 // our global state machine is ready - get the collection machine going.
28 let coll_state = match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? {
29 Some(coll_state) => coll_state,
30 None => {
31 // XXX - this is either "error" or "declined".
32 warn!(
33 "can't setup for the {} collection - hopefully it works later",
34 collection
35 );
36 return Ok(());
37 }
38 };
39
40 if let Some(clients) = clients {
41 engine.prepare_for_sync(&|| clients.get_client_data())?;
42 }
43 interruptee.err_if_interrupted()?;
44 // We assume an "engine" manages exactly one "collection" with the engine's name.
45 match engine.get_collection_request(coll_state.last_modified)? {
46 None => {
47 info!("skipping incoming for {} - not needed.", collection);
48 }
49 Some(collection_request) => {
50 // Ideally we would "batch" incoming records (eg, fetch just 1000 at a time)
51 // and ask the engine to "stage" them as they come in - but currently we just read
52 // them all in one request.
53
54 // Doing this batching will involve specifying a "limit=" param and
55 // "x-if-unmodified-since" for each request, looking for an
56 // "X-Weave-Next-Offset header in the response and using that in subsequent
57 // requests.
58 // See https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html#syncstorage-paging
59 //
60 // But even if we had that, we need to deal with a 412 response on a subsequent batch,
61 // so we can't know if we've staged *every* record for that timestamp; the next
62 // sync must use an earlier one.
63 //
64 // For this reason, an engine can't really trust a server timestamp until the
65 // very end when we know we've staged them all.
66 let incoming = super::fetch_incoming(client, &coll_state, collection_request)?;
67 info!("Downloaded {} remote changes", incoming.len());
68 engine.stage_incoming(incoming, telem_engine)?;
69 interruptee.err_if_interrupted()?;
70 }
71 };
72
73 // Should consider adding a new `fetch_outgoing()` and having `apply()` only apply.
74 // It *might* even make sense to only call `apply()` when something was staged,
75 // but that's not clear - see the discussion at
76 // https://github.com/mozilla/application-services/pull/5441/files/f36274f455a6299f10e7ce56b167882c369aa806#r1189267540
77 info!("Applying changes");
78 let outgoing = engine.apply(coll_state.last_modified, telem_engine)?;
79 interruptee.err_if_interrupted()?;
80
81 // XXX - this upload strategy is buggy due to batching. With enough records, we will commit
82 // 2 batches on the server. If the second fails, we get an Err<> here, so can't tell the
83 // engine about the successful server batch commit.
84 // Most stuff below should be called per-batch rather than at the successful end of all
85 // batches, but that's not trivial.
86 info!("Uploading {} outgoing changes", outgoing.len());
87 let upload_info = CollectionUpdate::new_from_changeset(
88 client,
89 &coll_state,
90 collection,
91 outgoing,
92 fully_atomic,
93 )?
94 .upload()?;
95 info!(
96 "Upload success ({} records success, {} records failed)",
97 upload_info.successful_ids.len(),
98 upload_info.failed_ids.len()
99 );
100
101 let mut telem_outgoing = telemetry::EngineOutgoing::new();
102 telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
103 telem_outgoing.failed(upload_info.failed_ids.len());
104 telem_engine.outgoing(telem_outgoing);
105
106 engine.set_uploaded(upload_info.modified_timestamp, upload_info.successful_ids)?;
107
108 // The above should all be per-batch :(
109
110 engine.sync_finished()?;
111
112 info!("Sync finished!");
113 Ok(())
114}