sync15/engine/bridged_engine.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use crate::{telemetry, ServerTimestamp};
use anyhow::Result;
use crate::bso::{IncomingBso, OutgoingBso};
use crate::Guid;
use super::{CollSyncIds, EngineSyncAssociation, SyncEngine};
/// A BridgedEngine acts as a bridge between application-services, rust
/// implemented sync engines and sync engines as defined by Desktop Firefox.
///
/// [Desktop Firefox has an abstract implementation of a Sync
/// Engine](https://searchfox.org/mozilla-central/source/services/sync/modules/engines.js)
/// with a number of functions each engine is expected to override. Engines
/// implemented in Rust use a different shape (specifically, the
/// [SyncEngine](crate::SyncEngine) trait), so this BridgedEngine trait adapts
/// between the 2.
pub trait BridgedEngine: Send + Sync {
/// Returns the last sync time, in milliseconds, for this engine's
/// collection. This is called before each sync, to determine the lower
/// bound for new records to fetch from the server.
fn last_sync(&self) -> Result<i64>;
/// Sets the last sync time, in milliseconds. This is called throughout
/// the sync, to fast-forward the stored last sync time to match the
/// timestamp on the uploaded records.
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
/// Returns the sync ID for this engine's collection. This is only used in
/// tests.
fn sync_id(&self) -> Result<Option<String>>;
/// Resets the sync ID for this engine's collection, returning the new ID.
/// As a side effect, implementations should reset all local Sync state,
/// as in `reset`.
/// (Note that bridged engines never maintain the "global" guid - that's all managed
/// by the bridged_engine consumer (ie, desktop). bridged_engines only care about
/// the per-collection one.)
fn reset_sync_id(&self) -> Result<String>;
/// Ensures that the locally stored sync ID for this engine's collection
/// matches the `new_sync_id` from the server. If the two don't match,
/// implementations should reset all local Sync state, as in `reset`.
/// This method returns the assigned sync ID, which can be either the
/// `new_sync_id`, or a different one if the engine wants to force other
/// devices to reset their Sync state for this collection the next time they
/// sync.
fn ensure_current_sync_id(&self, new_sync_id: &str) -> Result<String>;
/// Tells the tabs engine about recent FxA devices. A bit of a leaky abstraction as it only
/// makes sense for tabs.
/// The arg is a json serialized `ClientData` struct.
fn prepare_for_sync(&self, _client_data: &str) -> Result<()> {
Ok(())
}
/// Indicates that the engine is about to start syncing. This is called
/// once per sync, and always before `store_incoming`.
fn sync_started(&self) -> Result<()>;
/// Stages a batch of incoming Sync records. This is called multiple
/// times per sync, once for each batch. Implementations can use the
/// signal to check if the operation was aborted, and cancel any
/// pending work.
fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()>;
/// Applies all staged records, reconciling changes on both sides and
/// resolving conflicts. Returns a list of records to upload.
fn apply(&self) -> Result<ApplyResults>;
/// Indicates that the given record IDs were uploaded successfully to the
/// server. This is called multiple times per sync, once for each batch
/// upload.
fn set_uploaded(&self, server_modified_millis: i64, ids: &[Guid]) -> Result<()>;
/// Indicates that all records have been uploaded. At this point, any record
/// IDs marked for upload that haven't been passed to `set_uploaded`, can be
/// assumed to have failed: for example, because the server rejected a record
/// with an invalid TTL or sort index.
fn sync_finished(&self) -> Result<()>;
/// Resets all local Sync state, including any change flags, mirrors, and
/// the last sync time, such that the next sync is treated as a first sync
/// with all new local data. Does not erase any local user data.
fn reset(&self) -> Result<()>;
/// Erases all local user data for this collection, and any Sync metadata.
/// This method is destructive, and unused for most collections.
fn wipe(&self) -> Result<()>;
}
// This is an adaptor trait - the idea is that engines can implement this
// trait along with SyncEngine and get a BridgedEngine for free. It's temporary
// so we can land this trait without needing to update desktop.
// Longer term, we should remove both this trait and BridgedEngine entirely, sucking up
// the breaking change for desktop. The main blocker to this is moving desktop away
// from the explicit timestamp handling and moving closer to the `get_collection_request`
// model.
pub trait BridgedEngineAdaptor: Send + Sync {
// These are the main mismatches between the 2 engines
fn last_sync(&self) -> Result<i64>;
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()>;
fn sync_started(&self) -> Result<()> {
Ok(())
}
fn engine(&self) -> &dyn SyncEngine;
}
impl<A: BridgedEngineAdaptor> BridgedEngine for A {
fn last_sync(&self) -> Result<i64> {
self.last_sync()
}
fn set_last_sync(&self, last_sync_millis: i64) -> Result<()> {
self.set_last_sync(last_sync_millis)
}
fn sync_id(&self) -> Result<Option<String>> {
Ok(match self.engine().get_sync_assoc()? {
EngineSyncAssociation::Disconnected => None,
EngineSyncAssociation::Connected(c) => Some(c.coll.into()),
})
}
fn reset_sync_id(&self) -> Result<String> {
// Note that bridged engines never maintain the "global" guid - that's all managed
// by desktop. bridged_engines only care about the per-collection one.
let global = Guid::empty();
let coll = Guid::random();
self.engine()
.reset(&EngineSyncAssociation::Connected(CollSyncIds {
global,
coll: coll.clone(),
}))?;
Ok(coll.to_string())
}
fn ensure_current_sync_id(&self, sync_id: &str) -> Result<String> {
let engine = self.engine();
let assoc = engine.get_sync_assoc()?;
if matches!(assoc, EngineSyncAssociation::Connected(c) if c.coll == sync_id) {
log::debug!("ensure_current_sync_id is current");
} else {
let new_coll_ids = CollSyncIds {
global: Guid::empty(),
coll: sync_id.into(),
};
engine.reset(&EngineSyncAssociation::Connected(new_coll_ids))?;
}
Ok(sync_id.to_string())
}
fn prepare_for_sync(&self, client_data: &str) -> Result<()> {
// unwrap here is unfortunate, but can hopefully go away if we can
// start using the ClientData type instead of the string.
self.engine()
.prepare_for_sync(&|| serde_json::from_str::<crate::ClientData>(client_data).unwrap())
}
fn sync_started(&self) -> Result<()> {
A::sync_started(self)
}
fn store_incoming(&self, incoming_records: Vec<IncomingBso>) -> Result<()> {
let engine = self.engine();
let mut telem = telemetry::Engine::new(engine.collection_name());
engine.stage_incoming(incoming_records, &mut telem)
}
fn apply(&self) -> Result<ApplyResults> {
let engine = self.engine();
let mut telem = telemetry::Engine::new(engine.collection_name());
// Desktop tells a bridged engine to apply the records without telling it
// the server timestamp, and once applied, explicitly calls `set_last_sync()`
// with that timestamp. So this adaptor needs to call apply with an invalid
// timestamp, and hope that later call with the correct timestamp does come.
// This isn't ideal as it means the timestamp is updated in a different transaction,
// but nothing too bad should happen if it doesn't - we'll just end up applying
// the same records again next sync.
let records = engine.apply(ServerTimestamp::from_millis(0), &mut telem)?;
Ok(ApplyResults {
records,
num_reconciled: telem
.get_incoming()
.as_ref()
.map(|i| i.get_reconciled() as usize),
})
}
fn set_uploaded(&self, millis: i64, ids: &[Guid]) -> Result<()> {
self.engine()
.set_uploaded(ServerTimestamp::from_millis(millis), ids.to_vec())
}
fn sync_finished(&self) -> Result<()> {
self.engine().sync_finished()
}
fn reset(&self) -> Result<()> {
self.engine().reset(&EngineSyncAssociation::Disconnected)
}
fn wipe(&self) -> Result<()> {
self.engine().wipe()
}
}
// TODO: We should see if we can remove this to reduce the number of types engines need to deal
// with. num_reconciled is only used for telemetry on desktop.
#[derive(Debug, Default)]
pub struct ApplyResults {
/// List of records
pub records: Vec<OutgoingBso>,
/// The number of incoming records whose contents were merged because they
/// changed on both sides. None indicates we aren't reporting this
/// information.
pub num_reconciled: Option<usize>,
}
impl ApplyResults {
pub fn new(records: Vec<OutgoingBso>, num_reconciled: impl Into<Option<usize>>) -> Self {
Self {
records,
num_reconciled: num_reconciled.into(),
}
}
}
// Shorthand for engines that don't care.
impl From<Vec<OutgoingBso>> for ApplyResults {
fn from(records: Vec<OutgoingBso>) -> Self {
Self {
records,
num_reconciled: None,
}
}
}