sync15/client/
sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use super::{CollectionUpdate, GlobalState, LocalCollStateMachine, Sync15StorageClient};
use crate::clients_engine;
use crate::engine::SyncEngine;
use crate::error::Error;
use crate::telemetry;
use crate::KeyBundle;
use interrupt_support::Interruptee;

#[allow(clippy::too_many_arguments)]
pub fn synchronize_with_clients_engine(
    client: &Sync15StorageClient,
    global_state: &GlobalState,
    root_sync_key: &KeyBundle,
    clients: Option<&clients_engine::Engine<'_>>,
    engine: &dyn SyncEngine,
    fully_atomic: bool,
    telem_engine: &mut telemetry::Engine,
    interruptee: &dyn Interruptee,
) -> Result<(), Error> {
    let collection = engine.collection_name();
    log::info!("Syncing collection {}", collection);

    // our global state machine is ready - get the collection machine going.
    let coll_state = match LocalCollStateMachine::get_state(engine, global_state, root_sync_key)? {
        Some(coll_state) => coll_state,
        None => {
            // XXX - this is either "error" or "declined".
            log::warn!(
                "can't setup for the {} collection - hopefully it works later",
                collection
            );
            return Ok(());
        }
    };

    if let Some(clients) = clients {
        engine.prepare_for_sync(&|| clients.get_client_data())?;
    }
    interruptee.err_if_interrupted()?;
    // We assume an "engine" manages exactly one "collection" with the engine's name.
    match engine.get_collection_request(coll_state.last_modified)? {
        None => {
            log::info!("skipping incoming for {} - not needed.", collection);
        }
        Some(collection_request) => {
            // Ideally we would "batch" incoming records (eg, fetch just 1000 at a time)
            // and ask the engine to "stage" them as they come in - but currently we just read
            // them all in one request.

            // Doing this batching will involve specifying a "limit=" param and
            // "x-if-unmodified-since" for each request, looking for an
            // "X-Weave-Next-Offset header in the response and using that in subsequent
            // requests.
            // See https://mozilla-services.readthedocs.io/en/latest/storage/apis-1.5.html#syncstorage-paging
            //
            // But even if we had that, we need to deal with a 412 response on a subsequent batch,
            // so we can't know if we've staged *every* record for that timestamp; the next
            // sync must use an earlier one.
            //
            // For this reason, an engine can't really trust a server timestamp until the
            // very end when we know we've staged them all.
            let incoming = super::fetch_incoming(client, &coll_state, collection_request)?;
            log::info!("Downloaded {} remote changes", incoming.len());
            engine.stage_incoming(incoming, telem_engine)?;
            interruptee.err_if_interrupted()?;
        }
    };

    // Should consider adding a new `fetch_outgoing()` and having `apply()` only apply.
    // It *might* even make sense to only call `apply()` when something was staged,
    // but that's not clear - see the discussion at
    // https://github.com/mozilla/application-services/pull/5441/files/f36274f455a6299f10e7ce56b167882c369aa806#r1189267540
    log::info!("Applying changes");
    let outgoing = engine.apply(coll_state.last_modified, telem_engine)?;
    interruptee.err_if_interrupted()?;

    // XXX - this upload strategy is buggy due to batching. With enough records, we will commit
    // 2 batches on the server. If the second fails, we get an Err<> here, so can't tell the
    // engine about the successful server batch commit.
    // Most stuff below should be called per-batch rather than at the successful end of all
    // batches, but that's not trivial.
    log::info!("Uploading {} outgoing changes", outgoing.len());
    let upload_info = CollectionUpdate::new_from_changeset(
        client,
        &coll_state,
        collection,
        outgoing,
        fully_atomic,
    )?
    .upload()?;
    log::info!(
        "Upload success ({} records success, {} records failed)",
        upload_info.successful_ids.len(),
        upload_info.failed_ids.len()
    );

    let mut telem_outgoing = telemetry::EngineOutgoing::new();
    telem_outgoing.sent(upload_info.successful_ids.len() + upload_info.failed_ids.len());
    telem_outgoing.failed(upload_info.failed_ids.len());
    telem_engine.outgoing(telem_outgoing);

    engine.set_uploaded(upload_info.modified_timestamp, upload_info.successful_ids)?;

    // The above should all be per-batch :(

    engine.sync_finished()?;

    log::info!("Sync finished!");
    Ok(())
}