places/history_sync/
engine.rsuse crate::db::{PlacesDb, SharedPlacesDb};
use crate::error::*;
use crate::storage::history::{delete_everything, history_sync::reset};
use crate::storage::{get_meta, put_meta};
use interrupt_support::SqlInterruptScope;
use std::sync::Arc;
use sync15::bso::{IncomingBso, OutgoingBso};
use sync15::engine::{
CollSyncIds, CollectionRequest, EngineSyncAssociation, RequestOrder, SyncEngine,
};
use sync15::{telemetry, Guid, ServerTimestamp};
use super::plan::{apply_plan, finish_plan, get_planned_outgoing};
use super::MAX_INCOMING_PLACES;
pub const LAST_SYNC_META_KEY: &str = "history_last_sync_time";
pub const GLOBAL_SYNCID_META_KEY: &str = "history_global_sync_id";
pub const COLLECTION_SYNCID_META_KEY: &str = "history_sync_id";
fn do_apply_incoming(
db: &PlacesDb,
scope: &SqlInterruptScope,
inbound: Vec<IncomingBso>,
telem: &mut telemetry::Engine,
) -> Result<()> {
let mut incoming_telemetry = telemetry::EngineIncoming::new();
apply_plan(db, inbound, &mut incoming_telemetry, scope)?;
telem.incoming(incoming_telemetry);
Ok(())
}
fn do_sync_finished(
db: &PlacesDb,
new_timestamp: ServerTimestamp,
records_synced: Vec<Guid>,
) -> Result<()> {
log::info!(
"sync completed after uploading {} records",
records_synced.len()
);
finish_plan(db)?;
put_meta(db, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
db.pragma_update(None, "wal_checkpoint", "PASSIVE")?;
Ok(())
}
pub struct HistorySyncEngine {
pub db: Arc<SharedPlacesDb>,
pub(crate) scope: SqlInterruptScope,
}
impl HistorySyncEngine {
pub fn new(db: Arc<SharedPlacesDb>) -> Result<Self> {
Ok(Self {
scope: db.begin_interrupt_scope()?,
db,
})
}
}
impl SyncEngine for HistorySyncEngine {
fn collection_name(&self) -> std::borrow::Cow<'static, str> {
"history".into()
}
fn stage_incoming(
&self,
inbound: Vec<IncomingBso>,
telem: &mut telemetry::Engine,
) -> anyhow::Result<()> {
let conn = self.db.lock();
do_apply_incoming(&conn, &self.scope, inbound, telem)?;
Ok(())
}
fn apply(
&self,
timestamp: ServerTimestamp,
_telem: &mut telemetry::Engine,
) -> anyhow::Result<Vec<OutgoingBso>> {
let conn = self.db.lock();
put_meta(&conn, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
Ok(get_planned_outgoing(&conn)?)
}
fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
Ok(do_sync_finished(&self.db.lock(), new_timestamp, ids)?)
}
fn sync_finished(&self) -> anyhow::Result<()> {
Ok(())
}
fn get_collection_request(
&self,
server_timestamp: ServerTimestamp,
) -> anyhow::Result<Option<CollectionRequest>> {
let conn = self.db.lock();
let since =
ServerTimestamp(get_meta::<i64>(&conn, LAST_SYNC_META_KEY)?.unwrap_or_default());
Ok(if since == server_timestamp {
None
} else {
Some(
CollectionRequest::new("history".into())
.full()
.newer_than(since)
.limit(MAX_INCOMING_PLACES, RequestOrder::Newest),
)
})
}
fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
let conn = self.db.lock();
let global = get_meta(&conn, GLOBAL_SYNCID_META_KEY)?;
let coll = get_meta(&conn, COLLECTION_SYNCID_META_KEY)?;
Ok(if let (Some(global), Some(coll)) = (global, coll) {
EngineSyncAssociation::Connected(CollSyncIds { global, coll })
} else {
EngineSyncAssociation::Disconnected
})
}
fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
reset(&self.db.lock(), assoc)?;
Ok(())
}
fn wipe(&self) -> anyhow::Result<()> {
delete_everything(&self.db.lock())?;
Ok(())
}
}