places/history_sync/
engine.rs1use crate::db::{PlacesDb, SharedPlacesDb};
6use crate::error::*;
7use crate::storage::history::{delete_everything, history_sync::reset};
8use crate::storage::{get_meta, put_meta};
9use interrupt_support::SqlInterruptScope;
10use std::sync::Arc;
11use sync15::bso::{IncomingBso, OutgoingBso};
12use sync15::engine::{
13 CollSyncIds, CollectionRequest, EngineSyncAssociation, RequestOrder, SyncEngine,
14};
15use sync15::{telemetry, Guid, ServerTimestamp};
16
17use super::plan::{apply_plan, finish_plan, get_planned_outgoing};
18use super::MAX_INCOMING_PLACES;
19
20pub const LAST_SYNC_META_KEY: &str = "history_last_sync_time";
21pub const GLOBAL_SYNCID_META_KEY: &str = "history_global_sync_id";
24pub const COLLECTION_SYNCID_META_KEY: &str = "history_sync_id";
25
26fn do_apply_incoming(
27 db: &PlacesDb,
28 scope: &SqlInterruptScope,
29 inbound: Vec<IncomingBso>,
30 telem: &mut telemetry::Engine,
31) -> Result<()> {
32 let mut incoming_telemetry = telemetry::EngineIncoming::new();
33 apply_plan(db, inbound, &mut incoming_telemetry, scope)?;
34 telem.incoming(incoming_telemetry);
35 Ok(())
36}
37
38fn do_sync_finished(
39 db: &PlacesDb,
40 new_timestamp: ServerTimestamp,
41 records_synced: Vec<Guid>,
42) -> Result<()> {
43 info!(
44 "sync completed after uploading {} records",
45 records_synced.len()
46 );
47 finish_plan(db)?;
48
49 put_meta(db, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
53
54 db.pragma_update(None, "wal_checkpoint", "PASSIVE")?;
55
56 Ok(())
57}
58
59pub struct HistorySyncEngine {
61 pub db: Arc<SharedPlacesDb>,
62 pub(crate) scope: SqlInterruptScope,
67}
68
69impl HistorySyncEngine {
70 pub fn new(db: Arc<SharedPlacesDb>) -> Result<Self> {
71 Ok(Self {
72 scope: db.begin_interrupt_scope()?,
73 db,
74 })
75 }
76}
77
78impl SyncEngine for HistorySyncEngine {
79 fn collection_name(&self) -> std::borrow::Cow<'static, str> {
80 "history".into()
81 }
82
83 fn stage_incoming(
84 &self,
85 inbound: Vec<IncomingBso>,
86 telem: &mut telemetry::Engine,
87 ) -> anyhow::Result<()> {
88 let conn = self.db.lock();
92 do_apply_incoming(&conn, &self.scope, inbound, telem)?;
93 Ok(())
94 }
95
96 fn apply(
97 &self,
98 timestamp: ServerTimestamp,
99 _telem: &mut telemetry::Engine,
100 ) -> anyhow::Result<Vec<OutgoingBso>> {
101 let conn = self.db.lock();
102 put_meta(&conn, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
105 Ok(get_planned_outgoing(&conn)?)
106 }
107
108 fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
109 Ok(do_sync_finished(&self.db.lock(), new_timestamp, ids)?)
110 }
111
112 fn sync_finished(&self) -> anyhow::Result<()> {
113 Ok(())
114 }
115
116 fn get_collection_request(
117 &self,
118 server_timestamp: ServerTimestamp,
119 ) -> anyhow::Result<Option<CollectionRequest>> {
120 let conn = self.db.lock();
121 let since =
122 ServerTimestamp(get_meta::<i64>(&conn, LAST_SYNC_META_KEY)?.unwrap_or_default());
123 Ok(if since == server_timestamp {
124 None
125 } else {
126 Some(
127 CollectionRequest::new("history".into())
128 .full()
129 .newer_than(since)
130 .limit(MAX_INCOMING_PLACES, RequestOrder::Newest),
131 )
132 })
133 }
134
135 fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
136 let conn = self.db.lock();
137 let global = get_meta(&conn, GLOBAL_SYNCID_META_KEY)?;
138 let coll = get_meta(&conn, COLLECTION_SYNCID_META_KEY)?;
139 Ok(if let (Some(global), Some(coll)) = (global, coll) {
140 EngineSyncAssociation::Connected(CollSyncIds { global, coll })
141 } else {
142 EngineSyncAssociation::Disconnected
143 })
144 }
145
146 fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
147 reset(&self.db.lock(), assoc)?;
148 Ok(())
149 }
150
151 fn wipe(&self) -> anyhow::Result<()> {
152 delete_everything(&self.db.lock())?;
153 Ok(())
154 }
155}