use super::{plan_incoming, ProcessIncomingRecordImpl, ProcessOutgoingRecordImpl, SyncRecord};
use crate::error::*;
use crate::Store;
use rusqlite::{
types::{FromSql, ToSql},
Connection, Transaction,
};
use std::sync::Arc;
use sync15::bso::{IncomingBso, OutgoingBso};
use sync15::engine::{CollSyncIds, CollectionRequest, EngineSyncAssociation, SyncEngine};
use sync15::{telemetry, CollectionName, ServerTimestamp};
use sync_guid::Guid;
pub struct EngineConfig {
pub(crate) namespace: String, pub(crate) collection: CollectionName, }
pub const LAST_SYNC_META_KEY: &str = "last_sync_time";
pub const GLOBAL_SYNCID_META_KEY: &str = "global_sync_id";
pub const COLLECTION_SYNCID_META_KEY: &str = "sync_id";
pub trait SyncEngineStorageImpl<T> {
fn get_incoming_impl(
&self,
enc_key: &Option<String>,
) -> Result<Box<dyn ProcessIncomingRecordImpl<Record = T>>>;
fn reset_storage(&self, conn: &Transaction<'_>) -> Result<()>;
fn get_outgoing_impl(
&self,
enc_key: &Option<String>,
) -> Result<Box<dyn ProcessOutgoingRecordImpl<Record = T>>>;
}
pub struct ConfigSyncEngine<T> {
pub(crate) config: EngineConfig,
pub(crate) store: Arc<Store>,
pub(crate) storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
local_enc_key: Option<String>,
}
impl<T> ConfigSyncEngine<T> {
pub fn new(
config: EngineConfig,
store: Arc<Store>,
storage_impl: Box<dyn SyncEngineStorageImpl<T>>,
) -> Self {
Self {
config,
store,
storage_impl,
local_enc_key: None,
}
}
fn put_meta(&self, conn: &Connection, tail: &str, value: &dyn ToSql) -> Result<()> {
let key = format!("{}.{}", self.config.namespace, tail);
crate::db::store::put_meta(conn, &key, value)
}
fn get_meta<V: FromSql>(&self, conn: &Connection, tail: &str) -> Result<Option<V>> {
let key = format!("{}.{}", self.config.namespace, tail);
crate::db::store::get_meta(conn, &key)
}
fn delete_meta(&self, conn: &Connection, tail: &str) -> Result<()> {
let key = format!("{}.{}", self.config.namespace, tail);
crate::db::store::delete_meta(conn, &key)
}
pub fn reset_local_sync_data(&self) -> Result<()> {
let db = &self.store.db.lock().unwrap();
let tx = db.unchecked_transaction()?;
self.storage_impl.reset_storage(&tx)?;
self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
tx.commit()?;
Ok(())
}
}
impl<T: SyncRecord + std::fmt::Debug> SyncEngine for ConfigSyncEngine<T> {
fn collection_name(&self) -> CollectionName {
self.config.collection.clone()
}
fn set_local_encryption_key(&mut self, key: &str) -> anyhow::Result<()> {
self.local_enc_key = Some(key.to_string());
Ok(())
}
fn prepare_for_sync(
&self,
_get_client_data: &dyn Fn() -> sync15::ClientData,
) -> anyhow::Result<()> {
let db = &self.store.db.lock().unwrap();
let signal = db.begin_interrupt_scope()?;
crate::db::schema::create_empty_sync_temp_tables(&db.writer)?;
signal.err_if_interrupted()?;
Ok(())
}
fn stage_incoming(
&self,
inbound: Vec<IncomingBso>,
telem: &mut telemetry::Engine,
) -> anyhow::Result<()> {
let db = &self.store.db.lock().unwrap();
let signal = db.begin_interrupt_scope()?;
let mut incoming_telemetry = telemetry::EngineIncoming::new();
incoming_telemetry.applied(inbound.len() as u32);
telem.incoming(incoming_telemetry);
let tx = db.writer.unchecked_transaction()?;
let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
incoming_impl.stage_incoming(&tx, inbound, &signal)?;
tx.commit()?;
Ok(())
}
fn apply(
&self,
timestamp: ServerTimestamp,
_telem: &mut telemetry::Engine,
) -> anyhow::Result<Vec<OutgoingBso>> {
let db = &self.store.db.lock().unwrap();
let signal = db.begin_interrupt_scope()?;
let tx = db.writer.unchecked_transaction()?;
let incoming_impl = self.storage_impl.get_incoming_impl(&self.local_enc_key)?;
let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
for state in incoming_impl.fetch_incoming_states(&tx)? {
signal.err_if_interrupted()?;
let action = plan_incoming(&*incoming_impl, &tx, state)?;
super::apply_incoming_action(&*incoming_impl, &tx, action)?;
}
self.put_meta(&tx, LAST_SYNC_META_KEY, ×tamp.as_millis())?;
incoming_impl.finish_incoming(&tx)?;
let outgoing = outgoing_impl.fetch_outgoing_records(&tx)?;
tx.commit()?;
Ok(outgoing)
}
fn set_uploaded(&self, new_timestamp: ServerTimestamp, ids: Vec<Guid>) -> anyhow::Result<()> {
let db = &self.store.db.lock().unwrap();
self.put_meta(&db.writer, LAST_SYNC_META_KEY, &new_timestamp.as_millis())?;
let tx = db.writer.unchecked_transaction()?;
let outgoing_impl = self.storage_impl.get_outgoing_impl(&self.local_enc_key)?;
outgoing_impl.finish_synced_items(&tx, ids)?;
tx.commit()?;
Ok(())
}
fn get_collection_request(
&self,
server_timestamp: ServerTimestamp,
) -> anyhow::Result<Option<CollectionRequest>> {
let db = &self.store.db.lock().unwrap();
let since = ServerTimestamp(
self.get_meta::<i64>(&db.writer, LAST_SYNC_META_KEY)?
.unwrap_or_default(),
);
Ok(if since == server_timestamp {
None
} else {
Some(
CollectionRequest::new(self.collection_name())
.full()
.newer_than(since),
)
})
}
fn get_sync_assoc(&self) -> anyhow::Result<EngineSyncAssociation> {
let db = &self.store.db.lock().unwrap();
let global = self.get_meta(&db.writer, GLOBAL_SYNCID_META_KEY)?;
let coll = self.get_meta(&db.writer, COLLECTION_SYNCID_META_KEY)?;
Ok(if let (Some(global), Some(coll)) = (global, coll) {
EngineSyncAssociation::Connected(CollSyncIds { global, coll })
} else {
EngineSyncAssociation::Disconnected
})
}
fn reset(&self, assoc: &EngineSyncAssociation) -> anyhow::Result<()> {
let db = &self.store.db.lock().unwrap();
let tx = db.unchecked_transaction()?;
self.storage_impl.reset_storage(&tx)?;
self.put_meta(&tx, LAST_SYNC_META_KEY, &0)?;
match assoc {
EngineSyncAssociation::Disconnected => {
self.delete_meta(&tx, GLOBAL_SYNCID_META_KEY)?;
self.delete_meta(&tx, COLLECTION_SYNCID_META_KEY)?;
}
EngineSyncAssociation::Connected(ids) => {
self.put_meta(&tx, GLOBAL_SYNCID_META_KEY, &ids.global)?;
self.put_meta(&tx, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
}
}
tx.commit()?;
Ok(())
}
fn wipe(&self) -> anyhow::Result<()> {
log::warn!("not implemented as there isn't a valid use case for it");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::credit_cards::add_internal_credit_card;
use crate::db::credit_cards::tests::{
get_all, insert_tombstone_record, test_insert_mirror_record,
};
use crate::db::models::credit_card::InternalCreditCard;
use crate::db::schema::create_empty_sync_temp_tables;
use crate::encryption::EncryptorDecryptor;
use crate::sync::{IncomingBso, UnknownFields};
use sql_support::ConnExt;
impl InternalCreditCard {
pub fn into_test_incoming_bso(
self,
encdec: &EncryptorDecryptor,
unknown_fields: UnknownFields,
) -> IncomingBso {
let mut payload = self.into_payload(encdec).expect("is json");
payload.entry.unknown_fields = unknown_fields;
IncomingBso::from_test_content(payload)
}
}
fn create_engine() -> ConfigSyncEngine<InternalCreditCard> {
let store = crate::db::store::Store::new_memory();
crate::sync::credit_card::create_engine(Arc::new(store))
}
pub fn clear_cc_tables(conn: &Connection) -> rusqlite::Result<(), rusqlite::Error> {
conn.execute_all(&[
"DELETE FROM credit_cards_data;",
"DELETE FROM credit_cards_mirror;",
"DELETE FROM credit_cards_tombstones;",
"DELETE FROM moz_meta;",
])
}
#[test]
fn test_credit_card_engine_apply_timestamp() -> Result<()> {
let mut credit_card_engine = create_engine();
let test_key = crate::encryption::create_autofill_key().unwrap();
credit_card_engine
.set_local_encryption_key(&test_key)
.unwrap();
{
create_empty_sync_temp_tables(&credit_card_engine.store.db.lock().unwrap())?;
}
let mut telem = telemetry::Engine::new("whatever");
let last_sync = 24;
let result = credit_card_engine.apply(ServerTimestamp::from_millis(last_sync), &mut telem);
assert!(result.is_ok());
let conn = &credit_card_engine.store.db.lock().unwrap().writer;
assert_eq!(
credit_card_engine.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?,
Some(last_sync)
);
Ok(())
}
#[test]
fn test_credit_card_engine_get_sync_assoc() -> Result<()> {
let credit_card_engine = create_engine();
let result = credit_card_engine.get_sync_assoc();
assert!(result.is_ok());
assert_eq!(result.unwrap(), EngineSyncAssociation::Disconnected);
let global_guid = Guid::new("AAAA");
let coll_guid = Guid::new("AAAA");
let ids = CollSyncIds {
global: global_guid,
coll: coll_guid,
};
{
let conn = &credit_card_engine.store.db.lock().unwrap().writer;
credit_card_engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
credit_card_engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
}
let result = credit_card_engine.get_sync_assoc();
assert!(result.is_ok());
assert_eq!(result.unwrap(), EngineSyncAssociation::Connected(ids));
Ok(())
}
#[test]
fn test_engine_sync_reset() -> Result<()> {
let engine = create_engine();
let encdec = EncryptorDecryptor::new_with_random_key().unwrap();
let cc = InternalCreditCard {
guid: Guid::random(),
cc_name: "Ms Jane Doe".to_string(),
cc_number_enc: encdec.encrypt("12341232412341234", "cc_number")?,
cc_number_last_4: "1234".to_string(),
cc_exp_month: 12,
cc_exp_year: 2021,
cc_type: "visa".to_string(),
..Default::default()
};
{
let db = &engine.store.db.lock().unwrap();
let tx = db.writer.unchecked_transaction()?;
add_internal_credit_card(&tx, &cc)?;
test_insert_mirror_record(
&tx,
cc.clone()
.into_test_incoming_bso(&encdec, Default::default()),
);
insert_tombstone_record(&tx, Guid::random().to_string())?;
tx.commit()?;
}
let global_guid = Guid::new("AAAA");
let coll_guid = Guid::new("AAAA");
let ids = CollSyncIds {
global: global_guid.clone(),
coll: coll_guid.clone(),
};
{
let conn = &engine.store.db.lock().unwrap().writer;
engine.put_meta(conn, GLOBAL_SYNCID_META_KEY, &ids.global)?;
engine.put_meta(conn, COLLECTION_SYNCID_META_KEY, &ids.coll)?;
}
engine
.reset(&EngineSyncAssociation::Disconnected)
.expect("should work");
{
let conn = &engine.store.db.lock().unwrap().writer;
assert!(get_all(conn, "credit_cards_mirror".to_string())?.is_empty());
assert!(get_all(conn, "credit_cards_tombstones".to_string())?.is_empty());
let expected_sync_time = 0;
assert_eq!(
engine
.get_meta::<i64>(conn, LAST_SYNC_META_KEY)?
.unwrap_or(1),
expected_sync_time
);
assert!(engine
.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?
.is_none());
assert!(engine
.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?
.is_none());
clear_cc_tables(conn)?;
let tx = conn.unchecked_transaction()?;
add_internal_credit_card(&tx, &cc)?;
test_insert_mirror_record(&tx, cc.into_test_incoming_bso(&encdec, Default::default()));
insert_tombstone_record(&tx, Guid::random().to_string())?;
tx.commit()?;
}
engine
.reset(&EngineSyncAssociation::Connected(ids))
.expect("should work");
let conn = &engine.store.db.lock().unwrap().writer;
let retrieved_global_sync_id = engine.get_meta::<String>(conn, GLOBAL_SYNCID_META_KEY)?;
assert_eq!(
retrieved_global_sync_id.unwrap_or_default(),
global_guid.to_string()
);
let retrieved_coll_sync_id = engine.get_meta::<String>(conn, COLLECTION_SYNCID_META_KEY)?;
assert_eq!(
retrieved_coll_sync_id.unwrap_or_default(),
coll_guid.to_string()
);
Ok(())
}
}