places/history_sync/
engine.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5use crate::db::{PlacesDb, SharedPlacesDb};
6use crate::error::*;
7use crate::storage::history::{delete_everything, history_sync::reset};
8use crate::storage::{get_meta, put_meta};
9use interrupt_support::SqlInterruptScope;
10use std::sync::Arc;
11use sync15::bso::{IncomingBso, OutgoingBso};
12use sync15::engine::{
13    CollSyncIds, CollectionRequest, EngineSyncAssociation, RequestOrder, SyncEngine,
14};
15use sync15::{telemetry, Guid, ServerTimestamp};
16
17use super::plan::{apply_plan, finish_plan, get_planned_outgoing};
18use super::MAX_INCOMING_PLACES;
19
20pub const LAST_SYNC_META_KEY: &str = "history_last_sync_time";
21// Note that all engines in this crate should use a *different* meta key
22// for the global sync ID, because engines are reset individually.
23pub const GLOBAL_SYNCID_META_KEY: &str = "history_global_sync_id";
24pub const COLLECTION_SYNCID_META_KEY: &str = "history_sync_id";
25
26fn do_apply_incoming(
27    db: &PlacesDb,
28    scope: &SqlInterruptScope,
29    inbound: Vec<IncomingBso>,
30    telem: &mut telemetry::Engine,
31) -> Result<()> {
32    let mut incoming_telemetry = telemetry::EngineIncoming::new();
33    apply_plan(db, inbound, &mut incoming_telemetry, scope)?;
34    telem.incoming(incoming_telemetry);
35    Ok(())
36}
37
38fn do_sync_finished(
39    db: &PlacesDb,
40    new_timestamp: ServerTimestamp,
41    records_synced: Vec<Guid>,
42) -> Result<()> {
43    info!(
44        "sync completed after uploading {} records",
45        records_synced.len()
46    );
47    finish_plan(db)?;
48
49    // write timestamp to reflect what we just wrote.
50    // XXX - should clean up transactions, but we *are not* in a transaction
51    // here, so this value applies immediately.
52    put_meta(db, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
53
54    db.pragma_update(None, "wal_checkpoint", "PASSIVE")?;
55
56    Ok(())
57}
58
59// Short-lived struct that's constructed each sync
60pub struct HistorySyncEngine {
61    pub db: Arc<SharedPlacesDb>,
62    // We should stage these in a temp table! For now though we just hold them
63    // in memory.
64    // Public because we use it in the [PlacesApi] sync methods.  We can probably make this private
65    // once all syncing goes through the sync manager.
66    pub(crate) scope: SqlInterruptScope,
67}
68
69impl HistorySyncEngine {
70    pub fn new(db: Arc<SharedPlacesDb>) -> Result<Self> {
71        Ok(Self {
72            scope: db.begin_interrupt_scope()?,
73            db,
74        })
75    }
76}
77
78impl SyncEngine for HistorySyncEngine {
79    fn collection_name(&self) -> std::borrow::Cow<'static, str> {
80        "history".into()
81    }
82
83    fn stage_incoming(
84        &self,
85        inbound: Vec<IncomingBso>,
86        telem: &mut telemetry::Engine,
87    ) -> anyhow::Result<()> {
88        // This is minor abuse of the engine concept, but for each "stage_incoming" call we
89        // just apply it directly. We can't advance our timestamp, which means if we are
90        // interrupted we'll re-download and re-apply them, but that will be fine in practice.
91        let conn = self.db.lock();
92        do_apply_incoming(&conn, &self.scope, inbound, telem)?;
93        Ok(())
94    }
95
96    fn apply(
97        &self,
98        timestamp: ServerTimestamp,
99        _telem: &mut telemetry::Engine,
100    ) -> anyhow::Result<Vec<OutgoingBso>> {
101        let conn = self.db.lock();
102        // We know we've seen everything incoming, so it's safe to write the timestamp now.
103        // If we are interrupted creating outgoing BSOs we won't re-apply what we just did.
104        put_meta(&conn, LAST_SYNC_META_KEY, &timestamp.as_millis())?;
105        Ok(get_planned_outgoing(&conn)?)
106    }
107
108    fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
109        Ok(do_sync_finished(&self.db.lock(), new_timestamp, ids)?)
110    }
111
112    fn sync_finished(&self) -> anyhow::Result<()> {
113        Ok(())
114    }
115
116    fn get_collection_request(
117        &self,
118        server_timestamp: ServerTimestamp,
119    ) -> anyhow::Result<Option<CollectionRequest>> {
120        let conn = self.db.lock();
121        let since =
122            ServerTimestamp(get_meta::<i64>(&conn, LAST_SYNC_META_KEY)?.unwrap_or_default());
123        Ok(if since == server_timestamp {
124            None
125        } else {
126            Some(
127                CollectionRequest::new("history".into())
128                    .full()
129                    .newer_than(since)
130                    .limit(MAX_INCOMING_PLACES, RequestOrder::Newest),
131            )
132        })
133    }
134
135    fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
136        let conn = self.db.lock();
137        let global = get_meta(&conn, GLOBAL_SYNCID_META_KEY)?;
138        let coll = get_meta(&conn, COLLECTION_SYNCID_META_KEY)?;
139        Ok(if let (Some(global), Some(coll)) = (global, coll) {
140            EngineSyncAssociation::Connected(CollSyncIds { global, coll })
141        } else {
142            EngineSyncAssociation::Disconnected
143        })
144    }
145
146    fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
147        reset(&self.db.lock(), assoc)?;
148        Ok(())
149    }
150
151    fn wipe(&self) -> anyhow::Result<()> {
152        delete_everything(&self.db.lock())?;
153        Ok(())
154    }
155}