1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
45use crate::db::{PlacesDb, SharedPlacesDb};
6use crate::error::*;
7use crate::storage::history::{delete_everything, history_sync::reset};
8use crate::storage::{get_meta, put_meta};
9use interrupt_support::SqlInterruptScope;
10use std::sync::Arc;
11use sync15::bso::{IncomingBso, OutgoingBso};
12use sync15::engine::{
13 CollSyncIds, CollectionRequest, EngineSyncAssociation, RequestOrder, SyncEngine,
14};
15use sync15::{telemetry, Guid, ServerTimestamp};
1617use super::plan::{apply_plan, finish_plan, get_planned_outgoing};
18use super::MAX_INCOMING_PLACES;
1920pub const LAST_SYNC_META_KEY: &str = "history_last_sync_time";
21// Note that all engines in this crate should use a *different* meta key
22// for the global sync ID, because engines are reset individually.
23pub const GLOBAL_SYNCID_META_KEY: &str = "history_global_sync_id";
24pub const COLLECTION_SYNCID_META_KEY: &str = "history_sync_id";
2526fn do_apply_incoming(
27 db: &PlacesDb,
28 scope: &SqlInterruptScope,
29 inbound: Vec<IncomingBso>,
30 telem: &mut telemetry::Engine,
31) -> Result<()> {
32let mut incoming_telemetry = telemetry::EngineIncoming::new();
33 apply_plan(db, inbound, &mut incoming_telemetry, scope)?;
34 telem.incoming(incoming_telemetry);
35Ok(())
36}
3738fn do_sync_finished(
39 db: &PlacesDb,
40 new_timestamp: ServerTimestamp,
41 records_synced: Vec<Guid>,
42) -> Result<()> {
43info!(
44"sync completed after uploading {} records",
45 records_synced.len()
46 );
47 finish_plan(db)?;
4849// write timestamp to reflect what we just wrote.
50 // XXX - should clean up transactions, but we *are not* in a transaction
51 // here, so this value applies immediately.
52put_meta(db, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
5354 db.pragma_update(None, "wal_checkpoint", "PASSIVE")?;
5556Ok(())
57}
5859// Short-lived struct that's constructed each sync
60pub struct HistorySyncEngine {
61pub db: Arc<SharedPlacesDb>,
62// We should stage these in a temp table! For now though we just hold them
63 // in memory.
64 // Public because we use it in the [PlacesApi] sync methods. We can probably make this private
65 // once all syncing goes through the sync manager.
66pub(crate) scope: SqlInterruptScope,
67}
6869impl HistorySyncEngine {
70pub fn new(db: Arc<SharedPlacesDb>) -> Result<Self> {
71Ok(Self {
72 scope: db.begin_interrupt_scope()?,
73 db,
74 })
75 }
76}
7778impl SyncEngine for HistorySyncEngine {
79fn collection_name(&self) -> std::borrow::Cow<'static, str> {
80"history".into()
81 }
8283fn stage_incoming(
84&self,
85 inbound: Vec<IncomingBso>,
86 telem: &mut telemetry::Engine,
87 ) -> anyhow::Result<()> {
88// This is minor abuse of the engine concept, but for each "stage_incoming" call we
89 // just apply it directly. We can't advance our timestamp, which means if we are
90 // interrupted we'll re-download and re-apply them, but that will be fine in practice.
91let conn = self.db.lock();
92 do_apply_incoming(&conn, &self.scope, inbound, telem)?;
93Ok(())
94 }
9596fn apply(
97&self,
98 timestamp: ServerTimestamp,
99 _telem: &mut telemetry::Engine,
100 ) -> anyhow::Result<Vec<OutgoingBso>> {
101let conn = self.db.lock();
102// We know we've seen everything incoming, so it's safe to write the timestamp now.
103 // If we are interrupted creating outgoing BSOs we won't re-apply what we just did.
104put_meta(&conn, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
105Ok(get_planned_outgoing(&conn)?)
106 }
107108fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
109Ok(do_sync_finished(&self.db.lock(), new_timestamp, ids)?)
110 }
111112fn sync_finished(&self) -> anyhow::Result<()> {
113Ok(())
114 }
115116fn get_collection_request(
117&self,
118 server_timestamp: ServerTimestamp,
119 ) -> anyhow::Result<Option<CollectionRequest>> {
120let conn = self.db.lock();
121let since =
122 ServerTimestamp(get_meta::<i64>(&conn, LAST_SYNC_META_KEY)?.unwrap_or_default());
123Ok(if since == server_timestamp {
124None
125} else {
126Some(
127 CollectionRequest::new("history".into())
128 .full()
129 .newer_than(since)
130 .limit(MAX_INCOMING_PLACES, RequestOrder::Newest),
131 )
132 })
133 }
134135fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
136let conn = self.db.lock();
137let global = get_meta(&conn, GLOBAL_SYNCID_META_KEY)?;
138let coll = get_meta(&conn, COLLECTION_SYNCID_META_KEY)?;
139Ok(if let (Some(global), Some(coll)) = (global, coll) {
140 EngineSyncAssociation::Connected(CollSyncIds { global, coll })
141 } else {
142 EngineSyncAssociation::Disconnected
143 })
144 }
145146fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
147 reset(&self.db.lock(), assoc)?;
148Ok(())
149 }
150151fn wipe(&self) -> anyhow::Result<()> {
152 delete_everything(&self.db.lock())?;
153Ok(())
154 }
155}